1use std::collections::{vec_deque::IterMut, VecDeque};
5
6use allocative::Allocative;
7#[cfg(with_metrics)]
8use linera_base::prometheus_util::MeasureLatency as _;
9use serde::{de::DeserializeOwned, Deserialize, Serialize};
10
11use crate::{
12 batch::Batch,
13 common::{from_bytes_option, from_bytes_option_or_default, HasherOutput},
14 context::Context,
15 hashable_wrapper::WrappedHashableContainerView,
16 historical_hash_wrapper::HistoricallyHashableView,
17 store::ReadableKeyValueStore as _,
18 views::{ClonableView, HashableView, Hasher, View, ViewError, MIN_VIEW_TAG},
19};
20
21#[cfg(with_metrics)]
22mod metrics {
23 use std::sync::LazyLock;
24
25 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
26 use prometheus::HistogramVec;
27
28 pub static BUCKET_QUEUE_VIEW_HASH_RUNTIME: LazyLock<HistogramVec> = LazyLock::new(|| {
30 register_histogram_vec(
31 "bucket_queue_view_hash_runtime",
32 "BucketQueueView hash runtime",
33 &[],
34 exponential_bucket_latencies(5.0),
35 )
36 });
37}
38
39#[repr(u8)]
41enum KeyTag {
42 Front = MIN_VIEW_TAG,
44 Store,
46 Index,
48}
49
50#[derive(Clone, Debug, Default, Serialize, Deserialize)]
52struct BucketStore {
53 descriptions: Vec<BucketDescription>,
56 front_position: usize,
58}
59
60#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)]
62struct BucketDescription {
63 length: usize,
65 index: usize,
67}
68
69impl BucketStore {
70 fn len(&self) -> usize {
71 self.descriptions.len()
72 }
73}
74
75#[derive(Copy, Clone, Debug, Allocative)]
77struct Cursor {
78 offset: usize,
80 position: usize,
82}
83
84#[derive(Clone, Debug, Allocative)]
86enum State<T> {
87 Loaded { data: Vec<T> },
88 NotLoaded { length: usize },
89}
90
91impl<T> Bucket<T> {
92 fn len(&self) -> usize {
93 match &self.state {
94 State::Loaded { data } => data.len(),
95 State::NotLoaded { length } => *length,
96 }
97 }
98
99 fn is_loaded(&self) -> bool {
100 match self.state {
101 State::Loaded { .. } => true,
102 State::NotLoaded { .. } => false,
103 }
104 }
105
106 fn to_description(&self) -> BucketDescription {
107 BucketDescription {
108 length: self.len(),
109 index: self.index,
110 }
111 }
112}
113
114#[derive(Clone, Debug, Allocative)]
116struct Bucket<T> {
117 index: usize,
119 state: State<T>,
121}
122
123#[derive(Debug, Allocative)]
129#[allocative(bound = "C, T: Allocative, const N: usize")]
130pub struct BucketQueueView<C, T, const N: usize> {
131 #[allocative(skip)]
133 context: C,
134 stored_buckets: VecDeque<Bucket<T>>,
136 new_back_values: VecDeque<T>,
138 stored_front_position: usize,
140 cursor: Option<Cursor>,
143 delete_storage_first: bool,
145}
146
147impl<C, T, const N: usize> View for BucketQueueView<C, T, N>
148where
149 C: Context,
150 T: Send + Sync + Clone + Serialize + DeserializeOwned,
151{
152 const NUM_INIT_KEYS: usize = 2;
153
154 type Context = C;
155
156 fn context(&self) -> C {
157 self.context.clone()
158 }
159
160 fn pre_load(context: &C) -> Result<Vec<Vec<u8>>, ViewError> {
161 let key1 = context.base_key().base_tag(KeyTag::Front as u8);
162 let key2 = context.base_key().base_tag(KeyTag::Store as u8);
163 Ok(vec![key1, key2])
164 }
165
166 fn post_load(context: C, values: &[Option<Vec<u8>>]) -> Result<Self, ViewError> {
167 let value1 = values.first().ok_or(ViewError::PostLoadValuesError)?;
168 let value2 = values.get(1).ok_or(ViewError::PostLoadValuesError)?;
169 let front = from_bytes_option::<Vec<T>>(value1)?;
170 let mut stored_buckets = VecDeque::from(match front {
171 Some(data) => {
172 let bucket = Bucket {
173 index: 0,
174 state: State::Loaded { data },
175 };
176 vec![bucket]
177 }
178 None => {
179 vec![]
180 }
181 });
182 let bucket_store = from_bytes_option_or_default::<BucketStore>(value2)?;
183 for i in 1..bucket_store.len() {
186 let length = bucket_store.descriptions[i].length;
187 let index = bucket_store.descriptions[i].index;
188 stored_buckets.push_back(Bucket {
189 index,
190 state: State::NotLoaded { length },
191 });
192 }
193 let cursor = if bucket_store.descriptions.is_empty() {
194 None
195 } else {
196 Some(Cursor {
197 offset: 0,
198 position: bucket_store.front_position,
199 })
200 };
201 Ok(Self {
202 context,
203 stored_buckets,
204 stored_front_position: bucket_store.front_position,
205 new_back_values: VecDeque::new(),
206 cursor,
207 delete_storage_first: false,
208 })
209 }
210
211 fn rollback(&mut self) {
212 self.delete_storage_first = false;
213 self.cursor = if self.stored_buckets.is_empty() {
214 None
215 } else {
216 Some(Cursor {
217 offset: 0,
218 position: self.stored_front_position,
219 })
220 };
221 self.new_back_values.clear();
222 }
223
224 async fn has_pending_changes(&self) -> bool {
225 if self.delete_storage_first {
226 return true;
227 }
228 if !self.stored_buckets.is_empty() {
229 let Some(cursor) = self.cursor else {
230 return true;
231 };
232 if cursor.offset != 0 || cursor.position != self.stored_front_position {
233 return true;
234 }
235 }
236 !self.new_back_values.is_empty()
237 }
238
239 fn pre_save(&self, batch: &mut Batch) -> Result<bool, ViewError> {
240 let mut delete_view = false;
241 let mut descriptions = Vec::new();
242 let mut stored_front_position = self.stored_front_position;
243 if self.stored_count() == 0 {
244 let key_prefix = self.context.base_key().bytes.clone();
245 batch.delete_key_prefix(key_prefix);
246 delete_view = true;
247 stored_front_position = 0;
248 } else if let Some(cursor) = self.cursor {
249 for i in 0..cursor.offset {
251 let bucket = &self.stored_buckets[i];
252 let index = bucket.index;
253 let key = self.get_bucket_key(index)?;
254 batch.delete_key(key);
255 }
256 stored_front_position = cursor.position;
257 let first_index = self.stored_buckets[cursor.offset].index;
259 let start_offset = if first_index != 0 {
260 let key = self.get_bucket_key(first_index)?;
262 batch.delete_key(key);
263 let key = self.get_bucket_key(0)?;
264 let bucket = &self.stored_buckets[cursor.offset];
265 let State::Loaded { data } = &bucket.state else {
266 unreachable!("The front bucket is always loaded.");
267 };
268 batch.put_key_value(key, data)?;
269 descriptions.push(BucketDescription {
270 length: bucket.len(),
271 index: 0,
272 });
273 cursor.offset + 1
274 } else {
275 cursor.offset
276 };
277 for bucket in self.stored_buckets.range(start_offset..) {
278 descriptions.push(bucket.to_description());
279 }
280 }
281 if !self.new_back_values.is_empty() {
282 delete_view = false;
283 let mut index = if self.stored_count() == 0 {
287 0
288 } else if let Some(last_description) = descriptions.last() {
289 last_description.index + 1
290 } else {
291 0
293 };
294 let mut start = 0;
295 while start < self.new_back_values.len() {
296 let end = std::cmp::min(start + N, self.new_back_values.len());
297 let value_chunk: Vec<_> = self.new_back_values.range(start..end).collect();
298 let key = self.get_bucket_key(index)?;
299 batch.put_key_value(key, &value_chunk)?;
300 descriptions.push(BucketDescription {
301 index,
302 length: end - start,
303 });
304 index += 1;
305 start = end;
306 }
307 }
308 if !delete_view {
309 let bucket_store = BucketStore {
310 descriptions,
311 front_position: stored_front_position,
312 };
313 let key = self.context.base_key().base_tag(KeyTag::Store as u8);
314 batch.put_key_value(key, &bucket_store)?;
315 }
316 Ok(delete_view)
317 }
318
319 fn post_save(&mut self) {
320 if self.stored_count() == 0 {
321 self.stored_buckets.clear();
322 self.stored_front_position = 0;
323 self.cursor = None;
324 } else if let Some(cursor) = self.cursor {
325 for _ in 0..cursor.offset {
326 self.stored_buckets.pop_front();
327 }
328 self.cursor = Some(Cursor {
329 offset: 0,
330 position: cursor.position,
331 });
332 self.stored_front_position = cursor.position;
333 self.stored_buckets[0].index = 0;
335 }
336 if !self.new_back_values.is_empty() {
337 let mut index = match self.stored_buckets.back() {
338 Some(bucket) => bucket.index + 1,
339 None => 0,
340 };
341 let new_back_values = std::mem::take(&mut self.new_back_values);
342 let new_back_values = new_back_values.into_iter().collect::<Vec<_>>();
343 for value_chunk in new_back_values.chunks(N) {
344 self.stored_buckets.push_back(Bucket {
345 index,
346 state: State::Loaded {
347 data: value_chunk.to_vec(),
348 },
349 });
350 index += 1;
351 }
352 if self.cursor.is_none() {
353 self.cursor = Some(Cursor {
354 offset: 0,
355 position: 0,
356 });
357 }
358 }
359 self.delete_storage_first = false;
360 }
361
362 fn clear(&mut self) {
363 self.delete_storage_first = true;
364 self.new_back_values.clear();
365 self.cursor = None;
366 }
367}
368
369impl<C: Clone, T: Clone, const N: usize> ClonableView for BucketQueueView<C, T, N>
370where
371 Self: View,
372{
373 fn clone_unchecked(&mut self) -> Result<Self, ViewError> {
374 Ok(BucketQueueView {
375 context: self.context.clone(),
376 stored_buckets: self.stored_buckets.clone(),
377 new_back_values: self.new_back_values.clone(),
378 stored_front_position: self.stored_front_position,
379 cursor: self.cursor,
380 delete_storage_first: self.delete_storage_first,
381 })
382 }
383}
384
385impl<C: Context, T, const N: usize> BucketQueueView<C, T, N> {
386 fn get_bucket_key(&self, index: usize) -> Result<Vec<u8>, ViewError> {
388 Ok(if index == 0 {
389 self.context.base_key().base_tag(KeyTag::Front as u8)
390 } else {
391 self.context
392 .base_key()
393 .derive_tag_key(KeyTag::Index as u8, &index)?
394 })
395 }
396
397 pub fn stored_count(&self) -> usize {
410 if self.delete_storage_first {
411 0
412 } else {
413 let Some(cursor) = self.cursor else {
414 return 0;
415 };
416 let mut stored_count = 0;
417 for offset in cursor.offset..self.stored_buckets.len() {
418 stored_count += self.stored_buckets[offset].len();
419 }
420 stored_count -= cursor.position;
421 stored_count
422 }
423 }
424
425 pub fn count(&self) -> usize {
438 self.stored_count() + self.new_back_values.len()
439 }
440}
441
442impl<C: Context, T: DeserializeOwned + Clone, const N: usize> BucketQueueView<C, T, N> {
443 pub fn front(&self) -> Option<&T> {
457 match self.cursor {
458 Some(Cursor { offset, position }) => {
459 let bucket = &self.stored_buckets[offset];
460 let State::Loaded { data } = &bucket.state else {
461 unreachable!();
462 };
463 Some(&data[position])
464 }
465 None => self.new_back_values.front(),
466 }
467 }
468
469 pub fn front_mut(&mut self) -> Option<&mut T> {
485 match self.cursor {
486 Some(Cursor { offset, position }) => {
487 let bucket = self.stored_buckets.get_mut(offset).unwrap();
488 let State::Loaded { data } = &mut bucket.state else {
489 unreachable!();
490 };
491 Some(data.get_mut(position).unwrap())
492 }
493 None => self.new_back_values.front_mut(),
494 }
495 }
496
497 pub async fn delete_front(&mut self) -> Result<(), ViewError> {
511 match self.cursor {
512 Some(cursor) => {
513 let mut offset = cursor.offset;
514 let mut position = cursor.position + 1;
515 if self.stored_buckets[offset].len() == position {
516 offset += 1;
517 position = 0;
518 }
519 if offset == self.stored_buckets.len() {
520 self.cursor = None;
521 } else {
522 self.cursor = Some(Cursor { offset, position });
523 let bucket = self.stored_buckets.get_mut(offset).unwrap();
524 let index = bucket.index;
525 if !bucket.is_loaded() {
526 let key = self.get_bucket_key(index)?;
527 let data = self.context.store().read_value(&key).await?;
528 let data = match data {
529 Some(value) => value,
530 None => {
531 return Err(ViewError::MissingEntries(
532 "BucketQueueView::delete_front".into(),
533 ));
534 }
535 };
536 self.stored_buckets[offset].state = State::Loaded { data };
537 }
538 }
539 }
540 None => {
541 self.new_back_values.pop_front();
542 }
543 }
544 Ok(())
545 }
546
547 pub fn push_back(&mut self, value: T) {
560 self.new_back_values.push_back(value);
561 }
562
563 pub async fn elements(&self) -> Result<Vec<T>, ViewError> {
577 let count = self.count();
578 self.read_context(self.cursor, count).await
579 }
580
581 pub async fn back(&mut self) -> Result<Option<T>, ViewError>
595 where
596 T: Clone,
597 {
598 if let Some(value) = self.new_back_values.back() {
599 return Ok(Some(value.clone()));
600 }
601 if self.cursor.is_none() {
602 return Ok(None);
603 }
604 let Some(bucket) = self.stored_buckets.back() else {
605 return Ok(None);
606 };
607 if !bucket.is_loaded() {
608 let key = self.get_bucket_key(bucket.index)?;
609 let data = self.context.store().read_value(&key).await?;
610 let data = match data {
611 Some(data) => data,
612 None => {
613 return Err(ViewError::MissingEntries("BucketQueueView::back".into()));
614 }
615 };
616 self.stored_buckets.back_mut().unwrap().state = State::Loaded { data };
617 }
618 let state = &self.stored_buckets.back_mut().unwrap().state;
619 let State::Loaded { data } = state else {
620 unreachable!();
621 };
622 Ok(Some(data.last().unwrap().clone()))
623 }
624
625 async fn read_context(
626 &self,
627 cursor: Option<Cursor>,
628 count: usize,
629 ) -> Result<Vec<T>, ViewError> {
630 if count == 0 {
631 return Ok(Vec::new());
632 }
633 let mut elements = Vec::<T>::new();
634 let mut count_remain = count;
635 if let Some(cursor) = cursor {
636 let mut keys = Vec::new();
637 let mut position = cursor.position;
638 for offset in cursor.offset..self.stored_buckets.len() {
639 let bucket = &self.stored_buckets[offset];
640 let size = bucket.len() - position;
641 if !bucket.is_loaded() {
642 let key = self.get_bucket_key(bucket.index)?;
643 keys.push(key);
644 };
645 if size >= count_remain {
646 break;
647 }
648 count_remain -= size;
649 position = 0;
650 }
651 let values = self.context.store().read_multi_values_bytes(&keys).await?;
652 let mut value_pos = 0;
653 count_remain = count;
654 let mut position = cursor.position;
655 for offset in cursor.offset..self.stored_buckets.len() {
656 let bucket = &self.stored_buckets[offset];
657 let size = bucket.len() - position;
658 let data = match &bucket.state {
659 State::Loaded { data } => data,
660 State::NotLoaded { .. } => {
661 let value = match &values[value_pos] {
662 Some(value) => value,
663 None => {
664 return Err(ViewError::MissingEntries(
665 "BucketQueueView::read_context".into(),
666 ));
667 }
668 };
669 value_pos += 1;
670 &bcs::from_bytes::<Vec<T>>(value)?
671 }
672 };
673 elements.extend(data[position..].iter().take(count_remain).cloned());
674 if size >= count_remain {
675 return Ok(elements);
676 }
677 count_remain -= size;
678 position = 0;
679 }
680 }
681 let count_read = std::cmp::min(count_remain, self.new_back_values.len());
682 elements.extend(self.new_back_values.range(0..count_read).cloned());
683 Ok(elements)
684 }
685
686 pub async fn read_front(&self, count: usize) -> Result<Vec<T>, ViewError> {
701 let count = std::cmp::min(count, self.count());
702 self.read_context(self.cursor, count).await
703 }
704
705 pub async fn read_back(&self, count: usize) -> Result<Vec<T>, ViewError> {
720 let count = std::cmp::min(count, self.count());
721 if count <= self.new_back_values.len() {
722 let start = self.new_back_values.len() - count;
723 Ok(self
724 .new_back_values
725 .range(start..)
726 .cloned()
727 .collect::<Vec<_>>())
728 } else {
729 let mut increment = self.count() - count;
730 let Some(cursor) = self.cursor else {
731 unreachable!();
732 };
733 let mut position = cursor.position;
734 for offset in cursor.offset..self.stored_buckets.len() {
735 let size = self.stored_buckets[offset].len() - position;
736 if increment < size {
737 return self
738 .read_context(
739 Some(Cursor {
740 offset,
741 position: position + increment,
742 }),
743 count,
744 )
745 .await;
746 }
747 increment -= size;
748 position = 0;
749 }
750 unreachable!();
751 }
752 }
753
754 async fn load_all(&mut self) -> Result<(), ViewError> {
755 if !self.delete_storage_first {
756 let elements = self.elements().await?;
757 self.new_back_values.clear();
758 for elt in elements {
759 self.new_back_values.push_back(elt);
760 }
761 self.cursor = None;
762 self.delete_storage_first = true;
763 }
764 Ok(())
765 }
766
767 pub async fn iter_mut(&mut self) -> Result<IterMut<'_, T>, ViewError> {
783 self.load_all().await?;
784 Ok(self.new_back_values.iter_mut())
785 }
786}
787
788impl<C: Context, T: Serialize + DeserializeOwned + Send + Sync + Clone, const N: usize> HashableView
789 for BucketQueueView<C, T, N>
790where
791 Self: View,
792{
793 type Hasher = sha3::Sha3_256;
794
795 async fn hash_mut(&mut self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
796 self.hash().await
797 }
798
799 async fn hash(&self) -> Result<<Self::Hasher as Hasher>::Output, ViewError> {
800 #[cfg(with_metrics)]
801 let _hash_latency = metrics::BUCKET_QUEUE_VIEW_HASH_RUNTIME.measure_latency();
802 let elements = self.elements().await?;
803 let mut hasher = sha3::Sha3_256::default();
804 hasher.update_with_bcs_bytes(&elements)?;
805 Ok(hasher.finalize())
806 }
807}
808
809pub type HashedBucketQueueView<C, T, const N: usize> =
811 WrappedHashableContainerView<C, BucketQueueView<C, T, N>, HasherOutput>;
812
813pub type HistoricallyHashedBucketQueueView<C, T, const N: usize> =
815 HistoricallyHashableView<C, BucketQueueView<C, T, N>>;
816
817#[cfg(with_graphql)]
818mod graphql {
819 use std::borrow::Cow;
820
821 use super::BucketQueueView;
822 use crate::{
823 context::Context,
824 graphql::{hash_name, mangle},
825 };
826
827 impl<C: Send + Sync, T: async_graphql::OutputType, const N: usize> async_graphql::TypeName
828 for BucketQueueView<C, T, N>
829 {
830 fn type_name() -> Cow<'static, str> {
831 format!(
832 "BucketQueueView_{}_{:08x}",
833 mangle(T::type_name()),
834 hash_name::<T>()
835 )
836 .into()
837 }
838 }
839
840 #[async_graphql::Object(cache_control(no_cache), name_type)]
841 impl<C: Context, T: async_graphql::OutputType, const N: usize> BucketQueueView<C, T, N>
842 where
843 C: Send + Sync,
844 T: serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
845 {
846 #[graphql(derived(name = "count"))]
847 async fn count_(&self) -> Result<u32, async_graphql::Error> {
848 Ok(self.count() as u32)
849 }
850
851 async fn entries(&self, count: Option<usize>) -> async_graphql::Result<Vec<T>> {
852 Ok(self
853 .read_front(count.unwrap_or_else(|| self.count()))
854 .await?)
855 }
856 }
857}