1use std::collections::{vec_deque::IterMut, VecDeque};
5
6#[cfg(with_metrics)]
7use linera_base::prometheus_util::MeasureLatency as _;
8use serde::{de::DeserializeOwned, Deserialize, Serialize};
9
10use crate::{
11 batch::Batch,
12 common::{from_bytes_option, from_bytes_option_or_default, HasherOutput},
13 context::Context,
14 hashable_wrapper::WrappedHashableContainerView,
15 historical_hash_wrapper::HistoricallyHashableView,
16 store::ReadableKeyValueStore as _,
17 views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
18};
19
20#[cfg(with_metrics)]
21mod metrics {
22 use std::sync::LazyLock;
23
24 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
25 use prometheus::HistogramVec;
26
27 pub static BUCKET_QUEUE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
29 register_histogram_vec(
30 "bucket_queue_view_hash_runtime",
31 "BucketQueueView hash runtime",
32 &[],
33 exponential_bucket_latencies(5.0),
34 )
35 });
36}
37
38#[repr(u8)]
43enum KeyTag {
44 Front = MIN_VIEW_TAG,
46 Store,
48 Index,
50}
51
52#[derive(Clone, Debug, Default, Serialize, Deserialize)]
54struct StoredIndices {
55 indices: Vec<(usize, usize)>,
60 position: usize,
62}
63
64impl StoredIndices {
65 fn len(&self) -> usize {
66 self.indices.len()
67 }
68}
69
70#[derive(Clone, Debug)]
76struct Cursor {
77 position: Option<(usize, usize)>,
78}
79
80impl Cursor {
81 pub fn new(number_bucket: usize, position: usize) -> Self {
82 if number_bucket == 0 {
83 Cursor { position: None }
84 } else {
85 Cursor {
86 position: Some((0, position)),
87 }
88 }
89 }
90
91 pub fn is_incrementable(&self) -> bool {
92 self.position.is_some()
93 }
94}
95
96#[derive(Clone, Debug)]
97enum Bucket<T> {
98 Loaded { data: Vec<T> },
99 NotLoaded { length: usize },
100}
101
102impl<T> Bucket<T> {
103 fn len(&self) -> usize {
104 match self {
105 Bucket::Loaded { data } => data.len(),
106 Bucket::NotLoaded { length } => *length,
107 }
108 }
109
110 fn is_loaded(&self) -> bool {
111 match self {
112 Bucket::Loaded { .. } => true,
113 Bucket::NotLoaded { .. } => false,
114 }
115 }
116}
117
118fn stored_indices<T>(stored_data: &VecDeque<(usize, Bucket<T>)>, position: usize) -> StoredIndices {
119 let indices = stored_data
120 .iter()
121 .map(|(index, bucket)| (bucket.len(), *index))
122 .collect::<Vec<_>>();
123 StoredIndices { indices, position }
124}
125
126#[derive(Debug)]
131pub struct BucketQueueView<C, T, const N: usize> {
132 context: C,
133 stored_data: VecDeque<(usize, Bucket<T>)>,
135 new_back_values: VecDeque<T>,
137 stored_position: usize,
139 cursor: Cursor,
141 delete_storage_first: bool,
143}
144
145impl<C, T, const N: usize> View for BucketQueueView<C, T, N>
146where
147 C: Context,
148 T: Send + Sync + Clone + Serialize + DeserializeOwned,
149{
150 const NUM_INIT_KEYS: usize = 2;
151
152 type Context = C;
153
154 fn context(&self) -> &C {
155 &self.context
156 }
157
158 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
159 let key1 = context.base_key().base_tag(KeyTag::Front as u8);
160 let key2 = context.base_key().base_tag(KeyTag::Store as u8);
161 Ok(vec![key1, key2])
162 }
163
164 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
165 let value1 = values.first().ok_or(ViewError::PostLoadValuesError)?;
166 let value2 = values.get(1).ok_or(ViewError::PostLoadValuesError)?;
167 let front = from_bytes_option::<Vec<T>>(value1)?;
168 let mut stored_data = VecDeque::from(match front {
169 Some(front) => {
170 vec![(0, Bucket::Loaded { data: front })]
171 }
172 None => {
173 vec![]
174 }
175 });
176 let stored_indices = from_bytes_option_or_default::<StoredIndices>(value2)?;
177 for i in 1..stored_indices.len() {
178 let length = stored_indices.indices[i].0;
179 let index = stored_indices.indices[i].1;
180 stored_data.push_back((index, Bucket::NotLoaded { length }));
181 }
182 let cursor = Cursor::new(stored_indices.len(), stored_indices.position);
183 Ok(Self {
184 context,
185 stored_data,
186 stored_position: stored_indices.position,
187 new_back_values: VecDeque::new(),
188 cursor,
189 delete_storage_first: false,
190 })
191 }
192
193 fn rollback(&mut self) {
194 self.delete_storage_first = false;
195 self.cursor = Cursor::new(self.stored_data.len(), self.stored_position);
196 self.new_back_values.clear();
197 }
198
199 async fn has_pending_changes(&self) -> bool {
200 if self.delete_storage_first {
201 return true;
202 }
203 if !self.stored_data.is_empty() {
204 let Some((i_block, position)) = self.cursor.position else {
205 return true;
206 };
207 if i_block != 0 || position != self.stored_position {
208 return true;
209 }
210 }
211 !self.new_back_values.is_empty()
212 }
213
214 fn flush(&mut self, batch: &mut Batch) -> Result<bool, ViewError> {
215 let mut delete_view = false;
216 if self.delete_storage_first {
217 let key_prefix = self.context.base_key().bytes.clone();
218 batch.delete_key_prefix(key_prefix);
219 delete_view = true;
220 }
221 if self.stored_count() == 0 {
222 let key_prefix = self.context.base_key().bytes.clone();
223 batch.delete_key_prefix(key_prefix);
224 self.stored_data.clear();
225 self.stored_position = 0;
226 } else if let Some((i_block, position)) = self.cursor.position {
227 for _ in 0..i_block {
228 let block = self.stored_data.pop_front().unwrap();
229 let index = block.0;
230 let key = self.get_index_key(index)?;
231 batch.delete_key(key);
232 }
233 self.cursor = Cursor {
234 position: Some((0, position)),
235 };
236 self.stored_position = position;
237 let first_index = self.stored_data[0].0;
239 if first_index != 0 {
240 self.stored_data[0].0 = 0;
241 let key = self.get_index_key(first_index)?;
242 batch.delete_key(key);
243 let key = self.get_index_key(0)?;
244 let (_, data0) = self.stored_data.front().unwrap();
245 let Bucket::Loaded { data } = data0 else {
246 unreachable!();
247 };
248 batch.put_key_value(key, &data)?;
249 }
250 }
251 if !self.new_back_values.is_empty() {
252 delete_view = false;
253 let mut unused_index = match self.stored_data.back() {
254 Some((index, _)) => index + 1,
255 None => 0,
256 };
257 let new_back_values = std::mem::take(&mut self.new_back_values);
258 let new_back_values = new_back_values.into_iter().collect::<Vec<_>>();
259 for value_chunk in new_back_values.chunks(N) {
260 let key = self.get_index_key(unused_index)?;
261 batch.put_key_value(key, &value_chunk)?;
262 self.stored_data.push_back((
263 unused_index,
264 Bucket::Loaded {
265 data: value_chunk.to_vec(),
266 },
267 ));
268 unused_index += 1;
269 }
270 if !self.cursor.is_incrementable() {
271 self.cursor = Cursor {
272 position: Some((0, 0)),
273 }
274 }
275 }
276 if !self.delete_storage_first || !self.stored_data.is_empty() {
277 let stored_indices = stored_indices(&self.stored_data, self.stored_position);
278 let key = self.context.base_key().base_tag(KeyTag::Store as u8);
279 batch.put_key_value(key, &stored_indices)?;
280 }
281 self.delete_storage_first = false;
282 Ok(delete_view)
283 }
284
285 fn clear(&mut self) {
286 self.delete_storage_first = true;
287 self.new_back_values.clear();
288 self.cursor.position = None;
289 }
290}
291
292impl<C: Clone, T: Clone, const N: usize> ClonableView for BucketQueueView<C, T, N>
293where
294 Self: View,
295{
296 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
297 Ok(BucketQueueView {
298 context: self.context.clone(),
299 stored_data: self.stored_data.clone(),
300 new_back_values: self.new_back_values.clone(),
301 stored_position: self.stored_position,
302 cursor: self.cursor.clone(),
303 delete_storage_first: self.delete_storage_first,
304 })
305 }
306}
307
308impl<C: Context, T, const N: usize> BucketQueueView<C, T, N> {
309 fn get_index_key(&self, index: usize) -> Result<Vec<u8>, ViewError> {
311 Ok(if index == 0 {
312 self.context.base_key().base_tag(KeyTag::Front as u8)
313 } else {
314 self.context
315 .base_key()
316 .derive_tag_key(KeyTag::Index as u8, &index)?
317 })
318 }
319
320 pub fn stored_count(&self) -> usize {
333 if self.delete_storage_first {
334 0
335 } else {
336 let Some((i_block, position)) = self.cursor.position else {
337 return 0;
338 };
339 let mut stored_count = 0;
340 for block in i_block..self.stored_data.len() {
341 stored_count += self.stored_data[block].1.len();
342 }
343 stored_count -= position;
344 stored_count
345 }
346 }
347
348 pub fn count(&self) -> usize {
361 self.stored_count() + self.new_back_values.len()
362 }
363}
364
365impl<C: Context, T: DeserializeOwned + Clone, const N: usize> BucketQueueView<C, T, N> {
366 pub fn front(&self) -> Option<&T> {
380 match self.cursor.position {
381 Some((i_block, position)) => {
382 let block = &self.stored_data[i_block].1;
383 let Bucket::Loaded { data } = block else {
384 unreachable!();
385 };
386 Some(&data[position])
387 }
388 None => self.new_back_values.front(),
389 }
390 }
391
392 pub fn front_mut(&mut self) -> Option<&mut T> {
408 match self.cursor.position {
409 Some((i_block, position)) => {
410 let block = &mut self.stored_data.get_mut(i_block).unwrap().1;
411 let Bucket::Loaded { data } = block else {
412 unreachable!();
413 };
414 Some(data.get_mut(position).unwrap())
415 }
416 None => self.new_back_values.front_mut(),
417 }
418 }
419
420 pub async fn delete_front(&mut self) -> Result<(), ViewError> {
434 match self.cursor.position {
435 Some((mut i_block, mut position)) => {
436 position += 1;
437 if self.stored_data[i_block].1.len() == position {
438 i_block += 1;
439 position = 0;
440 }
441 if i_block == self.stored_data.len() {
442 self.cursor = Cursor { position: None };
443 } else {
444 self.cursor = Cursor {
445 position: Some((i_block, position)),
446 };
447 let (index, bucket) = self.stored_data.get_mut(i_block).unwrap();
448 let index = *index;
449 if !bucket.is_loaded() {
450 let key = self.get_index_key(index)?;
451 let value = self.context.store().read_value_bytes(&key).await?;
452 let value = value.ok_or(ViewError::MissingEntries)?;
453 let data = bcs::from_bytes(&value)?;
454 self.stored_data[i_block].1 = Bucket::Loaded { data };
455 }
456 }
457 }
458 None => {
459 self.new_back_values.pop_front();
460 }
461 }
462 Ok(())
463 }
464
465 pub fn push_back(&mut self, value: T) {
478 self.new_back_values.push_back(value);
479 }
480
481 pub async fn elements(&self) -> Result<Vec<T>, ViewError> {
495 let count = self.count();
496 self.read_context(self.cursor.position, count).await
497 }
498
499 pub async fn back(&mut self) -> Result<Option<T>, ViewError>
513 where
514 T: Clone,
515 {
516 if let Some(value) = self.new_back_values.back() {
517 return Ok(Some(value.clone()));
518 }
519 if self.cursor.position.is_none() {
520 return Ok(None);
521 }
522 let Some((index, bucket)) = self.stored_data.back() else {
523 return Ok(None);
524 };
525 if !bucket.is_loaded() {
526 let key = self.get_index_key(*index)?;
527 let value = self.context.store().read_value_bytes(&key).await?;
528 let value = value.as_ref().ok_or(ViewError::MissingEntries)?;
529 let data = bcs::from_bytes::<Vec<T>>(value)?;
530 self.stored_data.back_mut().unwrap().1 = Bucket::Loaded { data };
531 }
532 let bucket = &self.stored_data.back_mut().unwrap().1;
533 let Bucket::Loaded { data } = bucket else {
534 unreachable!();
535 };
536 Ok(Some(data.last().unwrap().clone()))
537 }
538
539 async fn read_context(
540 &self,
541 position: Option<(usize, usize)>,
542 count: usize,
543 ) -> Result<Vec<T>, ViewError> {
544 if count == 0 {
545 return Ok(Vec::new());
546 }
547 let mut elements = Vec::<T>::new();
548 let mut count_remain = count;
549 if let Some(pair) = position {
550 let mut keys = Vec::new();
551 let (i_block, mut position) = pair;
552 for block in i_block..self.stored_data.len() {
553 let (index, bucket) = &self.stored_data[block];
554 let size = bucket.len() - position;
555 if !bucket.is_loaded() {
556 let key = self.get_index_key(*index)?;
557 keys.push(key);
558 };
559 if size >= count_remain {
560 break;
561 }
562 count_remain -= size;
563 position = 0;
564 }
565 let values = self.context.store().read_multi_values_bytes(keys).await?;
566 position = pair.1;
567 let mut value_pos = 0;
568 count_remain = count;
569 for block in i_block..self.stored_data.len() {
570 let bucket = &self.stored_data[block].1;
571 let size = bucket.len() - position;
572 let vec = match bucket {
573 Bucket::Loaded { data } => data,
574 Bucket::NotLoaded { .. } => {
575 let value = values[value_pos]
576 .as_ref()
577 .ok_or(ViewError::MissingEntries)?;
578 value_pos += 1;
579 &bcs::from_bytes::<Vec<T>>(value)?
580 }
581 };
582 elements.extend(vec[position..].iter().take(count_remain).cloned());
583 if size >= count_remain {
584 return Ok(elements);
585 }
586 count_remain -= size;
587 position = 0;
588 }
589 }
590 let count_read = std::cmp::min(count_remain, self.new_back_values.len());
591 elements.extend(self.new_back_values.range(0..count_read).cloned());
592 Ok(elements)
593 }
594
595 pub async fn read_front(&self, count: usize) -> Result<Vec<T>, ViewError> {
610 let count = std::cmp::min(count, self.count());
611 self.read_context(self.cursor.position, count).await
612 }
613
614 pub async fn read_back(&self, count: usize) -> Result<Vec<T>, ViewError> {
629 let count = std::cmp::min(count, self.count());
630 if count <= self.new_back_values.len() {
631 let start = self.new_back_values.len() - count;
632 Ok(self
633 .new_back_values
634 .range(start..)
635 .cloned()
636 .collect::<Vec<_>>())
637 } else {
638 let mut increment = self.count() - count;
639 let Some((i_block, mut position)) = self.cursor.position else {
640 unreachable!();
641 };
642 for block in i_block..self.stored_data.len() {
643 let size = self.stored_data[block].1.len() - position;
644 if increment < size {
645 return self
646 .read_context(Some((block, position + increment)), count)
647 .await;
648 }
649 increment -= size;
650 position = 0;
651 }
652 unreachable!();
653 }
654 }
655
656 async fn load_all(&mut self) -> Result<(), ViewError> {
657 if !self.delete_storage_first {
658 let elements = self.elements().await?;
659 self.new_back_values.clear();
660 for elt in elements {
661 self.new_back_values.push_back(elt);
662 }
663 self.cursor = Cursor { position: None };
664 self.delete_storage_first = true;
665 }
666 Ok(())
667 }
668
669 pub async fn iter_mut(&mut self) -> Result<IterMut<'_, T>, ViewError> {
685 self.load_all().await?;
686 Ok(self.new_back_values.iter_mut())
687 }
688}
689
690impl<C: Context, T: Serialize + DeserializeOwned + Send + Sync + Clone, const N: usize> HashableView
691 for BucketQueueView<C, T, N>
692where
693 Self: View,
694{
695 type Hasher = sha3::Sha3_256;
696
697 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
698 self.hash().await
699 }
700
701 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
702 #[cfg(with_metrics)]
703 let _hash_latency = metrics::BUCKET_QUEUE_VIEW_HASH_RUNTIME.measure_latency();
704 let elements = self.elements().await?;
705 let mut hasher = sha3::Sha3_256::default();
706 hasher.update_with_bcs_bytes(&elements)?;
707 Ok(hasher.finalize())
708 }
709}
710
711pub type HashedBucketQueueView<C, T, const N: usize> =
713 WrappedHashableContainerView<C, BucketQueueView<C, T, N>, HasherOutput>;
714
715pub type HistoricallyHashedBucketQueueView<C, T, const N: usize> =
717 HistoricallyHashableView<C, BucketQueueView<C, T, N>>;
718
719#[cfg(with_graphql)]
720mod graphql {
721 use std::borrow::Cow;
722
723 use super::BucketQueueView;
724 use crate::{
725 context::Context,
726 graphql::{hash_name, mangle},
727 };
728
729 impl<C: Send + Sync, T: async_graphql::OutputType, const N: usize> async_graphql::TypeName
730 for BucketQueueView<C, T, N>
731 {
732 fn type_name() -> Cow<'static, str> {
733 format!(
734 "BucketQueueView_{}_{:08x}",
735 mangle(T::type_name()),
736 hash_name::<T>()
737 )
738 .into()
739 }
740 }
741
742 #[async_graphql::Object(cache_control(no_cache), name_type)]
743 impl<C: Context, T: async_graphql::OutputType, const N: usize> BucketQueueView<C, T, N>
744 where
745 C: Send + Sync,
746 T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
747 {
748 #[graphql(derived(name = "count"))]
749 async fn count_(&self) -> Result<u32, async_graphql::Error> {
750 Ok(self.count() as u32)
751 }
752
753 async fn entries(&self, count: Option<usize>) -> async_graphql::Result<Vec<T>> {
754 Ok(self
755 .read_front(count.unwrap_or_else(|| self.count()))
756 .await?)
757 }
758 }
759}