1use linera_base::ensure;
7use thiserror::Error;
8
9use crate::{
10 batch::{Batch, WriteOperation},
11 store::{
12 KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
13 WritableKeyValueStore,
14 },
15};
16#[cfg(with_testing)]
17use crate::{
18 memory::{MemoryStore, MemoryStoreError},
19 store::TestKeyValueDatabase,
20};
21
22#[derive(Clone)]
29pub struct ValueSplittingDatabase<D> {
30 database: D,
32}
33
34#[derive(Clone)]
36pub struct ValueSplittingStore<S> {
37 store: S,
39}
40
41#[derive(Error, Debug)]
43pub enum ValueSplittingError<E> {
44 #[error(transparent)]
46 InnerStoreError(#[from] E),
47
48 #[error("the key is of length less than 4, so we cannot extract the first byte")]
50 TooShortKey,
51
52 #[error("value segment is missing from the database")]
54 MissingSegment,
55
56 #[error("no count of size u32 is available in the value")]
58 NoCountAvailable,
59}
60
61impl<E: KeyValueStoreError> From<bcs::Error> for ValueSplittingError<E> {
62 fn from(error: bcs::Error) -> Self {
63 let error = E::from(error);
64 ValueSplittingError::InnerStoreError(error)
65 }
66}
67
68impl<E: KeyValueStoreError + 'static> KeyValueStoreError for ValueSplittingError<E> {
69 const BACKEND: &'static str = "value splitting";
70}
71
72impl<S> WithError for ValueSplittingDatabase<S>
73where
74 S: WithError,
75 S::Error: 'static,
76{
77 type Error = ValueSplittingError<S::Error>;
78}
79
80impl<D> WithError for ValueSplittingStore<D>
81where
82 D: WithError,
83 D::Error: 'static,
84{
85 type Error = ValueSplittingError<D::Error>;
86}
87
88impl<S> ReadableKeyValueStore for ValueSplittingStore<S>
89where
90 S: ReadableKeyValueStore,
91 S::Error: 'static,
92{
93 const MAX_KEY_SIZE: usize = S::MAX_KEY_SIZE - 4;
94
95 fn max_stream_queries(&self) -> usize {
96 self.store.max_stream_queries()
97 }
98
99 fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
100 Ok(self.store.root_key()?)
101 }
102
103 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
104 let mut big_key = key.to_vec();
105 big_key.extend(&[0, 0, 0, 0]);
106 let value = self.store.read_value_bytes(&big_key).await?;
107 let Some(value) = value else {
108 return Ok(None);
109 };
110 let count = Self::read_count_from_value(&value)?;
111 let mut big_value = value[4..].to_vec();
112 if count == 1 {
113 return Ok(Some(big_value));
114 }
115 let mut big_keys = Vec::new();
116 for i in 1..count {
117 let big_key_segment = Self::get_segment_key(key, i)?;
118 big_keys.push(big_key_segment);
119 }
120 let segments = self.store.read_multi_values_bytes(&big_keys).await?;
121 for segment in segments {
122 match segment {
123 None => {
124 return Err(ValueSplittingError::MissingSegment);
125 }
126 Some(segment) => {
127 big_value.extend(segment);
128 }
129 }
130 }
131 Ok(Some(big_value))
132 }
133
134 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
135 let mut big_key = key.to_vec();
136 big_key.extend(&[0, 0, 0, 0]);
137 Ok(self.store.contains_key(&big_key).await?)
138 }
139
140 async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error> {
141 let big_keys = keys
142 .iter()
143 .map(|key| {
144 let mut big_key = key.clone();
145 big_key.extend(&[0, 0, 0, 0]);
146 big_key
147 })
148 .collect::<Vec<_>>();
149 Ok(self.store.contains_keys(&big_keys).await?)
150 }
151
152 async fn read_multi_values_bytes(
153 &self,
154 keys: &[Vec<u8>],
155 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
156 let mut big_keys = Vec::new();
157 for key in keys {
158 let mut big_key = key.clone();
159 big_key.extend(&[0, 0, 0, 0]);
160 big_keys.push(big_key);
161 }
162 let values = self.store.read_multi_values_bytes(&big_keys).await?;
163 let mut big_values = Vec::<Option<Vec<u8>>>::new();
164 let mut keys_add = Vec::new();
165 let mut n_blocks = Vec::new();
166 for (key, value) in keys.iter().zip(values) {
167 match value {
168 None => {
169 n_blocks.push(0);
170 big_values.push(None);
171 }
172 Some(value) => {
173 let count = Self::read_count_from_value(&value)?;
174 let big_value = value[4..].to_vec();
175 for i in 1..count {
176 let big_key_segment = Self::get_segment_key(key, i)?;
177 keys_add.push(big_key_segment);
178 }
179 n_blocks.push(count);
180 big_values.push(Some(big_value));
181 }
182 }
183 }
184 if !keys_add.is_empty() {
185 let mut segments = self
186 .store
187 .read_multi_values_bytes(&keys_add)
188 .await?
189 .into_iter();
190 for (idx, count) in n_blocks.iter().enumerate() {
191 if count > &1 {
192 let value = big_values.get_mut(idx).unwrap();
193 if let Some(ref mut value) = value {
194 for _ in 1..*count {
195 let segment = segments.next().unwrap().unwrap();
196 value.extend(segment);
197 }
198 }
199 }
200 }
201 }
202 Ok(big_values)
203 }
204
205 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
206 let mut keys = Vec::new();
207 for big_key in self.store.find_keys_by_prefix(key_prefix).await? {
208 let len = big_key.len();
209 if Self::read_index_from_key(&big_key)? == 0 {
210 let key = big_key[0..len - 4].to_vec();
211 keys.push(key);
212 }
213 }
214 Ok(keys)
215 }
216
217 async fn find_key_values_by_prefix(
218 &self,
219 key_prefix: &[u8],
220 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
221 let small_key_values = self.store.find_key_values_by_prefix(key_prefix).await?;
222 let mut small_kv_iterator = small_key_values.into_iter();
223 let mut key_values = Vec::new();
224 while let Some((mut big_key, value)) = small_kv_iterator.next() {
225 if Self::read_index_from_key(&big_key)? != 0 {
226 continue; }
228 big_key.truncate(big_key.len() - 4);
229 let key = big_key;
230 let count = Self::read_count_from_value(&value)?;
231 let mut big_value = value[4..].to_vec();
232 for idx in 1..count {
233 let (big_key, value) = small_kv_iterator
234 .next()
235 .ok_or(ValueSplittingError::MissingSegment)?;
236 ensure!(
237 Self::read_index_from_key(&big_key)? == idx
238 && big_key.starts_with(&key)
239 && big_key.len() == key.len() + 4,
240 ValueSplittingError::MissingSegment
241 );
242 big_value.extend(value);
243 }
244 key_values.push((key, big_value));
245 }
246 Ok(key_values)
247 }
248}
249
250impl<K> WritableKeyValueStore for ValueSplittingStore<K>
251where
252 K: WritableKeyValueStore,
253 K::Error: 'static,
254{
255 const MAX_VALUE_SIZE: usize = usize::MAX;
256
257 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
258 let mut batch_new = Batch::new();
259 for operation in batch.operations {
260 match operation {
261 WriteOperation::Delete { key } => {
262 let mut big_key = key.to_vec();
263 big_key.extend(&[0, 0, 0, 0]);
264 batch_new.delete_key(big_key);
265 }
266 WriteOperation::Put { key, mut value } => {
267 let big_key = Self::get_segment_key(&key, 0)?;
268 let mut count: u32 = 1;
269 let value_ext = if value.len() <= K::MAX_VALUE_SIZE - 4 {
270 Self::get_initial_count_first_chunk(count, &value)?
271 } else {
272 let remainder = value.split_off(K::MAX_VALUE_SIZE - 4);
273 for value_chunk in remainder.chunks(K::MAX_VALUE_SIZE) {
274 let big_key_segment = Self::get_segment_key(&key, count)?;
275 batch_new.put_key_value_bytes(big_key_segment, value_chunk.to_vec());
276 count += 1;
277 }
278 Self::get_initial_count_first_chunk(count, &value)?
279 };
280 batch_new.put_key_value_bytes(big_key, value_ext);
281 }
282 WriteOperation::DeletePrefix { key_prefix } => {
283 batch_new.delete_key_prefix(key_prefix);
284 }
285 }
286 }
287 Ok(self.store.write_batch(batch_new).await?)
288 }
289
290 async fn clear_journal(&self) -> Result<(), Self::Error> {
291 Ok(self.store.clear_journal().await?)
292 }
293}
294
295impl<D> KeyValueDatabase for ValueSplittingDatabase<D>
296where
297 D: KeyValueDatabase,
298 D::Error: 'static,
299{
300 type Config = D::Config;
301
302 type Store = ValueSplittingStore<D::Store>;
303
304 fn get_name() -> String {
305 format!("value splitting {}", D::get_name())
306 }
307
308 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
309 let database = D::connect(config, namespace).await?;
310 Ok(Self { database })
311 }
312
313 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
314 let store = self.database.open_shared(root_key)?;
315 Ok(ValueSplittingStore { store })
316 }
317
318 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
319 let store = self.database.open_exclusive(root_key)?;
320 Ok(ValueSplittingStore { store })
321 }
322
323 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
324 Ok(D::list_all(config).await?)
325 }
326
327 async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
328 Ok(self.database.list_root_keys().await?)
329 }
330
331 async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
332 Ok(D::delete_all(config).await?)
333 }
334
335 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
336 Ok(D::exists(config, namespace).await?)
337 }
338
339 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
340 Ok(D::create(config, namespace).await?)
341 }
342
343 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
344 Ok(D::delete(config, namespace).await?)
345 }
346}
347
348#[cfg(with_testing)]
349impl<D> TestKeyValueDatabase for ValueSplittingDatabase<D>
350where
351 D: TestKeyValueDatabase,
352 D::Error: 'static,
353{
354 async fn new_test_config() -> Result<D::Config, Self::Error> {
355 Ok(D::new_test_config().await?)
356 }
357}
358
359impl<D> ValueSplittingStore<D>
360where
361 D: WithError,
362{
363 pub fn new(store: D) -> Self {
365 ValueSplittingStore { store }
366 }
367
368 fn get_segment_key(key: &[u8], index: u32) -> Result<Vec<u8>, ValueSplittingError<D::Error>> {
369 let mut big_key_segment = key.to_vec();
370 let mut bytes = bcs::to_bytes(&index)?;
371 bytes.reverse();
372 big_key_segment.extend(bytes);
373 Ok(big_key_segment)
374 }
375
376 fn get_initial_count_first_chunk(
377 count: u32,
378 first_chunk: &[u8],
379 ) -> Result<Vec<u8>, ValueSplittingError<D::Error>> {
380 let mut bytes = bcs::to_bytes(&count)?;
381 bytes.reverse();
382 let mut value_ext = Vec::new();
383 value_ext.extend(bytes);
384 value_ext.extend(first_chunk);
385 Ok(value_ext)
386 }
387
388 fn read_count_from_value(value: &[u8]) -> Result<u32, ValueSplittingError<D::Error>> {
389 if value.len() < 4 {
390 return Err(ValueSplittingError::NoCountAvailable);
391 }
392 let mut bytes = value[0..4].to_vec();
393 bytes.reverse();
394 Ok(bcs::from_bytes::<u32>(&bytes)?)
395 }
396
397 fn read_index_from_key(key: &[u8]) -> Result<u32, ValueSplittingError<D::Error>> {
398 let len = key.len();
399 if len < 4 {
400 return Err(ValueSplittingError::TooShortKey);
401 }
402 let mut bytes = key[len - 4..len].to_vec();
403 bytes.reverse();
404 Ok(bcs::from_bytes::<u32>(&bytes)?)
405 }
406}
407
408#[derive(Clone)]
410#[cfg(with_testing)]
411pub struct LimitedTestMemoryStore {
412 inner: MemoryStore,
413}
414
415#[cfg(with_testing)]
416impl Default for LimitedTestMemoryStore {
417 fn default() -> Self {
418 Self::new()
419 }
420}
421
422#[cfg(with_testing)]
423impl WithError for LimitedTestMemoryStore {
424 type Error = MemoryStoreError;
425}
426
427#[cfg(with_testing)]
428impl ReadableKeyValueStore for LimitedTestMemoryStore {
429 const MAX_KEY_SIZE: usize = usize::MAX;
430
431 fn max_stream_queries(&self) -> usize {
432 self.inner.max_stream_queries()
433 }
434
435 fn root_key(&self) -> Result<Vec<u8>, MemoryStoreError> {
436 self.inner.root_key()
437 }
438
439 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
440 self.inner.read_value_bytes(key).await
441 }
442
443 async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
444 self.inner.contains_key(key).await
445 }
446
447 async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, MemoryStoreError> {
448 self.inner.contains_keys(keys).await
449 }
450
451 async fn read_multi_values_bytes(
452 &self,
453 keys: &[Vec<u8>],
454 ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
455 self.inner.read_multi_values_bytes(keys).await
456 }
457
458 async fn find_keys_by_prefix(
459 &self,
460 key_prefix: &[u8],
461 ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
462 self.inner.find_keys_by_prefix(key_prefix).await
463 }
464
465 async fn find_key_values_by_prefix(
466 &self,
467 key_prefix: &[u8],
468 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, MemoryStoreError> {
469 self.inner.find_key_values_by_prefix(key_prefix).await
470 }
471}
472
473#[cfg(with_testing)]
474impl WritableKeyValueStore for LimitedTestMemoryStore {
475 const MAX_VALUE_SIZE: usize = 100;
478
479 async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
480 assert!(
481 batch.check_value_size(Self::MAX_VALUE_SIZE),
482 "The batch size is not adequate for this test"
483 );
484 self.inner.write_batch(batch).await
485 }
486
487 async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
488 self.inner.clear_journal().await
489 }
490}
491
492#[cfg(with_testing)]
493impl LimitedTestMemoryStore {
494 pub fn new() -> Self {
496 let inner = MemoryStore::new_for_testing();
497 LimitedTestMemoryStore { inner }
498 }
499}
500
501#[cfg(with_testing)]
503pub fn create_value_splitting_memory_store() -> ValueSplittingStore<LimitedTestMemoryStore> {
504 ValueSplittingStore::new(LimitedTestMemoryStore::new())
505}
506
507#[cfg(test)]
508mod tests {
509 use linera_views::{
510 batch::Batch,
511 store::{ReadableKeyValueStore, WritableKeyValueStore},
512 value_splitting::{LimitedTestMemoryStore, ValueSplittingStore},
513 };
514 use rand::Rng;
515
516 #[tokio::test]
519 #[expect(clippy::assertions_on_constants)]
520 async fn test_value_splitting1_testing_leftovers() {
521 let store = LimitedTestMemoryStore::new();
522 const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
523 assert!(MAX_LEN > 10);
524 let big_store = ValueSplittingStore::new(store.clone());
525 let key = vec![0, 0];
526 let mut batch = Batch::new();
528 let value = Vec::from([0; MAX_LEN + 1]);
529 batch.put_key_value_bytes(key.clone(), value.clone());
530 big_store.write_batch(batch).await.unwrap();
531 let value_read = big_store.read_value_bytes(&key).await.unwrap();
532 assert_eq!(value_read, Some(value));
533 let mut batch = Batch::new();
535 let value = Vec::from([0, 1]);
536 batch.put_key_value_bytes(key.clone(), value.clone());
537 big_store.write_batch(batch).await.unwrap();
538 let value_read = big_store.read_value_bytes(&key).await.unwrap();
539 assert_eq!(value_read, Some(value));
540 let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
542 assert_eq!(keys, vec![vec![0, 0, 0, 0, 0], vec![0, 0, 0, 0, 1]]);
543 }
544
545 #[tokio::test]
546 async fn test_value_splitting2_testing_splitting() {
547 let store = LimitedTestMemoryStore::new();
548 const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
549 let big_store = ValueSplittingStore::new(store.clone());
550 let key = vec![0, 0];
551 let mut batch = Batch::new();
553 let mut value = Vec::new();
554 let mut rng = crate::random::make_deterministic_rng();
555 for _ in 0..2 * MAX_LEN - 4 {
556 value.push(rng.gen::<u8>());
557 }
558 batch.put_key_value_bytes(key.clone(), value.clone());
559 big_store.write_batch(batch).await.unwrap();
560 let value_read = big_store.read_value_bytes(&key).await.unwrap();
561 assert_eq!(value_read, Some(value.clone()));
562 let mut value_concat = Vec::<u8>::new();
564 for index in 0..2 {
565 let mut segment_key = key.clone();
566 let mut bytes = bcs::to_bytes(&index).unwrap();
567 bytes.reverse();
568 segment_key.extend(bytes);
569 let value_read = store.read_value_bytes(&segment_key).await.unwrap();
570 let Some(value_read) = value_read else {
571 unreachable!()
572 };
573 if index == 0 {
574 value_concat.extend(&value_read[4..]);
575 } else {
576 value_concat.extend(&value_read);
577 }
578 }
579 assert_eq!(value, value_concat);
580 }
581
582 #[tokio::test]
583 async fn test_value_splitting3_write_and_delete() {
584 let store = LimitedTestMemoryStore::new();
585 const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
586 let big_store = ValueSplittingStore::new(store.clone());
587 let key = vec![0, 0];
588 let mut batch = Batch::new();
590 let mut value = Vec::new();
591 let mut rng = crate::random::make_deterministic_rng();
592 for _ in 0..3 * MAX_LEN - 4 {
593 value.push(rng.gen::<u8>());
594 }
595 batch.put_key_value_bytes(key.clone(), value.clone());
596 big_store.write_batch(batch).await.unwrap();
597 let mut batch = Batch::new();
599 batch.delete_key(key.clone());
600 big_store.write_batch(batch).await.unwrap();
601 let key_values = big_store.find_key_values_by_prefix(&[0]).await.unwrap();
603 assert_eq!(key_values.len(), 0);
604 let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
606 assert_eq!(keys, vec![vec![0, 0, 0, 0, 1], vec![0, 0, 0, 0, 2]]);
607 }
608}