1use std::{
5 collections::{vec_deque::IterMut, VecDeque},
6 ops::Range,
7};
8
9use allocative::Allocative;
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::{data_types::ArithmeticError, visit_allocative_simple};
13use serde::{de::DeserializeOwned, Serialize};
14
15use crate::{
16 batch::Batch,
17 common::{from_bytes_option_or_default, HasherOutput},
18 context::Context,
19 hashable_wrapper::WrappedHashableContainerView,
20 historical_hash_wrapper::HistoricallyHashableView,
21 store::ReadableKeyValueStore as _,
22 views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
23};
24
25#[cfg(with_metrics)]
26mod metrics {
27 use std::sync::LazyLock;
28
29 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
30 use prometheus::HistogramVec;
31
32 pub static QUEUE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
34 register_histogram_vec(
35 "queue_view_hash_runtime",
36 "QueueView hash runtime",
37 &[],
38 exponential_bucket_latencies(5.0),
39 )
40 });
41}
42
43#[repr(u8)]
45enum KeyTag {
46 Store = MIN_VIEW_TAG,
48 Index,
50}
51
52#[derive(Debug, Allocative)]
54#[allocative(bound = "C, T: Allocative")]
55pub struct QueueView<C, T> {
56 #[allocative(skip)]
58 context: C,
59 #[allocative(visit = visit_allocative_simple)]
61 stored_indices: Range<u32>,
62 front_delete_count: u32,
64 delete_storage_first: bool,
66 new_back_values: VecDeque<T>,
68}
69
70impl<C, T> View for QueueView<C, T>
71where
72 C: Context,
73 T: Serialize + Send + Sync,
74{
75 const NUM_INIT_KEYS: usize = 1;
76
77 type Context = C;
78
79 fn context(&self) -> C {
80 self.context.clone()
81 }
82
83 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
84 Ok(vec![context.base_key().base_tag(KeyTag::Store as u8)])
85 }
86
87 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
88 let stored_indices =
89 from_bytes_option_or_default(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
90 Ok(Self {
91 context,
92 stored_indices,
93 front_delete_count: 0,
94 delete_storage_first: false,
95 new_back_values: VecDeque::new(),
96 })
97 }
98
99 fn rollback(&mut self) {
100 self.delete_storage_first = false;
101 self.front_delete_count = 0;
102 self.new_back_values.clear();
103 }
104
105 async fn has_pending_changes(&self) -> bool {
106 if self.delete_storage_first {
107 return true;
108 }
109 if self.front_delete_count > 0 {
110 return true;
111 }
112 !self.new_back_values.is_empty()
113 }
114
115 fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
116 let mut delete_view = false;
117 if self.delete_storage_first {
118 batch.delete_key_prefix(self.context.base_key().bytes.clone());
119 delete_view = true;
120 }
121 let mut new_stored_indices = self.stored_indices.clone();
122 if self.stored_count() == 0 {
123 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
124 batch.delete_key_prefix(key_prefix);
125 new_stored_indices = Range::default();
126 } else if self.front_delete_count > 0 {
127 let deletion_end = new_stored_indices.start + self.front_delete_count;
128 for index in new_stored_indices.start..deletion_end {
129 let key = self
130 .context
131 .base_key()
132 .derive_tag_key(KeyTag::Index as u8, &index)?;
133 batch.delete_key(key);
134 }
135 new_stored_indices.start = deletion_end;
136 }
137 if !self.new_back_values.is_empty() {
138 delete_view = false;
139 let new_back_len =
140 u32::try_from(self.new_back_values.len()).map_err(|_| ArithmeticError::Overflow)?;
141 new_stored_indices.end = new_stored_indices
142 .end
143 .checked_add(new_back_len)
144 .ok_or(ArithmeticError::Overflow)?;
145 let start = new_stored_indices.end - new_back_len;
146 for (index, value) in (start..).zip(&self.new_back_values) {
147 let key = self
148 .context
149 .base_key()
150 .derive_tag_key(KeyTag::Index as u8, &index)?;
151 batch.put_key_value(key, value)?;
152 }
153 }
154 if !self.delete_storage_first || !new_stored_indices.is_empty() {
155 let key = self.context.base_key().base_tag(KeyTag::Store as u8);
156 batch.put_key_value(key, &new_stored_indices)?;
157 }
158 Ok(delete_view)
159 }
160
161 fn post_save(&mut self) {
162 if self.stored_count() == 0 {
163 self.stored_indices = Range::default();
164 } else if self.front_delete_count > 0 {
165 self.stored_indices.start += self.front_delete_count;
166 }
167 if !self.new_back_values.is_empty() {
168 self.stored_indices.end +=
169 u32::try_from(self.new_back_values.len()).expect("verified in pre_save");
170 self.new_back_values.clear();
171 }
172 self.front_delete_count = 0;
173 self.delete_storage_first = false;
174 }
175
176 fn clear(&mut self) {
177 self.delete_storage_first = true;
178 self.new_back_values.clear();
179 }
180}
181
182impl<C, T> ClonableView for QueueView<C, T>
183where
184 C: Context,
185 T: Clone + Send + Sync + Serialize,
186{
187 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
188 Ok(QueueView {
189 context: self.context.clone(),
190 stored_indices: self.stored_indices.clone(),
191 front_delete_count: self.front_delete_count,
192 delete_storage_first: self.delete_storage_first,
193 new_back_values: self.new_back_values.clone(),
194 })
195 }
196}
197
198impl<C, T> QueueView<C, T> {
199 fn stored_count(&self) -> u32 {
200 if self.delete_storage_first {
201 0
202 } else {
203 (self.stored_indices.end - self.stored_indices.start) - self.front_delete_count
204 }
205 }
206}
207
208impl<'a, C, T> QueueView<C, T>
209where
210 C: Context,
211 T: Send + Sync + Clone + Serialize + DeserializeOwned,
212{
213 async fn get(&self, index: u32) -> Result<Option<T>, ViewError> {
214 let key = self
215 .context
216 .base_key()
217 .derive_tag_key(KeyTag::Index as u8, &index)?;
218 Ok(self.context.store().read_value(&key).await?)
219 }
220
221 pub async fn front(&self) -> Result<Option<T>, ViewError> {
235 let stored_remainder = self.stored_count();
236 let value = if stored_remainder > 0 {
237 self.get(self.stored_indices.end - stored_remainder).await?
238 } else {
239 self.new_back_values.front().cloned()
240 };
241 Ok(value)
242 }
243
244 pub async fn back(&self) -> Result<Option<T>, ViewError> {
258 Ok(match self.new_back_values.back() {
259 Some(value) => Some(value.clone()),
260 None if self.stored_count() > 0 => self.get(self.stored_indices.end - 1).await?,
261 _ => None,
262 })
263 }
264
265 pub fn delete_front(&mut self) {
279 if self.stored_count() > 0 {
280 self.front_delete_count += 1;
281 } else {
282 self.new_back_values.pop_front();
283 }
284 }
285
286 pub fn push_back(&mut self, value: T) {
300 self.new_back_values.push_back(value);
301 }
302
303 pub fn count(&self) -> usize {
316 self.stored_count() as usize + self.new_back_values.len()
317 }
318
319 pub fn extra(&self) -> &C::Extra {
321 self.context.extra()
322 }
323
324 async fn read_context(&self, range: Range<u32>) -> Result<Vec<T>, ViewError> {
325 let count = range.len();
326 let mut keys = Vec::with_capacity(count);
327 for index in range {
328 let key = self
329 .context
330 .base_key()
331 .derive_tag_key(KeyTag::Index as u8, &index)?;
332 keys.push(key)
333 }
334 let mut values = Vec::with_capacity(count);
335 for entry in self.context.store().read_multi_values(&keys).await? {
336 match entry {
337 None => {
338 return Err(ViewError::MissingEntries("QueueView".into()));
339 }
340 Some(value) => values.push(value),
341 }
342 }
343 Ok(values)
344 }
345
346 pub async fn read_front(&self, mut count: usize) -> Result<Vec<T>, ViewError> {
360 if count > self.count() {
361 count = self.count();
362 }
363 if count == 0 {
364 return Ok(Vec::new());
365 }
366 let mut values = Vec::with_capacity(count);
367 if !self.delete_storage_first {
368 let stored_remainder = self.stored_count();
369 let start = self.stored_indices.end - stored_remainder;
370 if count <= stored_remainder as usize {
371 let count = u32::try_from(count).map_err(|_| ArithmeticError::Overflow)?;
372 values.extend(self.read_context(start..start + count).await?);
373 } else {
374 values.extend(self.read_context(start..self.stored_indices.end).await?);
375 values.extend(
376 self.new_back_values
377 .range(0..count - stored_remainder as usize)
378 .cloned(),
379 );
380 }
381 } else {
382 values.extend(self.new_back_values.range(0..count).cloned());
383 }
384 Ok(values)
385 }
386
387 pub async fn read_back(&self, mut count: usize) -> Result<Vec<T>, ViewError> {
401 if count > self.count() {
402 count = self.count();
403 }
404 if count == 0 {
405 return Ok(Vec::new());
406 }
407 let mut values = Vec::with_capacity(count);
408 let new_back_len = self.new_back_values.len();
409 if count <= new_back_len || self.delete_storage_first {
410 values.extend(
411 self.new_back_values
412 .range((new_back_len - count)..new_back_len)
413 .cloned(),
414 );
415 } else {
416 let stored_consumed =
417 u32::try_from(count - new_back_len).map_err(|_| ArithmeticError::Underflow)?;
418 let start = self.stored_indices.end - stored_consumed;
419 values.extend(self.read_context(start..self.stored_indices.end).await?);
420 values.extend(self.new_back_values.iter().cloned());
421 }
422 Ok(values)
423 }
424
425 pub async fn elements(&self) -> Result<Vec<T>, ViewError> {
439 let count = self.count();
440 self.read_front(count).await
441 }
442
443 async fn load_all(&mut self) -> Result<(), ViewError> {
444 if !self.delete_storage_first {
445 let stored_remainder = self.stored_count();
446 let start = self.stored_indices.end - stored_remainder;
447 let elements = self.read_context(start..self.stored_indices.end).await?;
448 for elt in elements {
449 self.new_back_values.push_back(elt);
450 }
451 self.new_back_values.rotate_right(stored_remainder as usize);
452 self.delete_storage_first = true;
457 }
458 Ok(())
459 }
460
461 pub async fn try_iter_mut(&'a mut self) -> Result<IterMut<'a, T>, ViewError> {
477 self.load_all().await?;
478 Ok(self.new_back_values.iter_mut())
479 }
480}
481
482impl<C, T> HashableView for QueueView<C, T>
483where
484 C: Context,
485 T: Send + Sync + Clone + Serialize + DeserializeOwned,
486{
487 type Hasher = sha3::Sha3_256;
488
489 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
490 self.hash().await
491 }
492
493 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
494 #[cfg(with_metrics)]
495 let _hash_latency = metrics::QUEUE_VIEW_HASH_RUNTIME.measure_latency();
496 let elements = self.elements().await?;
497 let mut hasher = sha3::Sha3_256::default();
498 hasher.update_with_bcs_bytes(&elements)?;
499 Ok(hasher.finalize())
500 }
501}
502
503pub type HashedQueueView<C, T> = WrappedHashableContainerView<C, QueueView<C, T>, HasherOutput>;
505
506pub type HistoricallyHashedQueueView<C, T> = HistoricallyHashableView<C, QueueView<C, T>>;
508
509#[cfg(with_graphql)]
510mod graphql {
511 use std::borrow::Cow;
512
513 use linera_base::data_types::ArithmeticError;
514
515 use super::QueueView;
516 use crate::{
517 context::Context,
518 graphql::{hash_name, mangle},
519 };
520
521 impl<C: Send + Sync, T: async_graphql::OutputType> async_graphql::TypeName for QueueView<C, T> {
522 fn type_name() -> Cow<'static, str> {
523 format!(
524 "QueueView_{}_{:08x}",
525 mangle(T::type_name()),
526 hash_name::<T>()
527 )
528 .into()
529 }
530 }
531
532 #[async_graphql::Object(cache_control(no_cache), name_type)]
533 impl<C: Context, T: async_graphql::OutputType> QueueView<C, T>
534 where
535 T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
536 {
537 #[graphql(derived(name = "count"))]
538 async fn count_(&self) -> Result<u32, async_graphql::Error> {
539 Ok(u32::try_from(self.count()).map_err(|_| ArithmeticError::Overflow)?)
540 }
541
542 async fn entries(&self, count: Option<usize>) -> async_graphql::Result<Vec<T>> {
543 Ok(self
544 .read_front(count.unwrap_or_else(|| self.count()))
545 .await?)
546 }
547 }
548}