1use std::{
19 collections::{BTreeMap, BTreeSet, HashSet},
20 fmt::Debug,
21 iter::Peekable,
22 ops::Bound,
23 vec::IntoIter,
24};
25
26use bcs::serialized_size;
27use linera_witty::{WitLoad, WitStore, WitType};
28use serde::{Deserialize, Serialize};
29
30use crate::{
31 common::{get_interval, get_uleb128_size},
32 ViewError,
33};
34
35#[derive(Clone, Debug, Eq, PartialEq, WitType, WitLoad, WitStore)]
41pub enum WriteOperation {
42 Delete {
44 key: Vec<u8>,
46 },
47 DeletePrefix {
49 key_prefix: Vec<u8>,
51 },
52 Put {
54 key: Vec<u8>,
56 value: Vec<u8>,
58 },
59}
60
61#[derive(Clone, Debug, Default, Eq, PartialEq)]
63pub struct Batch {
64 pub operations: Vec<WriteOperation>,
66}
67
68#[derive(Default, Serialize, Deserialize)]
71pub struct SimpleUnorderedBatch {
72 pub deletions: Vec<Vec<u8>>,
74 pub insertions: Vec<(Vec<u8>, Vec<u8>)>,
76}
77
78#[derive(Default, Serialize, Deserialize)]
81pub struct UnorderedBatch {
82 pub key_prefix_deletions: Vec<Vec<u8>>,
84 pub simple_unordered_batch: SimpleUnorderedBatch,
86}
87
88impl UnorderedBatch {
89 pub async fn expand_delete_prefixes<DB: DeletePrefixExpander>(
92 self,
93 db: &DB,
94 ) -> Result<SimpleUnorderedBatch, DB::Error> {
95 let mut insert_set = HashSet::new();
96 for (key, _) in &self.simple_unordered_batch.insertions {
97 insert_set.insert(key.clone());
98 }
99 let insertions = self.simple_unordered_batch.insertions;
100 let mut deletions = self.simple_unordered_batch.deletions;
101 for key_prefix in self.key_prefix_deletions {
102 for short_key in db.expand_delete_prefix(&key_prefix).await?.iter() {
103 let mut key = key_prefix.clone();
104 key.extend(short_key);
105 if !insert_set.contains(&key) {
106 deletions.push(key);
107 }
108 }
109 }
110 Ok(SimpleUnorderedBatch {
111 deletions,
112 insertions,
113 })
114 }
115
116 pub async fn expand_colliding_prefix_deletions<DB: DeletePrefixExpander>(
120 &mut self,
121 db: &DB,
122 ) -> Result<(), DB::Error> {
123 if self.key_prefix_deletions.is_empty() {
124 return Ok(());
125 }
126 let inserted_keys = self
127 .simple_unordered_batch
128 .insertions
129 .iter()
130 .map(|x| x.0.clone())
131 .collect::<BTreeSet<_>>();
132 let mut key_prefix_deletions = Vec::new();
133 for key_prefix in &self.key_prefix_deletions {
134 if inserted_keys
135 .range(get_interval(key_prefix.clone()))
136 .next()
137 .is_some()
138 {
139 for short_key in db.expand_delete_prefix(key_prefix).await?.iter() {
140 let mut key = key_prefix.clone();
141 key.extend(short_key);
142 if !inserted_keys.contains(&key) {
143 self.simple_unordered_batch.deletions.push(key);
144 }
145 }
146 } else {
147 key_prefix_deletions.push(key_prefix.to_vec());
148 }
149 }
150 self.key_prefix_deletions = key_prefix_deletions;
151 Ok(())
152 }
153
154 pub fn len(&self) -> usize {
156 self.key_prefix_deletions.len() + self.simple_unordered_batch.len()
157 }
158
159 pub fn is_empty(&self) -> bool {
161 self.key_prefix_deletions.is_empty() && self.simple_unordered_batch.is_empty()
162 }
163}
164
165fn is_prefix_matched(key_prefix_set: &BTreeSet<Vec<u8>>, key: &[u8]) -> bool {
173 let range = (Bound::Unbounded, Bound::Included(key.to_vec()));
174 let range = key_prefix_set.range(range);
175 if let Some(value) = range.last() {
176 if value.len() > key.len() {
177 return false;
178 }
179 return value == &key[0..value.len()];
180 }
181 false
182}
183
184impl Batch {
185 pub fn new() -> Self {
187 Self::default()
188 }
189
190 pub fn size(&self) -> usize {
192 self.operations
193 .iter()
194 .map(|operation| match operation {
195 WriteOperation::Delete { key } => key.len(),
196 WriteOperation::Put { key, value } => key.len() + value.len(),
197 WriteOperation::DeletePrefix { key_prefix } => key_prefix.len(),
198 })
199 .sum()
200 }
201
202 pub fn is_empty(&self) -> bool {
204 self.operations.is_empty()
205 }
206
207 pub fn num_operations(&self) -> usize {
209 self.operations.len()
210 }
211
212 pub async fn build<F>(builder: F) -> Result<Self, ViewError>
214 where
215 F: FnOnce(&mut Batch) -> futures::future::BoxFuture<Result<(), ViewError>> + Send + Sync,
216 {
217 let mut batch = Batch::new();
218 builder(&mut batch).await?;
219 Ok(batch)
220 }
221
222 pub fn simplify(self) -> UnorderedBatch {
237 let mut delete_and_insert_map = BTreeMap::new();
238 let mut delete_prefix_set = BTreeSet::new();
239 for operation in self.operations {
240 match operation {
241 WriteOperation::Delete { key } => {
242 if is_prefix_matched(&delete_prefix_set, &key) {
245 delete_and_insert_map.remove(&key);
246 } else {
247 delete_and_insert_map.insert(key, None);
248 }
249 }
250 WriteOperation::Put { key, value } => {
251 delete_and_insert_map.insert(key, Some(value));
253 }
254 WriteOperation::DeletePrefix { key_prefix } => {
255 let keys = delete_and_insert_map
257 .range(get_interval(key_prefix.clone()))
258 .map(|x| x.0.to_vec())
259 .collect::<Vec<_>>();
260 for key in keys {
261 delete_and_insert_map.remove(&key);
262 }
263 if is_prefix_matched(&delete_prefix_set, &key_prefix) {
265 continue;
266 }
267 let key_prefixes = delete_prefix_set
270 .range(get_interval(key_prefix.clone()))
271 .map(|x: &Vec<u8>| x.to_vec())
272 .collect::<Vec<_>>();
273 for key_prefix in key_prefixes {
275 delete_prefix_set.remove(&key_prefix);
276 }
277 delete_prefix_set.insert(key_prefix);
279 }
280 }
281 }
282 let key_prefix_deletions = delete_prefix_set.into_iter().collect();
283 let mut deletions = Vec::new();
284 let mut insertions = Vec::new();
285 for (key, val) in delete_and_insert_map {
286 match val {
287 Some(value) => insertions.push((key, value)),
288 None => deletions.push(key),
289 }
290 }
291 let simple_unordered_batch = SimpleUnorderedBatch {
292 deletions,
293 insertions,
294 };
295 UnorderedBatch {
296 key_prefix_deletions,
297 simple_unordered_batch,
298 }
299 }
300
301 pub fn check_value_size(&self, max_value_size: usize) -> bool {
303 for operation in &self.operations {
304 if let WriteOperation::Put { key: _, value } = operation {
305 if value.len() > max_value_size {
306 return false;
307 }
308 }
309 }
310 true
311 }
312
313 #[inline]
320 pub fn put_key_value(
321 &mut self,
322 key: Vec<u8>,
323 value: &impl Serialize,
324 ) -> Result<(), bcs::Error> {
325 let bytes = bcs::to_bytes(value)?;
326 self.put_key_value_bytes(key, bytes);
327 Ok(())
328 }
329
330 #[inline]
337 pub fn put_key_value_bytes(&mut self, key: Vec<u8>, value: Vec<u8>) {
338 self.operations.push(WriteOperation::Put { key, value });
339 }
340
341 #[inline]
348 pub fn delete_key(&mut self, key: Vec<u8>) {
349 self.operations.push(WriteOperation::Delete { key });
350 }
351
352 #[inline]
359 pub fn delete_key_prefix(&mut self, key_prefix: Vec<u8>) {
360 self.operations
361 .push(WriteOperation::DeletePrefix { key_prefix });
362 }
363}
364
365#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
371pub trait DeletePrefixExpander {
372 type Error: Debug;
374
375 async fn expand_delete_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error>;
377}
378
379#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
380pub trait SimplifiedBatch: Sized + Send + Sync {
382 type Iter: BatchValueWriter<Self>;
384
385 async fn from_batch<S: DeletePrefixExpander>(store: S, batch: Batch) -> Result<Self, S::Error>;
387
388 fn into_iter(self) -> Self::Iter;
390
391 fn len(&self) -> usize;
393
394 fn num_bytes(&self) -> usize;
396
397 fn overhead_size(&self) -> usize;
399
400 fn add_delete(&mut self, key: Vec<u8>);
402
403 fn add_insert(&mut self, key: Vec<u8>, value: Vec<u8>);
405
406 fn is_empty(&self) -> bool {
408 self.len() == 0
409 }
410}
411
412#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
415pub trait BatchValueWriter<Batch> {
416 fn is_empty(&self) -> bool;
418
419 fn write_next_value(
423 &mut self,
424 batch: &mut Batch,
425 batch_size: &mut usize,
426 ) -> Result<bool, bcs::Error>;
427
428 fn next_batch_size(
431 &mut self,
432 batch: &Batch,
433 batch_size: usize,
434 ) -> Result<Option<usize>, bcs::Error>;
435}
436
437pub struct SimpleUnorderedBatchIter {
439 delete_iter: Peekable<IntoIter<Vec<u8>>>,
440 insert_iter: Peekable<IntoIter<(Vec<u8>, Vec<u8>)>>,
441}
442
443impl SimplifiedBatch for SimpleUnorderedBatch {
444 type Iter = SimpleUnorderedBatchIter;
445
446 fn into_iter(self) -> Self::Iter {
447 let delete_iter = self.deletions.into_iter().peekable();
448 let insert_iter = self.insertions.into_iter().peekable();
449 Self::Iter {
450 delete_iter,
451 insert_iter,
452 }
453 }
454
455 fn len(&self) -> usize {
456 self.deletions.len() + self.insertions.len()
457 }
458
459 fn num_bytes(&self) -> usize {
460 let mut total_size = 0;
461 for (key, value) in &self.insertions {
462 total_size += key.len() + value.len();
463 }
464 for deletion in &self.deletions {
465 total_size += deletion.len();
466 }
467 total_size
468 }
469
470 fn overhead_size(&self) -> usize {
471 get_uleb128_size(self.deletions.len()) + get_uleb128_size(self.insertions.len())
472 }
473
474 fn add_delete(&mut self, key: Vec<u8>) {
475 self.deletions.push(key)
476 }
477
478 fn add_insert(&mut self, key: Vec<u8>, value: Vec<u8>) {
479 self.insertions.push((key, value))
480 }
481
482 async fn from_batch<S: DeletePrefixExpander>(store: S, batch: Batch) -> Result<Self, S::Error> {
483 let unordered_batch = batch.simplify();
484 unordered_batch.expand_delete_prefixes(&store).await
485 }
486}
487
488impl BatchValueWriter<SimpleUnorderedBatch> for SimpleUnorderedBatchIter {
489 fn is_empty(&self) -> bool {
490 self.delete_iter.len() == 0 && self.insert_iter.len() == 0
491 }
492
493 fn write_next_value(
494 &mut self,
495 batch: &mut SimpleUnorderedBatch,
496 batch_size: &mut usize,
497 ) -> Result<bool, bcs::Error> {
498 if let Some(delete) = self.delete_iter.next() {
499 *batch_size += serialized_size(&delete)?;
500 batch.deletions.push(delete);
501 Ok(true)
502 } else if let Some((key, value)) = self.insert_iter.next() {
503 *batch_size += serialized_size(&key)? + serialized_size(&value)?;
504 batch.insertions.push((key, value));
505 Ok(true)
506 } else {
507 Ok(false)
508 }
509 }
510
511 fn next_batch_size(
512 &mut self,
513 batch: &SimpleUnorderedBatch,
514 batch_size: usize,
515 ) -> Result<Option<usize>, bcs::Error> {
516 if let Some(delete) = self.delete_iter.peek() {
517 let next_size = serialized_size(&delete)?;
518 Ok(Some(
519 batch_size
520 + next_size
521 + get_uleb128_size(batch.deletions.len() + 1)
522 + get_uleb128_size(batch.insertions.len()),
523 ))
524 } else if let Some((key, value)) = self.insert_iter.peek() {
525 let next_size = serialized_size(&key)? + serialized_size(&value)?;
526 Ok(Some(
527 batch_size
528 + next_size
529 + get_uleb128_size(batch.deletions.len())
530 + get_uleb128_size(batch.insertions.len() + 1),
531 ))
532 } else {
533 Ok(None)
534 }
535 }
536}
537
538pub struct UnorderedBatchIter {
540 delete_prefix_iter: Peekable<IntoIter<Vec<u8>>>,
541 insert_deletion_iter: SimpleUnorderedBatchIter,
542}
543
544impl SimplifiedBatch for UnorderedBatch {
545 type Iter = UnorderedBatchIter;
546
547 fn into_iter(self) -> Self::Iter {
548 let delete_prefix_iter = self.key_prefix_deletions.into_iter().peekable();
549 let insert_deletion_iter = self.simple_unordered_batch.into_iter();
550 Self::Iter {
551 delete_prefix_iter,
552 insert_deletion_iter,
553 }
554 }
555
556 fn len(&self) -> usize {
557 self.key_prefix_deletions.len() + self.simple_unordered_batch.len()
558 }
559
560 fn num_bytes(&self) -> usize {
561 let mut total_size = self.simple_unordered_batch.num_bytes();
562 for prefix_deletion in &self.key_prefix_deletions {
563 total_size += prefix_deletion.len();
564 }
565 total_size
566 }
567
568 fn overhead_size(&self) -> usize {
569 get_uleb128_size(self.key_prefix_deletions.len())
570 + self.simple_unordered_batch.overhead_size()
571 }
572
573 fn add_delete(&mut self, key: Vec<u8>) {
574 self.simple_unordered_batch.add_delete(key)
575 }
576
577 fn add_insert(&mut self, key: Vec<u8>, value: Vec<u8>) {
578 self.simple_unordered_batch.add_insert(key, value)
579 }
580
581 async fn from_batch<S: DeletePrefixExpander>(store: S, batch: Batch) -> Result<Self, S::Error> {
582 let mut unordered_batch = batch.simplify();
583 unordered_batch
584 .expand_colliding_prefix_deletions(&store)
585 .await?;
586 Ok(unordered_batch)
587 }
588}
589
590impl BatchValueWriter<UnorderedBatch> for UnorderedBatchIter {
591 fn is_empty(&self) -> bool {
592 self.delete_prefix_iter.len() == 0 && self.insert_deletion_iter.is_empty()
593 }
594
595 fn write_next_value(
596 &mut self,
597 batch: &mut UnorderedBatch,
598 batch_size: &mut usize,
599 ) -> Result<bool, bcs::Error> {
600 if let Some(delete_prefix) = self.delete_prefix_iter.next() {
601 *batch_size += serialized_size(&delete_prefix)?;
602 batch.key_prefix_deletions.push(delete_prefix);
603 Ok(true)
604 } else {
605 self.insert_deletion_iter
606 .write_next_value(&mut batch.simple_unordered_batch, batch_size)
607 }
608 }
609
610 fn next_batch_size(
611 &mut self,
612 batch: &UnorderedBatch,
613 batch_size: usize,
614 ) -> Result<Option<usize>, bcs::Error> {
615 if let Some(delete_prefix) = self.delete_prefix_iter.peek() {
616 let next_size = serialized_size(&delete_prefix)?;
617 Ok(Some(
618 batch_size
619 + next_size
620 + get_uleb128_size(batch.key_prefix_deletions.len() + 1)
621 + batch.simple_unordered_batch.overhead_size(),
622 ))
623 } else {
624 let batch_size = batch_size + get_uleb128_size(batch.key_prefix_deletions.len());
625 self.insert_deletion_iter
626 .next_batch_size(&batch.simple_unordered_batch, batch_size)
627 }
628 }
629}
630
631#[cfg(test)]
632mod tests {
633 use linera_views::{
634 batch::{Batch, SimpleUnorderedBatch, UnorderedBatch},
635 context::{Context, MemoryContext},
636 store::WritableKeyValueStore as _,
637 };
638
639 #[test]
640 fn test_simplify_batch1() {
641 let mut batch = Batch::new();
642 batch.put_key_value_bytes(vec![1, 2], vec![]);
643 batch.put_key_value_bytes(vec![1, 3, 3], vec![33, 2]);
644 batch.put_key_value_bytes(vec![1, 2, 3], vec![34, 2]);
645 batch.delete_key_prefix(vec![1, 2]);
646 let unordered_batch = batch.simplify();
647 assert_eq!(unordered_batch.key_prefix_deletions, vec![vec![1, 2]]);
648 assert!(unordered_batch.simple_unordered_batch.deletions.is_empty());
649 assert_eq!(
650 unordered_batch.simple_unordered_batch.insertions,
651 vec![(vec![1, 3, 3], vec![33, 2])]
652 );
653 }
654
655 #[test]
656 fn test_simplify_batch2() {
657 let mut batch = Batch::new();
658 batch.delete_key(vec![1, 2, 3]);
659 batch.delete_key_prefix(vec![1, 2]);
660 batch.delete_key(vec![1, 2, 4]);
661 let unordered_batch = batch.simplify();
662 assert_eq!(unordered_batch.key_prefix_deletions, vec![vec![1, 2]]);
663 assert!(unordered_batch.simple_unordered_batch.deletions.is_empty());
664 assert!(unordered_batch.simple_unordered_batch.insertions.is_empty());
665 }
666
667 #[test]
668 fn test_simplify_batch3() {
669 let mut batch = Batch::new();
670 batch.delete_key_prefix(vec![1, 2]);
671 batch.put_key_value_bytes(vec![1, 2, 3, 4], vec![]);
672 batch.delete_key_prefix(vec![1, 2, 3]);
673 let unordered_batch = batch.simplify();
674 assert_eq!(unordered_batch.key_prefix_deletions, vec![vec![1, 2]]);
675 assert!(unordered_batch.simple_unordered_batch.deletions.is_empty());
676 assert!(unordered_batch.simple_unordered_batch.insertions.is_empty());
677 }
678
679 #[test]
680 fn test_simplify_batch4() {
681 let mut batch = Batch::new();
682 batch.delete_key_prefix(vec![1, 2]);
683 batch.put_key_value_bytes(vec![1, 2, 3], vec![4, 5]);
684 batch.delete_key(vec![1, 2, 3]);
685 let unordered_batch = batch.simplify();
686 assert_eq!(unordered_batch.key_prefix_deletions, vec![vec![1, 2]]);
687 assert!(unordered_batch.simple_unordered_batch.deletions.is_empty());
688 assert!(unordered_batch.simple_unordered_batch.insertions.is_empty());
689 }
690
691 #[tokio::test]
692 async fn test_simplify_batch5() {
693 let context = MemoryContext::new_for_testing(());
694 let mut batch = Batch::new();
695 batch.put_key_value_bytes(vec![1, 2, 3], vec![]);
696 batch.put_key_value_bytes(vec![1, 2, 4], vec![]);
697 batch.put_key_value_bytes(vec![1, 2, 5], vec![]);
698 batch.put_key_value_bytes(vec![1, 3, 3], vec![]);
699 context.store().write_batch(batch).await.unwrap();
700 let mut batch = Batch::new();
701 batch.delete_key_prefix(vec![1, 2]);
702 let unordered_batch = batch.simplify();
703 let simple_unordered_batch = unordered_batch
704 .expand_delete_prefixes(&context)
705 .await
706 .unwrap();
707 assert_eq!(
708 simple_unordered_batch.deletions,
709 vec![vec![1, 2, 3], vec![1, 2, 4], vec![1, 2, 5]]
710 );
711 assert!(simple_unordered_batch.insertions.is_empty());
712 }
713
714 #[tokio::test]
715 async fn test_simplify_batch6() {
716 let context = MemoryContext::new_for_testing(());
717 let insertions = vec![(vec![1, 2, 3], vec![])];
718 let simple_unordered_batch = SimpleUnorderedBatch {
719 insertions: insertions.clone(),
720 deletions: vec![],
721 };
722 let key_prefix_deletions = vec![vec![1, 2]];
723 let mut unordered_batch = UnorderedBatch {
724 simple_unordered_batch,
725 key_prefix_deletions,
726 };
727 unordered_batch
728 .expand_colliding_prefix_deletions(&context)
729 .await
730 .unwrap();
731 assert!(unordered_batch.simple_unordered_batch.deletions.is_empty());
732 assert_eq!(
733 unordered_batch.simple_unordered_batch.insertions,
734 insertions
735 );
736 assert!(unordered_batch.key_prefix_deletions.is_empty());
737 }
738}