1use linera_base::ensure;
7use thiserror::Error;
8
9use crate::{
10 batch::{Batch, WriteOperation},
11 store::{
12 KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
13 WritableKeyValueStore,
14 },
15};
16#[cfg(with_testing)]
17use crate::{
18 memory::{MemoryStore, MemoryStoreError},
19 store::TestKeyValueDatabase,
20};
21
22#[derive(Clone)]
29pub struct ValueSplittingDatabase<D> {
30 database: D,
32}
33
34#[derive(Clone)]
36pub struct ValueSplittingStore<S> {
37 store: S,
39}
40
41#[derive(Error, Debug)]
43pub enum ValueSplittingError<E> {
44 #[error(transparent)]
46 InnerStoreError(#[from] E),
47
48 #[error("the key is of length less than 4, so we cannot extract the first byte")]
50 TooShortKey,
51
52 #[error("value segment is missing from the database")]
54 MissingSegment,
55
56 #[error("no count of size u32 is available in the value")]
58 NoCountAvailable,
59}
60
61impl<E: KeyValueStoreError> From<bcs::Error> for ValueSplittingError<E> {
62 fn from(error: bcs::Error) -> Self {
63 let error = E::from(error);
64 ValueSplittingError::InnerStoreError(error)
65 }
66}
67
68impl<E: KeyValueStoreError + 'static> KeyValueStoreError for ValueSplittingError<E> {
69 const BACKEND: &'static str = "value splitting";
70
71 fn must_reload_view(&self) -> bool {
72 match self {
73 ValueSplittingError::InnerStoreError(e) => e.must_reload_view(),
74 _ => false,
75 }
76 }
77}
78
79impl<S> WithError for ValueSplittingDatabase<S>
80where
81 S: WithError,
82 S::Error: 'static,
83{
84 type Error = ValueSplittingError<S::Error>;
85}
86
87impl<D> WithError for ValueSplittingStore<D>
88where
89 D: WithError,
90 D::Error: 'static,
91{
92 type Error = ValueSplittingError<D::Error>;
93}
94
95impl<S> ReadableKeyValueStore for ValueSplittingStore<S>
96where
97 S: ReadableKeyValueStore,
98 S::Error: 'static,
99{
100 const MAX_KEY_SIZE: usize = S::MAX_KEY_SIZE - 4;
101
102 fn max_stream_queries(&self) -> usize {
103 self.store.max_stream_queries()
104 }
105
106 fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
107 Ok(self.store.root_key()?)
108 }
109
110 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
111 let mut big_key = key.to_vec();
112 big_key.extend(&[0, 0, 0, 0]);
113 let value = self.store.read_value_bytes(&big_key).await?;
114 let Some(value) = value else {
115 return Ok(None);
116 };
117 let count = Self::read_count_from_value(&value)?;
118 let mut big_value = value[4..].to_vec();
119 if count == 1 {
120 return Ok(Some(big_value));
121 }
122 let mut big_keys = Vec::new();
123 for i in 1..count {
124 let big_key_segment = Self::get_segment_key(key, i)?;
125 big_keys.push(big_key_segment);
126 }
127 let segments = self.store.read_multi_values_bytes(&big_keys).await?;
128 for segment in segments {
129 match segment {
130 None => {
131 return Err(ValueSplittingError::MissingSegment);
132 }
133 Some(segment) => {
134 big_value.extend(segment);
135 }
136 }
137 }
138 Ok(Some(big_value))
139 }
140
141 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
142 let mut big_key = key.to_vec();
143 big_key.extend(&[0, 0, 0, 0]);
144 Ok(self.store.contains_key(&big_key).await?)
145 }
146
147 async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error> {
148 let big_keys = keys
149 .iter()
150 .map(|key| {
151 let mut big_key = key.clone();
152 big_key.extend(&[0, 0, 0, 0]);
153 big_key
154 })
155 .collect::<Vec<_>>();
156 Ok(self.store.contains_keys(&big_keys).await?)
157 }
158
159 async fn read_multi_values_bytes(
160 &self,
161 keys: &[Vec<u8>],
162 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
163 let mut big_keys = Vec::new();
164 for key in keys {
165 let mut big_key = key.clone();
166 big_key.extend(&[0, 0, 0, 0]);
167 big_keys.push(big_key);
168 }
169 let values = self.store.read_multi_values_bytes(&big_keys).await?;
170 let mut big_values = Vec::<Option<Vec<u8>>>::new();
171 let mut keys_add = Vec::new();
172 let mut n_blocks = Vec::new();
173 for (key, value) in keys.iter().zip(values) {
174 match value {
175 None => {
176 n_blocks.push(0);
177 big_values.push(None);
178 }
179 Some(value) => {
180 let count = Self::read_count_from_value(&value)?;
181 let big_value = value[4..].to_vec();
182 for i in 1..count {
183 let big_key_segment = Self::get_segment_key(key, i)?;
184 keys_add.push(big_key_segment);
185 }
186 n_blocks.push(count);
187 big_values.push(Some(big_value));
188 }
189 }
190 }
191 if !keys_add.is_empty() {
192 let mut segments = self
193 .store
194 .read_multi_values_bytes(&keys_add)
195 .await?
196 .into_iter();
197 for (big_value, count) in big_values.iter_mut().zip(&n_blocks) {
198 if let Some(value) = big_value {
199 for _ in 1..*count {
200 let segment = segments.next().unwrap().unwrap();
201 value.extend(segment);
202 }
203 }
204 }
205 }
206 Ok(big_values)
207 }
208
209 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
210 let mut keys = Vec::new();
211 for big_key in self.store.find_keys_by_prefix(key_prefix).await? {
212 let len = big_key.len();
213 if Self::read_index_from_key(&big_key)? == 0 {
214 let key = big_key[0..len - 4].to_vec();
215 keys.push(key);
216 }
217 }
218 Ok(keys)
219 }
220
221 async fn find_key_values_by_prefix(
222 &self,
223 key_prefix: &[u8],
224 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
225 let small_key_values = self.store.find_key_values_by_prefix(key_prefix).await?;
226 let mut small_kv_iterator = small_key_values.into_iter();
227 let mut key_values = Vec::new();
228 while let Some((mut big_key, value)) = small_kv_iterator.next() {
229 if Self::read_index_from_key(&big_key)? != 0 {
230 continue; }
232 big_key.truncate(big_key.len() - 4);
233 let key = big_key;
234 let count = Self::read_count_from_value(&value)?;
235 let mut big_value = value[4..].to_vec();
236 for idx in 1..count {
237 let (big_key, value) = small_kv_iterator
238 .next()
239 .ok_or(ValueSplittingError::MissingSegment)?;
240 ensure!(
241 Self::read_index_from_key(&big_key)? == idx
242 && big_key.starts_with(&key)
243 && big_key.len() == key.len() + 4,
244 ValueSplittingError::MissingSegment
245 );
246 big_value.extend(value);
247 }
248 key_values.push((key, big_value));
249 }
250 Ok(key_values)
251 }
252}
253
254impl<K> WritableKeyValueStore for ValueSplittingStore<K>
255where
256 K: WritableKeyValueStore,
257 K::Error: 'static,
258{
259 const MAX_VALUE_SIZE: usize = usize::MAX;
260
261 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
262 let mut batch_new = Batch::new();
263 for operation in batch.operations {
264 match operation {
265 WriteOperation::Delete { key } => {
266 let mut big_key = key.to_vec();
267 big_key.extend(&[0, 0, 0, 0]);
268 batch_new.delete_key(big_key);
269 }
270 WriteOperation::Put { key, mut value } => {
271 let big_key = Self::get_segment_key(&key, 0)?;
272 let mut count: u32 = 1;
273 let value_ext = if value.len() <= K::MAX_VALUE_SIZE - 4 {
274 Self::get_initial_count_first_chunk(count, &value)?
275 } else {
276 let remainder = value.split_off(K::MAX_VALUE_SIZE - 4);
277 for value_chunk in remainder.chunks(K::MAX_VALUE_SIZE) {
278 let big_key_segment = Self::get_segment_key(&key, count)?;
279 batch_new.put_key_value_bytes(big_key_segment, value_chunk.to_vec());
280 count += 1;
281 }
282 Self::get_initial_count_first_chunk(count, &value)?
283 };
284 batch_new.put_key_value_bytes(big_key, value_ext);
285 }
286 WriteOperation::DeletePrefix { key_prefix } => {
287 batch_new.delete_key_prefix(key_prefix);
288 }
289 }
290 }
291 Ok(self.store.write_batch(batch_new).await?)
292 }
293
294 async fn clear_journal(&self) -> Result<(), Self::Error> {
295 Ok(self.store.clear_journal().await?)
296 }
297}
298
299impl<D> KeyValueDatabase for ValueSplittingDatabase<D>
300where
301 D: KeyValueDatabase,
302 D::Error: 'static,
303{
304 type Config = D::Config;
305
306 type Store = ValueSplittingStore<D::Store>;
307
308 fn get_name() -> String {
309 format!("value splitting {}", D::get_name())
310 }
311
312 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
313 let database = D::connect(config, namespace).await?;
314 Ok(Self { database })
315 }
316
317 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
318 let store = self.database.open_shared(root_key)?;
319 Ok(ValueSplittingStore { store })
320 }
321
322 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
323 let store = self.database.open_exclusive(root_key)?;
324 Ok(ValueSplittingStore { store })
325 }
326
327 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
328 Ok(D::list_all(config).await?)
329 }
330
331 async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
332 Ok(self.database.list_root_keys().await?)
333 }
334
335 async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
336 Ok(D::delete_all(config).await?)
337 }
338
339 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
340 Ok(D::exists(config, namespace).await?)
341 }
342
343 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
344 Ok(D::create(config, namespace).await?)
345 }
346
347 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
348 Ok(D::delete(config, namespace).await?)
349 }
350}
351
352#[cfg(with_testing)]
353impl<D> TestKeyValueDatabase for ValueSplittingDatabase<D>
354where
355 D: TestKeyValueDatabase,
356 D::Error: 'static,
357{
358 async fn new_test_config() -> Result<D::Config, Self::Error> {
359 Ok(D::new_test_config().await?)
360 }
361}
362
363#[cfg(with_testing)]
364impl<D: crate::backends::DatabaseBackup> crate::backends::DatabaseBackup
365 for ValueSplittingDatabase<D>
366{
367 fn backup_to(&self, dir: &std::path::Path) -> anyhow::Result<()> {
368 self.database.backup_to(dir)
369 }
370}
371
372impl<D> ValueSplittingStore<D>
373where
374 D: WithError,
375{
376 pub fn new(store: D) -> Self {
378 ValueSplittingStore { store }
379 }
380
381 fn get_segment_key(key: &[u8], index: u32) -> Result<Vec<u8>, ValueSplittingError<D::Error>> {
382 let mut big_key_segment = key.to_vec();
383 let mut bytes = bcs::to_bytes(&index)?;
384 bytes.reverse();
385 big_key_segment.extend(bytes);
386 Ok(big_key_segment)
387 }
388
389 fn get_initial_count_first_chunk(
390 count: u32,
391 first_chunk: &[u8],
392 ) -> Result<Vec<u8>, ValueSplittingError<D::Error>> {
393 let mut bytes = bcs::to_bytes(&count)?;
394 bytes.reverse();
395 let mut value_ext = Vec::new();
396 value_ext.extend(bytes);
397 value_ext.extend(first_chunk);
398 Ok(value_ext)
399 }
400
401 fn read_count_from_value(value: &[u8]) -> Result<u32, ValueSplittingError<D::Error>> {
402 if value.len() < 4 {
403 return Err(ValueSplittingError::NoCountAvailable);
404 }
405 let mut bytes = value[0..4].to_vec();
406 bytes.reverse();
407 Ok(bcs::from_bytes::<u32>(&bytes)?)
408 }
409
410 fn read_index_from_key(key: &[u8]) -> Result<u32, ValueSplittingError<D::Error>> {
411 let len = key.len();
412 if len < 4 {
413 return Err(ValueSplittingError::TooShortKey);
414 }
415 let mut bytes = key[len - 4..len].to_vec();
416 bytes.reverse();
417 Ok(bcs::from_bytes::<u32>(&bytes)?)
418 }
419}
420
421#[derive(Clone)]
423#[cfg(with_testing)]
424pub struct LimitedTestMemoryStore {
425 inner: MemoryStore,
426}
427
428#[cfg(with_testing)]
429impl Default for LimitedTestMemoryStore {
430 fn default() -> Self {
431 Self::new()
432 }
433}
434
435#[cfg(with_testing)]
436impl WithError for LimitedTestMemoryStore {
437 type Error = MemoryStoreError;
438}
439
440#[cfg(with_testing)]
441impl ReadableKeyValueStore for LimitedTestMemoryStore {
442 const MAX_KEY_SIZE: usize = usize::MAX;
443
444 fn max_stream_queries(&self) -> usize {
445 self.inner.max_stream_queries()
446 }
447
448 fn root_key(&self) -> Result<Vec<u8>, MemoryStoreError> {
449 self.inner.root_key()
450 }
451
452 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
453 self.inner.read_value_bytes(key).await
454 }
455
456 async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
457 self.inner.contains_key(key).await
458 }
459
460 async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, MemoryStoreError> {
461 self.inner.contains_keys(keys).await
462 }
463
464 async fn read_multi_values_bytes(
465 &self,
466 keys: &[Vec<u8>],
467 ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
468 self.inner.read_multi_values_bytes(keys).await
469 }
470
471 async fn find_keys_by_prefix(
472 &self,
473 key_prefix: &[u8],
474 ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
475 self.inner.find_keys_by_prefix(key_prefix).await
476 }
477
478 async fn find_key_values_by_prefix(
479 &self,
480 key_prefix: &[u8],
481 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, MemoryStoreError> {
482 self.inner.find_key_values_by_prefix(key_prefix).await
483 }
484}
485
486#[cfg(with_testing)]
487impl WritableKeyValueStore for LimitedTestMemoryStore {
488 const MAX_VALUE_SIZE: usize = 100;
491
492 async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
493 assert!(
494 batch.check_value_size(Self::MAX_VALUE_SIZE),
495 "The batch size is not adequate for this test"
496 );
497 self.inner.write_batch(batch).await
498 }
499
500 async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
501 self.inner.clear_journal().await
502 }
503}
504
505#[cfg(with_testing)]
506impl LimitedTestMemoryStore {
507 pub fn new() -> Self {
509 let inner = MemoryStore::new_for_testing();
510 LimitedTestMemoryStore { inner }
511 }
512}
513
514#[cfg(with_testing)]
516pub fn create_value_splitting_memory_store() -> ValueSplittingStore<LimitedTestMemoryStore> {
517 ValueSplittingStore::new(LimitedTestMemoryStore::new())
518}
519
520#[cfg(test)]
521mod tests {
522 use linera_views::{
523 batch::Batch,
524 store::{ReadableKeyValueStore, WritableKeyValueStore},
525 value_splitting::{LimitedTestMemoryStore, ValueSplittingStore},
526 };
527 use rand::Rng;
528
529 #[tokio::test]
532 async fn test_value_splitting1_testing_leftovers() {
533 let store = LimitedTestMemoryStore::new();
534 const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
535 const _: () = assert!(MAX_LEN > 10);
536 let big_store = ValueSplittingStore::new(store.clone());
537 let key = vec![0, 0];
538 let mut batch = Batch::new();
540 let value = Vec::from([0; MAX_LEN + 1]);
541 batch.put_key_value_bytes(key.clone(), value.clone());
542 big_store.write_batch(batch).await.unwrap();
543 let value_read = big_store.read_value_bytes(&key).await.unwrap();
544 assert_eq!(value_read, Some(value));
545 let mut batch = Batch::new();
547 let value = Vec::from([0, 1]);
548 batch.put_key_value_bytes(key.clone(), value.clone());
549 big_store.write_batch(batch).await.unwrap();
550 let value_read = big_store.read_value_bytes(&key).await.unwrap();
551 assert_eq!(value_read, Some(value));
552 let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
554 assert_eq!(keys, vec![vec![0, 0, 0, 0, 0], vec![0, 0, 0, 0, 1]]);
555 }
556
557 #[tokio::test]
558 async fn test_value_splitting2_testing_splitting() {
559 let store = LimitedTestMemoryStore::new();
560 const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
561 let big_store = ValueSplittingStore::new(store.clone());
562 let key = vec![0, 0];
563 let mut batch = Batch::new();
565 let mut value = Vec::new();
566 let mut rng = crate::random::make_deterministic_rng();
567 for _ in 0..2 * MAX_LEN - 4 {
568 value.push(rng.gen::<u8>());
569 }
570 batch.put_key_value_bytes(key.clone(), value.clone());
571 big_store.write_batch(batch).await.unwrap();
572 let value_read = big_store.read_value_bytes(&key).await.unwrap();
573 assert_eq!(value_read, Some(value.clone()));
574 let mut value_concat = Vec::<u8>::new();
576 for index in 0..2 {
577 let mut segment_key = key.clone();
578 let mut bytes = bcs::to_bytes(&index).unwrap();
579 bytes.reverse();
580 segment_key.extend(bytes);
581 let value_read = store.read_value_bytes(&segment_key).await.unwrap();
582 let Some(value_read) = value_read else {
583 unreachable!(
584 "value_splitting test: segment key not found in underlying store right after a multi-segment write"
585 )
586 };
587 if index == 0 {
588 value_concat.extend(&value_read[4..]);
589 } else {
590 value_concat.extend(&value_read);
591 }
592 }
593 assert_eq!(value, value_concat);
594 }
595
596 #[tokio::test]
597 async fn test_value_splitting3_write_and_delete() {
598 let store = LimitedTestMemoryStore::new();
599 const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
600 let big_store = ValueSplittingStore::new(store.clone());
601 let key = vec![0, 0];
602 let mut batch = Batch::new();
604 let mut value = Vec::new();
605 let mut rng = crate::random::make_deterministic_rng();
606 for _ in 0..3 * MAX_LEN - 4 {
607 value.push(rng.gen::<u8>());
608 }
609 batch.put_key_value_bytes(key.clone(), value.clone());
610 big_store.write_batch(batch).await.unwrap();
611 let mut batch = Batch::new();
613 batch.delete_key(key.clone());
614 big_store.write_batch(batch).await.unwrap();
615 let key_values = big_store.find_key_values_by_prefix(&[0]).await.unwrap();
617 assert_eq!(key_values.len(), 0);
618 let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
620 assert_eq!(keys, vec![vec![0, 0, 0, 0, 1], vec![0, 0, 0, 0, 2]]);
621 }
622}