1use linera_base::ensure;
7use thiserror::Error;
8
9use crate::{
10 batch::{Batch, WriteOperation},
11 store::{
12 KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
13 WritableKeyValueStore,
14 },
15};
16#[cfg(with_testing)]
17use crate::{
18 memory::{MemoryStore, MemoryStoreError},
19 store::TestKeyValueDatabase,
20};
21
22#[derive(Clone)]
29pub struct ValueSplittingDatabase<D> {
30 database: D,
32}
33
34#[derive(Clone)]
36pub struct ValueSplittingStore<S> {
37 store: S,
39}
40
41#[derive(Error, Debug)]
43pub enum ValueSplittingError<E> {
44 #[error(transparent)]
46 InnerStoreError(#[from] E),
47
48 #[error("the key is of length less than 4, so we cannot extract the first byte")]
50 TooShortKey,
51
52 #[error("value segment is missing from the database")]
54 MissingSegment,
55
56 #[error("no count of size u32 is available in the value")]
58 NoCountAvailable,
59}
60
61impl<E: KeyValueStoreError> From<bcs::Error> for ValueSplittingError<E> {
62 fn from(error: bcs::Error) -> Self {
63 let error = E::from(error);
64 ValueSplittingError::InnerStoreError(error)
65 }
66}
67
68impl<E: KeyValueStoreError + 'static> KeyValueStoreError for ValueSplittingError<E> {
69 const BACKEND: &'static str = "value splitting";
70
71 fn must_reload_view(&self) -> bool {
72 match self {
73 ValueSplittingError::InnerStoreError(e) => e.must_reload_view(),
74 _ => false,
75 }
76 }
77}
78
79impl<S> WithError for ValueSplittingDatabase<S>
80where
81 S: WithError,
82 S::Error: 'static,
83{
84 type Error = ValueSplittingError<S::Error>;
85}
86
87impl<D> WithError for ValueSplittingStore<D>
88where
89 D: WithError,
90 D::Error: 'static,
91{
92 type Error = ValueSplittingError<D::Error>;
93}
94
95impl<S> ReadableKeyValueStore for ValueSplittingStore<S>
96where
97 S: ReadableKeyValueStore,
98 S::Error: 'static,
99{
100 const MAX_KEY_SIZE: usize = S::MAX_KEY_SIZE - 4;
101
102 fn max_stream_queries(&self) -> usize {
103 self.store.max_stream_queries()
104 }
105
106 fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
107 Ok(self.store.root_key()?)
108 }
109
110 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
111 let mut big_key = key.to_vec();
112 big_key.extend(&[0, 0, 0, 0]);
113 let value = self.store.read_value_bytes(&big_key).await?;
114 let Some(value) = value else {
115 return Ok(None);
116 };
117 let count = Self::read_count_from_value(&value)?;
118 let mut big_value = value[4..].to_vec();
119 if count == 1 {
120 return Ok(Some(big_value));
121 }
122 let mut big_keys = Vec::new();
123 for i in 1..count {
124 let big_key_segment = Self::get_segment_key(key, i)?;
125 big_keys.push(big_key_segment);
126 }
127 let segments = self.store.read_multi_values_bytes(&big_keys).await?;
128 for segment in segments {
129 match segment {
130 None => {
131 return Err(ValueSplittingError::MissingSegment);
132 }
133 Some(segment) => {
134 big_value.extend(segment);
135 }
136 }
137 }
138 Ok(Some(big_value))
139 }
140
141 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
142 let mut big_key = key.to_vec();
143 big_key.extend(&[0, 0, 0, 0]);
144 Ok(self.store.contains_key(&big_key).await?)
145 }
146
147 async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error> {
148 let big_keys = keys
149 .iter()
150 .map(|key| {
151 let mut big_key = key.clone();
152 big_key.extend(&[0, 0, 0, 0]);
153 big_key
154 })
155 .collect::<Vec<_>>();
156 Ok(self.store.contains_keys(&big_keys).await?)
157 }
158
159 async fn read_multi_values_bytes(
160 &self,
161 keys: &[Vec<u8>],
162 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
163 let mut big_keys = Vec::new();
164 for key in keys {
165 let mut big_key = key.clone();
166 big_key.extend(&[0, 0, 0, 0]);
167 big_keys.push(big_key);
168 }
169 let values = self.store.read_multi_values_bytes(&big_keys).await?;
170 let mut big_values = Vec::<Option<Vec<u8>>>::new();
171 let mut keys_add = Vec::new();
172 let mut n_blocks = Vec::new();
173 for (key, value) in keys.iter().zip(values) {
174 match value {
175 None => {
176 n_blocks.push(0);
177 big_values.push(None);
178 }
179 Some(value) => {
180 let count = Self::read_count_from_value(&value)?;
181 let big_value = value[4..].to_vec();
182 for i in 1..count {
183 let big_key_segment = Self::get_segment_key(key, i)?;
184 keys_add.push(big_key_segment);
185 }
186 n_blocks.push(count);
187 big_values.push(Some(big_value));
188 }
189 }
190 }
191 if !keys_add.is_empty() {
192 let mut segments = self
193 .store
194 .read_multi_values_bytes(&keys_add)
195 .await?
196 .into_iter();
197 for (idx, count) in n_blocks.iter().enumerate() {
198 if count > &1 {
199 let value = big_values.get_mut(idx).unwrap();
200 if let Some(ref mut value) = value {
201 for _ in 1..*count {
202 let segment = segments.next().unwrap().unwrap();
203 value.extend(segment);
204 }
205 }
206 }
207 }
208 }
209 Ok(big_values)
210 }
211
212 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
213 let mut keys = Vec::new();
214 for big_key in self.store.find_keys_by_prefix(key_prefix).await? {
215 let len = big_key.len();
216 if Self::read_index_from_key(&big_key)? == 0 {
217 let key = big_key[0..len - 4].to_vec();
218 keys.push(key);
219 }
220 }
221 Ok(keys)
222 }
223
224 async fn find_key_values_by_prefix(
225 &self,
226 key_prefix: &[u8],
227 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
228 let small_key_values = self.store.find_key_values_by_prefix(key_prefix).await?;
229 let mut small_kv_iterator = small_key_values.into_iter();
230 let mut key_values = Vec::new();
231 while let Some((mut big_key, value)) = small_kv_iterator.next() {
232 if Self::read_index_from_key(&big_key)? != 0 {
233 continue; }
235 big_key.truncate(big_key.len() - 4);
236 let key = big_key;
237 let count = Self::read_count_from_value(&value)?;
238 let mut big_value = value[4..].to_vec();
239 for idx in 1..count {
240 let (big_key, value) = small_kv_iterator
241 .next()
242 .ok_or(ValueSplittingError::MissingSegment)?;
243 ensure!(
244 Self::read_index_from_key(&big_key)? == idx
245 && big_key.starts_with(&key)
246 && big_key.len() == key.len() + 4,
247 ValueSplittingError::MissingSegment
248 );
249 big_value.extend(value);
250 }
251 key_values.push((key, big_value));
252 }
253 Ok(key_values)
254 }
255}
256
257impl<K> WritableKeyValueStore for ValueSplittingStore<K>
258where
259 K: WritableKeyValueStore,
260 K::Error: 'static,
261{
262 const MAX_VALUE_SIZE: usize = usize::MAX;
263
264 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
265 let mut batch_new = Batch::new();
266 for operation in batch.operations {
267 match operation {
268 WriteOperation::Delete { key } => {
269 let mut big_key = key.to_vec();
270 big_key.extend(&[0, 0, 0, 0]);
271 batch_new.delete_key(big_key);
272 }
273 WriteOperation::Put { key, mut value } => {
274 let big_key = Self::get_segment_key(&key, 0)?;
275 let mut count: u32 = 1;
276 let value_ext = if value.len() <= K::MAX_VALUE_SIZE - 4 {
277 Self::get_initial_count_first_chunk(count, &value)?
278 } else {
279 let remainder = value.split_off(K::MAX_VALUE_SIZE - 4);
280 for value_chunk in remainder.chunks(K::MAX_VALUE_SIZE) {
281 let big_key_segment = Self::get_segment_key(&key, count)?;
282 batch_new.put_key_value_bytes(big_key_segment, value_chunk.to_vec());
283 count += 1;
284 }
285 Self::get_initial_count_first_chunk(count, &value)?
286 };
287 batch_new.put_key_value_bytes(big_key, value_ext);
288 }
289 WriteOperation::DeletePrefix { key_prefix } => {
290 batch_new.delete_key_prefix(key_prefix);
291 }
292 }
293 }
294 Ok(self.store.write_batch(batch_new).await?)
295 }
296
297 async fn clear_journal(&self) -> Result<(), Self::Error> {
298 Ok(self.store.clear_journal().await?)
299 }
300}
301
302impl<D> KeyValueDatabase for ValueSplittingDatabase<D>
303where
304 D: KeyValueDatabase,
305 D::Error: 'static,
306{
307 type Config = D::Config;
308
309 type Store = ValueSplittingStore<D::Store>;
310
311 fn get_name() -> String {
312 format!("value splitting {}", D::get_name())
313 }
314
315 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
316 let database = D::connect(config, namespace).await?;
317 Ok(Self { database })
318 }
319
320 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
321 let store = self.database.open_shared(root_key)?;
322 Ok(ValueSplittingStore { store })
323 }
324
325 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
326 let store = self.database.open_exclusive(root_key)?;
327 Ok(ValueSplittingStore { store })
328 }
329
330 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
331 Ok(D::list_all(config).await?)
332 }
333
334 async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
335 Ok(self.database.list_root_keys().await?)
336 }
337
338 async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
339 Ok(D::delete_all(config).await?)
340 }
341
342 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
343 Ok(D::exists(config, namespace).await?)
344 }
345
346 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
347 Ok(D::create(config, namespace).await?)
348 }
349
350 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
351 Ok(D::delete(config, namespace).await?)
352 }
353}
354
355#[cfg(with_testing)]
356impl<D> TestKeyValueDatabase for ValueSplittingDatabase<D>
357where
358 D: TestKeyValueDatabase,
359 D::Error: 'static,
360{
361 async fn new_test_config() -> Result<D::Config, Self::Error> {
362 Ok(D::new_test_config().await?)
363 }
364}
365
366impl<D> ValueSplittingStore<D>
367where
368 D: WithError,
369{
370 pub fn new(store: D) -> Self {
372 ValueSplittingStore { store }
373 }
374
375 fn get_segment_key(key: &[u8], index: u32) -> Result<Vec<u8>, ValueSplittingError<D::Error>> {
376 let mut big_key_segment = key.to_vec();
377 let mut bytes = bcs::to_bytes(&index)?;
378 bytes.reverse();
379 big_key_segment.extend(bytes);
380 Ok(big_key_segment)
381 }
382
383 fn get_initial_count_first_chunk(
384 count: u32,
385 first_chunk: &[u8],
386 ) -> Result<Vec<u8>, ValueSplittingError<D::Error>> {
387 let mut bytes = bcs::to_bytes(&count)?;
388 bytes.reverse();
389 let mut value_ext = Vec::new();
390 value_ext.extend(bytes);
391 value_ext.extend(first_chunk);
392 Ok(value_ext)
393 }
394
395 fn read_count_from_value(value: &[u8]) -> Result<u32, ValueSplittingError<D::Error>> {
396 if value.len() < 4 {
397 return Err(ValueSplittingError::NoCountAvailable);
398 }
399 let mut bytes = value[0..4].to_vec();
400 bytes.reverse();
401 Ok(bcs::from_bytes::<u32>(&bytes)?)
402 }
403
404 fn read_index_from_key(key: &[u8]) -> Result<u32, ValueSplittingError<D::Error>> {
405 let len = key.len();
406 if len < 4 {
407 return Err(ValueSplittingError::TooShortKey);
408 }
409 let mut bytes = key[len - 4..len].to_vec();
410 bytes.reverse();
411 Ok(bcs::from_bytes::<u32>(&bytes)?)
412 }
413}
414
415#[derive(Clone)]
417#[cfg(with_testing)]
418pub struct LimitedTestMemoryStore {
419 inner: MemoryStore,
420}
421
422#[cfg(with_testing)]
423impl Default for LimitedTestMemoryStore {
424 fn default() -> Self {
425 Self::new()
426 }
427}
428
429#[cfg(with_testing)]
430impl WithError for LimitedTestMemoryStore {
431 type Error = MemoryStoreError;
432}
433
434#[cfg(with_testing)]
435impl ReadableKeyValueStore for LimitedTestMemoryStore {
436 const MAX_KEY_SIZE: usize = usize::MAX;
437
438 fn max_stream_queries(&self) -> usize {
439 self.inner.max_stream_queries()
440 }
441
442 fn root_key(&self) -> Result<Vec<u8>, MemoryStoreError> {
443 self.inner.root_key()
444 }
445
446 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
447 self.inner.read_value_bytes(key).await
448 }
449
450 async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
451 self.inner.contains_key(key).await
452 }
453
454 async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, MemoryStoreError> {
455 self.inner.contains_keys(keys).await
456 }
457
458 async fn read_multi_values_bytes(
459 &self,
460 keys: &[Vec<u8>],
461 ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
462 self.inner.read_multi_values_bytes(keys).await
463 }
464
465 async fn find_keys_by_prefix(
466 &self,
467 key_prefix: &[u8],
468 ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
469 self.inner.find_keys_by_prefix(key_prefix).await
470 }
471
472 async fn find_key_values_by_prefix(
473 &self,
474 key_prefix: &[u8],
475 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, MemoryStoreError> {
476 self.inner.find_key_values_by_prefix(key_prefix).await
477 }
478}
479
480#[cfg(with_testing)]
481impl WritableKeyValueStore for LimitedTestMemoryStore {
482 const MAX_VALUE_SIZE: usize = 100;
485
486 async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
487 assert!(
488 batch.check_value_size(Self::MAX_VALUE_SIZE),
489 "The batch size is not adequate for this test"
490 );
491 self.inner.write_batch(batch).await
492 }
493
494 async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
495 self.inner.clear_journal().await
496 }
497}
498
499#[cfg(with_testing)]
500impl LimitedTestMemoryStore {
501 pub fn new() -> Self {
503 let inner = MemoryStore::new_for_testing();
504 LimitedTestMemoryStore { inner }
505 }
506}
507
508#[cfg(with_testing)]
510pub fn create_value_splitting_memory_store() -> ValueSplittingStore<LimitedTestMemoryStore> {
511 ValueSplittingStore::new(LimitedTestMemoryStore::new())
512}
513
514#[cfg(test)]
515mod tests {
516 use linera_views::{
517 batch::Batch,
518 store::{ReadableKeyValueStore, WritableKeyValueStore},
519 value_splitting::{LimitedTestMemoryStore, ValueSplittingStore},
520 };
521 use rand::Rng;
522
523 #[tokio::test]
526 async fn test_value_splitting1_testing_leftovers() {
527 let store = LimitedTestMemoryStore::new();
528 const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
529 const _: () = assert!(MAX_LEN > 10);
530 let big_store = ValueSplittingStore::new(store.clone());
531 let key = vec![0, 0];
532 let mut batch = Batch::new();
534 let value = Vec::from([0; MAX_LEN + 1]);
535 batch.put_key_value_bytes(key.clone(), value.clone());
536 big_store.write_batch(batch).await.unwrap();
537 let value_read = big_store.read_value_bytes(&key).await.unwrap();
538 assert_eq!(value_read, Some(value));
539 let mut batch = Batch::new();
541 let value = Vec::from([0, 1]);
542 batch.put_key_value_bytes(key.clone(), value.clone());
543 big_store.write_batch(batch).await.unwrap();
544 let value_read = big_store.read_value_bytes(&key).await.unwrap();
545 assert_eq!(value_read, Some(value));
546 let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
548 assert_eq!(keys, vec![vec![0, 0, 0, 0, 0], vec![0, 0, 0, 0, 1]]);
549 }
550
551 #[tokio::test]
552 async fn test_value_splitting2_testing_splitting() {
553 let store = LimitedTestMemoryStore::new();
554 const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
555 let big_store = ValueSplittingStore::new(store.clone());
556 let key = vec![0, 0];
557 let mut batch = Batch::new();
559 let mut value = Vec::new();
560 let mut rng = crate::random::make_deterministic_rng();
561 for _ in 0..2 * MAX_LEN - 4 {
562 value.push(rng.gen::<u8>());
563 }
564 batch.put_key_value_bytes(key.clone(), value.clone());
565 big_store.write_batch(batch).await.unwrap();
566 let value_read = big_store.read_value_bytes(&key).await.unwrap();
567 assert_eq!(value_read, Some(value.clone()));
568 let mut value_concat = Vec::<u8>::new();
570 for index in 0..2 {
571 let mut segment_key = key.clone();
572 let mut bytes = bcs::to_bytes(&index).unwrap();
573 bytes.reverse();
574 segment_key.extend(bytes);
575 let value_read = store.read_value_bytes(&segment_key).await.unwrap();
576 let Some(value_read) = value_read else {
577 unreachable!(
578 "value_splitting test: segment key not found in underlying store right after a multi-segment write"
579 )
580 };
581 if index == 0 {
582 value_concat.extend(&value_read[4..]);
583 } else {
584 value_concat.extend(&value_read);
585 }
586 }
587 assert_eq!(value, value_concat);
588 }
589
590 #[tokio::test]
591 async fn test_value_splitting3_write_and_delete() {
592 let store = LimitedTestMemoryStore::new();
593 const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
594 let big_store = ValueSplittingStore::new(store.clone());
595 let key = vec![0, 0];
596 let mut batch = Batch::new();
598 let mut value = Vec::new();
599 let mut rng = crate::random::make_deterministic_rng();
600 for _ in 0..3 * MAX_LEN - 4 {
601 value.push(rng.gen::<u8>());
602 }
603 batch.put_key_value_bytes(key.clone(), value.clone());
604 big_store.write_batch(batch).await.unwrap();
605 let mut batch = Batch::new();
607 batch.delete_key(key.clone());
608 big_store.write_batch(batch).await.unwrap();
609 let key_values = big_store.find_key_values_by_prefix(&[0]).await.unwrap();
611 assert_eq!(key_values.len(), 0);
612 let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
614 assert_eq!(keys, vec![vec![0, 0, 0, 0, 1], vec![0, 0, 0, 0, 2]]);
615 }
616}