linera_views/backends/
value_splitting.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Adds support for large values to a given store by splitting them between several keys.
5
6use linera_base::ensure;
7use thiserror::Error;
8
9use crate::{
10    batch::{Batch, WriteOperation},
11    store::{
12        AdminKeyValueStore, KeyValueStoreError, ReadableKeyValueStore, WithError,
13        WritableKeyValueStore,
14    },
15};
16#[cfg(with_testing)]
17use crate::{
18    memory::{MemoryStore, MemoryStoreError, TEST_MEMORY_MAX_STREAM_QUERIES},
19    random::generate_test_namespace,
20    store::TestKeyValueStore,
21};
22
23/// The composed error type built from the inner error type.
24#[derive(Error, Debug)]
25pub enum ValueSplittingError<E> {
26    /// inner store error
27    #[error(transparent)]
28    InnerStoreError(#[from] E),
29
30    /// The key is of length less than 4, so we cannot extract the first byte
31    #[error("the key is of length less than 4, so we cannot extract the first byte")]
32    TooShortKey,
33
34    /// Value segment is missing from the database
35    #[error("value segment is missing from the database")]
36    MissingSegment,
37
38    /// No count of size `u32` is available in the value
39    #[error("no count of size u32 is available in the value")]
40    NoCountAvailable,
41}
42
43impl<E: KeyValueStoreError> From<bcs::Error> for ValueSplittingError<E> {
44    fn from(error: bcs::Error) -> Self {
45        let error = E::from(error);
46        ValueSplittingError::InnerStoreError(error)
47    }
48}
49
50impl<E: KeyValueStoreError + 'static> KeyValueStoreError for ValueSplittingError<E> {
51    const BACKEND: &'static str = "value splitting";
52}
53
54/// A key-value store with no size limit for values.
55///
56/// It wraps a key-value store, potentially _with_ a size limit, and automatically
57/// splits up large values into smaller ones. A single logical key-value pair is
58/// stored as multiple smaller key-value pairs in the wrapped store.
59/// See the `README.md` for additional details.
60#[derive(Clone)]
61pub struct ValueSplittingStore<K> {
62    /// The underlying store of the transformed store.
63    store: K,
64}
65
66impl<K> WithError for ValueSplittingStore<K>
67where
68    K: WithError,
69    K::Error: 'static,
70{
71    type Error = ValueSplittingError<K::Error>;
72}
73
74impl<K> ReadableKeyValueStore for ValueSplittingStore<K>
75where
76    K: ReadableKeyValueStore,
77    K::Error: 'static,
78{
79    const MAX_KEY_SIZE: usize = K::MAX_KEY_SIZE - 4;
80
81    fn max_stream_queries(&self) -> usize {
82        self.store.max_stream_queries()
83    }
84
85    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
86        let mut big_key = key.to_vec();
87        big_key.extend(&[0, 0, 0, 0]);
88        let value = self.store.read_value_bytes(&big_key).await?;
89        let Some(value) = value else {
90            return Ok(None);
91        };
92        let count = Self::read_count_from_value(&value)?;
93        let mut big_value = value[4..].to_vec();
94        if count == 1 {
95            return Ok(Some(big_value));
96        }
97        let mut big_keys = Vec::new();
98        for i in 1..count {
99            let big_key_segment = Self::get_segment_key(key, i)?;
100            big_keys.push(big_key_segment);
101        }
102        let segments = self.store.read_multi_values_bytes(big_keys).await?;
103        for segment in segments {
104            match segment {
105                None => {
106                    return Err(ValueSplittingError::MissingSegment);
107                }
108                Some(segment) => {
109                    big_value.extend(segment);
110                }
111            }
112        }
113        Ok(Some(big_value))
114    }
115
116    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
117        let mut big_key = key.to_vec();
118        big_key.extend(&[0, 0, 0, 0]);
119        Ok(self.store.contains_key(&big_key).await?)
120    }
121
122    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
123        let big_keys = keys
124            .into_iter()
125            .map(|key| {
126                let mut big_key = key.clone();
127                big_key.extend(&[0, 0, 0, 0]);
128                big_key
129            })
130            .collect::<Vec<_>>();
131        Ok(self.store.contains_keys(big_keys).await?)
132    }
133
134    async fn read_multi_values_bytes(
135        &self,
136        keys: Vec<Vec<u8>>,
137    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
138        let mut big_keys = Vec::new();
139        for key in &keys {
140            let mut big_key = key.clone();
141            big_key.extend(&[0, 0, 0, 0]);
142            big_keys.push(big_key);
143        }
144        let values = self.store.read_multi_values_bytes(big_keys).await?;
145        let mut big_values = Vec::<Option<Vec<u8>>>::new();
146        let mut keys_add = Vec::new();
147        let mut n_blocks = Vec::new();
148        for (key, value) in keys.iter().zip(values) {
149            match value {
150                None => {
151                    n_blocks.push(0);
152                    big_values.push(None);
153                }
154                Some(value) => {
155                    let count = Self::read_count_from_value(&value)?;
156                    let big_value = value[4..].to_vec();
157                    for i in 1..count {
158                        let big_key_segment = Self::get_segment_key(key, i)?;
159                        keys_add.push(big_key_segment);
160                    }
161                    n_blocks.push(count);
162                    big_values.push(Some(big_value));
163                }
164            }
165        }
166        if !keys_add.is_empty() {
167            let mut segments = self
168                .store
169                .read_multi_values_bytes(keys_add)
170                .await?
171                .into_iter();
172            for (idx, count) in n_blocks.iter().enumerate() {
173                if count > &1 {
174                    let value = big_values.get_mut(idx).unwrap();
175                    if let Some(ref mut value) = value {
176                        for _ in 1..*count {
177                            let segment = segments.next().unwrap().unwrap();
178                            value.extend(segment);
179                        }
180                    }
181                }
182            }
183        }
184        Ok(big_values)
185    }
186
187    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
188        let mut keys = Vec::new();
189        for big_key in self.store.find_keys_by_prefix(key_prefix).await? {
190            let len = big_key.len();
191            if Self::read_index_from_key(&big_key)? == 0 {
192                let key = big_key[0..len - 4].to_vec();
193                keys.push(key);
194            }
195        }
196        Ok(keys)
197    }
198
199    async fn find_key_values_by_prefix(
200        &self,
201        key_prefix: &[u8],
202    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
203        let small_key_values = self.store.find_key_values_by_prefix(key_prefix).await?;
204        let mut small_kv_iterator = small_key_values.into_iter();
205        let mut key_values = Vec::new();
206        while let Some((mut big_key, value)) = small_kv_iterator.next() {
207            if Self::read_index_from_key(&big_key)? != 0 {
208                continue; // Leftover segment from an earlier value.
209            }
210            big_key.truncate(big_key.len() - 4);
211            let key = big_key;
212            let count = Self::read_count_from_value(&value)?;
213            let mut big_value = value[4..].to_vec();
214            for idx in 1..count {
215                let (big_key, value) = small_kv_iterator
216                    .next()
217                    .ok_or(ValueSplittingError::MissingSegment)?;
218                ensure!(
219                    Self::read_index_from_key(&big_key)? == idx
220                        && big_key.starts_with(&key)
221                        && big_key.len() == key.len() + 4,
222                    ValueSplittingError::MissingSegment
223                );
224                big_value.extend(value);
225            }
226            key_values.push((key, big_value));
227        }
228        Ok(key_values)
229    }
230}
231
232impl<K> WritableKeyValueStore for ValueSplittingStore<K>
233where
234    K: WritableKeyValueStore,
235    K::Error: 'static,
236{
237    const MAX_VALUE_SIZE: usize = usize::MAX;
238
239    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
240        let mut batch_new = Batch::new();
241        for operation in batch.operations {
242            match operation {
243                WriteOperation::Delete { key } => {
244                    let mut big_key = key.to_vec();
245                    big_key.extend(&[0, 0, 0, 0]);
246                    batch_new.delete_key(big_key);
247                }
248                WriteOperation::Put { key, mut value } => {
249                    let big_key = Self::get_segment_key(&key, 0)?;
250                    let mut count: u32 = 1;
251                    let value_ext = if value.len() <= K::MAX_VALUE_SIZE - 4 {
252                        Self::get_initial_count_first_chunk(count, &value)?
253                    } else {
254                        let remainder = value.split_off(K::MAX_VALUE_SIZE - 4);
255                        for value_chunk in remainder.chunks(K::MAX_VALUE_SIZE) {
256                            let big_key_segment = Self::get_segment_key(&key, count)?;
257                            batch_new.put_key_value_bytes(big_key_segment, value_chunk.to_vec());
258                            count += 1;
259                        }
260                        Self::get_initial_count_first_chunk(count, &value)?
261                    };
262                    batch_new.put_key_value_bytes(big_key, value_ext);
263                }
264                WriteOperation::DeletePrefix { key_prefix } => {
265                    batch_new.delete_key_prefix(key_prefix);
266                }
267            }
268        }
269        Ok(self.store.write_batch(batch_new).await?)
270    }
271
272    async fn clear_journal(&self) -> Result<(), Self::Error> {
273        Ok(self.store.clear_journal().await?)
274    }
275}
276
277impl<K> AdminKeyValueStore for ValueSplittingStore<K>
278where
279    K: AdminKeyValueStore,
280    K::Error: 'static,
281{
282    type Config = K::Config;
283
284    fn get_name() -> String {
285        format!("value splitting {}", K::get_name())
286    }
287
288    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
289        let store = K::connect(config, namespace).await?;
290        Ok(Self { store })
291    }
292
293    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, Self::Error> {
294        let store = self.store.open_exclusive(root_key)?;
295        Ok(Self { store })
296    }
297
298    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
299        Ok(K::list_all(config).await?)
300    }
301
302    async fn list_root_keys(
303        config: &Self::Config,
304        namespace: &str,
305    ) -> Result<Vec<Vec<u8>>, Self::Error> {
306        Ok(K::list_root_keys(config, namespace).await?)
307    }
308
309    async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
310        Ok(K::delete_all(config).await?)
311    }
312
313    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
314        Ok(K::exists(config, namespace).await?)
315    }
316
317    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
318        Ok(K::create(config, namespace).await?)
319    }
320
321    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
322        Ok(K::delete(config, namespace).await?)
323    }
324}
325
326#[cfg(with_testing)]
327impl<K> TestKeyValueStore for ValueSplittingStore<K>
328where
329    K: TestKeyValueStore,
330    K::Error: 'static,
331{
332    async fn new_test_config() -> Result<K::Config, Self::Error> {
333        Ok(K::new_test_config().await?)
334    }
335}
336
337impl<K> ValueSplittingStore<K>
338where
339    K: WithError,
340{
341    /// Creates a new store that deals with big values from one that does not.
342    pub fn new(store: K) -> Self {
343        ValueSplittingStore { store }
344    }
345
346    fn get_segment_key(key: &[u8], index: u32) -> Result<Vec<u8>, ValueSplittingError<K::Error>> {
347        let mut big_key_segment = key.to_vec();
348        let mut bytes = bcs::to_bytes(&index)?;
349        bytes.reverse();
350        big_key_segment.extend(bytes);
351        Ok(big_key_segment)
352    }
353
354    fn get_initial_count_first_chunk(
355        count: u32,
356        first_chunk: &[u8],
357    ) -> Result<Vec<u8>, ValueSplittingError<K::Error>> {
358        let mut bytes = bcs::to_bytes(&count)?;
359        bytes.reverse();
360        let mut value_ext = Vec::new();
361        value_ext.extend(bytes);
362        value_ext.extend(first_chunk);
363        Ok(value_ext)
364    }
365
366    fn read_count_from_value(value: &[u8]) -> Result<u32, ValueSplittingError<K::Error>> {
367        if value.len() < 4 {
368            return Err(ValueSplittingError::NoCountAvailable);
369        }
370        let mut bytes = value[0..4].to_vec();
371        bytes.reverse();
372        Ok(bcs::from_bytes::<u32>(&bytes)?)
373    }
374
375    fn read_index_from_key(key: &[u8]) -> Result<u32, ValueSplittingError<K::Error>> {
376        let len = key.len();
377        if len < 4 {
378            return Err(ValueSplittingError::TooShortKey);
379        }
380        let mut bytes = key[len - 4..len].to_vec();
381        bytes.reverse();
382        Ok(bcs::from_bytes::<u32>(&bytes)?)
383    }
384}
385
386/// A memory store for which the values are limited to 100 bytes and can be used for tests.
387#[derive(Clone)]
388#[cfg(with_testing)]
389pub struct LimitedTestMemoryStore {
390    store: MemoryStore,
391}
392
393#[cfg(with_testing)]
394impl Default for LimitedTestMemoryStore {
395    fn default() -> Self {
396        Self::new()
397    }
398}
399
400#[cfg(with_testing)]
401impl WithError for LimitedTestMemoryStore {
402    type Error = MemoryStoreError;
403}
404
405#[cfg(with_testing)]
406impl ReadableKeyValueStore for LimitedTestMemoryStore {
407    const MAX_KEY_SIZE: usize = usize::MAX;
408
409    fn max_stream_queries(&self) -> usize {
410        TEST_MEMORY_MAX_STREAM_QUERIES
411    }
412
413    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
414        self.store.read_value_bytes(key).await
415    }
416
417    async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
418        self.store.contains_key(key).await
419    }
420
421    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, MemoryStoreError> {
422        self.store.contains_keys(keys).await
423    }
424
425    async fn read_multi_values_bytes(
426        &self,
427        keys: Vec<Vec<u8>>,
428    ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
429        self.store.read_multi_values_bytes(keys).await
430    }
431
432    async fn find_keys_by_prefix(
433        &self,
434        key_prefix: &[u8],
435    ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
436        self.store.find_keys_by_prefix(key_prefix).await
437    }
438
439    async fn find_key_values_by_prefix(
440        &self,
441        key_prefix: &[u8],
442    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, MemoryStoreError> {
443        self.store.find_key_values_by_prefix(key_prefix).await
444    }
445}
446
447#[cfg(with_testing)]
448impl WritableKeyValueStore for LimitedTestMemoryStore {
449    // We set up the MAX_VALUE_SIZE to the artificially low value of 100
450    // purely for testing purposes.
451    const MAX_VALUE_SIZE: usize = 100;
452
453    async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
454        if !batch.check_value_size(Self::MAX_VALUE_SIZE) {
455            panic!("The batch size is not adequate for this test");
456        }
457        self.store.write_batch(batch).await
458    }
459
460    async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
461        self.store.clear_journal().await
462    }
463}
464
465#[cfg(with_testing)]
466impl LimitedTestMemoryStore {
467    /// Creates a `LimitedTestMemoryStore`
468    pub fn new() -> Self {
469        let namespace = generate_test_namespace();
470        let store =
471            MemoryStore::new_for_testing(TEST_MEMORY_MAX_STREAM_QUERIES, &namespace).unwrap();
472        LimitedTestMemoryStore { store }
473    }
474}
475
476/// Provides a `LimitedTestMemoryStore<()>` that can be used for tests.
477#[cfg(with_testing)]
478pub fn create_value_splitting_memory_store() -> ValueSplittingStore<LimitedTestMemoryStore> {
479    ValueSplittingStore::new(LimitedTestMemoryStore::new())
480}
481
482#[cfg(test)]
483mod tests {
484    use linera_views::{
485        batch::Batch,
486        store::{ReadableKeyValueStore, WritableKeyValueStore},
487        value_splitting::{LimitedTestMemoryStore, ValueSplittingStore},
488    };
489    use rand::Rng;
490
491    // The key splitting means that when a key is overwritten
492    // some previous segments may still be present.
493    #[tokio::test]
494    #[expect(clippy::assertions_on_constants)]
495    async fn test_value_splitting1_testing_leftovers() {
496        let store = LimitedTestMemoryStore::new();
497        const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
498        assert!(MAX_LEN > 10);
499        let big_store = ValueSplittingStore::new(store.clone());
500        let key = vec![0, 0];
501        // Write a key with a long value
502        let mut batch = Batch::new();
503        let value = Vec::from([0; MAX_LEN + 1]);
504        batch.put_key_value_bytes(key.clone(), value.clone());
505        big_store.write_batch(batch).await.unwrap();
506        let value_read = big_store.read_value_bytes(&key).await.unwrap();
507        assert_eq!(value_read, Some(value));
508        // Write a key with a smaller value
509        let mut batch = Batch::new();
510        let value = Vec::from([0, 1]);
511        batch.put_key_value_bytes(key.clone(), value.clone());
512        big_store.write_batch(batch).await.unwrap();
513        let value_read = big_store.read_value_bytes(&key).await.unwrap();
514        assert_eq!(value_read, Some(value));
515        // Two segments are present even though only one is used
516        let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
517        assert_eq!(keys, vec![vec![0, 0, 0, 0, 0], vec![0, 0, 0, 0, 1]]);
518    }
519
520    #[tokio::test]
521    async fn test_value_splitting2_testing_splitting() {
522        let store = LimitedTestMemoryStore::new();
523        const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
524        let big_store = ValueSplittingStore::new(store.clone());
525        let key = vec![0, 0];
526        // Writing a big value
527        let mut batch = Batch::new();
528        let mut value = Vec::new();
529        let mut rng = crate::random::make_deterministic_rng();
530        for _ in 0..2 * MAX_LEN - 4 {
531            value.push(rng.gen::<u8>());
532        }
533        batch.put_key_value_bytes(key.clone(), value.clone());
534        big_store.write_batch(batch).await.unwrap();
535        let value_read = big_store.read_value_bytes(&key).await.unwrap();
536        assert_eq!(value_read, Some(value.clone()));
537        // Reading the segments and checking
538        let mut value_concat = Vec::<u8>::new();
539        for index in 0..2 {
540            let mut segment_key = key.clone();
541            let mut bytes = bcs::to_bytes(&index).unwrap();
542            bytes.reverse();
543            segment_key.extend(bytes);
544            let value_read = store.read_value_bytes(&segment_key).await.unwrap();
545            let Some(value_read) = value_read else {
546                unreachable!()
547            };
548            if index == 0 {
549                value_concat.extend(&value_read[4..]);
550            } else {
551                value_concat.extend(&value_read);
552            }
553        }
554        assert_eq!(value, value_concat);
555    }
556
557    #[tokio::test]
558    async fn test_value_splitting3_write_and_delete() {
559        let store = LimitedTestMemoryStore::new();
560        const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
561        let big_store = ValueSplittingStore::new(store.clone());
562        let key = vec![0, 0];
563        // writing a big key
564        let mut batch = Batch::new();
565        let mut value = Vec::new();
566        let mut rng = crate::random::make_deterministic_rng();
567        for _ in 0..3 * MAX_LEN - 4 {
568            value.push(rng.gen::<u8>());
569        }
570        batch.put_key_value_bytes(key.clone(), value.clone());
571        big_store.write_batch(batch).await.unwrap();
572        // deleting it
573        let mut batch = Batch::new();
574        batch.delete_key(key.clone());
575        big_store.write_batch(batch).await.unwrap();
576        // reading everything (there are leftover keys)
577        let key_values = big_store.find_key_values_by_prefix(&[0]).await.unwrap();
578        assert_eq!(key_values.len(), 0);
579        // Two segments remain
580        let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
581        assert_eq!(keys, vec![vec![0, 0, 0, 0, 1], vec![0, 0, 0, 0, 2]]);
582    }
583}