Skip to main content

linera_views/backends/
value_splitting.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Adds support for large values to a given store by splitting them between several keys.
5
6use linera_base::ensure;
7use thiserror::Error;
8
9use crate::{
10    batch::{Batch, WriteOperation},
11    store::{
12        KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
13        WritableKeyValueStore,
14    },
15};
16#[cfg(with_testing)]
17use crate::{
18    memory::{MemoryStore, MemoryStoreError},
19    store::TestKeyValueDatabase,
20};
21
22/// A key-value database with no size limit for values.
23///
24/// It wraps a key-value store, potentially _with_ a size limit, and automatically
25/// splits up large values into smaller ones. A single logical key-value pair is
26/// stored as multiple smaller key-value pairs in the wrapped store.
27/// See the `README.md` for additional details.
28#[derive(Clone)]
29pub struct ValueSplittingDatabase<D> {
30    /// The underlying database.
31    database: D,
32}
33
34/// A key-value store with no size limit for values.
35#[derive(Clone)]
36pub struct ValueSplittingStore<S> {
37    /// The underlying store.
38    store: S,
39}
40
41/// The composed error type built from the inner error type.
42#[derive(Error, Debug)]
43pub enum ValueSplittingError<E> {
44    /// inner store error
45    #[error(transparent)]
46    InnerStoreError(#[from] E),
47
48    /// The key is of length less than 4, so we cannot extract the first byte
49    #[error("the key is of length less than 4, so we cannot extract the first byte")]
50    TooShortKey,
51
52    /// Value segment is missing from the database
53    #[error("value segment is missing from the database")]
54    MissingSegment,
55
56    /// No count of size `u32` is available in the value
57    #[error("no count of size u32 is available in the value")]
58    NoCountAvailable,
59}
60
61impl<E: KeyValueStoreError> From<bcs::Error> for ValueSplittingError<E> {
62    fn from(error: bcs::Error) -> Self {
63        let error = E::from(error);
64        ValueSplittingError::InnerStoreError(error)
65    }
66}
67
68impl<E: KeyValueStoreError + 'static> KeyValueStoreError for ValueSplittingError<E> {
69    const BACKEND: &'static str = "value splitting";
70
71    fn must_reload_view(&self) -> bool {
72        match self {
73            ValueSplittingError::InnerStoreError(e) => e.must_reload_view(),
74            _ => false,
75        }
76    }
77}
78
79impl<S> WithError for ValueSplittingDatabase<S>
80where
81    S: WithError,
82    S::Error: 'static,
83{
84    type Error = ValueSplittingError<S::Error>;
85}
86
87impl<D> WithError for ValueSplittingStore<D>
88where
89    D: WithError,
90    D::Error: 'static,
91{
92    type Error = ValueSplittingError<D::Error>;
93}
94
95impl<S> ReadableKeyValueStore for ValueSplittingStore<S>
96where
97    S: ReadableKeyValueStore,
98    S::Error: 'static,
99{
100    const MAX_KEY_SIZE: usize = S::MAX_KEY_SIZE - 4;
101
102    fn max_stream_queries(&self) -> usize {
103        self.store.max_stream_queries()
104    }
105
106    fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
107        Ok(self.store.root_key()?)
108    }
109
110    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
111        let mut big_key = key.to_vec();
112        big_key.extend(&[0, 0, 0, 0]);
113        let value = self.store.read_value_bytes(&big_key).await?;
114        let Some(value) = value else {
115            return Ok(None);
116        };
117        let count = Self::read_count_from_value(&value)?;
118        let mut big_value = value[4..].to_vec();
119        if count == 1 {
120            return Ok(Some(big_value));
121        }
122        let mut big_keys = Vec::new();
123        for i in 1..count {
124            let big_key_segment = Self::get_segment_key(key, i)?;
125            big_keys.push(big_key_segment);
126        }
127        let segments = self.store.read_multi_values_bytes(&big_keys).await?;
128        for segment in segments {
129            match segment {
130                None => {
131                    return Err(ValueSplittingError::MissingSegment);
132                }
133                Some(segment) => {
134                    big_value.extend(segment);
135                }
136            }
137        }
138        Ok(Some(big_value))
139    }
140
141    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
142        let mut big_key = key.to_vec();
143        big_key.extend(&[0, 0, 0, 0]);
144        Ok(self.store.contains_key(&big_key).await?)
145    }
146
147    async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error> {
148        let big_keys = keys
149            .iter()
150            .map(|key| {
151                let mut big_key = key.clone();
152                big_key.extend(&[0, 0, 0, 0]);
153                big_key
154            })
155            .collect::<Vec<_>>();
156        Ok(self.store.contains_keys(&big_keys).await?)
157    }
158
159    async fn read_multi_values_bytes(
160        &self,
161        keys: &[Vec<u8>],
162    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
163        let mut big_keys = Vec::new();
164        for key in keys {
165            let mut big_key = key.clone();
166            big_key.extend(&[0, 0, 0, 0]);
167            big_keys.push(big_key);
168        }
169        let values = self.store.read_multi_values_bytes(&big_keys).await?;
170        let mut big_values = Vec::<Option<Vec<u8>>>::new();
171        let mut keys_add = Vec::new();
172        let mut n_blocks = Vec::new();
173        for (key, value) in keys.iter().zip(values) {
174            match value {
175                None => {
176                    n_blocks.push(0);
177                    big_values.push(None);
178                }
179                Some(value) => {
180                    let count = Self::read_count_from_value(&value)?;
181                    let big_value = value[4..].to_vec();
182                    for i in 1..count {
183                        let big_key_segment = Self::get_segment_key(key, i)?;
184                        keys_add.push(big_key_segment);
185                    }
186                    n_blocks.push(count);
187                    big_values.push(Some(big_value));
188                }
189            }
190        }
191        if !keys_add.is_empty() {
192            let mut segments = self
193                .store
194                .read_multi_values_bytes(&keys_add)
195                .await?
196                .into_iter();
197            for (big_value, count) in big_values.iter_mut().zip(&n_blocks) {
198                if let Some(value) = big_value {
199                    for _ in 1..*count {
200                        let segment = segments.next().unwrap().unwrap();
201                        value.extend(segment);
202                    }
203                }
204            }
205        }
206        Ok(big_values)
207    }
208
209    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
210        let mut keys = Vec::new();
211        for big_key in self.store.find_keys_by_prefix(key_prefix).await? {
212            let len = big_key.len();
213            if Self::read_index_from_key(&big_key)? == 0 {
214                let key = big_key[0..len - 4].to_vec();
215                keys.push(key);
216            }
217        }
218        Ok(keys)
219    }
220
221    async fn find_key_values_by_prefix(
222        &self,
223        key_prefix: &[u8],
224    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
225        let small_key_values = self.store.find_key_values_by_prefix(key_prefix).await?;
226        let mut small_kv_iterator = small_key_values.into_iter();
227        let mut key_values = Vec::new();
228        while let Some((mut big_key, value)) = small_kv_iterator.next() {
229            if Self::read_index_from_key(&big_key)? != 0 {
230                continue; // Leftover segment from an earlier value.
231            }
232            big_key.truncate(big_key.len() - 4);
233            let key = big_key;
234            let count = Self::read_count_from_value(&value)?;
235            let mut big_value = value[4..].to_vec();
236            for idx in 1..count {
237                let (big_key, value) = small_kv_iterator
238                    .next()
239                    .ok_or(ValueSplittingError::MissingSegment)?;
240                ensure!(
241                    Self::read_index_from_key(&big_key)? == idx
242                        && big_key.starts_with(&key)
243                        && big_key.len() == key.len() + 4,
244                    ValueSplittingError::MissingSegment
245                );
246                big_value.extend(value);
247            }
248            key_values.push((key, big_value));
249        }
250        Ok(key_values)
251    }
252}
253
254impl<K> WritableKeyValueStore for ValueSplittingStore<K>
255where
256    K: WritableKeyValueStore,
257    K::Error: 'static,
258{
259    const MAX_VALUE_SIZE: usize = usize::MAX;
260
261    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
262        let mut batch_new = Batch::new();
263        for operation in batch.operations {
264            match operation {
265                WriteOperation::Delete { key } => {
266                    let mut big_key = key.to_vec();
267                    big_key.extend(&[0, 0, 0, 0]);
268                    batch_new.delete_key(big_key);
269                }
270                WriteOperation::Put { key, mut value } => {
271                    let big_key = Self::get_segment_key(&key, 0)?;
272                    let mut count: u32 = 1;
273                    let value_ext = if value.len() <= K::MAX_VALUE_SIZE - 4 {
274                        Self::get_initial_count_first_chunk(count, &value)?
275                    } else {
276                        let remainder = value.split_off(K::MAX_VALUE_SIZE - 4);
277                        for value_chunk in remainder.chunks(K::MAX_VALUE_SIZE) {
278                            let big_key_segment = Self::get_segment_key(&key, count)?;
279                            batch_new.put_key_value_bytes(big_key_segment, value_chunk.to_vec());
280                            count += 1;
281                        }
282                        Self::get_initial_count_first_chunk(count, &value)?
283                    };
284                    batch_new.put_key_value_bytes(big_key, value_ext);
285                }
286                WriteOperation::DeletePrefix { key_prefix } => {
287                    batch_new.delete_key_prefix(key_prefix);
288                }
289            }
290        }
291        Ok(self.store.write_batch(batch_new).await?)
292    }
293
294    async fn clear_journal(&self) -> Result<(), Self::Error> {
295        Ok(self.store.clear_journal().await?)
296    }
297}
298
299impl<D> KeyValueDatabase for ValueSplittingDatabase<D>
300where
301    D: KeyValueDatabase,
302    D::Error: 'static,
303{
304    type Config = D::Config;
305
306    type Store = ValueSplittingStore<D::Store>;
307
308    fn get_name() -> String {
309        format!("value splitting {}", D::get_name())
310    }
311
312    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
313        let database = D::connect(config, namespace).await?;
314        Ok(Self { database })
315    }
316
317    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
318        let store = self.database.open_shared(root_key)?;
319        Ok(ValueSplittingStore { store })
320    }
321
322    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
323        let store = self.database.open_exclusive(root_key)?;
324        Ok(ValueSplittingStore { store })
325    }
326
327    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
328        Ok(D::list_all(config).await?)
329    }
330
331    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
332        Ok(self.database.list_root_keys().await?)
333    }
334
335    async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
336        Ok(D::delete_all(config).await?)
337    }
338
339    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
340        Ok(D::exists(config, namespace).await?)
341    }
342
343    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
344        Ok(D::create(config, namespace).await?)
345    }
346
347    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
348        Ok(D::delete(config, namespace).await?)
349    }
350}
351
352#[cfg(with_testing)]
353impl<D> TestKeyValueDatabase for ValueSplittingDatabase<D>
354where
355    D: TestKeyValueDatabase,
356    D::Error: 'static,
357{
358    async fn new_test_config() -> Result<D::Config, Self::Error> {
359        Ok(D::new_test_config().await?)
360    }
361}
362
363#[cfg(with_testing)]
364impl<D: crate::backends::DatabaseBackup> crate::backends::DatabaseBackup
365    for ValueSplittingDatabase<D>
366{
367    fn backup_to(&self, dir: &std::path::Path) -> anyhow::Result<()> {
368        self.database.backup_to(dir)
369    }
370}
371
372impl<D> ValueSplittingStore<D>
373where
374    D: WithError,
375{
376    /// Creates a new store that deals with big values from one that does not.
377    pub fn new(store: D) -> Self {
378        ValueSplittingStore { store }
379    }
380
381    fn get_segment_key(key: &[u8], index: u32) -> Result<Vec<u8>, ValueSplittingError<D::Error>> {
382        let mut big_key_segment = key.to_vec();
383        let mut bytes = bcs::to_bytes(&index)?;
384        bytes.reverse();
385        big_key_segment.extend(bytes);
386        Ok(big_key_segment)
387    }
388
389    fn get_initial_count_first_chunk(
390        count: u32,
391        first_chunk: &[u8],
392    ) -> Result<Vec<u8>, ValueSplittingError<D::Error>> {
393        let mut bytes = bcs::to_bytes(&count)?;
394        bytes.reverse();
395        let mut value_ext = Vec::new();
396        value_ext.extend(bytes);
397        value_ext.extend(first_chunk);
398        Ok(value_ext)
399    }
400
401    fn read_count_from_value(value: &[u8]) -> Result<u32, ValueSplittingError<D::Error>> {
402        if value.len() < 4 {
403            return Err(ValueSplittingError::NoCountAvailable);
404        }
405        let mut bytes = value[0..4].to_vec();
406        bytes.reverse();
407        Ok(bcs::from_bytes::<u32>(&bytes)?)
408    }
409
410    fn read_index_from_key(key: &[u8]) -> Result<u32, ValueSplittingError<D::Error>> {
411        let len = key.len();
412        if len < 4 {
413            return Err(ValueSplittingError::TooShortKey);
414        }
415        let mut bytes = key[len - 4..len].to_vec();
416        bytes.reverse();
417        Ok(bcs::from_bytes::<u32>(&bytes)?)
418    }
419}
420
421/// A memory store for which the values are limited to 100 bytes and can be used for tests.
422#[derive(Clone)]
423#[cfg(with_testing)]
424pub struct LimitedTestMemoryStore {
425    inner: MemoryStore,
426}
427
428#[cfg(with_testing)]
429impl Default for LimitedTestMemoryStore {
430    fn default() -> Self {
431        Self::new()
432    }
433}
434
435#[cfg(with_testing)]
436impl WithError for LimitedTestMemoryStore {
437    type Error = MemoryStoreError;
438}
439
440#[cfg(with_testing)]
441impl ReadableKeyValueStore for LimitedTestMemoryStore {
442    const MAX_KEY_SIZE: usize = usize::MAX;
443
444    fn max_stream_queries(&self) -> usize {
445        self.inner.max_stream_queries()
446    }
447
448    fn root_key(&self) -> Result<Vec<u8>, MemoryStoreError> {
449        self.inner.root_key()
450    }
451
452    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
453        self.inner.read_value_bytes(key).await
454    }
455
456    async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
457        self.inner.contains_key(key).await
458    }
459
460    async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, MemoryStoreError> {
461        self.inner.contains_keys(keys).await
462    }
463
464    async fn read_multi_values_bytes(
465        &self,
466        keys: &[Vec<u8>],
467    ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
468        self.inner.read_multi_values_bytes(keys).await
469    }
470
471    async fn find_keys_by_prefix(
472        &self,
473        key_prefix: &[u8],
474    ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
475        self.inner.find_keys_by_prefix(key_prefix).await
476    }
477
478    async fn find_key_values_by_prefix(
479        &self,
480        key_prefix: &[u8],
481    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, MemoryStoreError> {
482        self.inner.find_key_values_by_prefix(key_prefix).await
483    }
484}
485
486#[cfg(with_testing)]
487impl WritableKeyValueStore for LimitedTestMemoryStore {
488    // We set up the MAX_VALUE_SIZE to the artificially low value of 100
489    // purely for testing purposes.
490    const MAX_VALUE_SIZE: usize = 100;
491
492    async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
493        assert!(
494            batch.check_value_size(Self::MAX_VALUE_SIZE),
495            "The batch size is not adequate for this test"
496        );
497        self.inner.write_batch(batch).await
498    }
499
500    async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
501        self.inner.clear_journal().await
502    }
503}
504
505#[cfg(with_testing)]
506impl LimitedTestMemoryStore {
507    /// Creates a `LimitedTestMemoryStore`
508    pub fn new() -> Self {
509        let inner = MemoryStore::new_for_testing();
510        LimitedTestMemoryStore { inner }
511    }
512}
513
514/// Provides a `LimitedTestMemoryStore<()>` that can be used for tests.
515#[cfg(with_testing)]
516pub fn create_value_splitting_memory_store() -> ValueSplittingStore<LimitedTestMemoryStore> {
517    ValueSplittingStore::new(LimitedTestMemoryStore::new())
518}
519
520#[cfg(test)]
521mod tests {
522    use linera_views::{
523        batch::Batch,
524        store::{ReadableKeyValueStore, WritableKeyValueStore},
525        value_splitting::{LimitedTestMemoryStore, ValueSplittingStore},
526    };
527    use rand::Rng;
528
529    // The key splitting means that when a key is overwritten
530    // some previous segments may still be present.
531    #[tokio::test]
532    async fn test_value_splitting1_testing_leftovers() {
533        let store = LimitedTestMemoryStore::new();
534        const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
535        const _: () = assert!(MAX_LEN > 10);
536        let big_store = ValueSplittingStore::new(store.clone());
537        let key = vec![0, 0];
538        // Write a key with a long value
539        let mut batch = Batch::new();
540        let value = Vec::from([0; MAX_LEN + 1]);
541        batch.put_key_value_bytes(key.clone(), value.clone());
542        big_store.write_batch(batch).await.unwrap();
543        let value_read = big_store.read_value_bytes(&key).await.unwrap();
544        assert_eq!(value_read, Some(value));
545        // Write a key with a smaller value
546        let mut batch = Batch::new();
547        let value = Vec::from([0, 1]);
548        batch.put_key_value_bytes(key.clone(), value.clone());
549        big_store.write_batch(batch).await.unwrap();
550        let value_read = big_store.read_value_bytes(&key).await.unwrap();
551        assert_eq!(value_read, Some(value));
552        // Two segments are present even though only one is used
553        let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
554        assert_eq!(keys, vec![vec![0, 0, 0, 0, 0], vec![0, 0, 0, 0, 1]]);
555    }
556
557    #[tokio::test]
558    async fn test_value_splitting2_testing_splitting() {
559        let store = LimitedTestMemoryStore::new();
560        const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
561        let big_store = ValueSplittingStore::new(store.clone());
562        let key = vec![0, 0];
563        // Writing a big value
564        let mut batch = Batch::new();
565        let mut value = Vec::new();
566        let mut rng = crate::random::make_deterministic_rng();
567        for _ in 0..2 * MAX_LEN - 4 {
568            value.push(rng.gen::<u8>());
569        }
570        batch.put_key_value_bytes(key.clone(), value.clone());
571        big_store.write_batch(batch).await.unwrap();
572        let value_read = big_store.read_value_bytes(&key).await.unwrap();
573        assert_eq!(value_read, Some(value.clone()));
574        // Reading the segments and checking
575        let mut value_concat = Vec::<u8>::new();
576        for index in 0..2 {
577            let mut segment_key = key.clone();
578            let mut bytes = bcs::to_bytes(&index).unwrap();
579            bytes.reverse();
580            segment_key.extend(bytes);
581            let value_read = store.read_value_bytes(&segment_key).await.unwrap();
582            let Some(value_read) = value_read else {
583                unreachable!(
584                    "value_splitting test: segment key not found in underlying store right after a multi-segment write"
585                )
586            };
587            if index == 0 {
588                value_concat.extend(&value_read[4..]);
589            } else {
590                value_concat.extend(&value_read);
591            }
592        }
593        assert_eq!(value, value_concat);
594    }
595
596    #[tokio::test]
597    async fn test_value_splitting3_write_and_delete() {
598        let store = LimitedTestMemoryStore::new();
599        const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
600        let big_store = ValueSplittingStore::new(store.clone());
601        let key = vec![0, 0];
602        // writing a big key
603        let mut batch = Batch::new();
604        let mut value = Vec::new();
605        let mut rng = crate::random::make_deterministic_rng();
606        for _ in 0..3 * MAX_LEN - 4 {
607            value.push(rng.gen::<u8>());
608        }
609        batch.put_key_value_bytes(key.clone(), value.clone());
610        big_store.write_batch(batch).await.unwrap();
611        // deleting it
612        let mut batch = Batch::new();
613        batch.delete_key(key.clone());
614        big_store.write_batch(batch).await.unwrap();
615        // reading everything (there are leftover keys)
616        let key_values = big_store.find_key_values_by_prefix(&[0]).await.unwrap();
617        assert_eq!(key_values.len(), 0);
618        // Two segments remain
619        let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
620        assert_eq!(keys, vec![vec![0, 0, 0, 0, 1], vec![0, 0, 0, 0, 2]]);
621    }
622}