linera_views/backends/
value_splitting.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Adds support for large values to a given store by splitting them between several keys.
5
6use linera_base::ensure;
7use thiserror::Error;
8
9use crate::{
10    batch::{Batch, WriteOperation},
11    store::{
12        KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
13        WritableKeyValueStore,
14    },
15};
16#[cfg(with_testing)]
17use crate::{
18    memory::{MemoryStore, MemoryStoreError},
19    store::TestKeyValueDatabase,
20};
21
22/// A key-value database with no size limit for values.
23///
24/// It wraps a key-value store, potentially _with_ a size limit, and automatically
25/// splits up large values into smaller ones. A single logical key-value pair is
26/// stored as multiple smaller key-value pairs in the wrapped store.
27/// See the `README.md` for additional details.
28#[derive(Clone)]
29pub struct ValueSplittingDatabase<D> {
30    /// The underlying database.
31    database: D,
32}
33
34/// A key-value store with no size limit for values.
35#[derive(Clone)]
36pub struct ValueSplittingStore<S> {
37    /// The underlying store.
38    store: S,
39}
40
41/// The composed error type built from the inner error type.
42#[derive(Error, Debug)]
43pub enum ValueSplittingError<E> {
44    /// inner store error
45    #[error(transparent)]
46    InnerStoreError(#[from] E),
47
48    /// The key is of length less than 4, so we cannot extract the first byte
49    #[error("the key is of length less than 4, so we cannot extract the first byte")]
50    TooShortKey,
51
52    /// Value segment is missing from the database
53    #[error("value segment is missing from the database")]
54    MissingSegment,
55
56    /// No count of size `u32` is available in the value
57    #[error("no count of size u32 is available in the value")]
58    NoCountAvailable,
59}
60
61impl<E: KeyValueStoreError> From<bcs::Error> for ValueSplittingError<E> {
62    fn from(error: bcs::Error) -> Self {
63        let error = E::from(error);
64        ValueSplittingError::InnerStoreError(error)
65    }
66}
67
68impl<E: KeyValueStoreError + 'static> KeyValueStoreError for ValueSplittingError<E> {
69    const BACKEND: &'static str = "value splitting";
70}
71
72impl<S> WithError for ValueSplittingDatabase<S>
73where
74    S: WithError,
75    S::Error: 'static,
76{
77    type Error = ValueSplittingError<S::Error>;
78}
79
80impl<D> WithError for ValueSplittingStore<D>
81where
82    D: WithError,
83    D::Error: 'static,
84{
85    type Error = ValueSplittingError<D::Error>;
86}
87
88impl<S> ReadableKeyValueStore for ValueSplittingStore<S>
89where
90    S: ReadableKeyValueStore,
91    S::Error: 'static,
92{
93    const MAX_KEY_SIZE: usize = S::MAX_KEY_SIZE - 4;
94
95    fn max_stream_queries(&self) -> usize {
96        self.store.max_stream_queries()
97    }
98
99    fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
100        Ok(self.store.root_key()?)
101    }
102
103    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
104        let mut big_key = key.to_vec();
105        big_key.extend(&[0, 0, 0, 0]);
106        let value = self.store.read_value_bytes(&big_key).await?;
107        let Some(value) = value else {
108            return Ok(None);
109        };
110        let count = Self::read_count_from_value(&value)?;
111        let mut big_value = value[4..].to_vec();
112        if count == 1 {
113            return Ok(Some(big_value));
114        }
115        let mut big_keys = Vec::new();
116        for i in 1..count {
117            let big_key_segment = Self::get_segment_key(key, i)?;
118            big_keys.push(big_key_segment);
119        }
120        let segments = self.store.read_multi_values_bytes(&big_keys).await?;
121        for segment in segments {
122            match segment {
123                None => {
124                    return Err(ValueSplittingError::MissingSegment);
125                }
126                Some(segment) => {
127                    big_value.extend(segment);
128                }
129            }
130        }
131        Ok(Some(big_value))
132    }
133
134    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
135        let mut big_key = key.to_vec();
136        big_key.extend(&[0, 0, 0, 0]);
137        Ok(self.store.contains_key(&big_key).await?)
138    }
139
140    async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error> {
141        let big_keys = keys
142            .iter()
143            .map(|key| {
144                let mut big_key = key.clone();
145                big_key.extend(&[0, 0, 0, 0]);
146                big_key
147            })
148            .collect::<Vec<_>>();
149        Ok(self.store.contains_keys(&big_keys).await?)
150    }
151
152    async fn read_multi_values_bytes(
153        &self,
154        keys: &[Vec<u8>],
155    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
156        let mut big_keys = Vec::new();
157        for key in keys {
158            let mut big_key = key.clone();
159            big_key.extend(&[0, 0, 0, 0]);
160            big_keys.push(big_key);
161        }
162        let values = self.store.read_multi_values_bytes(&big_keys).await?;
163        let mut big_values = Vec::<Option<Vec<u8>>>::new();
164        let mut keys_add = Vec::new();
165        let mut n_blocks = Vec::new();
166        for (key, value) in keys.iter().zip(values) {
167            match value {
168                None => {
169                    n_blocks.push(0);
170                    big_values.push(None);
171                }
172                Some(value) => {
173                    let count = Self::read_count_from_value(&value)?;
174                    let big_value = value[4..].to_vec();
175                    for i in 1..count {
176                        let big_key_segment = Self::get_segment_key(key, i)?;
177                        keys_add.push(big_key_segment);
178                    }
179                    n_blocks.push(count);
180                    big_values.push(Some(big_value));
181                }
182            }
183        }
184        if !keys_add.is_empty() {
185            let mut segments = self
186                .store
187                .read_multi_values_bytes(&keys_add)
188                .await?
189                .into_iter();
190            for (idx, count) in n_blocks.iter().enumerate() {
191                if count > &1 {
192                    let value = big_values.get_mut(idx).unwrap();
193                    if let Some(ref mut value) = value {
194                        for _ in 1..*count {
195                            let segment = segments.next().unwrap().unwrap();
196                            value.extend(segment);
197                        }
198                    }
199                }
200            }
201        }
202        Ok(big_values)
203    }
204
205    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
206        let mut keys = Vec::new();
207        for big_key in self.store.find_keys_by_prefix(key_prefix).await? {
208            let len = big_key.len();
209            if Self::read_index_from_key(&big_key)? == 0 {
210                let key = big_key[0..len - 4].to_vec();
211                keys.push(key);
212            }
213        }
214        Ok(keys)
215    }
216
217    async fn find_key_values_by_prefix(
218        &self,
219        key_prefix: &[u8],
220    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
221        let small_key_values = self.store.find_key_values_by_prefix(key_prefix).await?;
222        let mut small_kv_iterator = small_key_values.into_iter();
223        let mut key_values = Vec::new();
224        while let Some((mut big_key, value)) = small_kv_iterator.next() {
225            if Self::read_index_from_key(&big_key)? != 0 {
226                continue; // Leftover segment from an earlier value.
227            }
228            big_key.truncate(big_key.len() - 4);
229            let key = big_key;
230            let count = Self::read_count_from_value(&value)?;
231            let mut big_value = value[4..].to_vec();
232            for idx in 1..count {
233                let (big_key, value) = small_kv_iterator
234                    .next()
235                    .ok_or(ValueSplittingError::MissingSegment)?;
236                ensure!(
237                    Self::read_index_from_key(&big_key)? == idx
238                        && big_key.starts_with(&key)
239                        && big_key.len() == key.len() + 4,
240                    ValueSplittingError::MissingSegment
241                );
242                big_value.extend(value);
243            }
244            key_values.push((key, big_value));
245        }
246        Ok(key_values)
247    }
248}
249
250impl<K> WritableKeyValueStore for ValueSplittingStore<K>
251where
252    K: WritableKeyValueStore,
253    K::Error: 'static,
254{
255    const MAX_VALUE_SIZE: usize = usize::MAX;
256
257    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
258        let mut batch_new = Batch::new();
259        for operation in batch.operations {
260            match operation {
261                WriteOperation::Delete { key } => {
262                    let mut big_key = key.to_vec();
263                    big_key.extend(&[0, 0, 0, 0]);
264                    batch_new.delete_key(big_key);
265                }
266                WriteOperation::Put { key, mut value } => {
267                    let big_key = Self::get_segment_key(&key, 0)?;
268                    let mut count: u32 = 1;
269                    let value_ext = if value.len() <= K::MAX_VALUE_SIZE - 4 {
270                        Self::get_initial_count_first_chunk(count, &value)?
271                    } else {
272                        let remainder = value.split_off(K::MAX_VALUE_SIZE - 4);
273                        for value_chunk in remainder.chunks(K::MAX_VALUE_SIZE) {
274                            let big_key_segment = Self::get_segment_key(&key, count)?;
275                            batch_new.put_key_value_bytes(big_key_segment, value_chunk.to_vec());
276                            count += 1;
277                        }
278                        Self::get_initial_count_first_chunk(count, &value)?
279                    };
280                    batch_new.put_key_value_bytes(big_key, value_ext);
281                }
282                WriteOperation::DeletePrefix { key_prefix } => {
283                    batch_new.delete_key_prefix(key_prefix);
284                }
285            }
286        }
287        Ok(self.store.write_batch(batch_new).await?)
288    }
289
290    async fn clear_journal(&self) -> Result<(), Self::Error> {
291        Ok(self.store.clear_journal().await?)
292    }
293}
294
295impl<D> KeyValueDatabase for ValueSplittingDatabase<D>
296where
297    D: KeyValueDatabase,
298    D::Error: 'static,
299{
300    type Config = D::Config;
301
302    type Store = ValueSplittingStore<D::Store>;
303
304    fn get_name() -> String {
305        format!("value splitting {}", D::get_name())
306    }
307
308    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
309        let database = D::connect(config, namespace).await?;
310        Ok(Self { database })
311    }
312
313    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
314        let store = self.database.open_shared(root_key)?;
315        Ok(ValueSplittingStore { store })
316    }
317
318    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
319        let store = self.database.open_exclusive(root_key)?;
320        Ok(ValueSplittingStore { store })
321    }
322
323    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
324        Ok(D::list_all(config).await?)
325    }
326
327    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
328        Ok(self.database.list_root_keys().await?)
329    }
330
331    async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
332        Ok(D::delete_all(config).await?)
333    }
334
335    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
336        Ok(D::exists(config, namespace).await?)
337    }
338
339    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
340        Ok(D::create(config, namespace).await?)
341    }
342
343    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
344        Ok(D::delete(config, namespace).await?)
345    }
346}
347
348#[cfg(with_testing)]
349impl<D> TestKeyValueDatabase for ValueSplittingDatabase<D>
350where
351    D: TestKeyValueDatabase,
352    D::Error: 'static,
353{
354    async fn new_test_config() -> Result<D::Config, Self::Error> {
355        Ok(D::new_test_config().await?)
356    }
357}
358
359impl<D> ValueSplittingStore<D>
360where
361    D: WithError,
362{
363    /// Creates a new store that deals with big values from one that does not.
364    pub fn new(store: D) -> Self {
365        ValueSplittingStore { store }
366    }
367
368    fn get_segment_key(key: &[u8], index: u32) -> Result<Vec<u8>, ValueSplittingError<D::Error>> {
369        let mut big_key_segment = key.to_vec();
370        let mut bytes = bcs::to_bytes(&index)?;
371        bytes.reverse();
372        big_key_segment.extend(bytes);
373        Ok(big_key_segment)
374    }
375
376    fn get_initial_count_first_chunk(
377        count: u32,
378        first_chunk: &[u8],
379    ) -> Result<Vec<u8>, ValueSplittingError<D::Error>> {
380        let mut bytes = bcs::to_bytes(&count)?;
381        bytes.reverse();
382        let mut value_ext = Vec::new();
383        value_ext.extend(bytes);
384        value_ext.extend(first_chunk);
385        Ok(value_ext)
386    }
387
388    fn read_count_from_value(value: &[u8]) -> Result<u32, ValueSplittingError<D::Error>> {
389        if value.len() < 4 {
390            return Err(ValueSplittingError::NoCountAvailable);
391        }
392        let mut bytes = value[0..4].to_vec();
393        bytes.reverse();
394        Ok(bcs::from_bytes::<u32>(&bytes)?)
395    }
396
397    fn read_index_from_key(key: &[u8]) -> Result<u32, ValueSplittingError<D::Error>> {
398        let len = key.len();
399        if len < 4 {
400            return Err(ValueSplittingError::TooShortKey);
401        }
402        let mut bytes = key[len - 4..len].to_vec();
403        bytes.reverse();
404        Ok(bcs::from_bytes::<u32>(&bytes)?)
405    }
406}
407
408/// A memory store for which the values are limited to 100 bytes and can be used for tests.
409#[derive(Clone)]
410#[cfg(with_testing)]
411pub struct LimitedTestMemoryStore {
412    inner: MemoryStore,
413}
414
415#[cfg(with_testing)]
416impl Default for LimitedTestMemoryStore {
417    fn default() -> Self {
418        Self::new()
419    }
420}
421
422#[cfg(with_testing)]
423impl WithError for LimitedTestMemoryStore {
424    type Error = MemoryStoreError;
425}
426
427#[cfg(with_testing)]
428impl ReadableKeyValueStore for LimitedTestMemoryStore {
429    const MAX_KEY_SIZE: usize = usize::MAX;
430
431    fn max_stream_queries(&self) -> usize {
432        self.inner.max_stream_queries()
433    }
434
435    fn root_key(&self) -> Result<Vec<u8>, MemoryStoreError> {
436        self.inner.root_key()
437    }
438
439    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
440        self.inner.read_value_bytes(key).await
441    }
442
443    async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
444        self.inner.contains_key(key).await
445    }
446
447    async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, MemoryStoreError> {
448        self.inner.contains_keys(keys).await
449    }
450
451    async fn read_multi_values_bytes(
452        &self,
453        keys: &[Vec<u8>],
454    ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
455        self.inner.read_multi_values_bytes(keys).await
456    }
457
458    async fn find_keys_by_prefix(
459        &self,
460        key_prefix: &[u8],
461    ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
462        self.inner.find_keys_by_prefix(key_prefix).await
463    }
464
465    async fn find_key_values_by_prefix(
466        &self,
467        key_prefix: &[u8],
468    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, MemoryStoreError> {
469        self.inner.find_key_values_by_prefix(key_prefix).await
470    }
471}
472
473#[cfg(with_testing)]
474impl WritableKeyValueStore for LimitedTestMemoryStore {
475    // We set up the MAX_VALUE_SIZE to the artificially low value of 100
476    // purely for testing purposes.
477    const MAX_VALUE_SIZE: usize = 100;
478
479    async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
480        assert!(
481            batch.check_value_size(Self::MAX_VALUE_SIZE),
482            "The batch size is not adequate for this test"
483        );
484        self.inner.write_batch(batch).await
485    }
486
487    async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
488        self.inner.clear_journal().await
489    }
490}
491
492#[cfg(with_testing)]
493impl LimitedTestMemoryStore {
494    /// Creates a `LimitedTestMemoryStore`
495    pub fn new() -> Self {
496        let inner = MemoryStore::new_for_testing();
497        LimitedTestMemoryStore { inner }
498    }
499}
500
501/// Provides a `LimitedTestMemoryStore<()>` that can be used for tests.
502#[cfg(with_testing)]
503pub fn create_value_splitting_memory_store() -> ValueSplittingStore<LimitedTestMemoryStore> {
504    ValueSplittingStore::new(LimitedTestMemoryStore::new())
505}
506
507#[cfg(test)]
508mod tests {
509    use linera_views::{
510        batch::Batch,
511        store::{ReadableKeyValueStore, WritableKeyValueStore},
512        value_splitting::{LimitedTestMemoryStore, ValueSplittingStore},
513    };
514    use rand::Rng;
515
516    // The key splitting means that when a key is overwritten
517    // some previous segments may still be present.
518    #[tokio::test]
519    #[expect(clippy::assertions_on_constants)]
520    async fn test_value_splitting1_testing_leftovers() {
521        let store = LimitedTestMemoryStore::new();
522        const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
523        assert!(MAX_LEN > 10);
524        let big_store = ValueSplittingStore::new(store.clone());
525        let key = vec![0, 0];
526        // Write a key with a long value
527        let mut batch = Batch::new();
528        let value = Vec::from([0; MAX_LEN + 1]);
529        batch.put_key_value_bytes(key.clone(), value.clone());
530        big_store.write_batch(batch).await.unwrap();
531        let value_read = big_store.read_value_bytes(&key).await.unwrap();
532        assert_eq!(value_read, Some(value));
533        // Write a key with a smaller value
534        let mut batch = Batch::new();
535        let value = Vec::from([0, 1]);
536        batch.put_key_value_bytes(key.clone(), value.clone());
537        big_store.write_batch(batch).await.unwrap();
538        let value_read = big_store.read_value_bytes(&key).await.unwrap();
539        assert_eq!(value_read, Some(value));
540        // Two segments are present even though only one is used
541        let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
542        assert_eq!(keys, vec![vec![0, 0, 0, 0, 0], vec![0, 0, 0, 0, 1]]);
543    }
544
545    #[tokio::test]
546    async fn test_value_splitting2_testing_splitting() {
547        let store = LimitedTestMemoryStore::new();
548        const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
549        let big_store = ValueSplittingStore::new(store.clone());
550        let key = vec![0, 0];
551        // Writing a big value
552        let mut batch = Batch::new();
553        let mut value = Vec::new();
554        let mut rng = crate::random::make_deterministic_rng();
555        for _ in 0..2 * MAX_LEN - 4 {
556            value.push(rng.gen::<u8>());
557        }
558        batch.put_key_value_bytes(key.clone(), value.clone());
559        big_store.write_batch(batch).await.unwrap();
560        let value_read = big_store.read_value_bytes(&key).await.unwrap();
561        assert_eq!(value_read, Some(value.clone()));
562        // Reading the segments and checking
563        let mut value_concat = Vec::<u8>::new();
564        for index in 0..2 {
565            let mut segment_key = key.clone();
566            let mut bytes = bcs::to_bytes(&index).unwrap();
567            bytes.reverse();
568            segment_key.extend(bytes);
569            let value_read = store.read_value_bytes(&segment_key).await.unwrap();
570            let Some(value_read) = value_read else {
571                unreachable!()
572            };
573            if index == 0 {
574                value_concat.extend(&value_read[4..]);
575            } else {
576                value_concat.extend(&value_read);
577            }
578        }
579        assert_eq!(value, value_concat);
580    }
581
582    #[tokio::test]
583    async fn test_value_splitting3_write_and_delete() {
584        let store = LimitedTestMemoryStore::new();
585        const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
586        let big_store = ValueSplittingStore::new(store.clone());
587        let key = vec![0, 0];
588        // writing a big key
589        let mut batch = Batch::new();
590        let mut value = Vec::new();
591        let mut rng = crate::random::make_deterministic_rng();
592        for _ in 0..3 * MAX_LEN - 4 {
593            value.push(rng.gen::<u8>());
594        }
595        batch.put_key_value_bytes(key.clone(), value.clone());
596        big_store.write_batch(batch).await.unwrap();
597        // deleting it
598        let mut batch = Batch::new();
599        batch.delete_key(key.clone());
600        big_store.write_batch(batch).await.unwrap();
601        // reading everything (there are leftover keys)
602        let key_values = big_store.find_key_values_by_prefix(&[0]).await.unwrap();
603        assert_eq!(key_values.len(), 0);
604        // Two segments remain
605        let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
606        assert_eq!(keys, vec![vec![0, 0, 0, 0, 1], vec![0, 0, 0, 0, 2]]);
607    }
608}