1use std::{
19 collections::{BTreeMap, BTreeSet, HashSet},
20 fmt,
21 iter::Peekable,
22 ops::Bound,
23 vec::IntoIter,
24};
25
26use bcs::serialized_size;
27use custom_debug_derive::Debug;
28use linera_base::hex_debug;
29use linera_witty::{WitLoad, WitStore, WitType};
30use serde::{Deserialize, Serialize};
31
32use crate::{
33 common::{get_key_range_for_prefix, get_uleb128_size},
34 ViewError,
35};
36
37#[derive(Clone, Debug, Eq, PartialEq, WitType, WitLoad, WitStore, Serialize)]
43pub enum WriteOperation {
44 Delete {
46 #[debug(with = "hex_debug")]
48 key: Vec<u8>,
49 },
50 DeletePrefix {
52 #[debug(with = "hex_debug")]
54 key_prefix: Vec<u8>,
55 },
56 Put {
58 #[debug(with = "hex_debug")]
60 key: Vec<u8>,
61 #[debug(with = "hex_debug")]
63 value: Vec<u8>,
64 },
65}
66
67#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize)]
69pub struct Batch {
70 pub operations: Vec<WriteOperation>,
72}
73
74#[derive(Default, Serialize, Deserialize)]
77pub struct SimpleUnorderedBatch {
78 pub deletions: Vec<Vec<u8>>,
80 pub insertions: Vec<(Vec<u8>, Vec<u8>)>,
82}
83
84#[derive(Default, Serialize, Deserialize)]
92pub struct UnorderedBatch {
93 pub key_prefix_deletions: Vec<Vec<u8>>,
95 pub simple_unordered_batch: SimpleUnorderedBatch,
97}
98
99impl UnorderedBatch {
100 pub async fn expand_delete_prefixes<DB: DeletePrefixExpander>(
103 self,
104 db: &DB,
105 ) -> Result<SimpleUnorderedBatch, DB::Error> {
106 let mut insert_set = HashSet::new();
107 for (key, _) in &self.simple_unordered_batch.insertions {
108 insert_set.insert(key.clone());
109 }
110 let insertions = self.simple_unordered_batch.insertions;
111 let mut deletions = self.simple_unordered_batch.deletions;
112 for key_prefix in self.key_prefix_deletions {
113 for short_key in &db.expand_delete_prefix(&key_prefix).await? {
114 let mut key = key_prefix.clone();
115 key.extend(short_key);
116 if !insert_set.contains(&key) {
117 deletions.push(key);
118 }
119 }
120 }
121 Ok(SimpleUnorderedBatch {
122 deletions,
123 insertions,
124 })
125 }
126
127 pub fn len(&self) -> usize {
129 self.key_prefix_deletions.len() + self.simple_unordered_batch.len()
130 }
131
132 pub fn is_empty(&self) -> bool {
134 self.key_prefix_deletions.is_empty() && self.simple_unordered_batch.is_empty()
135 }
136}
137
138fn is_prefix_matched(key_prefix_set: &BTreeSet<Vec<u8>>, key: &[u8]) -> bool {
146 let range = (Bound::Unbounded, Bound::Included(key.to_vec()));
147 let range = key_prefix_set.range(range);
148 if let Some(value) = range.last() {
149 if value.len() > key.len() {
150 return false;
151 }
152 return value == &key[0..value.len()];
153 }
154 false
155}
156
157impl Batch {
158 pub fn new() -> Self {
160 Self::default()
161 }
162
163 pub fn size(&self) -> usize {
165 self.operations
166 .iter()
167 .map(|operation| match operation {
168 WriteOperation::Delete { key } => key.len(),
169 WriteOperation::Put { key, value } => key.len() + value.len(),
170 WriteOperation::DeletePrefix { key_prefix } => key_prefix.len(),
171 })
172 .sum()
173 }
174
175 pub fn is_empty(&self) -> bool {
177 self.operations.is_empty()
178 }
179
180 pub fn num_operations(&self) -> usize {
182 self.operations.len()
183 }
184
185 pub async fn build<F>(builder: F) -> Result<Self, ViewError>
187 where
188 F: FnOnce(&mut Batch) -> futures::future::BoxFuture<Result<(), ViewError>> + Send + Sync,
189 {
190 let mut batch = Batch::new();
191 builder(&mut batch).await?;
192 Ok(batch)
193 }
194
195 pub fn simplify(self) -> UnorderedBatch {
210 let mut delete_and_insert_map = BTreeMap::new();
211 let mut delete_prefix_set = BTreeSet::new();
212 for operation in self.operations {
213 match operation {
214 WriteOperation::Delete { key } => {
215 if is_prefix_matched(&delete_prefix_set, &key) {
218 delete_and_insert_map.remove(&key);
219 } else {
220 delete_and_insert_map.insert(key, None);
221 }
222 }
223 WriteOperation::Put { key, value } => {
224 delete_and_insert_map.insert(key, Some(value));
226 }
227 WriteOperation::DeletePrefix { key_prefix } => {
228 let keys = delete_and_insert_map
230 .range(get_key_range_for_prefix(key_prefix.clone()))
231 .map(|x| x.0.to_vec())
232 .collect::<Vec<_>>();
233 for key in keys {
234 delete_and_insert_map.remove(&key);
235 }
236 if is_prefix_matched(&delete_prefix_set, &key_prefix) {
238 continue;
239 }
240 let key_prefixes = delete_prefix_set
243 .range(get_key_range_for_prefix(key_prefix.clone()))
244 .map(|x: &Vec<u8>| x.to_vec())
245 .collect::<Vec<_>>();
246 for key_prefix in key_prefixes {
248 delete_prefix_set.remove(&key_prefix);
249 }
250 delete_prefix_set.insert(key_prefix);
252 }
253 }
254 }
255 let key_prefix_deletions = delete_prefix_set.into_iter().collect();
256 let mut deletions = Vec::new();
257 let mut insertions = Vec::new();
258 for (key, val) in delete_and_insert_map {
259 match val {
260 Some(value) => insertions.push((key, value)),
261 None => deletions.push(key),
262 }
263 }
264 let simple_unordered_batch = SimpleUnorderedBatch {
265 deletions,
266 insertions,
267 };
268 UnorderedBatch {
269 key_prefix_deletions,
270 simple_unordered_batch,
271 }
272 }
273
274 pub fn check_value_size(&self, max_value_size: usize) -> bool {
276 for operation in &self.operations {
277 if let WriteOperation::Put { key: _, value } = operation {
278 if value.len() > max_value_size {
279 return false;
280 }
281 }
282 }
283 true
284 }
285
286 #[inline]
293 pub fn put_key_value(
294 &mut self,
295 key: Vec<u8>,
296 value: &impl Serialize,
297 ) -> Result<(), bcs::Error> {
298 let bytes = bcs::to_bytes(value)?;
299 self.put_key_value_bytes(key, bytes);
300 Ok(())
301 }
302
303 #[inline]
310 pub fn put_key_value_bytes(&mut self, key: Vec<u8>, value: Vec<u8>) {
311 self.operations.push(WriteOperation::Put { key, value });
312 }
313
314 #[inline]
321 pub fn delete_key(&mut self, key: Vec<u8>) {
322 self.operations.push(WriteOperation::Delete { key });
323 }
324
325 #[inline]
332 pub fn delete_key_prefix(&mut self, key_prefix: Vec<u8>) {
333 self.operations
334 .push(WriteOperation::DeletePrefix { key_prefix });
335 }
336}
337
338#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
344pub trait DeletePrefixExpander {
345 type Error: fmt::Debug;
347
348 async fn expand_delete_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error>;
350}
351
352#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
353pub trait SimplifiedBatch: Sized + Send + Sync {
355 type Iter: BatchValueWriter<Self>;
357
358 async fn from_batch<S: DeletePrefixExpander>(store: S, batch: Batch) -> Result<Self, S::Error>;
360
361 fn into_iter(self) -> Self::Iter;
363
364 fn len(&self) -> usize;
366
367 fn num_bytes(&self) -> usize;
369
370 fn overhead_size(&self) -> usize;
372
373 fn add_delete(&mut self, key: Vec<u8>);
375
376 fn add_insert(&mut self, key: Vec<u8>, value: Vec<u8>);
378
379 fn is_empty(&self) -> bool {
381 self.len() == 0
382 }
383}
384
385#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
388pub trait BatchValueWriter<Batch> {
389 fn is_empty(&self) -> bool;
391
392 fn write_next_value(
396 &mut self,
397 batch: &mut Batch,
398 batch_size: &mut usize,
399 ) -> Result<bool, bcs::Error>;
400
401 fn next_batch_size(
404 &mut self,
405 batch: &Batch,
406 batch_size: usize,
407 ) -> Result<Option<usize>, bcs::Error>;
408}
409
410pub struct SimpleUnorderedBatchIter {
412 delete_iter: Peekable<IntoIter<Vec<u8>>>,
413 insert_iter: Peekable<IntoIter<(Vec<u8>, Vec<u8>)>>,
414}
415
416impl SimplifiedBatch for SimpleUnorderedBatch {
417 type Iter = SimpleUnorderedBatchIter;
418
419 fn into_iter(self) -> Self::Iter {
420 let delete_iter = self.deletions.into_iter().peekable();
421 let insert_iter = self.insertions.into_iter().peekable();
422 Self::Iter {
423 delete_iter,
424 insert_iter,
425 }
426 }
427
428 fn len(&self) -> usize {
429 self.deletions.len() + self.insertions.len()
430 }
431
432 fn num_bytes(&self) -> usize {
433 let mut total_size = 0;
434 for (key, value) in &self.insertions {
435 total_size += key.len() + value.len();
436 }
437 for deletion in &self.deletions {
438 total_size += deletion.len();
439 }
440 total_size
441 }
442
443 fn overhead_size(&self) -> usize {
444 get_uleb128_size(self.deletions.len()) + get_uleb128_size(self.insertions.len())
445 }
446
447 fn add_delete(&mut self, key: Vec<u8>) {
448 self.deletions.push(key)
449 }
450
451 fn add_insert(&mut self, key: Vec<u8>, value: Vec<u8>) {
452 self.insertions.push((key, value))
453 }
454
455 async fn from_batch<S: DeletePrefixExpander>(store: S, batch: Batch) -> Result<Self, S::Error> {
456 let unordered_batch = batch.simplify();
457 unordered_batch.expand_delete_prefixes(&store).await
458 }
459}
460
461impl BatchValueWriter<SimpleUnorderedBatch> for SimpleUnorderedBatchIter {
462 fn is_empty(&self) -> bool {
463 self.delete_iter.len() == 0 && self.insert_iter.len() == 0
464 }
465
466 fn write_next_value(
467 &mut self,
468 batch: &mut SimpleUnorderedBatch,
469 batch_size: &mut usize,
470 ) -> Result<bool, bcs::Error> {
471 if let Some(delete) = self.delete_iter.next() {
472 *batch_size += serialized_size(&delete)?;
473 batch.deletions.push(delete);
474 Ok(true)
475 } else if let Some((key, value)) = self.insert_iter.next() {
476 *batch_size += serialized_size(&key)? + serialized_size(&value)?;
477 batch.insertions.push((key, value));
478 Ok(true)
479 } else {
480 Ok(false)
481 }
482 }
483
484 fn next_batch_size(
485 &mut self,
486 batch: &SimpleUnorderedBatch,
487 batch_size: usize,
488 ) -> Result<Option<usize>, bcs::Error> {
489 if let Some(delete) = self.delete_iter.peek() {
490 let next_size = serialized_size(&delete)?;
491 Ok(Some(
492 batch_size
493 + next_size
494 + get_uleb128_size(batch.deletions.len() + 1)
495 + get_uleb128_size(batch.insertions.len()),
496 ))
497 } else if let Some((key, value)) = self.insert_iter.peek() {
498 let next_size = serialized_size(&key)? + serialized_size(&value)?;
499 Ok(Some(
500 batch_size
501 + next_size
502 + get_uleb128_size(batch.deletions.len())
503 + get_uleb128_size(batch.insertions.len() + 1),
504 ))
505 } else {
506 Ok(None)
507 }
508 }
509}
510
511pub struct UnorderedBatchIter {
513 delete_prefix_iter: Peekable<IntoIter<Vec<u8>>>,
514 insert_deletion_iter: SimpleUnorderedBatchIter,
515}
516
517impl SimplifiedBatch for UnorderedBatch {
518 type Iter = UnorderedBatchIter;
519
520 fn into_iter(self) -> Self::Iter {
521 let delete_prefix_iter = self.key_prefix_deletions.into_iter().peekable();
522 let insert_deletion_iter = self.simple_unordered_batch.into_iter();
523 Self::Iter {
524 delete_prefix_iter,
525 insert_deletion_iter,
526 }
527 }
528
529 fn len(&self) -> usize {
530 self.key_prefix_deletions.len() + self.simple_unordered_batch.len()
531 }
532
533 fn num_bytes(&self) -> usize {
534 let mut total_size = self.simple_unordered_batch.num_bytes();
535 for prefix_deletion in &self.key_prefix_deletions {
536 total_size += prefix_deletion.len();
537 }
538 total_size
539 }
540
541 fn overhead_size(&self) -> usize {
542 get_uleb128_size(self.key_prefix_deletions.len())
543 + self.simple_unordered_batch.overhead_size()
544 }
545
546 fn add_delete(&mut self, key: Vec<u8>) {
547 self.simple_unordered_batch.add_delete(key)
548 }
549
550 fn add_insert(&mut self, key: Vec<u8>, value: Vec<u8>) {
551 self.simple_unordered_batch.add_insert(key, value)
552 }
553
554 async fn from_batch<S: DeletePrefixExpander>(
555 _store: S,
556 batch: Batch,
557 ) -> Result<Self, S::Error> {
558 Ok(batch.simplify())
559 }
560}
561
562impl BatchValueWriter<UnorderedBatch> for UnorderedBatchIter {
563 fn is_empty(&self) -> bool {
564 self.delete_prefix_iter.len() == 0 && self.insert_deletion_iter.is_empty()
565 }
566
567 fn write_next_value(
568 &mut self,
569 batch: &mut UnorderedBatch,
570 batch_size: &mut usize,
571 ) -> Result<bool, bcs::Error> {
572 if let Some(delete_prefix) = self.delete_prefix_iter.next() {
573 *batch_size += serialized_size(&delete_prefix)?;
574 batch.key_prefix_deletions.push(delete_prefix);
575 Ok(true)
576 } else {
577 self.insert_deletion_iter
578 .write_next_value(&mut batch.simple_unordered_batch, batch_size)
579 }
580 }
581
582 fn next_batch_size(
583 &mut self,
584 batch: &UnorderedBatch,
585 batch_size: usize,
586 ) -> Result<Option<usize>, bcs::Error> {
587 if let Some(delete_prefix) = self.delete_prefix_iter.peek() {
588 let next_size = serialized_size(&delete_prefix)?;
589 Ok(Some(
590 batch_size
591 + next_size
592 + get_uleb128_size(batch.key_prefix_deletions.len() + 1)
593 + batch.simple_unordered_batch.overhead_size(),
594 ))
595 } else {
596 let batch_size = batch_size + get_uleb128_size(batch.key_prefix_deletions.len());
597 self.insert_deletion_iter
598 .next_batch_size(&batch.simple_unordered_batch, batch_size)
599 }
600 }
601}
602
603#[cfg(test)]
604mod tests {
605 use linera_views::{
606 batch::Batch,
607 context::{Context, MemoryContext},
608 store::WritableKeyValueStore as _,
609 };
610
611 #[test]
612 fn test_simplify_batch1() {
613 let mut batch = Batch::new();
614 batch.put_key_value_bytes(vec![1, 2], vec![]);
615 batch.put_key_value_bytes(vec![1, 3, 3], vec![33, 2]);
616 batch.put_key_value_bytes(vec![1, 2, 3], vec![34, 2]);
617 batch.delete_key_prefix(vec![1, 2]);
618 let unordered_batch = batch.simplify();
619 assert_eq!(unordered_batch.key_prefix_deletions, vec![vec![1, 2]]);
620 assert!(unordered_batch.simple_unordered_batch.deletions.is_empty());
621 assert_eq!(
622 unordered_batch.simple_unordered_batch.insertions,
623 vec![(vec![1, 3, 3], vec![33, 2])]
624 );
625 }
626
627 #[test]
628 fn test_simplify_batch2() {
629 let mut batch = Batch::new();
630 batch.delete_key(vec![1, 2, 3]);
631 batch.delete_key_prefix(vec![1, 2]);
632 batch.delete_key(vec![1, 2, 4]);
633 let unordered_batch = batch.simplify();
634 assert_eq!(unordered_batch.key_prefix_deletions, vec![vec![1, 2]]);
635 assert!(unordered_batch.simple_unordered_batch.deletions.is_empty());
636 assert!(unordered_batch.simple_unordered_batch.insertions.is_empty());
637 }
638
639 #[test]
640 fn test_simplify_batch3() {
641 let mut batch = Batch::new();
642 batch.delete_key_prefix(vec![1, 2]);
643 batch.put_key_value_bytes(vec![1, 2, 3, 4], vec![]);
644 batch.delete_key_prefix(vec![1, 2, 3]);
645 let unordered_batch = batch.simplify();
646 assert_eq!(unordered_batch.key_prefix_deletions, vec![vec![1, 2]]);
647 assert!(unordered_batch.simple_unordered_batch.deletions.is_empty());
648 assert!(unordered_batch.simple_unordered_batch.insertions.is_empty());
649 }
650
651 #[test]
652 fn test_simplify_batch4() {
653 let mut batch = Batch::new();
654 batch.delete_key_prefix(vec![1, 2]);
655 batch.put_key_value_bytes(vec![1, 2, 3], vec![4, 5]);
656 batch.delete_key(vec![1, 2, 3]);
657 let unordered_batch = batch.simplify();
658 assert_eq!(unordered_batch.key_prefix_deletions, vec![vec![1, 2]]);
659 assert!(unordered_batch.simple_unordered_batch.deletions.is_empty());
660 assert!(unordered_batch.simple_unordered_batch.insertions.is_empty());
661 }
662
663 #[tokio::test]
664 async fn test_simplify_batch5() {
665 let context = MemoryContext::new_for_testing(());
666 let mut batch = Batch::new();
667 batch.put_key_value_bytes(vec![1, 2, 3], vec![]);
668 batch.put_key_value_bytes(vec![1, 2, 4], vec![]);
669 batch.put_key_value_bytes(vec![1, 2, 5], vec![]);
670 batch.put_key_value_bytes(vec![1, 3, 3], vec![]);
671 context.store().write_batch(batch).await.unwrap();
672 let mut batch = Batch::new();
673 batch.delete_key_prefix(vec![1, 2]);
674 let unordered_batch = batch.simplify();
675 let simple_unordered_batch = unordered_batch
676 .expand_delete_prefixes(&context)
677 .await
678 .unwrap();
679 assert_eq!(
680 simple_unordered_batch.deletions,
681 vec![vec![1, 2, 3], vec![1, 2, 4], vec![1, 2, 5]]
682 );
683 assert!(simple_unordered_batch.insertions.is_empty());
684 }
685}