linera_views/backends/
value_splitting.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Adds support for large values to a given store by splitting them between several keys.
5
6use linera_base::ensure;
7use thiserror::Error;
8
9use crate::{
10    batch::{Batch, WriteOperation},
11    store::{
12        AdminKeyValueStore, KeyIterable, KeyValueIterable, KeyValueStoreError,
13        ReadableKeyValueStore, WithError, WritableKeyValueStore,
14    },
15};
16#[cfg(with_testing)]
17use crate::{
18    memory::{MemoryStore, MemoryStoreError, TEST_MEMORY_MAX_STREAM_QUERIES},
19    random::generate_test_namespace,
20    store::TestKeyValueStore,
21};
22
23/// The composed error type built from the inner error type.
24#[derive(Error, Debug)]
25pub enum ValueSplittingError<E> {
26    /// inner store error
27    #[error(transparent)]
28    InnerStoreError(#[from] E),
29
30    /// The key is of length less than 4, so we cannot extract the first byte
31    #[error("the key is of length less than 4, so we cannot extract the first byte")]
32    TooShortKey,
33
34    /// Value segment is missing from the database
35    #[error("value segment is missing from the database")]
36    MissingSegment,
37
38    /// No count of size `u32` is available in the value
39    #[error("no count of size u32 is available in the value")]
40    NoCountAvailable,
41}
42
43impl<E: KeyValueStoreError> From<bcs::Error> for ValueSplittingError<E> {
44    fn from(error: bcs::Error) -> Self {
45        let error = E::from(error);
46        ValueSplittingError::InnerStoreError(error)
47    }
48}
49
50impl<E: KeyValueStoreError + 'static> KeyValueStoreError for ValueSplittingError<E> {
51    const BACKEND: &'static str = "value splitting";
52}
53
54/// A key-value store with no size limit for values.
55///
56/// It wraps a key-value store, potentially _with_ a size limit, and automatically
57/// splits up large values into smaller ones. A single logical key-value pair is
58/// stored as multiple smaller key-value pairs in the wrapped store.
59/// See the `README.md` for additional details.
60#[derive(Clone)]
61pub struct ValueSplittingStore<K> {
62    /// The underlying store of the transformed store.
63    store: K,
64}
65
66impl<K> WithError for ValueSplittingStore<K>
67where
68    K: WithError,
69    K::Error: 'static,
70{
71    type Error = ValueSplittingError<K::Error>;
72}
73
74impl<K> ReadableKeyValueStore for ValueSplittingStore<K>
75where
76    K: ReadableKeyValueStore,
77    K::Error: 'static,
78{
79    const MAX_KEY_SIZE: usize = K::MAX_KEY_SIZE - 4;
80    type Keys = Vec<Vec<u8>>;
81    type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
82
83    fn max_stream_queries(&self) -> usize {
84        self.store.max_stream_queries()
85    }
86
87    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
88        let mut big_key = key.to_vec();
89        big_key.extend(&[0, 0, 0, 0]);
90        let value = self.store.read_value_bytes(&big_key).await?;
91        let Some(value) = value else {
92            return Ok(None);
93        };
94        let count = Self::read_count_from_value(&value)?;
95        let mut big_value = value[4..].to_vec();
96        if count == 1 {
97            return Ok(Some(big_value));
98        }
99        let mut big_keys = Vec::new();
100        for i in 1..count {
101            let big_key_segment = Self::get_segment_key(key, i)?;
102            big_keys.push(big_key_segment);
103        }
104        let segments = self.store.read_multi_values_bytes(big_keys).await?;
105        for segment in segments {
106            match segment {
107                None => {
108                    return Err(ValueSplittingError::MissingSegment);
109                }
110                Some(segment) => {
111                    big_value.extend(segment);
112                }
113            }
114        }
115        Ok(Some(big_value))
116    }
117
118    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
119        let mut big_key = key.to_vec();
120        big_key.extend(&[0, 0, 0, 0]);
121        Ok(self.store.contains_key(&big_key).await?)
122    }
123
124    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
125        let big_keys = keys
126            .into_iter()
127            .map(|key| {
128                let mut big_key = key.clone();
129                big_key.extend(&[0, 0, 0, 0]);
130                big_key
131            })
132            .collect::<Vec<_>>();
133        Ok(self.store.contains_keys(big_keys).await?)
134    }
135
136    async fn read_multi_values_bytes(
137        &self,
138        keys: Vec<Vec<u8>>,
139    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
140        let mut big_keys = Vec::new();
141        for key in &keys {
142            let mut big_key = key.clone();
143            big_key.extend(&[0, 0, 0, 0]);
144            big_keys.push(big_key);
145        }
146        let values = self.store.read_multi_values_bytes(big_keys).await?;
147        let mut big_values = Vec::<Option<Vec<u8>>>::new();
148        let mut keys_add = Vec::new();
149        let mut n_blocks = Vec::new();
150        for (key, value) in keys.iter().zip(values) {
151            match value {
152                None => {
153                    n_blocks.push(0);
154                    big_values.push(None);
155                }
156                Some(value) => {
157                    let count = Self::read_count_from_value(&value)?;
158                    let big_value = value[4..].to_vec();
159                    for i in 1..count {
160                        let big_key_segment = Self::get_segment_key(key, i)?;
161                        keys_add.push(big_key_segment);
162                    }
163                    n_blocks.push(count);
164                    big_values.push(Some(big_value));
165                }
166            }
167        }
168        if !keys_add.is_empty() {
169            let mut segments = self
170                .store
171                .read_multi_values_bytes(keys_add)
172                .await?
173                .into_iter();
174            for (idx, count) in n_blocks.iter().enumerate() {
175                if count > &1 {
176                    let value = big_values.get_mut(idx).unwrap();
177                    if let Some(ref mut value) = value {
178                        for _ in 1..*count {
179                            let segment = segments.next().unwrap().unwrap();
180                            value.extend(segment);
181                        }
182                    }
183                }
184            }
185        }
186        Ok(big_values)
187    }
188
189    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Self::Keys, Self::Error> {
190        let mut keys = Vec::new();
191        for big_key in self.store.find_keys_by_prefix(key_prefix).await?.iterator() {
192            let big_key = big_key?;
193            let len = big_key.len();
194            if Self::read_index_from_key(big_key)? == 0 {
195                let key = big_key[0..len - 4].to_vec();
196                keys.push(key);
197            }
198        }
199        Ok(keys)
200    }
201
202    async fn find_key_values_by_prefix(
203        &self,
204        key_prefix: &[u8],
205    ) -> Result<Self::KeyValues, Self::Error> {
206        let small_key_values = self.store.find_key_values_by_prefix(key_prefix).await?;
207        let mut small_kv_iterator = small_key_values.into_iterator_owned();
208        let mut key_values = Vec::new();
209        while let Some(result) = small_kv_iterator.next() {
210            let (mut big_key, value) = result?;
211            if Self::read_index_from_key(&big_key)? != 0 {
212                continue; // Leftover segment from an earlier value.
213            }
214            big_key.truncate(big_key.len() - 4);
215            let key = big_key;
216            let count = Self::read_count_from_value(&value)?;
217            let mut big_value = value[4..].to_vec();
218            for idx in 1..count {
219                let (big_key, value) = small_kv_iterator
220                    .next()
221                    .ok_or(ValueSplittingError::MissingSegment)??;
222                ensure!(
223                    Self::read_index_from_key(&big_key)? == idx
224                        && big_key.starts_with(&key)
225                        && big_key.len() == key.len() + 4,
226                    ValueSplittingError::MissingSegment
227                );
228                big_value.extend(value);
229            }
230            key_values.push((key, big_value));
231        }
232        Ok(key_values)
233    }
234}
235
236impl<K> WritableKeyValueStore for ValueSplittingStore<K>
237where
238    K: WritableKeyValueStore,
239    K::Error: 'static,
240{
241    const MAX_VALUE_SIZE: usize = usize::MAX;
242
243    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
244        let mut batch_new = Batch::new();
245        for operation in batch.operations {
246            match operation {
247                WriteOperation::Delete { key } => {
248                    let mut big_key = key.to_vec();
249                    big_key.extend(&[0, 0, 0, 0]);
250                    batch_new.delete_key(big_key);
251                }
252                WriteOperation::Put { key, mut value } => {
253                    let big_key = Self::get_segment_key(&key, 0)?;
254                    let mut count: u32 = 1;
255                    let value_ext = if value.len() <= K::MAX_VALUE_SIZE - 4 {
256                        Self::get_initial_count_first_chunk(count, &value)?
257                    } else {
258                        let remainder = value.split_off(K::MAX_VALUE_SIZE - 4);
259                        for value_chunk in remainder.chunks(K::MAX_VALUE_SIZE) {
260                            let big_key_segment = Self::get_segment_key(&key, count)?;
261                            batch_new.put_key_value_bytes(big_key_segment, value_chunk.to_vec());
262                            count += 1;
263                        }
264                        Self::get_initial_count_first_chunk(count, &value)?
265                    };
266                    batch_new.put_key_value_bytes(big_key, value_ext);
267                }
268                WriteOperation::DeletePrefix { key_prefix } => {
269                    batch_new.delete_key_prefix(key_prefix);
270                }
271            }
272        }
273        Ok(self.store.write_batch(batch_new).await?)
274    }
275
276    async fn clear_journal(&self) -> Result<(), Self::Error> {
277        Ok(self.store.clear_journal().await?)
278    }
279}
280
281impl<K> AdminKeyValueStore for ValueSplittingStore<K>
282where
283    K: AdminKeyValueStore,
284    K::Error: 'static,
285{
286    type Config = K::Config;
287
288    fn get_name() -> String {
289        format!("value splitting {}", K::get_name())
290    }
291
292    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
293        let store = K::connect(config, namespace).await?;
294        Ok(Self { store })
295    }
296
297    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, Self::Error> {
298        let store = self.store.open_exclusive(root_key)?;
299        Ok(Self { store })
300    }
301
302    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
303        Ok(K::list_all(config).await?)
304    }
305
306    async fn list_root_keys(
307        config: &Self::Config,
308        namespace: &str,
309    ) -> Result<Vec<Vec<u8>>, Self::Error> {
310        Ok(K::list_root_keys(config, namespace).await?)
311    }
312
313    async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
314        Ok(K::delete_all(config).await?)
315    }
316
317    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
318        Ok(K::exists(config, namespace).await?)
319    }
320
321    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
322        Ok(K::create(config, namespace).await?)
323    }
324
325    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
326        Ok(K::delete(config, namespace).await?)
327    }
328}
329
330#[cfg(with_testing)]
331impl<K> TestKeyValueStore for ValueSplittingStore<K>
332where
333    K: TestKeyValueStore,
334    K::Error: 'static,
335{
336    async fn new_test_config() -> Result<K::Config, Self::Error> {
337        Ok(K::new_test_config().await?)
338    }
339}
340
341impl<K> ValueSplittingStore<K>
342where
343    K: WithError,
344{
345    /// Creates a new store that deals with big values from one that does not.
346    pub fn new(store: K) -> Self {
347        ValueSplittingStore { store }
348    }
349
350    fn get_segment_key(key: &[u8], index: u32) -> Result<Vec<u8>, ValueSplittingError<K::Error>> {
351        let mut big_key_segment = key.to_vec();
352        let mut bytes = bcs::to_bytes(&index)?;
353        bytes.reverse();
354        big_key_segment.extend(bytes);
355        Ok(big_key_segment)
356    }
357
358    fn get_initial_count_first_chunk(
359        count: u32,
360        first_chunk: &[u8],
361    ) -> Result<Vec<u8>, ValueSplittingError<K::Error>> {
362        let mut bytes = bcs::to_bytes(&count)?;
363        bytes.reverse();
364        let mut value_ext = Vec::new();
365        value_ext.extend(bytes);
366        value_ext.extend(first_chunk);
367        Ok(value_ext)
368    }
369
370    fn read_count_from_value(value: &[u8]) -> Result<u32, ValueSplittingError<K::Error>> {
371        if value.len() < 4 {
372            return Err(ValueSplittingError::NoCountAvailable);
373        }
374        let mut bytes = value[0..4].to_vec();
375        bytes.reverse();
376        Ok(bcs::from_bytes::<u32>(&bytes)?)
377    }
378
379    fn read_index_from_key(key: &[u8]) -> Result<u32, ValueSplittingError<K::Error>> {
380        let len = key.len();
381        if len < 4 {
382            return Err(ValueSplittingError::TooShortKey);
383        }
384        let mut bytes = key[len - 4..len].to_vec();
385        bytes.reverse();
386        Ok(bcs::from_bytes::<u32>(&bytes)?)
387    }
388}
389
390/// A memory store for which the values are limited to 100 bytes and can be used for tests.
391#[derive(Clone)]
392#[cfg(with_testing)]
393pub struct LimitedTestMemoryStore {
394    store: MemoryStore,
395}
396
397#[cfg(with_testing)]
398impl Default for LimitedTestMemoryStore {
399    fn default() -> Self {
400        Self::new()
401    }
402}
403
404#[cfg(with_testing)]
405impl WithError for LimitedTestMemoryStore {
406    type Error = MemoryStoreError;
407}
408
409#[cfg(with_testing)]
410impl ReadableKeyValueStore for LimitedTestMemoryStore {
411    const MAX_KEY_SIZE: usize = usize::MAX;
412    type Keys = Vec<Vec<u8>>;
413    type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
414
415    fn max_stream_queries(&self) -> usize {
416        TEST_MEMORY_MAX_STREAM_QUERIES
417    }
418
419    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
420        self.store.read_value_bytes(key).await
421    }
422
423    async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
424        self.store.contains_key(key).await
425    }
426
427    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, MemoryStoreError> {
428        self.store.contains_keys(keys).await
429    }
430
431    async fn read_multi_values_bytes(
432        &self,
433        keys: Vec<Vec<u8>>,
434    ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
435        self.store.read_multi_values_bytes(keys).await
436    }
437
438    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Self::Keys, MemoryStoreError> {
439        self.store.find_keys_by_prefix(key_prefix).await
440    }
441
442    async fn find_key_values_by_prefix(
443        &self,
444        key_prefix: &[u8],
445    ) -> Result<Self::KeyValues, MemoryStoreError> {
446        self.store.find_key_values_by_prefix(key_prefix).await
447    }
448}
449
450#[cfg(with_testing)]
451impl WritableKeyValueStore for LimitedTestMemoryStore {
452    // We set up the MAX_VALUE_SIZE to the artificially low value of 100
453    // purely for testing purposes.
454    const MAX_VALUE_SIZE: usize = 100;
455
456    async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
457        if !batch.check_value_size(Self::MAX_VALUE_SIZE) {
458            panic!("The batch size is not adequate for this test");
459        }
460        self.store.write_batch(batch).await
461    }
462
463    async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
464        self.store.clear_journal().await
465    }
466}
467
468#[cfg(with_testing)]
469impl LimitedTestMemoryStore {
470    /// Creates a `LimitedTestMemoryStore`
471    pub fn new() -> Self {
472        let namespace = generate_test_namespace();
473        let store =
474            MemoryStore::new_for_testing(TEST_MEMORY_MAX_STREAM_QUERIES, &namespace).unwrap();
475        LimitedTestMemoryStore { store }
476    }
477}
478
479/// Provides a `LimitedTestMemoryStore<()>` that can be used for tests.
480#[cfg(with_testing)]
481pub fn create_value_splitting_memory_store() -> ValueSplittingStore<LimitedTestMemoryStore> {
482    ValueSplittingStore::new(LimitedTestMemoryStore::new())
483}
484
485#[cfg(test)]
486mod tests {
487    use linera_views::{
488        batch::Batch,
489        store::{ReadableKeyValueStore, WritableKeyValueStore},
490        value_splitting::{LimitedTestMemoryStore, ValueSplittingStore},
491    };
492    use rand::Rng;
493
494    // The key splitting means that when a key is overwritten
495    // some previous segments may still be present.
496    #[tokio::test]
497    #[expect(clippy::assertions_on_constants)]
498    async fn test_value_splitting1_testing_leftovers() {
499        let store = LimitedTestMemoryStore::new();
500        const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
501        assert!(MAX_LEN > 10);
502        let big_store = ValueSplittingStore::new(store.clone());
503        let key = vec![0, 0];
504        // Write a key with a long value
505        let mut batch = Batch::new();
506        let value = Vec::from([0; MAX_LEN + 1]);
507        batch.put_key_value_bytes(key.clone(), value.clone());
508        big_store.write_batch(batch).await.unwrap();
509        let value_read = big_store.read_value_bytes(&key).await.unwrap();
510        assert_eq!(value_read, Some(value));
511        // Write a key with a smaller value
512        let mut batch = Batch::new();
513        let value = Vec::from([0, 1]);
514        batch.put_key_value_bytes(key.clone(), value.clone());
515        big_store.write_batch(batch).await.unwrap();
516        let value_read = big_store.read_value_bytes(&key).await.unwrap();
517        assert_eq!(value_read, Some(value));
518        // Two segments are present even though only one is used
519        let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
520        assert_eq!(keys, vec![vec![0, 0, 0, 0, 0], vec![0, 0, 0, 0, 1]]);
521    }
522
523    #[tokio::test]
524    async fn test_value_splitting2_testing_splitting() {
525        let store = LimitedTestMemoryStore::new();
526        const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
527        let big_store = ValueSplittingStore::new(store.clone());
528        let key = vec![0, 0];
529        // Writing a big value
530        let mut batch = Batch::new();
531        let mut value = Vec::new();
532        let mut rng = crate::random::make_deterministic_rng();
533        for _ in 0..2 * MAX_LEN - 4 {
534            value.push(rng.gen::<u8>());
535        }
536        batch.put_key_value_bytes(key.clone(), value.clone());
537        big_store.write_batch(batch).await.unwrap();
538        let value_read = big_store.read_value_bytes(&key).await.unwrap();
539        assert_eq!(value_read, Some(value.clone()));
540        // Reading the segments and checking
541        let mut value_concat = Vec::<u8>::new();
542        for index in 0..2 {
543            let mut segment_key = key.clone();
544            let mut bytes = bcs::to_bytes(&index).unwrap();
545            bytes.reverse();
546            segment_key.extend(bytes);
547            let value_read = store.read_value_bytes(&segment_key).await.unwrap();
548            let Some(value_read) = value_read else {
549                unreachable!()
550            };
551            if index == 0 {
552                value_concat.extend(&value_read[4..]);
553            } else {
554                value_concat.extend(&value_read);
555            }
556        }
557        assert_eq!(value, value_concat);
558    }
559
560    #[tokio::test]
561    async fn test_value_splitting3_write_and_delete() {
562        let store = LimitedTestMemoryStore::new();
563        const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
564        let big_store = ValueSplittingStore::new(store.clone());
565        let key = vec![0, 0];
566        // writing a big key
567        let mut batch = Batch::new();
568        let mut value = Vec::new();
569        let mut rng = crate::random::make_deterministic_rng();
570        for _ in 0..3 * MAX_LEN - 4 {
571            value.push(rng.gen::<u8>());
572        }
573        batch.put_key_value_bytes(key.clone(), value.clone());
574        big_store.write_batch(batch).await.unwrap();
575        // deleting it
576        let mut batch = Batch::new();
577        batch.delete_key(key.clone());
578        big_store.write_batch(batch).await.unwrap();
579        // reading everything (there are leftover keys)
580        let key_values = big_store.find_key_values_by_prefix(&[0]).await.unwrap();
581        assert_eq!(key_values.len(), 0);
582        // Two segments remain
583        let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
584        assert_eq!(keys, vec![vec![0, 0, 0, 0, 1], vec![0, 0, 0, 0, 2]]);
585    }
586}