1use linera_base::ensure;
7use thiserror::Error;
8
9use crate::{
10 batch::{Batch, WriteOperation},
11 store::{
12 KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
13 WritableKeyValueStore,
14 },
15};
16#[cfg(with_testing)]
17use crate::{
18 memory::{MemoryStore, MemoryStoreError},
19 store::TestKeyValueDatabase,
20};
21
22#[derive(Clone)]
29pub struct ValueSplittingDatabase<D> {
30 database: D,
32}
33
34#[derive(Clone)]
36pub struct ValueSplittingStore<S> {
37 store: S,
39}
40
41#[derive(Error, Debug)]
43pub enum ValueSplittingError<E> {
44 #[error(transparent)]
46 InnerStoreError(#[from] E),
47
48 #[error("the key is of length less than 4, so we cannot extract the first byte")]
50 TooShortKey,
51
52 #[error("value segment is missing from the database")]
54 MissingSegment,
55
56 #[error("no count of size u32 is available in the value")]
58 NoCountAvailable,
59}
60
61impl<E: KeyValueStoreError> From<bcs::Error> for ValueSplittingError<E> {
62 fn from(error: bcs::Error) -> Self {
63 let error = E::from(error);
64 ValueSplittingError::InnerStoreError(error)
65 }
66}
67
68impl<E: KeyValueStoreError + 'static> KeyValueStoreError for ValueSplittingError<E> {
69 const BACKEND: &'static str = "value splitting";
70}
71
72impl<S> WithError for ValueSplittingDatabase<S>
73where
74 S: WithError,
75 S::Error: 'static,
76{
77 type Error = ValueSplittingError<S::Error>;
78}
79
80impl<D> WithError for ValueSplittingStore<D>
81where
82 D: WithError,
83 D::Error: 'static,
84{
85 type Error = ValueSplittingError<D::Error>;
86}
87
88impl<S> ReadableKeyValueStore for ValueSplittingStore<S>
89where
90 S: ReadableKeyValueStore,
91 S::Error: 'static,
92{
93 const MAX_KEY_SIZE: usize = S::MAX_KEY_SIZE - 4;
94
95 fn max_stream_queries(&self) -> usize {
96 self.store.max_stream_queries()
97 }
98
99 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
100 let mut big_key = key.to_vec();
101 big_key.extend(&[0, 0, 0, 0]);
102 let value = self.store.read_value_bytes(&big_key).await?;
103 let Some(value) = value else {
104 return Ok(None);
105 };
106 let count = Self::read_count_from_value(&value)?;
107 let mut big_value = value[4..].to_vec();
108 if count == 1 {
109 return Ok(Some(big_value));
110 }
111 let mut big_keys = Vec::new();
112 for i in 1..count {
113 let big_key_segment = Self::get_segment_key(key, i)?;
114 big_keys.push(big_key_segment);
115 }
116 let segments = self.store.read_multi_values_bytes(big_keys).await?;
117 for segment in segments {
118 match segment {
119 None => {
120 return Err(ValueSplittingError::MissingSegment);
121 }
122 Some(segment) => {
123 big_value.extend(segment);
124 }
125 }
126 }
127 Ok(Some(big_value))
128 }
129
130 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
131 let mut big_key = key.to_vec();
132 big_key.extend(&[0, 0, 0, 0]);
133 Ok(self.store.contains_key(&big_key).await?)
134 }
135
136 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
137 let big_keys = keys
138 .into_iter()
139 .map(|key| {
140 let mut big_key = key.clone();
141 big_key.extend(&[0, 0, 0, 0]);
142 big_key
143 })
144 .collect::<Vec<_>>();
145 Ok(self.store.contains_keys(big_keys).await?)
146 }
147
148 async fn read_multi_values_bytes(
149 &self,
150 keys: Vec<Vec<u8>>,
151 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
152 let mut big_keys = Vec::new();
153 for key in &keys {
154 let mut big_key = key.clone();
155 big_key.extend(&[0, 0, 0, 0]);
156 big_keys.push(big_key);
157 }
158 let values = self.store.read_multi_values_bytes(big_keys).await?;
159 let mut big_values = Vec::<Option<Vec<u8>>>::new();
160 let mut keys_add = Vec::new();
161 let mut n_blocks = Vec::new();
162 for (key, value) in keys.iter().zip(values) {
163 match value {
164 None => {
165 n_blocks.push(0);
166 big_values.push(None);
167 }
168 Some(value) => {
169 let count = Self::read_count_from_value(&value)?;
170 let big_value = value[4..].to_vec();
171 for i in 1..count {
172 let big_key_segment = Self::get_segment_key(key, i)?;
173 keys_add.push(big_key_segment);
174 }
175 n_blocks.push(count);
176 big_values.push(Some(big_value));
177 }
178 }
179 }
180 if !keys_add.is_empty() {
181 let mut segments = self
182 .store
183 .read_multi_values_bytes(keys_add)
184 .await?
185 .into_iter();
186 for (idx, count) in n_blocks.iter().enumerate() {
187 if count > &1 {
188 let value = big_values.get_mut(idx).unwrap();
189 if let Some(ref mut value) = value {
190 for _ in 1..*count {
191 let segment = segments.next().unwrap().unwrap();
192 value.extend(segment);
193 }
194 }
195 }
196 }
197 }
198 Ok(big_values)
199 }
200
201 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
202 let mut keys = Vec::new();
203 for big_key in self.store.find_keys_by_prefix(key_prefix).await? {
204 let len = big_key.len();
205 if Self::read_index_from_key(&big_key)? == 0 {
206 let key = big_key[0..len - 4].to_vec();
207 keys.push(key);
208 }
209 }
210 Ok(keys)
211 }
212
213 async fn find_key_values_by_prefix(
214 &self,
215 key_prefix: &[u8],
216 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
217 let small_key_values = self.store.find_key_values_by_prefix(key_prefix).await?;
218 let mut small_kv_iterator = small_key_values.into_iter();
219 let mut key_values = Vec::new();
220 while let Some((mut big_key, value)) = small_kv_iterator.next() {
221 if Self::read_index_from_key(&big_key)? != 0 {
222 continue; }
224 big_key.truncate(big_key.len() - 4);
225 let key = big_key;
226 let count = Self::read_count_from_value(&value)?;
227 let mut big_value = value[4..].to_vec();
228 for idx in 1..count {
229 let (big_key, value) = small_kv_iterator
230 .next()
231 .ok_or(ValueSplittingError::MissingSegment)?;
232 ensure!(
233 Self::read_index_from_key(&big_key)? == idx
234 && big_key.starts_with(&key)
235 && big_key.len() == key.len() + 4,
236 ValueSplittingError::MissingSegment
237 );
238 big_value.extend(value);
239 }
240 key_values.push((key, big_value));
241 }
242 Ok(key_values)
243 }
244}
245
246impl<K> WritableKeyValueStore for ValueSplittingStore<K>
247where
248 K: WritableKeyValueStore,
249 K::Error: 'static,
250{
251 const MAX_VALUE_SIZE: usize = usize::MAX;
252
253 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
254 let mut batch_new = Batch::new();
255 for operation in batch.operations {
256 match operation {
257 WriteOperation::Delete { key } => {
258 let mut big_key = key.to_vec();
259 big_key.extend(&[0, 0, 0, 0]);
260 batch_new.delete_key(big_key);
261 }
262 WriteOperation::Put { key, mut value } => {
263 let big_key = Self::get_segment_key(&key, 0)?;
264 let mut count: u32 = 1;
265 let value_ext = if value.len() <= K::MAX_VALUE_SIZE - 4 {
266 Self::get_initial_count_first_chunk(count, &value)?
267 } else {
268 let remainder = value.split_off(K::MAX_VALUE_SIZE - 4);
269 for value_chunk in remainder.chunks(K::MAX_VALUE_SIZE) {
270 let big_key_segment = Self::get_segment_key(&key, count)?;
271 batch_new.put_key_value_bytes(big_key_segment, value_chunk.to_vec());
272 count += 1;
273 }
274 Self::get_initial_count_first_chunk(count, &value)?
275 };
276 batch_new.put_key_value_bytes(big_key, value_ext);
277 }
278 WriteOperation::DeletePrefix { key_prefix } => {
279 batch_new.delete_key_prefix(key_prefix);
280 }
281 }
282 }
283 Ok(self.store.write_batch(batch_new).await?)
284 }
285
286 async fn clear_journal(&self) -> Result<(), Self::Error> {
287 Ok(self.store.clear_journal().await?)
288 }
289}
290
291impl<D> KeyValueDatabase for ValueSplittingDatabase<D>
292where
293 D: KeyValueDatabase,
294 D::Error: 'static,
295{
296 type Config = D::Config;
297
298 type Store = ValueSplittingStore<D::Store>;
299
300 fn get_name() -> String {
301 format!("value splitting {}", D::get_name())
302 }
303
304 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
305 let database = D::connect(config, namespace).await?;
306 Ok(Self { database })
307 }
308
309 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
310 let store = self.database.open_shared(root_key)?;
311 Ok(ValueSplittingStore { store })
312 }
313
314 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
315 let store = self.database.open_exclusive(root_key)?;
316 Ok(ValueSplittingStore { store })
317 }
318
319 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
320 Ok(D::list_all(config).await?)
321 }
322
323 async fn list_root_keys(
324 config: &Self::Config,
325 namespace: &str,
326 ) -> Result<Vec<Vec<u8>>, Self::Error> {
327 Ok(D::list_root_keys(config, namespace).await?)
328 }
329
330 async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
331 Ok(D::delete_all(config).await?)
332 }
333
334 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
335 Ok(D::exists(config, namespace).await?)
336 }
337
338 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
339 Ok(D::create(config, namespace).await?)
340 }
341
342 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
343 Ok(D::delete(config, namespace).await?)
344 }
345}
346
347#[cfg(with_testing)]
348impl<D> TestKeyValueDatabase for ValueSplittingDatabase<D>
349where
350 D: TestKeyValueDatabase,
351 D::Error: 'static,
352{
353 async fn new_test_config() -> Result<D::Config, Self::Error> {
354 Ok(D::new_test_config().await?)
355 }
356}
357
358impl<D> ValueSplittingStore<D>
359where
360 D: WithError,
361{
362 pub fn new(store: D) -> Self {
364 ValueSplittingStore { store }
365 }
366
367 fn get_segment_key(key: &[u8], index: u32) -> Result<Vec<u8>, ValueSplittingError<D::Error>> {
368 let mut big_key_segment = key.to_vec();
369 let mut bytes = bcs::to_bytes(&index)?;
370 bytes.reverse();
371 big_key_segment.extend(bytes);
372 Ok(big_key_segment)
373 }
374
375 fn get_initial_count_first_chunk(
376 count: u32,
377 first_chunk: &[u8],
378 ) -> Result<Vec<u8>, ValueSplittingError<D::Error>> {
379 let mut bytes = bcs::to_bytes(&count)?;
380 bytes.reverse();
381 let mut value_ext = Vec::new();
382 value_ext.extend(bytes);
383 value_ext.extend(first_chunk);
384 Ok(value_ext)
385 }
386
387 fn read_count_from_value(value: &[u8]) -> Result<u32, ValueSplittingError<D::Error>> {
388 if value.len() < 4 {
389 return Err(ValueSplittingError::NoCountAvailable);
390 }
391 let mut bytes = value[0..4].to_vec();
392 bytes.reverse();
393 Ok(bcs::from_bytes::<u32>(&bytes)?)
394 }
395
396 fn read_index_from_key(key: &[u8]) -> Result<u32, ValueSplittingError<D::Error>> {
397 let len = key.len();
398 if len < 4 {
399 return Err(ValueSplittingError::TooShortKey);
400 }
401 let mut bytes = key[len - 4..len].to_vec();
402 bytes.reverse();
403 Ok(bcs::from_bytes::<u32>(&bytes)?)
404 }
405}
406
407#[derive(Clone)]
409#[cfg(with_testing)]
410pub struct LimitedTestMemoryStore {
411 inner: MemoryStore,
412}
413
414#[cfg(with_testing)]
415impl Default for LimitedTestMemoryStore {
416 fn default() -> Self {
417 Self::new()
418 }
419}
420
421#[cfg(with_testing)]
422impl WithError for LimitedTestMemoryStore {
423 type Error = MemoryStoreError;
424}
425
426#[cfg(with_testing)]
427impl ReadableKeyValueStore for LimitedTestMemoryStore {
428 const MAX_KEY_SIZE: usize = usize::MAX;
429
430 fn max_stream_queries(&self) -> usize {
431 self.inner.max_stream_queries()
432 }
433
434 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
435 self.inner.read_value_bytes(key).await
436 }
437
438 async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
439 self.inner.contains_key(key).await
440 }
441
442 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, MemoryStoreError> {
443 self.inner.contains_keys(keys).await
444 }
445
446 async fn read_multi_values_bytes(
447 &self,
448 keys: Vec<Vec<u8>>,
449 ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
450 self.inner.read_multi_values_bytes(keys).await
451 }
452
453 async fn find_keys_by_prefix(
454 &self,
455 key_prefix: &[u8],
456 ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
457 self.inner.find_keys_by_prefix(key_prefix).await
458 }
459
460 async fn find_key_values_by_prefix(
461 &self,
462 key_prefix: &[u8],
463 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, MemoryStoreError> {
464 self.inner.find_key_values_by_prefix(key_prefix).await
465 }
466}
467
468#[cfg(with_testing)]
469impl WritableKeyValueStore for LimitedTestMemoryStore {
470 const MAX_VALUE_SIZE: usize = 100;
473
474 async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
475 assert!(
476 batch.check_value_size(Self::MAX_VALUE_SIZE),
477 "The batch size is not adequate for this test"
478 );
479 self.inner.write_batch(batch).await
480 }
481
482 async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
483 self.inner.clear_journal().await
484 }
485}
486
487#[cfg(with_testing)]
488impl LimitedTestMemoryStore {
489 pub fn new() -> Self {
491 let inner = MemoryStore::new_for_testing();
492 LimitedTestMemoryStore { inner }
493 }
494}
495
496#[cfg(with_testing)]
498pub fn create_value_splitting_memory_store() -> ValueSplittingStore<LimitedTestMemoryStore> {
499 ValueSplittingStore::new(LimitedTestMemoryStore::new())
500}
501
502#[cfg(test)]
503mod tests {
504 use linera_views::{
505 batch::Batch,
506 store::{ReadableKeyValueStore, WritableKeyValueStore},
507 value_splitting::{LimitedTestMemoryStore, ValueSplittingStore},
508 };
509 use rand::Rng;
510
511 #[tokio::test]
514 #[expect(clippy::assertions_on_constants)]
515 async fn test_value_splitting1_testing_leftovers() {
516 let store = LimitedTestMemoryStore::new();
517 const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
518 assert!(MAX_LEN > 10);
519 let big_store = ValueSplittingStore::new(store.clone());
520 let key = vec![0, 0];
521 let mut batch = Batch::new();
523 let value = Vec::from([0; MAX_LEN + 1]);
524 batch.put_key_value_bytes(key.clone(), value.clone());
525 big_store.write_batch(batch).await.unwrap();
526 let value_read = big_store.read_value_bytes(&key).await.unwrap();
527 assert_eq!(value_read, Some(value));
528 let mut batch = Batch::new();
530 let value = Vec::from([0, 1]);
531 batch.put_key_value_bytes(key.clone(), value.clone());
532 big_store.write_batch(batch).await.unwrap();
533 let value_read = big_store.read_value_bytes(&key).await.unwrap();
534 assert_eq!(value_read, Some(value));
535 let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
537 assert_eq!(keys, vec![vec![0, 0, 0, 0, 0], vec![0, 0, 0, 0, 1]]);
538 }
539
540 #[tokio::test]
541 async fn test_value_splitting2_testing_splitting() {
542 let store = LimitedTestMemoryStore::new();
543 const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
544 let big_store = ValueSplittingStore::new(store.clone());
545 let key = vec![0, 0];
546 let mut batch = Batch::new();
548 let mut value = Vec::new();
549 let mut rng = crate::random::make_deterministic_rng();
550 for _ in 0..2 * MAX_LEN - 4 {
551 value.push(rng.gen::<u8>());
552 }
553 batch.put_key_value_bytes(key.clone(), value.clone());
554 big_store.write_batch(batch).await.unwrap();
555 let value_read = big_store.read_value_bytes(&key).await.unwrap();
556 assert_eq!(value_read, Some(value.clone()));
557 let mut value_concat = Vec::<u8>::new();
559 for index in 0..2 {
560 let mut segment_key = key.clone();
561 let mut bytes = bcs::to_bytes(&index).unwrap();
562 bytes.reverse();
563 segment_key.extend(bytes);
564 let value_read = store.read_value_bytes(&segment_key).await.unwrap();
565 let Some(value_read) = value_read else {
566 unreachable!()
567 };
568 if index == 0 {
569 value_concat.extend(&value_read[4..]);
570 } else {
571 value_concat.extend(&value_read);
572 }
573 }
574 assert_eq!(value, value_concat);
575 }
576
577 #[tokio::test]
578 async fn test_value_splitting3_write_and_delete() {
579 let store = LimitedTestMemoryStore::new();
580 const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
581 let big_store = ValueSplittingStore::new(store.clone());
582 let key = vec![0, 0];
583 let mut batch = Batch::new();
585 let mut value = Vec::new();
586 let mut rng = crate::random::make_deterministic_rng();
587 for _ in 0..3 * MAX_LEN - 4 {
588 value.push(rng.gen::<u8>());
589 }
590 batch.put_key_value_bytes(key.clone(), value.clone());
591 big_store.write_batch(batch).await.unwrap();
592 let mut batch = Batch::new();
594 batch.delete_key(key.clone());
595 big_store.write_batch(batch).await.unwrap();
596 let key_values = big_store.find_key_values_by_prefix(&[0]).await.unwrap();
598 assert_eq!(key_values.len(), 0);
599 let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
601 assert_eq!(keys, vec![vec![0, 0, 0, 0, 1], vec![0, 0, 0, 0, 2]]);
602 }
603}