linera_views/backends/
value_splitting.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Adds support for large values to a given store by splitting them between several keys.
5
6use linera_base::ensure;
7use thiserror::Error;
8
9use crate::{
10    batch::{Batch, WriteOperation},
11    store::{
12        KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
13        WritableKeyValueStore,
14    },
15};
16#[cfg(with_testing)]
17use crate::{
18    memory::{MemoryStore, MemoryStoreError},
19    store::TestKeyValueDatabase,
20};
21
22/// A key-value database with no size limit for values.
23///
24/// It wraps a key-value store, potentially _with_ a size limit, and automatically
25/// splits up large values into smaller ones. A single logical key-value pair is
26/// stored as multiple smaller key-value pairs in the wrapped store.
27/// See the `README.md` for additional details.
28#[derive(Clone)]
29pub struct ValueSplittingDatabase<D> {
30    /// The underlying database.
31    database: D,
32}
33
34/// A key-value store with no size limit for values.
35#[derive(Clone)]
36pub struct ValueSplittingStore<S> {
37    /// The underlying store.
38    store: S,
39}
40
41/// The composed error type built from the inner error type.
42#[derive(Error, Debug)]
43pub enum ValueSplittingError<E> {
44    /// inner store error
45    #[error(transparent)]
46    InnerStoreError(#[from] E),
47
48    /// The key is of length less than 4, so we cannot extract the first byte
49    #[error("the key is of length less than 4, so we cannot extract the first byte")]
50    TooShortKey,
51
52    /// Value segment is missing from the database
53    #[error("value segment is missing from the database")]
54    MissingSegment,
55
56    /// No count of size `u32` is available in the value
57    #[error("no count of size u32 is available in the value")]
58    NoCountAvailable,
59}
60
61impl<E: KeyValueStoreError> From<bcs::Error> for ValueSplittingError<E> {
62    fn from(error: bcs::Error) -> Self {
63        let error = E::from(error);
64        ValueSplittingError::InnerStoreError(error)
65    }
66}
67
68impl<E: KeyValueStoreError + 'static> KeyValueStoreError for ValueSplittingError<E> {
69    const BACKEND: &'static str = "value splitting";
70}
71
72impl<S> WithError for ValueSplittingDatabase<S>
73where
74    S: WithError,
75    S::Error: 'static,
76{
77    type Error = ValueSplittingError<S::Error>;
78}
79
80impl<D> WithError for ValueSplittingStore<D>
81where
82    D: WithError,
83    D::Error: 'static,
84{
85    type Error = ValueSplittingError<D::Error>;
86}
87
88impl<S> ReadableKeyValueStore for ValueSplittingStore<S>
89where
90    S: ReadableKeyValueStore,
91    S::Error: 'static,
92{
93    const MAX_KEY_SIZE: usize = S::MAX_KEY_SIZE - 4;
94
95    fn max_stream_queries(&self) -> usize {
96        self.store.max_stream_queries()
97    }
98
99    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
100        let mut big_key = key.to_vec();
101        big_key.extend(&[0, 0, 0, 0]);
102        let value = self.store.read_value_bytes(&big_key).await?;
103        let Some(value) = value else {
104            return Ok(None);
105        };
106        let count = Self::read_count_from_value(&value)?;
107        let mut big_value = value[4..].to_vec();
108        if count == 1 {
109            return Ok(Some(big_value));
110        }
111        let mut big_keys = Vec::new();
112        for i in 1..count {
113            let big_key_segment = Self::get_segment_key(key, i)?;
114            big_keys.push(big_key_segment);
115        }
116        let segments = self.store.read_multi_values_bytes(big_keys).await?;
117        for segment in segments {
118            match segment {
119                None => {
120                    return Err(ValueSplittingError::MissingSegment);
121                }
122                Some(segment) => {
123                    big_value.extend(segment);
124                }
125            }
126        }
127        Ok(Some(big_value))
128    }
129
130    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
131        let mut big_key = key.to_vec();
132        big_key.extend(&[0, 0, 0, 0]);
133        Ok(self.store.contains_key(&big_key).await?)
134    }
135
136    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
137        let big_keys = keys
138            .into_iter()
139            .map(|key| {
140                let mut big_key = key.clone();
141                big_key.extend(&[0, 0, 0, 0]);
142                big_key
143            })
144            .collect::<Vec<_>>();
145        Ok(self.store.contains_keys(big_keys).await?)
146    }
147
148    async fn read_multi_values_bytes(
149        &self,
150        keys: Vec<Vec<u8>>,
151    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
152        let mut big_keys = Vec::new();
153        for key in &keys {
154            let mut big_key = key.clone();
155            big_key.extend(&[0, 0, 0, 0]);
156            big_keys.push(big_key);
157        }
158        let values = self.store.read_multi_values_bytes(big_keys).await?;
159        let mut big_values = Vec::<Option<Vec<u8>>>::new();
160        let mut keys_add = Vec::new();
161        let mut n_blocks = Vec::new();
162        for (key, value) in keys.iter().zip(values) {
163            match value {
164                None => {
165                    n_blocks.push(0);
166                    big_values.push(None);
167                }
168                Some(value) => {
169                    let count = Self::read_count_from_value(&value)?;
170                    let big_value = value[4..].to_vec();
171                    for i in 1..count {
172                        let big_key_segment = Self::get_segment_key(key, i)?;
173                        keys_add.push(big_key_segment);
174                    }
175                    n_blocks.push(count);
176                    big_values.push(Some(big_value));
177                }
178            }
179        }
180        if !keys_add.is_empty() {
181            let mut segments = self
182                .store
183                .read_multi_values_bytes(keys_add)
184                .await?
185                .into_iter();
186            for (idx, count) in n_blocks.iter().enumerate() {
187                if count > &1 {
188                    let value = big_values.get_mut(idx).unwrap();
189                    if let Some(ref mut value) = value {
190                        for _ in 1..*count {
191                            let segment = segments.next().unwrap().unwrap();
192                            value.extend(segment);
193                        }
194                    }
195                }
196            }
197        }
198        Ok(big_values)
199    }
200
201    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
202        let mut keys = Vec::new();
203        for big_key in self.store.find_keys_by_prefix(key_prefix).await? {
204            let len = big_key.len();
205            if Self::read_index_from_key(&big_key)? == 0 {
206                let key = big_key[0..len - 4].to_vec();
207                keys.push(key);
208            }
209        }
210        Ok(keys)
211    }
212
213    async fn find_key_values_by_prefix(
214        &self,
215        key_prefix: &[u8],
216    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
217        let small_key_values = self.store.find_key_values_by_prefix(key_prefix).await?;
218        let mut small_kv_iterator = small_key_values.into_iter();
219        let mut key_values = Vec::new();
220        while let Some((mut big_key, value)) = small_kv_iterator.next() {
221            if Self::read_index_from_key(&big_key)? != 0 {
222                continue; // Leftover segment from an earlier value.
223            }
224            big_key.truncate(big_key.len() - 4);
225            let key = big_key;
226            let count = Self::read_count_from_value(&value)?;
227            let mut big_value = value[4..].to_vec();
228            for idx in 1..count {
229                let (big_key, value) = small_kv_iterator
230                    .next()
231                    .ok_or(ValueSplittingError::MissingSegment)?;
232                ensure!(
233                    Self::read_index_from_key(&big_key)? == idx
234                        && big_key.starts_with(&key)
235                        && big_key.len() == key.len() + 4,
236                    ValueSplittingError::MissingSegment
237                );
238                big_value.extend(value);
239            }
240            key_values.push((key, big_value));
241        }
242        Ok(key_values)
243    }
244}
245
246impl<K> WritableKeyValueStore for ValueSplittingStore<K>
247where
248    K: WritableKeyValueStore,
249    K::Error: 'static,
250{
251    const MAX_VALUE_SIZE: usize = usize::MAX;
252
253    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
254        let mut batch_new = Batch::new();
255        for operation in batch.operations {
256            match operation {
257                WriteOperation::Delete { key } => {
258                    let mut big_key = key.to_vec();
259                    big_key.extend(&[0, 0, 0, 0]);
260                    batch_new.delete_key(big_key);
261                }
262                WriteOperation::Put { key, mut value } => {
263                    let big_key = Self::get_segment_key(&key, 0)?;
264                    let mut count: u32 = 1;
265                    let value_ext = if value.len() <= K::MAX_VALUE_SIZE - 4 {
266                        Self::get_initial_count_first_chunk(count, &value)?
267                    } else {
268                        let remainder = value.split_off(K::MAX_VALUE_SIZE - 4);
269                        for value_chunk in remainder.chunks(K::MAX_VALUE_SIZE) {
270                            let big_key_segment = Self::get_segment_key(&key, count)?;
271                            batch_new.put_key_value_bytes(big_key_segment, value_chunk.to_vec());
272                            count += 1;
273                        }
274                        Self::get_initial_count_first_chunk(count, &value)?
275                    };
276                    batch_new.put_key_value_bytes(big_key, value_ext);
277                }
278                WriteOperation::DeletePrefix { key_prefix } => {
279                    batch_new.delete_key_prefix(key_prefix);
280                }
281            }
282        }
283        Ok(self.store.write_batch(batch_new).await?)
284    }
285
286    async fn clear_journal(&self) -> Result<(), Self::Error> {
287        Ok(self.store.clear_journal().await?)
288    }
289}
290
291impl<D> KeyValueDatabase for ValueSplittingDatabase<D>
292where
293    D: KeyValueDatabase,
294    D::Error: 'static,
295{
296    type Config = D::Config;
297
298    type Store = ValueSplittingStore<D::Store>;
299
300    fn get_name() -> String {
301        format!("value splitting {}", D::get_name())
302    }
303
304    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
305        let database = D::connect(config, namespace).await?;
306        Ok(Self { database })
307    }
308
309    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
310        let store = self.database.open_shared(root_key)?;
311        Ok(ValueSplittingStore { store })
312    }
313
314    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
315        let store = self.database.open_exclusive(root_key)?;
316        Ok(ValueSplittingStore { store })
317    }
318
319    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
320        Ok(D::list_all(config).await?)
321    }
322
323    async fn list_root_keys(
324        config: &Self::Config,
325        namespace: &str,
326    ) -> Result<Vec<Vec<u8>>, Self::Error> {
327        Ok(D::list_root_keys(config, namespace).await?)
328    }
329
330    async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
331        Ok(D::delete_all(config).await?)
332    }
333
334    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
335        Ok(D::exists(config, namespace).await?)
336    }
337
338    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
339        Ok(D::create(config, namespace).await?)
340    }
341
342    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
343        Ok(D::delete(config, namespace).await?)
344    }
345}
346
347#[cfg(with_testing)]
348impl<D> TestKeyValueDatabase for ValueSplittingDatabase<D>
349where
350    D: TestKeyValueDatabase,
351    D::Error: 'static,
352{
353    async fn new_test_config() -> Result<D::Config, Self::Error> {
354        Ok(D::new_test_config().await?)
355    }
356}
357
358impl<D> ValueSplittingStore<D>
359where
360    D: WithError,
361{
362    /// Creates a new store that deals with big values from one that does not.
363    pub fn new(store: D) -> Self {
364        ValueSplittingStore { store }
365    }
366
367    fn get_segment_key(key: &[u8], index: u32) -> Result<Vec<u8>, ValueSplittingError<D::Error>> {
368        let mut big_key_segment = key.to_vec();
369        let mut bytes = bcs::to_bytes(&index)?;
370        bytes.reverse();
371        big_key_segment.extend(bytes);
372        Ok(big_key_segment)
373    }
374
375    fn get_initial_count_first_chunk(
376        count: u32,
377        first_chunk: &[u8],
378    ) -> Result<Vec<u8>, ValueSplittingError<D::Error>> {
379        let mut bytes = bcs::to_bytes(&count)?;
380        bytes.reverse();
381        let mut value_ext = Vec::new();
382        value_ext.extend(bytes);
383        value_ext.extend(first_chunk);
384        Ok(value_ext)
385    }
386
387    fn read_count_from_value(value: &[u8]) -> Result<u32, ValueSplittingError<D::Error>> {
388        if value.len() < 4 {
389            return Err(ValueSplittingError::NoCountAvailable);
390        }
391        let mut bytes = value[0..4].to_vec();
392        bytes.reverse();
393        Ok(bcs::from_bytes::<u32>(&bytes)?)
394    }
395
396    fn read_index_from_key(key: &[u8]) -> Result<u32, ValueSplittingError<D::Error>> {
397        let len = key.len();
398        if len < 4 {
399            return Err(ValueSplittingError::TooShortKey);
400        }
401        let mut bytes = key[len - 4..len].to_vec();
402        bytes.reverse();
403        Ok(bcs::from_bytes::<u32>(&bytes)?)
404    }
405}
406
407/// A memory store for which the values are limited to 100 bytes and can be used for tests.
408#[derive(Clone)]
409#[cfg(with_testing)]
410pub struct LimitedTestMemoryStore {
411    inner: MemoryStore,
412}
413
414#[cfg(with_testing)]
415impl Default for LimitedTestMemoryStore {
416    fn default() -> Self {
417        Self::new()
418    }
419}
420
421#[cfg(with_testing)]
422impl WithError for LimitedTestMemoryStore {
423    type Error = MemoryStoreError;
424}
425
426#[cfg(with_testing)]
427impl ReadableKeyValueStore for LimitedTestMemoryStore {
428    const MAX_KEY_SIZE: usize = usize::MAX;
429
430    fn max_stream_queries(&self) -> usize {
431        self.inner.max_stream_queries()
432    }
433
434    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
435        self.inner.read_value_bytes(key).await
436    }
437
438    async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
439        self.inner.contains_key(key).await
440    }
441
442    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, MemoryStoreError> {
443        self.inner.contains_keys(keys).await
444    }
445
446    async fn read_multi_values_bytes(
447        &self,
448        keys: Vec<Vec<u8>>,
449    ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
450        self.inner.read_multi_values_bytes(keys).await
451    }
452
453    async fn find_keys_by_prefix(
454        &self,
455        key_prefix: &[u8],
456    ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
457        self.inner.find_keys_by_prefix(key_prefix).await
458    }
459
460    async fn find_key_values_by_prefix(
461        &self,
462        key_prefix: &[u8],
463    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, MemoryStoreError> {
464        self.inner.find_key_values_by_prefix(key_prefix).await
465    }
466}
467
468#[cfg(with_testing)]
469impl WritableKeyValueStore for LimitedTestMemoryStore {
470    // We set up the MAX_VALUE_SIZE to the artificially low value of 100
471    // purely for testing purposes.
472    const MAX_VALUE_SIZE: usize = 100;
473
474    async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
475        assert!(
476            batch.check_value_size(Self::MAX_VALUE_SIZE),
477            "The batch size is not adequate for this test"
478        );
479        self.inner.write_batch(batch).await
480    }
481
482    async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
483        self.inner.clear_journal().await
484    }
485}
486
487#[cfg(with_testing)]
488impl LimitedTestMemoryStore {
489    /// Creates a `LimitedTestMemoryStore`
490    pub fn new() -> Self {
491        let inner = MemoryStore::new_for_testing();
492        LimitedTestMemoryStore { inner }
493    }
494}
495
496/// Provides a `LimitedTestMemoryStore<()>` that can be used for tests.
497#[cfg(with_testing)]
498pub fn create_value_splitting_memory_store() -> ValueSplittingStore<LimitedTestMemoryStore> {
499    ValueSplittingStore::new(LimitedTestMemoryStore::new())
500}
501
502#[cfg(test)]
503mod tests {
504    use linera_views::{
505        batch::Batch,
506        store::{ReadableKeyValueStore, WritableKeyValueStore},
507        value_splitting::{LimitedTestMemoryStore, ValueSplittingStore},
508    };
509    use rand::Rng;
510
511    // The key splitting means that when a key is overwritten
512    // some previous segments may still be present.
513    #[tokio::test]
514    #[expect(clippy::assertions_on_constants)]
515    async fn test_value_splitting1_testing_leftovers() {
516        let store = LimitedTestMemoryStore::new();
517        const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
518        assert!(MAX_LEN > 10);
519        let big_store = ValueSplittingStore::new(store.clone());
520        let key = vec![0, 0];
521        // Write a key with a long value
522        let mut batch = Batch::new();
523        let value = Vec::from([0; MAX_LEN + 1]);
524        batch.put_key_value_bytes(key.clone(), value.clone());
525        big_store.write_batch(batch).await.unwrap();
526        let value_read = big_store.read_value_bytes(&key).await.unwrap();
527        assert_eq!(value_read, Some(value));
528        // Write a key with a smaller value
529        let mut batch = Batch::new();
530        let value = Vec::from([0, 1]);
531        batch.put_key_value_bytes(key.clone(), value.clone());
532        big_store.write_batch(batch).await.unwrap();
533        let value_read = big_store.read_value_bytes(&key).await.unwrap();
534        assert_eq!(value_read, Some(value));
535        // Two segments are present even though only one is used
536        let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
537        assert_eq!(keys, vec![vec![0, 0, 0, 0, 0], vec![0, 0, 0, 0, 1]]);
538    }
539
540    #[tokio::test]
541    async fn test_value_splitting2_testing_splitting() {
542        let store = LimitedTestMemoryStore::new();
543        const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
544        let big_store = ValueSplittingStore::new(store.clone());
545        let key = vec![0, 0];
546        // Writing a big value
547        let mut batch = Batch::new();
548        let mut value = Vec::new();
549        let mut rng = crate::random::make_deterministic_rng();
550        for _ in 0..2 * MAX_LEN - 4 {
551            value.push(rng.gen::<u8>());
552        }
553        batch.put_key_value_bytes(key.clone(), value.clone());
554        big_store.write_batch(batch).await.unwrap();
555        let value_read = big_store.read_value_bytes(&key).await.unwrap();
556        assert_eq!(value_read, Some(value.clone()));
557        // Reading the segments and checking
558        let mut value_concat = Vec::<u8>::new();
559        for index in 0..2 {
560            let mut segment_key = key.clone();
561            let mut bytes = bcs::to_bytes(&index).unwrap();
562            bytes.reverse();
563            segment_key.extend(bytes);
564            let value_read = store.read_value_bytes(&segment_key).await.unwrap();
565            let Some(value_read) = value_read else {
566                unreachable!()
567            };
568            if index == 0 {
569                value_concat.extend(&value_read[4..]);
570            } else {
571                value_concat.extend(&value_read);
572            }
573        }
574        assert_eq!(value, value_concat);
575    }
576
577    #[tokio::test]
578    async fn test_value_splitting3_write_and_delete() {
579        let store = LimitedTestMemoryStore::new();
580        const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
581        let big_store = ValueSplittingStore::new(store.clone());
582        let key = vec![0, 0];
583        // writing a big key
584        let mut batch = Batch::new();
585        let mut value = Vec::new();
586        let mut rng = crate::random::make_deterministic_rng();
587        for _ in 0..3 * MAX_LEN - 4 {
588            value.push(rng.gen::<u8>());
589        }
590        batch.put_key_value_bytes(key.clone(), value.clone());
591        big_store.write_batch(batch).await.unwrap();
592        // deleting it
593        let mut batch = Batch::new();
594        batch.delete_key(key.clone());
595        big_store.write_batch(batch).await.unwrap();
596        // reading everything (there are leftover keys)
597        let key_values = big_store.find_key_values_by_prefix(&[0]).await.unwrap();
598        assert_eq!(key_values.len(), 0);
599        // Two segments remain
600        let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
601        assert_eq!(keys, vec![vec![0, 0, 0, 0, 1], vec![0, 0, 0, 0, 2]]);
602    }
603}