1use std::collections::{vec_deque::IterMut, VecDeque};
5
6#[cfg(with_metrics)]
7use linera_base::prometheus_util::MeasureLatency as _;
8use serde::{de::DeserializeOwned, Deserialize, Serialize};
9
10use crate::{
11 batch::Batch,
12 common::{from_bytes_option, from_bytes_option_or_default, HasherOutput},
13 context::Context,
14 hashable_wrapper::WrappedHashableContainerView,
15 store::ReadableKeyValueStore as _,
16 views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
17};
18
19#[cfg(with_metrics)]
20mod metrics {
21 use std::sync::LazyLock;
22
23 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
24 use prometheus::HistogramVec;
25
26 pub static BUCKET_QUEUE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
28 register_histogram_vec(
29 "bucket_queue_view_hash_runtime",
30 "BucketQueueView hash runtime",
31 &[],
32 exponential_bucket_latencies(5.0),
33 )
34 });
35}
36
37#[repr(u8)]
42enum KeyTag {
43 Front = MIN_VIEW_TAG,
45 Store,
47 Index,
49}
50
51#[derive(Clone, Debug, Default, Serialize, Deserialize)]
53struct StoredIndices {
54 indices: Vec<(usize, usize)>,
59 position: usize,
61}
62
63impl StoredIndices {
64 fn len(&self) -> usize {
65 self.indices.len()
66 }
67}
68
69#[derive(Clone, Debug)]
75struct Cursor {
76 position: Option<(usize, usize)>,
77}
78
79impl Cursor {
80 pub fn new(number_bucket: usize, position: usize) -> Self {
81 if number_bucket == 0 {
82 Cursor { position: None }
83 } else {
84 Cursor {
85 position: Some((0, position)),
86 }
87 }
88 }
89
90 pub fn is_incrementable(&self) -> bool {
91 self.position.is_some()
92 }
93}
94
95#[derive(Clone, Debug)]
96enum Bucket<T> {
97 Loaded { data: Vec<T> },
98 NotLoaded { length: usize },
99}
100
101impl<T> Bucket<T> {
102 fn len(&self) -> usize {
103 match self {
104 Bucket::Loaded { data } => data.len(),
105 Bucket::NotLoaded { length } => *length,
106 }
107 }
108
109 fn is_loaded(&self) -> bool {
110 match self {
111 Bucket::Loaded { .. } => true,
112 Bucket::NotLoaded { .. } => false,
113 }
114 }
115}
116
117fn stored_indices<T>(stored_data: &VecDeque<(usize, Bucket<T>)>, position: usize) -> StoredIndices {
118 let indices = stored_data
119 .iter()
120 .map(|(index, bucket)| (bucket.len(), *index))
121 .collect::<Vec<_>>();
122 StoredIndices { indices, position }
123}
124
125#[derive(Debug)]
130pub struct BucketQueueView<C, T, const N: usize> {
131 context: C,
132 stored_data: VecDeque<(usize, Bucket<T>)>,
134 new_back_values: VecDeque<T>,
136 stored_position: usize,
138 cursor: Cursor,
140 delete_storage_first: bool,
142}
143
144impl<C, T, const N: usize> View for BucketQueueView<C, T, N>
145where
146 C: Context,
147 T: Send + Sync + Clone + Serialize + DeserializeOwned,
148{
149 const NUM_INIT_KEYS: usize = 2;
150
151 type Context = C;
152
153 fn context(&self) -> &C {
154 &self.context
155 }
156
157 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
158 let key1 = context.base_key().base_tag(KeyTag::Front as u8);
159 let key2 = context.base_key().base_tag(KeyTag::Store as u8);
160 Ok(vec![key1, key2])
161 }
162
163 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
164 let value1 = values.first().ok_or(ViewError::PostLoadValuesError)?;
165 let value2 = values.get(1).ok_or(ViewError::PostLoadValuesError)?;
166 let front = from_bytes_option::<Vec<T>>(value1)?;
167 let mut stored_data = VecDeque::from(match front {
168 Some(front) => {
169 vec![(0, Bucket::Loaded { data: front })]
170 }
171 None => {
172 vec![]
173 }
174 });
175 let stored_indices = from_bytes_option_or_default::<StoredIndices>(value2)?;
176 for i in 1..stored_indices.len() {
177 let length = stored_indices.indices[i].0;
178 let index = stored_indices.indices[i].1;
179 stored_data.push_back((index, Bucket::NotLoaded { length }));
180 }
181 let cursor = Cursor::new(stored_indices.len(), stored_indices.position);
182 Ok(Self {
183 context,
184 stored_data,
185 stored_position: stored_indices.position,
186 new_back_values: VecDeque::new(),
187 cursor,
188 delete_storage_first: false,
189 })
190 }
191
192 async fn load(context: C) -> Result<Self, ViewError> {
193 let keys = Self::pre_load(&context)?;
194 let values = context.store().read_multi_values_bytes(keys).await?;
195 Self::post_load(context, &values)
196 }
197
198 fn rollback(&mut self) {
199 self.delete_storage_first = false;
200 self.cursor = Cursor::new(self.stored_data.len(), self.stored_position);
201 self.new_back_values.clear();
202 }
203
204 async fn has_pending_changes(&self) -> bool {
205 if self.delete_storage_first {
206 return true;
207 }
208 if !self.stored_data.is_empty() {
209 let Some((i_block, position)) = self.cursor.position else {
210 return true;
211 };
212 if i_block != 0 || position != self.stored_position {
213 return true;
214 }
215 }
216 !self.new_back_values.is_empty()
217 }
218
219 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
220 let mut delete_view = false;
221 if self.delete_storage_first {
222 let key_prefix = self.context.base_key().bytes.clone();
223 batch.delete_key_prefix(key_prefix);
224 delete_view = true;
225 }
226 if self.stored_count() == 0 {
227 let key_prefix = self.context.base_key().bytes.clone();
228 batch.delete_key_prefix(key_prefix);
229 self.stored_data.clear();
230 self.stored_position = 0;
231 } else if let Some((i_block, position)) = self.cursor.position {
232 for _ in 0..i_block {
233 let block = self.stored_data.pop_front().unwrap();
234 let index = block.0;
235 let key = self.get_index_key(index)?;
236 batch.delete_key(key);
237 }
238 self.cursor = Cursor {
239 position: Some((0, position)),
240 };
241 self.stored_position = position;
242 let first_index = self.stored_data[0].0;
244 if first_index != 0 {
245 self.stored_data[0].0 = 0;
246 let key = self.get_index_key(first_index)?;
247 batch.delete_key(key);
248 let key = self.get_index_key(0)?;
249 let (_, data0) = self.stored_data.front().unwrap();
250 let Bucket::Loaded { data } = data0 else {
251 unreachable!();
252 };
253 batch.put_key_value(key, &data)?;
254 }
255 }
256 if !self.new_back_values.is_empty() {
257 delete_view = false;
258 let mut unused_index = match self.stored_data.back() {
259 Some((index, _)) => index + 1,
260 None => 0,
261 };
262 let new_back_values = std::mem::take(&mut self.new_back_values);
263 let new_back_values = new_back_values.into_iter().collect::<Vec<_>>();
264 for value_chunk in new_back_values.chunks(N) {
265 let key = self.get_index_key(unused_index)?;
266 batch.put_key_value(key, &value_chunk)?;
267 self.stored_data.push_back((
268 unused_index,
269 Bucket::Loaded {
270 data: value_chunk.to_vec(),
271 },
272 ));
273 unused_index += 1;
274 }
275 if !self.cursor.is_incrementable() {
276 self.cursor = Cursor {
277 position: Some((0, 0)),
278 }
279 }
280 }
281 if !self.delete_storage_first || !self.stored_data.is_empty() {
282 let stored_indices = stored_indices(&self.stored_data, self.stored_position);
283 let key = self.context.base_key().base_tag(KeyTag::Store as u8);
284 batch.put_key_value(key, &stored_indices)?;
285 }
286 self.delete_storage_first = false;
287 Ok(delete_view)
288 }
289
290 fn clear(&mut self) {
291 self.delete_storage_first = true;
292 self.new_back_values.clear();
293 self.cursor.position = None;
294 }
295}
296
297impl<C: Clone, T: Clone, const N: usize> ClonableView for BucketQueueView<C, T, N>
298where
299 Self: View,
300{
301 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
302 Ok(BucketQueueView {
303 context: self.context.clone(),
304 stored_data: self.stored_data.clone(),
305 new_back_values: self.new_back_values.clone(),
306 stored_position: self.stored_position,
307 cursor: self.cursor.clone(),
308 delete_storage_first: self.delete_storage_first,
309 })
310 }
311}
312
313impl<C: Context, T, const N: usize> BucketQueueView<C, T, N> {
314 fn get_index_key(&self, index: usize) -> Result<Vec<u8>, ViewError> {
316 Ok(if index == 0 {
317 self.context.base_key().base_tag(KeyTag::Front as u8)
318 } else {
319 self.context
320 .base_key()
321 .derive_tag_key(KeyTag::Index as u8, &index)?
322 })
323 }
324
325 pub fn stored_count(&self) -> usize {
338 if self.delete_storage_first {
339 0
340 } else {
341 let Some((i_block, position)) = self.cursor.position else {
342 return 0;
343 };
344 let mut stored_count = 0;
345 for block in i_block..self.stored_data.len() {
346 stored_count += self.stored_data[block].1.len();
347 }
348 stored_count -= position;
349 stored_count
350 }
351 }
352
353 pub fn count(&self) -> usize {
366 self.stored_count() + self.new_back_values.len()
367 }
368}
369
370impl<C: Context, T: DeserializeOwned + Clone, const N: usize> BucketQueueView<C, T, N> {
371 pub fn front(&self) -> Option<&T> {
385 match self.cursor.position {
386 Some((i_block, position)) => {
387 let block = &self.stored_data[i_block].1;
388 let Bucket::Loaded { data } = block else {
389 unreachable!();
390 };
391 Some(&data[position])
392 }
393 None => self.new_back_values.front(),
394 }
395 }
396
397 pub fn front_mut(&mut self) -> Option<&mut T> {
413 match self.cursor.position {
414 Some((i_block, position)) => {
415 let block = &mut self.stored_data.get_mut(i_block).unwrap().1;
416 let Bucket::Loaded { data } = block else {
417 unreachable!();
418 };
419 Some(data.get_mut(position).unwrap())
420 }
421 None => self.new_back_values.front_mut(),
422 }
423 }
424
425 pub async fn delete_front(&mut self) -> Result<(), ViewError> {
439 match self.cursor.position {
440 Some((mut i_block, mut position)) => {
441 position += 1;
442 if self.stored_data[i_block].1.len() == position {
443 i_block += 1;
444 position = 0;
445 }
446 if i_block == self.stored_data.len() {
447 self.cursor = Cursor { position: None };
448 } else {
449 self.cursor = Cursor {
450 position: Some((i_block, position)),
451 };
452 let (index, bucket) = self.stored_data.get_mut(i_block).unwrap();
453 let index = *index;
454 if !bucket.is_loaded() {
455 let key = self.get_index_key(index)?;
456 let value = self.context.store().read_value_bytes(&key).await?;
457 let value = value.ok_or(ViewError::MissingEntries)?;
458 let data = bcs::from_bytes(&value)?;
459 self.stored_data[i_block].1 = Bucket::Loaded { data };
460 }
461 }
462 }
463 None => {
464 self.new_back_values.pop_front();
465 }
466 }
467 Ok(())
468 }
469
470 pub fn push_back(&mut self, value: T) {
483 self.new_back_values.push_back(value);
484 }
485
486 pub async fn elements(&self) -> Result<Vec<T>, ViewError> {
500 let count = self.count();
501 self.read_context(self.cursor.position, count).await
502 }
503
504 pub async fn back(&mut self) -> Result<Option<T>, ViewError>
518 where
519 T: Clone,
520 {
521 if let Some(value) = self.new_back_values.back() {
522 return Ok(Some(value.clone()));
523 }
524 if self.cursor.position.is_none() {
525 return Ok(None);
526 }
527 let Some((index, bucket)) = self.stored_data.back() else {
528 return Ok(None);
529 };
530 if !bucket.is_loaded() {
531 let key = self.get_index_key(*index)?;
532 let value = self.context.store().read_value_bytes(&key).await?;
533 let value = value.as_ref().ok_or(ViewError::MissingEntries)?;
534 let data = bcs::from_bytes::<Vec<T>>(value)?;
535 self.stored_data.back_mut().unwrap().1 = Bucket::Loaded { data };
536 }
537 let bucket = &self.stored_data.back_mut().unwrap().1;
538 let Bucket::Loaded { data } = bucket else {
539 unreachable!();
540 };
541 Ok(Some(data.last().unwrap().clone()))
542 }
543
544 async fn read_context(
545 &self,
546 position: Option<(usize, usize)>,
547 count: usize,
548 ) -> Result<Vec<T>, ViewError> {
549 if count == 0 {
550 return Ok(Vec::new());
551 }
552 let mut elements = Vec::<T>::new();
553 let mut count_remain = count;
554 if let Some(pair) = position {
555 let mut keys = Vec::new();
556 let (i_block, mut position) = pair;
557 for block in i_block..self.stored_data.len() {
558 let (index, bucket) = &self.stored_data[block];
559 let size = bucket.len() - position;
560 if !bucket.is_loaded() {
561 let key = self.get_index_key(*index)?;
562 keys.push(key);
563 };
564 if size >= count_remain {
565 break;
566 }
567 count_remain -= size;
568 position = 0;
569 }
570 let values = self.context.store().read_multi_values_bytes(keys).await?;
571 position = pair.1;
572 let mut value_pos = 0;
573 count_remain = count;
574 for block in i_block..self.stored_data.len() {
575 let bucket = &self.stored_data[block].1;
576 let size = bucket.len() - position;
577 let vec = match bucket {
578 Bucket::Loaded { data } => data,
579 Bucket::NotLoaded { .. } => {
580 let value = values[value_pos]
581 .as_ref()
582 .ok_or(ViewError::MissingEntries)?;
583 value_pos += 1;
584 &bcs::from_bytes::<Vec<T>>(value)?
585 }
586 };
587 elements.extend(vec[position..].iter().take(count_remain).cloned());
588 if size >= count_remain {
589 return Ok(elements);
590 }
591 count_remain -= size;
592 position = 0;
593 }
594 }
595 let count_read = std::cmp::min(count_remain, self.new_back_values.len());
596 elements.extend(self.new_back_values.range(0..count_read).cloned());
597 Ok(elements)
598 }
599
600 pub async fn read_front(&self, count: usize) -> Result<Vec<T>, ViewError> {
615 let count = std::cmp::min(count, self.count());
616 self.read_context(self.cursor.position, count).await
617 }
618
619 pub async fn read_back(&self, count: usize) -> Result<Vec<T>, ViewError> {
634 let count = std::cmp::min(count, self.count());
635 if count <= self.new_back_values.len() {
636 let start = self.new_back_values.len() - count;
637 Ok(self
638 .new_back_values
639 .range(start..)
640 .cloned()
641 .collect::<Vec<_>>())
642 } else {
643 let mut increment = self.count() - count;
644 let Some((i_block, mut position)) = self.cursor.position else {
645 unreachable!();
646 };
647 for block in i_block..self.stored_data.len() {
648 let size = self.stored_data[block].1.len() - position;
649 if increment < size {
650 return self
651 .read_context(Some((block, position + increment)), count)
652 .await;
653 }
654 increment -= size;
655 position = 0;
656 }
657 unreachable!();
658 }
659 }
660
661 async fn load_all(&mut self) -> Result<(), ViewError> {
662 if !self.delete_storage_first {
663 let elements = self.elements().await?;
664 self.new_back_values.clear();
665 for elt in elements {
666 self.new_back_values.push_back(elt);
667 }
668 self.cursor = Cursor { position: None };
669 self.delete_storage_first = true;
670 }
671 Ok(())
672 }
673
674 pub async fn iter_mut(&mut self) -> Result<IterMut<'_, T>, ViewError> {
690 self.load_all().await?;
691 Ok(self.new_back_values.iter_mut())
692 }
693}
694
695impl<C: Context, T: Serialize + DeserializeOwned + Send + Sync + Clone, const N: usize> HashableView
696 for BucketQueueView<C, T, N>
697where
698 Self: View,
699{
700 type Hasher = sha3::Sha3_256;
701
702 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
703 self.hash().await
704 }
705
706 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
707 #[cfg(with_metrics)]
708 let _hash_latency = metrics::BUCKET_QUEUE_VIEW_HASH_RUNTIME.measure_latency();
709 let elements = self.elements().await?;
710 let mut hasher = sha3::Sha3_256::default();
711 hasher.update_with_bcs_bytes(&elements)?;
712 Ok(hasher.finalize())
713 }
714}
715
716pub type HashedBucketQueueView<C, T, const N: usize> =
718 WrappedHashableContainerView<C, BucketQueueView<C, T, N>, HasherOutput>;
719
720#[cfg(with_graphql)]
721mod graphql {
722 use std::borrow::Cow;
723
724 use super::BucketQueueView;
725 use crate::{
726 context::Context,
727 graphql::{hash_name, mangle},
728 };
729
730 impl<C: Send + Sync, T: async_graphql::OutputType, const N: usize> async_graphql::TypeName
731 for BucketQueueView<C, T, N>
732 {
733 fn type_name() -> Cow<'static, str> {
734 format!(
735 "BucketQueueView_{}_{:08x}",
736 mangle(T::type_name()),
737 hash_name::<T>()
738 )
739 .into()
740 }
741 }
742
743 #[async_graphql::Object(cache_control(no_cache), name_type)]
744 impl<C: Context, T: async_graphql::OutputType, const N: usize> BucketQueueView<C, T, N>
745 where
746 C: Send + Sync,
747 T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
748 {
749 async fn entries(&self, count: Option<usize>) -> async_graphql::Result<Vec<T>> {
750 Ok(self
751 .read_front(count.unwrap_or_else(|| self.count()))
752 .await?)
753 }
754 }
755}