1use std::{
5 collections::{vec_deque::IterMut, VecDeque},
6 ops::Range,
7};
8
9#[cfg(with_metrics)]
10use linera_base::prometheus_util::MeasureLatency as _;
11use serde::{de::DeserializeOwned, Serialize};
12
13use crate::{
14 batch::Batch,
15 common::{from_bytes_option_or_default, HasherOutput},
16 context::Context,
17 hashable_wrapper::WrappedHashableContainerView,
18 store::ReadableKeyValueStore as _,
19 views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
20};
21
22#[cfg(with_metrics)]
23mod metrics {
24 use std::sync::LazyLock;
25
26 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
27 use prometheus::HistogramVec;
28
29 pub static QUEUE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
31 register_histogram_vec(
32 "queue_view_hash_runtime",
33 "QueueView hash runtime",
34 &[],
35 exponential_bucket_latencies(5.0),
36 )
37 });
38}
39
40#[repr(u8)]
42enum KeyTag {
43 Store = MIN_VIEW_TAG,
45 Index,
47}
48
49#[derive(Debug)]
51pub struct QueueView<C, T> {
52 context: C,
53 stored_indices: Range<usize>,
54 front_delete_count: usize,
55 delete_storage_first: bool,
56 new_back_values: VecDeque<T>,
57}
58
59impl<C, T> View for QueueView<C, T>
60where
61 C: Context,
62 T: Serialize + Send + Sync,
63{
64 const NUM_INIT_KEYS: usize = 1;
65
66 type Context = C;
67
68 fn context(&self) -> &C {
69 &self.context
70 }
71
72 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
73 Ok(vec![context.base_key().base_tag(KeyTag::Store as u8)])
74 }
75
76 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
77 let stored_indices =
78 from_bytes_option_or_default(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
79 Ok(Self {
80 context,
81 stored_indices,
82 front_delete_count: 0,
83 delete_storage_first: false,
84 new_back_values: VecDeque::new(),
85 })
86 }
87
88 async fn load(context: C) -> Result<Self, ViewError> {
89 let keys = Self::pre_load(&context)?;
90 let values = context.store().read_multi_values_bytes(keys).await?;
91 Self::post_load(context, &values)
92 }
93
94 fn rollback(&mut self) {
95 self.delete_storage_first = false;
96 self.front_delete_count = 0;
97 self.new_back_values.clear();
98 }
99
100 async fn has_pending_changes(&self) -> bool {
101 if self.delete_storage_first {
102 return true;
103 }
104 if self.front_delete_count > 0 {
105 return true;
106 }
107 !self.new_back_values.is_empty()
108 }
109
110 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
111 let mut delete_view = false;
112 if self.delete_storage_first {
113 batch.delete_key_prefix(self.context.base_key().bytes.clone());
114 delete_view = true;
115 }
116 if self.stored_count() == 0 {
117 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
118 batch.delete_key_prefix(key_prefix);
119 self.stored_indices = Range::default();
120 } else if self.front_delete_count > 0 {
121 let deletion_range = self.stored_indices.clone().take(self.front_delete_count);
122 self.stored_indices.start += self.front_delete_count;
123 for index in deletion_range {
124 let key = self
125 .context
126 .base_key()
127 .derive_tag_key(KeyTag::Index as u8, &index)?;
128 batch.delete_key(key);
129 }
130 }
131 if !self.new_back_values.is_empty() {
132 delete_view = false;
133 for value in &self.new_back_values {
134 let key = self
135 .context
136 .base_key()
137 .derive_tag_key(KeyTag::Index as u8, &self.stored_indices.end)?;
138 batch.put_key_value(key, value)?;
139 self.stored_indices.end += 1;
140 }
141 self.new_back_values.clear();
142 }
143 if !self.delete_storage_first || !self.stored_indices.is_empty() {
144 let key = self.context.base_key().base_tag(KeyTag::Store as u8);
145 batch.put_key_value(key, &self.stored_indices)?;
146 }
147 self.front_delete_count = 0;
148 self.delete_storage_first = false;
149 Ok(delete_view)
150 }
151
152 fn clear(&mut self) {
153 self.delete_storage_first = true;
154 self.new_back_values.clear();
155 }
156}
157
158impl<C, T> ClonableView for QueueView<C, T>
159where
160 C: Context,
161 T: Clone + Send + Sync + Serialize,
162{
163 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
164 Ok(QueueView {
165 context: self.context.clone(),
166 stored_indices: self.stored_indices.clone(),
167 front_delete_count: self.front_delete_count,
168 delete_storage_first: self.delete_storage_first,
169 new_back_values: self.new_back_values.clone(),
170 })
171 }
172}
173
174impl<C, T> QueueView<C, T> {
175 fn stored_count(&self) -> usize {
176 if self.delete_storage_first {
177 0
178 } else {
179 self.stored_indices.len() - self.front_delete_count
180 }
181 }
182}
183
184impl<'a, C, T> QueueView<C, T>
185where
186 C: Context,
187 T: Send + Sync + Clone + Serialize + DeserializeOwned,
188{
189 async fn get(&self, index: usize) -> Result<Option<T>, ViewError> {
190 let key = self
191 .context
192 .base_key()
193 .derive_tag_key(KeyTag::Index as u8, &index)?;
194 Ok(self.context.store().read_value(&key).await?)
195 }
196
197 pub async fn front(&self) -> Result<Option<T>, ViewError> {
211 let stored_remainder = self.stored_count();
212 let value = if stored_remainder > 0 {
213 self.get(self.stored_indices.end - stored_remainder).await?
214 } else {
215 self.new_back_values.front().cloned()
216 };
217 Ok(value)
218 }
219
220 pub async fn back(&self) -> Result<Option<T>, ViewError> {
234 Ok(match self.new_back_values.back() {
235 Some(value) => Some(value.clone()),
236 None if self.stored_count() > 0 => self.get(self.stored_indices.end - 1).await?,
237 _ => None,
238 })
239 }
240
241 pub fn delete_front(&mut self) {
255 if self.stored_count() > 0 {
256 self.front_delete_count += 1;
257 } else {
258 self.new_back_values.pop_front();
259 }
260 }
261
262 pub fn push_back(&mut self, value: T) {
276 self.new_back_values.push_back(value);
277 }
278
279 pub fn count(&self) -> usize {
292 self.stored_count() + self.new_back_values.len()
293 }
294
295 pub fn extra(&self) -> &C::Extra {
297 self.context.extra()
298 }
299
300 async fn read_context(&self, range: Range<usize>) -> Result<Vec<T>, ViewError> {
301 let count = range.len();
302 let mut keys = Vec::with_capacity(count);
303 for index in range {
304 let key = self
305 .context
306 .base_key()
307 .derive_tag_key(KeyTag::Index as u8, &index)?;
308 keys.push(key)
309 }
310 let mut values = Vec::with_capacity(count);
311 for entry in self.context.store().read_multi_values(keys).await? {
312 match entry {
313 None => {
314 return Err(ViewError::MissingEntries);
315 }
316 Some(value) => values.push(value),
317 }
318 }
319 Ok(values)
320 }
321
322 pub async fn read_front(&self, mut count: usize) -> Result<Vec<T>, ViewError> {
336 if count > self.count() {
337 count = self.count();
338 }
339 if count == 0 {
340 return Ok(Vec::new());
341 }
342 let mut values = Vec::with_capacity(count);
343 if !self.delete_storage_first {
344 let stored_remainder = self.stored_count();
345 let start = self.stored_indices.end - stored_remainder;
346 if count <= stored_remainder {
347 values.extend(self.read_context(start..(start + count)).await?);
348 } else {
349 values.extend(self.read_context(start..self.stored_indices.end).await?);
350 values.extend(
351 self.new_back_values
352 .range(0..(count - stored_remainder))
353 .cloned(),
354 );
355 }
356 } else {
357 values.extend(self.new_back_values.range(0..count).cloned());
358 }
359 Ok(values)
360 }
361
362 pub async fn read_back(&self, mut count: usize) -> Result<Vec<T>, ViewError> {
376 if count > self.count() {
377 count = self.count();
378 }
379 if count == 0 {
380 return Ok(Vec::new());
381 }
382 let mut values = Vec::with_capacity(count);
383 let new_back_len = self.new_back_values.len();
384 if count <= new_back_len || self.delete_storage_first {
385 values.extend(
386 self.new_back_values
387 .range((new_back_len - count)..new_back_len)
388 .cloned(),
389 );
390 } else {
391 let start = self.stored_indices.end + new_back_len - count;
392 values.extend(self.read_context(start..self.stored_indices.end).await?);
393 values.extend(self.new_back_values.iter().cloned());
394 }
395 Ok(values)
396 }
397
398 pub async fn elements(&self) -> Result<Vec<T>, ViewError> {
412 let count = self.count();
413 self.read_front(count).await
414 }
415
416 async fn load_all(&mut self) -> Result<(), ViewError> {
417 if !self.delete_storage_first {
418 let stored_remainder = self.stored_count();
419 let start = self.stored_indices.end - stored_remainder;
420 let elements = self.read_context(start..self.stored_indices.end).await?;
421 let shift = self.stored_indices.end - start;
422 for elt in elements {
423 self.new_back_values.push_back(elt);
424 }
425 self.new_back_values.rotate_right(shift);
426 self.delete_storage_first = true;
431 }
432 Ok(())
433 }
434
435 pub async fn iter_mut(&'a mut self) -> Result<IterMut<'a, T>, ViewError> {
451 self.load_all().await?;
452 Ok(self.new_back_values.iter_mut())
453 }
454}
455
456impl<C, T> HashableView for QueueView<C, T>
457where
458 C: Context,
459 T: Send + Sync + Clone + Serialize + DeserializeOwned,
460{
461 type Hasher = sha3::Sha3_256;
462
463 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
464 self.hash().await
465 }
466
467 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
468 #[cfg(with_metrics)]
469 let _hash_latency = metrics::QUEUE_VIEW_HASH_RUNTIME.measure_latency();
470 let elements = self.elements().await?;
471 let mut hasher = sha3::Sha3_256::default();
472 hasher.update_with_bcs_bytes(&elements)?;
473 Ok(hasher.finalize())
474 }
475}
476
477pub type HashedQueueView<C, T> = WrappedHashableContainerView<C, QueueView<C, T>, HasherOutput>;
479
480#[cfg(with_graphql)]
481mod graphql {
482 use std::borrow::Cow;
483
484 use super::QueueView;
485 use crate::{
486 context::Context,
487 graphql::{hash_name, mangle},
488 };
489
490 impl<C: Send + Sync, T: async_graphql::OutputType> async_graphql::TypeName for QueueView<C, T> {
491 fn type_name() -> Cow<'static, str> {
492 format!(
493 "QueueView_{}_{:08x}",
494 mangle(T::type_name()),
495 hash_name::<T>()
496 )
497 .into()
498 }
499 }
500
501 #[async_graphql::Object(cache_control(no_cache), name_type)]
502 impl<C: Context, T: async_graphql::OutputType> QueueView<C, T>
503 where
504 T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
505 {
506 async fn entries(&self, count: Option<usize>) -> async_graphql::Result<Vec<T>> {
507 Ok(self
508 .read_front(count.unwrap_or_else(|| self.count()))
509 .await?)
510 }
511 }
512}