linera_views/backends/
value_splitting.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Adds support for large values to a given store by splitting them between several keys.
5
6use linera_base::ensure;
7use thiserror::Error;
8
9use crate::{
10    batch::{Batch, WriteOperation},
11    store::{
12        KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
13        WritableKeyValueStore,
14    },
15};
16#[cfg(with_testing)]
17use crate::{
18    memory::{MemoryStore, MemoryStoreError},
19    store::TestKeyValueDatabase,
20};
21
22/// A key-value database with no size limit for values.
23///
24/// It wraps a key-value store, potentially _with_ a size limit, and automatically
25/// splits up large values into smaller ones. A single logical key-value pair is
26/// stored as multiple smaller key-value pairs in the wrapped store.
27/// See the `README.md` for additional details.
28#[derive(Clone)]
29pub struct ValueSplittingDatabase<D> {
30    /// The underlying database.
31    database: D,
32}
33
34/// A key-value store with no size limit for values.
35#[derive(Clone)]
36pub struct ValueSplittingStore<S> {
37    /// The underlying store.
38    store: S,
39}
40
41/// The composed error type built from the inner error type.
42#[derive(Error, Debug)]
43pub enum ValueSplittingError<E> {
44    /// inner store error
45    #[error(transparent)]
46    InnerStoreError(#[from] E),
47
48    /// The key is of length less than 4, so we cannot extract the first byte
49    #[error("the key is of length less than 4, so we cannot extract the first byte")]
50    TooShortKey,
51
52    /// Value segment is missing from the database
53    #[error("value segment is missing from the database")]
54    MissingSegment,
55
56    /// No count of size `u32` is available in the value
57    #[error("no count of size u32 is available in the value")]
58    NoCountAvailable,
59}
60
61impl<E: KeyValueStoreError> From<bcs::Error> for ValueSplittingError<E> {
62    fn from(error: bcs::Error) -> Self {
63        let error = E::from(error);
64        ValueSplittingError::InnerStoreError(error)
65    }
66}
67
68impl<E: KeyValueStoreError + 'static> KeyValueStoreError for ValueSplittingError<E> {
69    const BACKEND: &'static str = "value splitting";
70
71    fn must_reload_view(&self) -> bool {
72        match self {
73            ValueSplittingError::InnerStoreError(e) => e.must_reload_view(),
74            _ => false,
75        }
76    }
77}
78
79impl<S> WithError for ValueSplittingDatabase<S>
80where
81    S: WithError,
82    S::Error: 'static,
83{
84    type Error = ValueSplittingError<S::Error>;
85}
86
87impl<D> WithError for ValueSplittingStore<D>
88where
89    D: WithError,
90    D::Error: 'static,
91{
92    type Error = ValueSplittingError<D::Error>;
93}
94
95impl<S> ReadableKeyValueStore for ValueSplittingStore<S>
96where
97    S: ReadableKeyValueStore,
98    S::Error: 'static,
99{
100    const MAX_KEY_SIZE: usize = S::MAX_KEY_SIZE - 4;
101
102    fn max_stream_queries(&self) -> usize {
103        self.store.max_stream_queries()
104    }
105
106    fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
107        Ok(self.store.root_key()?)
108    }
109
110    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
111        let mut big_key = key.to_vec();
112        big_key.extend(&[0, 0, 0, 0]);
113        let value = self.store.read_value_bytes(&big_key).await?;
114        let Some(value) = value else {
115            return Ok(None);
116        };
117        let count = Self::read_count_from_value(&value)?;
118        let mut big_value = value[4..].to_vec();
119        if count == 1 {
120            return Ok(Some(big_value));
121        }
122        let mut big_keys = Vec::new();
123        for i in 1..count {
124            let big_key_segment = Self::get_segment_key(key, i)?;
125            big_keys.push(big_key_segment);
126        }
127        let segments = self.store.read_multi_values_bytes(&big_keys).await?;
128        for segment in segments {
129            match segment {
130                None => {
131                    return Err(ValueSplittingError::MissingSegment);
132                }
133                Some(segment) => {
134                    big_value.extend(segment);
135                }
136            }
137        }
138        Ok(Some(big_value))
139    }
140
141    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
142        let mut big_key = key.to_vec();
143        big_key.extend(&[0, 0, 0, 0]);
144        Ok(self.store.contains_key(&big_key).await?)
145    }
146
147    async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error> {
148        let big_keys = keys
149            .iter()
150            .map(|key| {
151                let mut big_key = key.clone();
152                big_key.extend(&[0, 0, 0, 0]);
153                big_key
154            })
155            .collect::<Vec<_>>();
156        Ok(self.store.contains_keys(&big_keys).await?)
157    }
158
159    async fn read_multi_values_bytes(
160        &self,
161        keys: &[Vec<u8>],
162    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
163        let mut big_keys = Vec::new();
164        for key in keys {
165            let mut big_key = key.clone();
166            big_key.extend(&[0, 0, 0, 0]);
167            big_keys.push(big_key);
168        }
169        let values = self.store.read_multi_values_bytes(&big_keys).await?;
170        let mut big_values = Vec::<Option<Vec<u8>>>::new();
171        let mut keys_add = Vec::new();
172        let mut n_blocks = Vec::new();
173        for (key, value) in keys.iter().zip(values) {
174            match value {
175                None => {
176                    n_blocks.push(0);
177                    big_values.push(None);
178                }
179                Some(value) => {
180                    let count = Self::read_count_from_value(&value)?;
181                    let big_value = value[4..].to_vec();
182                    for i in 1..count {
183                        let big_key_segment = Self::get_segment_key(key, i)?;
184                        keys_add.push(big_key_segment);
185                    }
186                    n_blocks.push(count);
187                    big_values.push(Some(big_value));
188                }
189            }
190        }
191        if !keys_add.is_empty() {
192            let mut segments = self
193                .store
194                .read_multi_values_bytes(&keys_add)
195                .await?
196                .into_iter();
197            for (idx, count) in n_blocks.iter().enumerate() {
198                if count > &1 {
199                    let value = big_values.get_mut(idx).unwrap();
200                    if let Some(ref mut value) = value {
201                        for _ in 1..*count {
202                            let segment = segments.next().unwrap().unwrap();
203                            value.extend(segment);
204                        }
205                    }
206                }
207            }
208        }
209        Ok(big_values)
210    }
211
212    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
213        let mut keys = Vec::new();
214        for big_key in self.store.find_keys_by_prefix(key_prefix).await? {
215            let len = big_key.len();
216            if Self::read_index_from_key(&big_key)? == 0 {
217                let key = big_key[0..len - 4].to_vec();
218                keys.push(key);
219            }
220        }
221        Ok(keys)
222    }
223
224    async fn find_key_values_by_prefix(
225        &self,
226        key_prefix: &[u8],
227    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
228        let small_key_values = self.store.find_key_values_by_prefix(key_prefix).await?;
229        let mut small_kv_iterator = small_key_values.into_iter();
230        let mut key_values = Vec::new();
231        while let Some((mut big_key, value)) = small_kv_iterator.next() {
232            if Self::read_index_from_key(&big_key)? != 0 {
233                continue; // Leftover segment from an earlier value.
234            }
235            big_key.truncate(big_key.len() - 4);
236            let key = big_key;
237            let count = Self::read_count_from_value(&value)?;
238            let mut big_value = value[4..].to_vec();
239            for idx in 1..count {
240                let (big_key, value) = small_kv_iterator
241                    .next()
242                    .ok_or(ValueSplittingError::MissingSegment)?;
243                ensure!(
244                    Self::read_index_from_key(&big_key)? == idx
245                        && big_key.starts_with(&key)
246                        && big_key.len() == key.len() + 4,
247                    ValueSplittingError::MissingSegment
248                );
249                big_value.extend(value);
250            }
251            key_values.push((key, big_value));
252        }
253        Ok(key_values)
254    }
255}
256
257impl<K> WritableKeyValueStore for ValueSplittingStore<K>
258where
259    K: WritableKeyValueStore,
260    K::Error: 'static,
261{
262    const MAX_VALUE_SIZE: usize = usize::MAX;
263
264    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
265        let mut batch_new = Batch::new();
266        for operation in batch.operations {
267            match operation {
268                WriteOperation::Delete { key } => {
269                    let mut big_key = key.to_vec();
270                    big_key.extend(&[0, 0, 0, 0]);
271                    batch_new.delete_key(big_key);
272                }
273                WriteOperation::Put { key, mut value } => {
274                    let big_key = Self::get_segment_key(&key, 0)?;
275                    let mut count: u32 = 1;
276                    let value_ext = if value.len() <= K::MAX_VALUE_SIZE - 4 {
277                        Self::get_initial_count_first_chunk(count, &value)?
278                    } else {
279                        let remainder = value.split_off(K::MAX_VALUE_SIZE - 4);
280                        for value_chunk in remainder.chunks(K::MAX_VALUE_SIZE) {
281                            let big_key_segment = Self::get_segment_key(&key, count)?;
282                            batch_new.put_key_value_bytes(big_key_segment, value_chunk.to_vec());
283                            count += 1;
284                        }
285                        Self::get_initial_count_first_chunk(count, &value)?
286                    };
287                    batch_new.put_key_value_bytes(big_key, value_ext);
288                }
289                WriteOperation::DeletePrefix { key_prefix } => {
290                    batch_new.delete_key_prefix(key_prefix);
291                }
292            }
293        }
294        Ok(self.store.write_batch(batch_new).await?)
295    }
296
297    async fn clear_journal(&self) -> Result<(), Self::Error> {
298        Ok(self.store.clear_journal().await?)
299    }
300}
301
302impl<D> KeyValueDatabase for ValueSplittingDatabase<D>
303where
304    D: KeyValueDatabase,
305    D::Error: 'static,
306{
307    type Config = D::Config;
308
309    type Store = ValueSplittingStore<D::Store>;
310
311    fn get_name() -> String {
312        format!("value splitting {}", D::get_name())
313    }
314
315    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
316        let database = D::connect(config, namespace).await?;
317        Ok(Self { database })
318    }
319
320    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
321        let store = self.database.open_shared(root_key)?;
322        Ok(ValueSplittingStore { store })
323    }
324
325    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
326        let store = self.database.open_exclusive(root_key)?;
327        Ok(ValueSplittingStore { store })
328    }
329
330    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
331        Ok(D::list_all(config).await?)
332    }
333
334    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
335        Ok(self.database.list_root_keys().await?)
336    }
337
338    async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
339        Ok(D::delete_all(config).await?)
340    }
341
342    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
343        Ok(D::exists(config, namespace).await?)
344    }
345
346    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
347        Ok(D::create(config, namespace).await?)
348    }
349
350    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
351        Ok(D::delete(config, namespace).await?)
352    }
353}
354
355#[cfg(with_testing)]
356impl<D> TestKeyValueDatabase for ValueSplittingDatabase<D>
357where
358    D: TestKeyValueDatabase,
359    D::Error: 'static,
360{
361    async fn new_test_config() -> Result<D::Config, Self::Error> {
362        Ok(D::new_test_config().await?)
363    }
364}
365
366impl<D> ValueSplittingStore<D>
367where
368    D: WithError,
369{
370    /// Creates a new store that deals with big values from one that does not.
371    pub fn new(store: D) -> Self {
372        ValueSplittingStore { store }
373    }
374
375    fn get_segment_key(key: &[u8], index: u32) -> Result<Vec<u8>, ValueSplittingError<D::Error>> {
376        let mut big_key_segment = key.to_vec();
377        let mut bytes = bcs::to_bytes(&index)?;
378        bytes.reverse();
379        big_key_segment.extend(bytes);
380        Ok(big_key_segment)
381    }
382
383    fn get_initial_count_first_chunk(
384        count: u32,
385        first_chunk: &[u8],
386    ) -> Result<Vec<u8>, ValueSplittingError<D::Error>> {
387        let mut bytes = bcs::to_bytes(&count)?;
388        bytes.reverse();
389        let mut value_ext = Vec::new();
390        value_ext.extend(bytes);
391        value_ext.extend(first_chunk);
392        Ok(value_ext)
393    }
394
395    fn read_count_from_value(value: &[u8]) -> Result<u32, ValueSplittingError<D::Error>> {
396        if value.len() < 4 {
397            return Err(ValueSplittingError::NoCountAvailable);
398        }
399        let mut bytes = value[0..4].to_vec();
400        bytes.reverse();
401        Ok(bcs::from_bytes::<u32>(&bytes)?)
402    }
403
404    fn read_index_from_key(key: &[u8]) -> Result<u32, ValueSplittingError<D::Error>> {
405        let len = key.len();
406        if len < 4 {
407            return Err(ValueSplittingError::TooShortKey);
408        }
409        let mut bytes = key[len - 4..len].to_vec();
410        bytes.reverse();
411        Ok(bcs::from_bytes::<u32>(&bytes)?)
412    }
413}
414
415/// A memory store for which the values are limited to 100 bytes and can be used for tests.
416#[derive(Clone)]
417#[cfg(with_testing)]
418pub struct LimitedTestMemoryStore {
419    inner: MemoryStore,
420}
421
422#[cfg(with_testing)]
423impl Default for LimitedTestMemoryStore {
424    fn default() -> Self {
425        Self::new()
426    }
427}
428
429#[cfg(with_testing)]
430impl WithError for LimitedTestMemoryStore {
431    type Error = MemoryStoreError;
432}
433
434#[cfg(with_testing)]
435impl ReadableKeyValueStore for LimitedTestMemoryStore {
436    const MAX_KEY_SIZE: usize = usize::MAX;
437
438    fn max_stream_queries(&self) -> usize {
439        self.inner.max_stream_queries()
440    }
441
442    fn root_key(&self) -> Result<Vec<u8>, MemoryStoreError> {
443        self.inner.root_key()
444    }
445
446    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
447        self.inner.read_value_bytes(key).await
448    }
449
450    async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
451        self.inner.contains_key(key).await
452    }
453
454    async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, MemoryStoreError> {
455        self.inner.contains_keys(keys).await
456    }
457
458    async fn read_multi_values_bytes(
459        &self,
460        keys: &[Vec<u8>],
461    ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
462        self.inner.read_multi_values_bytes(keys).await
463    }
464
465    async fn find_keys_by_prefix(
466        &self,
467        key_prefix: &[u8],
468    ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
469        self.inner.find_keys_by_prefix(key_prefix).await
470    }
471
472    async fn find_key_values_by_prefix(
473        &self,
474        key_prefix: &[u8],
475    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, MemoryStoreError> {
476        self.inner.find_key_values_by_prefix(key_prefix).await
477    }
478}
479
480#[cfg(with_testing)]
481impl WritableKeyValueStore for LimitedTestMemoryStore {
482    // We set up the MAX_VALUE_SIZE to the artificially low value of 100
483    // purely for testing purposes.
484    const MAX_VALUE_SIZE: usize = 100;
485
486    async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
487        assert!(
488            batch.check_value_size(Self::MAX_VALUE_SIZE),
489            "The batch size is not adequate for this test"
490        );
491        self.inner.write_batch(batch).await
492    }
493
494    async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
495        self.inner.clear_journal().await
496    }
497}
498
499#[cfg(with_testing)]
500impl LimitedTestMemoryStore {
501    /// Creates a `LimitedTestMemoryStore`
502    pub fn new() -> Self {
503        let inner = MemoryStore::new_for_testing();
504        LimitedTestMemoryStore { inner }
505    }
506}
507
508/// Provides a `LimitedTestMemoryStore<()>` that can be used for tests.
509#[cfg(with_testing)]
510pub fn create_value_splitting_memory_store() -> ValueSplittingStore<LimitedTestMemoryStore> {
511    ValueSplittingStore::new(LimitedTestMemoryStore::new())
512}
513
514#[cfg(test)]
515mod tests {
516    use linera_views::{
517        batch::Batch,
518        store::{ReadableKeyValueStore, WritableKeyValueStore},
519        value_splitting::{LimitedTestMemoryStore, ValueSplittingStore},
520    };
521    use rand::Rng;
522
523    // The key splitting means that when a key is overwritten
524    // some previous segments may still be present.
525    #[tokio::test]
526    async fn test_value_splitting1_testing_leftovers() {
527        let store = LimitedTestMemoryStore::new();
528        const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
529        const _: () = assert!(MAX_LEN > 10);
530        let big_store = ValueSplittingStore::new(store.clone());
531        let key = vec![0, 0];
532        // Write a key with a long value
533        let mut batch = Batch::new();
534        let value = Vec::from([0; MAX_LEN + 1]);
535        batch.put_key_value_bytes(key.clone(), value.clone());
536        big_store.write_batch(batch).await.unwrap();
537        let value_read = big_store.read_value_bytes(&key).await.unwrap();
538        assert_eq!(value_read, Some(value));
539        // Write a key with a smaller value
540        let mut batch = Batch::new();
541        let value = Vec::from([0, 1]);
542        batch.put_key_value_bytes(key.clone(), value.clone());
543        big_store.write_batch(batch).await.unwrap();
544        let value_read = big_store.read_value_bytes(&key).await.unwrap();
545        assert_eq!(value_read, Some(value));
546        // Two segments are present even though only one is used
547        let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
548        assert_eq!(keys, vec![vec![0, 0, 0, 0, 0], vec![0, 0, 0, 0, 1]]);
549    }
550
551    #[tokio::test]
552    async fn test_value_splitting2_testing_splitting() {
553        let store = LimitedTestMemoryStore::new();
554        const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
555        let big_store = ValueSplittingStore::new(store.clone());
556        let key = vec![0, 0];
557        // Writing a big value
558        let mut batch = Batch::new();
559        let mut value = Vec::new();
560        let mut rng = crate::random::make_deterministic_rng();
561        for _ in 0..2 * MAX_LEN - 4 {
562            value.push(rng.gen::<u8>());
563        }
564        batch.put_key_value_bytes(key.clone(), value.clone());
565        big_store.write_batch(batch).await.unwrap();
566        let value_read = big_store.read_value_bytes(&key).await.unwrap();
567        assert_eq!(value_read, Some(value.clone()));
568        // Reading the segments and checking
569        let mut value_concat = Vec::<u8>::new();
570        for index in 0..2 {
571            let mut segment_key = key.clone();
572            let mut bytes = bcs::to_bytes(&index).unwrap();
573            bytes.reverse();
574            segment_key.extend(bytes);
575            let value_read = store.read_value_bytes(&segment_key).await.unwrap();
576            let Some(value_read) = value_read else {
577                unreachable!(
578                    "value_splitting test: segment key not found in underlying store right after a multi-segment write"
579                )
580            };
581            if index == 0 {
582                value_concat.extend(&value_read[4..]);
583            } else {
584                value_concat.extend(&value_read);
585            }
586        }
587        assert_eq!(value, value_concat);
588    }
589
590    #[tokio::test]
591    async fn test_value_splitting3_write_and_delete() {
592        let store = LimitedTestMemoryStore::new();
593        const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
594        let big_store = ValueSplittingStore::new(store.clone());
595        let key = vec![0, 0];
596        // writing a big key
597        let mut batch = Batch::new();
598        let mut value = Vec::new();
599        let mut rng = crate::random::make_deterministic_rng();
600        for _ in 0..3 * MAX_LEN - 4 {
601            value.push(rng.gen::<u8>());
602        }
603        batch.put_key_value_bytes(key.clone(), value.clone());
604        big_store.write_batch(batch).await.unwrap();
605        // deleting it
606        let mut batch = Batch::new();
607        batch.delete_key(key.clone());
608        big_store.write_batch(batch).await.unwrap();
609        // reading everything (there are leftover keys)
610        let key_values = big_store.find_key_values_by_prefix(&[0]).await.unwrap();
611        assert_eq!(key_values.len(), 0);
612        // Two segments remain
613        let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
614        assert_eq!(keys, vec![vec![0, 0, 0, 0, 1], vec![0, 0, 0, 0, 2]]);
615    }
616}