1use std::{
5 collections::{vec_deque::IterMut, VecDeque},
6 ops::Range,
7};
8
9use allocative::Allocative;
10#[cfg(with_metrics)]
11use linera_base::prometheus_util::MeasureLatency as _;
12use linera_base::visit_allocative_simple;
13use serde::{de::DeserializeOwned, Serialize};
14
15use crate::{
16 batch::Batch,
17 common::{from_bytes_option_or_default, HasherOutput},
18 context::Context,
19 hashable_wrapper::WrappedHashableContainerView,
20 historical_hash_wrapper::HistoricallyHashableView,
21 store::ReadableKeyValueStore as _,
22 views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
23};
24
25#[cfg(with_metrics)]
26mod metrics {
27 use std::sync::LazyLock;
28
29 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
30 use prometheus::HistogramVec;
31
32 pub static QUEUE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
34 register_histogram_vec(
35 "queue_view_hash_runtime",
36 "QueueView hash runtime",
37 &[],
38 exponential_bucket_latencies(5.0),
39 )
40 });
41}
42
43#[repr(u8)]
45enum KeyTag {
46 Store = MIN_VIEW_TAG,
48 Index,
50}
51
52#[derive(Debug, Allocative)]
54#[allocative(bound = "C, T: Allocative")]
55pub struct QueueView<C, T> {
56 #[allocative(skip)]
58 context: C,
59 #[allocative(visit = visit_allocative_simple)]
61 stored_indices: Range<usize>,
62 front_delete_count: usize,
64 delete_storage_first: bool,
66 new_back_values: VecDeque<T>,
68}
69
70impl<C, T> View for QueueView<C, T>
71where
72 C: Context,
73 T: Serialize + Send + Sync,
74{
75 const NUM_INIT_KEYS: usize = 1;
76
77 type Context = C;
78
79 fn context(&self) -> C {
80 self.context.clone()
81 }
82
83 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
84 Ok(vec![context.base_key().base_tag(KeyTag::Store as u8)])
85 }
86
87 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
88 let stored_indices =
89 from_bytes_option_or_default(values.first().ok_or(ViewError::PostLoadValuesError)?)?;
90 Ok(Self {
91 context,
92 stored_indices,
93 front_delete_count: 0,
94 delete_storage_first: false,
95 new_back_values: VecDeque::new(),
96 })
97 }
98
99 fn rollback(&mut self) {
100 self.delete_storage_first = false;
101 self.front_delete_count = 0;
102 self.new_back_values.clear();
103 }
104
105 async fn has_pending_changes(&self) -> bool {
106 if self.delete_storage_first {
107 return true;
108 }
109 if self.front_delete_count > 0 {
110 return true;
111 }
112 !self.new_back_values.is_empty()
113 }
114
115 fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
116 let mut delete_view = false;
117 if self.delete_storage_first {
118 batch.delete_key_prefix(self.context.base_key().bytes.clone());
119 delete_view = true;
120 }
121 let mut new_stored_indices = self.stored_indices.clone();
122 if self.stored_count() == 0 {
123 let key_prefix = self.context.base_key().base_tag(KeyTag::Index as u8);
124 batch.delete_key_prefix(key_prefix);
125 new_stored_indices = Range::default();
126 } else if self.front_delete_count > 0 {
127 let deletion_range = self.stored_indices.clone().take(self.front_delete_count);
128 new_stored_indices.start += self.front_delete_count;
129 for index in deletion_range {
130 let key = self
131 .context
132 .base_key()
133 .derive_tag_key(KeyTag::Index as u8, &index)?;
134 batch.delete_key(key);
135 }
136 }
137 if !self.new_back_values.is_empty() {
138 delete_view = false;
139 for value in &self.new_back_values {
140 let key = self
141 .context
142 .base_key()
143 .derive_tag_key(KeyTag::Index as u8, &new_stored_indices.end)?;
144 batch.put_key_value(key, value)?;
145 new_stored_indices.end += 1;
146 }
147 }
148 if !self.delete_storage_first || !new_stored_indices.is_empty() {
149 let key = self.context.base_key().base_tag(KeyTag::Store as u8);
150 batch.put_key_value(key, &new_stored_indices)?;
151 }
152 Ok(delete_view)
153 }
154
155 fn post_save(&mut self) {
156 if self.stored_count() == 0 {
157 self.stored_indices = Range::default();
158 } else if self.front_delete_count > 0 {
159 self.stored_indices.start += self.front_delete_count;
160 }
161 if !self.new_back_values.is_empty() {
162 self.stored_indices.end += self.new_back_values.len();
163 self.new_back_values.clear();
164 }
165 self.front_delete_count = 0;
166 self.delete_storage_first = false;
167 }
168
169 fn clear(&mut self) {
170 self.delete_storage_first = true;
171 self.new_back_values.clear();
172 }
173}
174
175impl<C, T> ClonableView for QueueView<C, T>
176where
177 C: Context,
178 T: Clone + Send + Sync + Serialize,
179{
180 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
181 Ok(QueueView {
182 context: self.context.clone(),
183 stored_indices: self.stored_indices.clone(),
184 front_delete_count: self.front_delete_count,
185 delete_storage_first: self.delete_storage_first,
186 new_back_values: self.new_back_values.clone(),
187 })
188 }
189}
190
191impl<C, T> QueueView<C, T> {
192 fn stored_count(&self) -> usize {
193 if self.delete_storage_first {
194 0
195 } else {
196 self.stored_indices.len() - self.front_delete_count
197 }
198 }
199}
200
201impl<'a, C, T> QueueView<C, T>
202where
203 C: Context,
204 T: Send + Sync + Clone + Serialize + DeserializeOwned,
205{
206 async fn get(&self, index: usize) -> Result<Option<T>, ViewError> {
207 let key = self
208 .context
209 .base_key()
210 .derive_tag_key(KeyTag::Index as u8, &index)?;
211 Ok(self.context.store().read_value(&key).await?)
212 }
213
214 pub async fn front(&self) -> Result<Option<T>, ViewError> {
228 let stored_remainder = self.stored_count();
229 let value = if stored_remainder > 0 {
230 self.get(self.stored_indices.end - stored_remainder).await?
231 } else {
232 self.new_back_values.front().cloned()
233 };
234 Ok(value)
235 }
236
237 pub async fn back(&self) -> Result<Option<T>, ViewError> {
251 Ok(match self.new_back_values.back() {
252 Some(value) => Some(value.clone()),
253 None if self.stored_count() > 0 => self.get(self.stored_indices.end - 1).await?,
254 _ => None,
255 })
256 }
257
258 pub fn delete_front(&mut self) {
272 if self.stored_count() > 0 {
273 self.front_delete_count += 1;
274 } else {
275 self.new_back_values.pop_front();
276 }
277 }
278
279 pub fn push_back(&mut self, value: T) {
293 self.new_back_values.push_back(value);
294 }
295
296 pub fn count(&self) -> usize {
309 self.stored_count() + self.new_back_values.len()
310 }
311
312 pub fn extra(&self) -> &C::Extra {
314 self.context.extra()
315 }
316
317 async fn read_context(&self, range: Range<usize>) -> Result<Vec<T>, ViewError> {
318 let count = range.len();
319 let mut keys = Vec::with_capacity(count);
320 for index in range {
321 let key = self
322 .context
323 .base_key()
324 .derive_tag_key(KeyTag::Index as u8, &index)?;
325 keys.push(key)
326 }
327 let mut values = Vec::with_capacity(count);
328 for entry in self.context.store().read_multi_values(&keys).await? {
329 match entry {
330 None => {
331 return Err(ViewError::MissingEntries("QueueView".into()));
332 }
333 Some(value) => values.push(value),
334 }
335 }
336 Ok(values)
337 }
338
339 pub async fn read_front(&self, mut count: usize) -> Result<Vec<T>, ViewError> {
353 if count > self.count() {
354 count = self.count();
355 }
356 if count == 0 {
357 return Ok(Vec::new());
358 }
359 let mut values = Vec::with_capacity(count);
360 if !self.delete_storage_first {
361 let stored_remainder = self.stored_count();
362 let start = self.stored_indices.end - stored_remainder;
363 if count <= stored_remainder {
364 values.extend(self.read_context(start..(start + count)).await?);
365 } else {
366 values.extend(self.read_context(start..self.stored_indices.end).await?);
367 values.extend(
368 self.new_back_values
369 .range(0..(count - stored_remainder))
370 .cloned(),
371 );
372 }
373 } else {
374 values.extend(self.new_back_values.range(0..count).cloned());
375 }
376 Ok(values)
377 }
378
379 pub async fn read_back(&self, mut count: usize) -> Result<Vec<T>, ViewError> {
393 if count > self.count() {
394 count = self.count();
395 }
396 if count == 0 {
397 return Ok(Vec::new());
398 }
399 let mut values = Vec::with_capacity(count);
400 let new_back_len = self.new_back_values.len();
401 if count <= new_back_len || self.delete_storage_first {
402 values.extend(
403 self.new_back_values
404 .range((new_back_len - count)..new_back_len)
405 .cloned(),
406 );
407 } else {
408 let start = self.stored_indices.end + new_back_len - count;
409 values.extend(self.read_context(start..self.stored_indices.end).await?);
410 values.extend(self.new_back_values.iter().cloned());
411 }
412 Ok(values)
413 }
414
415 pub async fn elements(&self) -> Result<Vec<T>, ViewError> {
429 let count = self.count();
430 self.read_front(count).await
431 }
432
433 async fn load_all(&mut self) -> Result<(), ViewError> {
434 if !self.delete_storage_first {
435 let stored_remainder = self.stored_count();
436 let start = self.stored_indices.end - stored_remainder;
437 let elements = self.read_context(start..self.stored_indices.end).await?;
438 let shift = self.stored_indices.end - start;
439 for elt in elements {
440 self.new_back_values.push_back(elt);
441 }
442 self.new_back_values.rotate_right(shift);
443 self.delete_storage_first = true;
448 }
449 Ok(())
450 }
451
452 pub async fn iter_mut(&'a mut self) -> Result<IterMut<'a, T>, ViewError> {
468 self.load_all().await?;
469 Ok(self.new_back_values.iter_mut())
470 }
471}
472
473impl<C, T> HashableView for QueueView<C, T>
474where
475 C: Context,
476 T: Send + Sync + Clone + Serialize + DeserializeOwned,
477{
478 type Hasher = sha3::Sha3_256;
479
480 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
481 self.hash().await
482 }
483
484 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
485 #[cfg(with_metrics)]
486 let _hash_latency = metrics::QUEUE_VIEW_HASH_RUNTIME.measure_latency();
487 let elements = self.elements().await?;
488 let mut hasher = sha3::Sha3_256::default();
489 hasher.update_with_bcs_bytes(&elements)?;
490 Ok(hasher.finalize())
491 }
492}
493
494pub type HashedQueueView<C, T> = WrappedHashableContainerView<C, QueueView<C, T>, HasherOutput>;
496
497pub type HistoricallyHashedQueueView<C, T> = HistoricallyHashableView<C, QueueView<C, T>>;
499
500#[cfg(with_graphql)]
501mod graphql {
502 use std::borrow::Cow;
503
504 use super::QueueView;
505 use crate::{
506 context::Context,
507 graphql::{hash_name, mangle},
508 };
509
510 impl<C: Send + Sync, T: async_graphql::OutputType> async_graphql::TypeName for QueueView<C, T> {
511 fn type_name() -> Cow<'static, str> {
512 format!(
513 "QueueView_{}_{:08x}",
514 mangle(T::type_name()),
515 hash_name::<T>()
516 )
517 .into()
518 }
519 }
520
521 #[async_graphql::Object(cache_control(no_cache), name_type)]
522 impl<C: Context, T: async_graphql::OutputType> QueueView<C, T>
523 where
524 T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
525 {
526 #[graphql(derived(name = "count"))]
527 async fn count_(&self) -> Result<u32, async_graphql::Error> {
528 Ok(self.count() as u32)
529 }
530
531 async fn entries(&self, count: Option<usize>) -> async_graphql::Result<Vec<T>> {
532 Ok(self
533 .read_front(count.unwrap_or_else(|| self.count()))
534 .await?)
535 }
536 }
537}