1use std::{
5 collections::{vec_deque::IterMut, VecDeque},
6 ops::Range,
7};
8
9#[cfg(with_metrics)]
10use linera_base::prometheus_util::MeasureLatency as _;
11use serde::{de::DeserializeOwned, Serialize};
12
13use crate::{
14 batch::Batch,
15 common::{from_bytes_option_or_default, HasherOutput},
16 context::Context,
17 hashable_wrapper::WrappedHashableContainerView,
18 historical_hash_wrapper::HistoricallyHashableView,
19 store::ReadableKeyValueStore as _,
20 views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
21};
22
23#[cfg(with_metrics)]
24mod metrics {
25 use std::sync::LazyLock;
26
27 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
28 use prometheus::HistogramVec;
29
30 pub static QUEUE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
32 register_histogram_vec(
33 "queue_view_hash_runtime",
34 "QueueView hash runtime",
35 &[],
36 exponential_bucket_latencies(5.0),
37 )
38 });
39}
40
41#[repr(u8)]
43enum KeyTag {
44 Store = MIN_VIEW_TAG,
46 Index,
48}
49
50#[derive(Debug)]
52pub struct QueueView<C, T> {
53 context: C,
54 stored_indices: Range<usize>,
55 front_delete_count: usize,
56 delete_storage_first: bool,
57 new_back_values: VecDeque<T>,
58}
59
60impl<C, T> View for QueueView<C, T>
61where
62 C: Context,
63 T: Serialize + Send + Sync,
64{
65 const NUM_INIT_KEYS: usize = 1;
66
67 type Context = C;
68
69 fn context(&self) -> &C {
70 &self.context
71 }
72
73 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
74 Ok(vec![context.base_key().base_tag(KeyTag::Store as u8)])
75 }
76
77 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
78 let stored_indices =
79 from_bytes_option_or_default(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
80 Ok(Self {
81 context,
82 stored_indices,
83 front_delete_count: 0,
84 delete_storage_first: false,
85 new_back_values: VecDeque::new(),
86 })
87 }
88
89 fn rollback(&mut self) {
90 self.delete_storage_first = false;
91 self.front_delete_count = 0;
92 self.new_back_values.clear();
93 }
94
95 async fn has_pending_changes(&self) -> bool {
96 if self.delete_storage_first {
97 return true;
98 }
99 if self.front_delete_count > 0 {
100 return true;
101 }
102 !self.new_back_values.is_empty()
103 }
104
105 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
106 let mut delete_view = false;
107 if self.delete_storage_first {
108 batch.delete_key_prefix(self.context.base_key().bytes.clone());
109 delete_view = true;
110 }
111 if self.stored_count() == 0 {
112 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
113 batch.delete_key_prefix(key_prefix);
114 self.stored_indices = Range::default();
115 } else if self.front_delete_count > 0 {
116 let deletion_range = self.stored_indices.clone().take(self.front_delete_count);
117 self.stored_indices.start += self.front_delete_count;
118 for index in deletion_range {
119 let key = self
120 .context
121 .base_key()
122 .derive_tag_key(KeyTag::Index as u8, &index)?;
123 batch.delete_key(key);
124 }
125 }
126 if !self.new_back_values.is_empty() {
127 delete_view = false;
128 for value in &self.new_back_values {
129 let key = self
130 .context
131 .base_key()
132 .derive_tag_key(KeyTag::Index as u8, &self.stored_indices.end)?;
133 batch.put_key_value(key, value)?;
134 self.stored_indices.end += 1;
135 }
136 self.new_back_values.clear();
137 }
138 if !self.delete_storage_first || !self.stored_indices.is_empty() {
139 let key = self.context.base_key().base_tag(KeyTag::Store as u8);
140 batch.put_key_value(key, &self.stored_indices)?;
141 }
142 self.front_delete_count = 0;
143 self.delete_storage_first = false;
144 Ok(delete_view)
145 }
146
147 fn clear(&mut self) {
148 self.delete_storage_first = true;
149 self.new_back_values.clear();
150 }
151}
152
153impl<C, T> ClonableView for QueueView<C, T>
154where
155 C: Context,
156 T: Clone + Send + Sync + Serialize,
157{
158 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
159 Ok(QueueView {
160 context: self.context.clone(),
161 stored_indices: self.stored_indices.clone(),
162 front_delete_count: self.front_delete_count,
163 delete_storage_first: self.delete_storage_first,
164 new_back_values: self.new_back_values.clone(),
165 })
166 }
167}
168
169impl<C, T> QueueView<C, T> {
170 fn stored_count(&self) -> usize {
171 if self.delete_storage_first {
172 0
173 } else {
174 self.stored_indices.len() - self.front_delete_count
175 }
176 }
177}
178
179impl<'a, C, T> QueueView<C, T>
180where
181 C: Context,
182 T: Send + Sync + Clone + Serialize + DeserializeOwned,
183{
184 async fn get(&self, index: usize) -> Result<Option<T>, ViewError> {
185 let key = self
186 .context
187 .base_key()
188 .derive_tag_key(KeyTag::Index as u8, &index)?;
189 Ok(self.context.store().read_value(&key).await?)
190 }
191
192 pub async fn front(&self) -> Result<Option<T>, ViewError> {
206 let stored_remainder = self.stored_count();
207 let value = if stored_remainder > 0 {
208 self.get(self.stored_indices.end - stored_remainder).await?
209 } else {
210 self.new_back_values.front().cloned()
211 };
212 Ok(value)
213 }
214
215 pub async fn back(&self) -> Result<Option<T>, ViewError> {
229 Ok(match self.new_back_values.back() {
230 Some(value) => Some(value.clone()),
231 None if self.stored_count() > 0 => self.get(self.stored_indices.end - 1).await?,
232 _ => None,
233 })
234 }
235
236 pub fn delete_front(&mut self) {
250 if self.stored_count() > 0 {
251 self.front_delete_count += 1;
252 } else {
253 self.new_back_values.pop_front();
254 }
255 }
256
257 pub fn push_back(&mut self, value: T) {
271 self.new_back_values.push_back(value);
272 }
273
274 pub fn count(&self) -> usize {
287 self.stored_count() + self.new_back_values.len()
288 }
289
290 pub fn extra(&self) -> &C::Extra {
292 self.context.extra()
293 }
294
295 async fn read_context(&self, range: Range<usize>) -> Result<Vec<T>, ViewError> {
296 let count = range.len();
297 let mut keys = Vec::with_capacity(count);
298 for index in range {
299 let key = self
300 .context
301 .base_key()
302 .derive_tag_key(KeyTag::Index as u8, &index)?;
303 keys.push(key)
304 }
305 let mut values = Vec::with_capacity(count);
306 for entry in self.context.store().read_multi_values(keys).await? {
307 match entry {
308 None => {
309 return Err(ViewError::MissingEntries);
310 }
311 Some(value) => values.push(value),
312 }
313 }
314 Ok(values)
315 }
316
317 pub async fn read_front(&self, mut count: usize) -> Result<Vec<T>, ViewError> {
331 if count > self.count() {
332 count = self.count();
333 }
334 if count == 0 {
335 return Ok(Vec::new());
336 }
337 let mut values = Vec::with_capacity(count);
338 if !self.delete_storage_first {
339 let stored_remainder = self.stored_count();
340 let start = self.stored_indices.end - stored_remainder;
341 if count <= stored_remainder {
342 values.extend(self.read_context(start..(start + count)).await?);
343 } else {
344 values.extend(self.read_context(start..self.stored_indices.end).await?);
345 values.extend(
346 self.new_back_values
347 .range(0..(count - stored_remainder))
348 .cloned(),
349 );
350 }
351 } else {
352 values.extend(self.new_back_values.range(0..count).cloned());
353 }
354 Ok(values)
355 }
356
357 pub async fn read_back(&self, mut count: usize) -> Result<Vec<T>, ViewError> {
371 if count > self.count() {
372 count = self.count();
373 }
374 if count == 0 {
375 return Ok(Vec::new());
376 }
377 let mut values = Vec::with_capacity(count);
378 let new_back_len = self.new_back_values.len();
379 if count <= new_back_len || self.delete_storage_first {
380 values.extend(
381 self.new_back_values
382 .range((new_back_len - count)..new_back_len)
383 .cloned(),
384 );
385 } else {
386 let start = self.stored_indices.end + new_back_len - count;
387 values.extend(self.read_context(start..self.stored_indices.end).await?);
388 values.extend(self.new_back_values.iter().cloned());
389 }
390 Ok(values)
391 }
392
393 pub async fn elements(&self) -> Result<Vec<T>, ViewError> {
407 let count = self.count();
408 self.read_front(count).await
409 }
410
411 async fn load_all(&mut self) -> Result<(), ViewError> {
412 if !self.delete_storage_first {
413 let stored_remainder = self.stored_count();
414 let start = self.stored_indices.end - stored_remainder;
415 let elements = self.read_context(start..self.stored_indices.end).await?;
416 let shift = self.stored_indices.end - start;
417 for elt in elements {
418 self.new_back_values.push_back(elt);
419 }
420 self.new_back_values.rotate_right(shift);
421 self.delete_storage_first = true;
426 }
427 Ok(())
428 }
429
430 pub async fn iter_mut(&'a mut self) -> Result<IterMut<'a, T>, ViewError> {
446 self.load_all().await?;
447 Ok(self.new_back_values.iter_mut())
448 }
449}
450
451impl<C, T> HashableView for QueueView<C, T>
452where
453 C: Context,
454 T: Send + Sync + Clone + Serialize + DeserializeOwned,
455{
456 type Hasher = sha3::Sha3_256;
457
458 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
459 self.hash().await
460 }
461
462 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
463 #[cfg(with_metrics)]
464 let _hash_latency = metrics::QUEUE_VIEW_HASH_RUNTIME.measure_latency();
465 let elements = self.elements().await?;
466 let mut hasher = sha3::Sha3_256::default();
467 hasher.update_with_bcs_bytes(&elements)?;
468 Ok(hasher.finalize())
469 }
470}
471
472pub type HashedQueueView<C, T> = WrappedHashableContainerView<C, QueueView<C, T>, HasherOutput>;
474
475pub type HistoricallyHashedQueueView<C, T> = HistoricallyHashableView<C, QueueView<C, T>>;
477
478#[cfg(with_graphql)]
479mod graphql {
480 use std::borrow::Cow;
481
482 use super::QueueView;
483 use crate::{
484 context::Context,
485 graphql::{hash_name, mangle},
486 };
487
488 impl<C: Send + Sync, T: async_graphql::OutputType> async_graphql::TypeName for QueueView<C, T> {
489 fn type_name() -> Cow<'static, str> {
490 format!(
491 "QueueView_{}_{:08x}",
492 mangle(T::type_name()),
493 hash_name::<T>()
494 )
495 .into()
496 }
497 }
498
499 #[async_graphql::Object(cache_control(no_cache), name_type)]
500 impl<C: Context, T: async_graphql::OutputType> QueueView<C, T>
501 where
502 T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
503 {
504 #[graphql(derived(name = "count"))]
505 async fn count_(&self) -> Result<u32, async_graphql::Error> {
506 Ok(self.count() as u32)
507 }
508
509 async fn entries(&self, count: Option<usize>) -> async_graphql::Result<Vec<T>> {
510 Ok(self
511 .read_front(count.unwrap_or_else(|| self.count()))
512 .await?)
513 }
514 }
515}