1use linera_base::ensure;
7use thiserror::Error;
8
9use crate::{
10 batch::{Batch, WriteOperation},
11 store::{
12 AdminKeyValueStore, KeyIterable, KeyValueIterable, KeyValueStoreError,
13 ReadableKeyValueStore, WithError, WritableKeyValueStore,
14 },
15};
16#[cfg(with_testing)]
17use crate::{
18 memory::{MemoryStore, MemoryStoreError, TEST_MEMORY_MAX_STREAM_QUERIES},
19 random::generate_test_namespace,
20 store::TestKeyValueStore,
21};
22
23#[derive(Error, Debug)]
25pub enum ValueSplittingError<E> {
26 #[error(transparent)]
28 InnerStoreError(#[from] E),
29
30 #[error("the key is of length less than 4, so we cannot extract the first byte")]
32 TooShortKey,
33
34 #[error("value segment is missing from the database")]
36 MissingSegment,
37
38 #[error("no count of size u32 is available in the value")]
40 NoCountAvailable,
41}
42
43impl<E: KeyValueStoreError> From<bcs::Error> for ValueSplittingError<E> {
44 fn from(error: bcs::Error) -> Self {
45 let error = E::from(error);
46 ValueSplittingError::InnerStoreError(error)
47 }
48}
49
50impl<E: KeyValueStoreError + 'static> KeyValueStoreError for ValueSplittingError<E> {
51 const BACKEND: &'static str = "value splitting";
52}
53
54#[derive(Clone)]
61pub struct ValueSplittingStore<K> {
62 store: K,
64}
65
66impl<K> WithError for ValueSplittingStore<K>
67where
68 K: WithError,
69 K::Error: 'static,
70{
71 type Error = ValueSplittingError<K::Error>;
72}
73
74impl<K> ReadableKeyValueStore for ValueSplittingStore<K>
75where
76 K: ReadableKeyValueStore,
77 K::Error: 'static,
78{
79 const MAX_KEY_SIZE: usize = K::MAX_KEY_SIZE - 4;
80 type Keys = Vec<Vec<u8>>;
81 type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
82
83 fn max_stream_queries(&self) -> usize {
84 self.store.max_stream_queries()
85 }
86
87 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
88 let mut big_key = key.to_vec();
89 big_key.extend(&[0, 0, 0, 0]);
90 let value = self.store.read_value_bytes(&big_key).await?;
91 let Some(value) = value else {
92 return Ok(None);
93 };
94 let count = Self::read_count_from_value(&value)?;
95 let mut big_value = value[4..].to_vec();
96 if count == 1 {
97 return Ok(Some(big_value));
98 }
99 let mut big_keys = Vec::new();
100 for i in 1..count {
101 let big_key_segment = Self::get_segment_key(key, i)?;
102 big_keys.push(big_key_segment);
103 }
104 let segments = self.store.read_multi_values_bytes(big_keys).await?;
105 for segment in segments {
106 match segment {
107 None => {
108 return Err(ValueSplittingError::MissingSegment);
109 }
110 Some(segment) => {
111 big_value.extend(segment);
112 }
113 }
114 }
115 Ok(Some(big_value))
116 }
117
118 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
119 let mut big_key = key.to_vec();
120 big_key.extend(&[0, 0, 0, 0]);
121 Ok(self.store.contains_key(&big_key).await?)
122 }
123
124 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
125 let big_keys = keys
126 .into_iter()
127 .map(|key| {
128 let mut big_key = key.clone();
129 big_key.extend(&[0, 0, 0, 0]);
130 big_key
131 })
132 .collect::<Vec<_>>();
133 Ok(self.store.contains_keys(big_keys).await?)
134 }
135
136 async fn read_multi_values_bytes(
137 &self,
138 keys: Vec<Vec<u8>>,
139 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
140 let mut big_keys = Vec::new();
141 for key in &keys {
142 let mut big_key = key.clone();
143 big_key.extend(&[0, 0, 0, 0]);
144 big_keys.push(big_key);
145 }
146 let values = self.store.read_multi_values_bytes(big_keys).await?;
147 let mut big_values = Vec::<Option<Vec<u8>>>::new();
148 let mut keys_add = Vec::new();
149 let mut n_blocks = Vec::new();
150 for (key, value) in keys.iter().zip(values) {
151 match value {
152 None => {
153 n_blocks.push(0);
154 big_values.push(None);
155 }
156 Some(value) => {
157 let count = Self::read_count_from_value(&value)?;
158 let big_value = value[4..].to_vec();
159 for i in 1..count {
160 let big_key_segment = Self::get_segment_key(key, i)?;
161 keys_add.push(big_key_segment);
162 }
163 n_blocks.push(count);
164 big_values.push(Some(big_value));
165 }
166 }
167 }
168 if !keys_add.is_empty() {
169 let mut segments = self
170 .store
171 .read_multi_values_bytes(keys_add)
172 .await?
173 .into_iter();
174 for (idx, count) in n_blocks.iter().enumerate() {
175 if count > &1 {
176 let value = big_values.get_mut(idx).unwrap();
177 if let Some(ref mut value) = value {
178 for _ in 1..*count {
179 let segment = segments.next().unwrap().unwrap();
180 value.extend(segment);
181 }
182 }
183 }
184 }
185 }
186 Ok(big_values)
187 }
188
189 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Self::Keys, Self::Error> {
190 let mut keys = Vec::new();
191 for big_key in self.store.find_keys_by_prefix(key_prefix).await?.iterator() {
192 let big_key = big_key?;
193 let len = big_key.len();
194 if Self::read_index_from_key(big_key)? == 0 {
195 let key = big_key[0..len - 4].to_vec();
196 keys.push(key);
197 }
198 }
199 Ok(keys)
200 }
201
202 async fn find_key_values_by_prefix(
203 &self,
204 key_prefix: &[u8],
205 ) -> Result<Self::KeyValues, Self::Error> {
206 let small_key_values = self.store.find_key_values_by_prefix(key_prefix).await?;
207 let mut small_kv_iterator = small_key_values.into_iterator_owned();
208 let mut key_values = Vec::new();
209 while let Some(result) = small_kv_iterator.next() {
210 let (mut big_key, value) = result?;
211 if Self::read_index_from_key(&big_key)? != 0 {
212 continue; }
214 big_key.truncate(big_key.len() - 4);
215 let key = big_key;
216 let count = Self::read_count_from_value(&value)?;
217 let mut big_value = value[4..].to_vec();
218 for idx in 1..count {
219 let (big_key, value) = small_kv_iterator
220 .next()
221 .ok_or(ValueSplittingError::MissingSegment)??;
222 ensure!(
223 Self::read_index_from_key(&big_key)? == idx
224 && big_key.starts_with(&key)
225 && big_key.len() == key.len() + 4,
226 ValueSplittingError::MissingSegment
227 );
228 big_value.extend(value);
229 }
230 key_values.push((key, big_value));
231 }
232 Ok(key_values)
233 }
234}
235
236impl<K> WritableKeyValueStore for ValueSplittingStore<K>
237where
238 K: WritableKeyValueStore,
239 K::Error: 'static,
240{
241 const MAX_VALUE_SIZE: usize = usize::MAX;
242
243 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
244 let mut batch_new = Batch::new();
245 for operation in batch.operations {
246 match operation {
247 WriteOperation::Delete { key } => {
248 let mut big_key = key.to_vec();
249 big_key.extend(&[0, 0, 0, 0]);
250 batch_new.delete_key(big_key);
251 }
252 WriteOperation::Put { key, mut value } => {
253 let big_key = Self::get_segment_key(&key, 0)?;
254 let mut count: u32 = 1;
255 let value_ext = if value.len() <= K::MAX_VALUE_SIZE - 4 {
256 Self::get_initial_count_first_chunk(count, &value)?
257 } else {
258 let remainder = value.split_off(K::MAX_VALUE_SIZE - 4);
259 for value_chunk in remainder.chunks(K::MAX_VALUE_SIZE) {
260 let big_key_segment = Self::get_segment_key(&key, count)?;
261 batch_new.put_key_value_bytes(big_key_segment, value_chunk.to_vec());
262 count += 1;
263 }
264 Self::get_initial_count_first_chunk(count, &value)?
265 };
266 batch_new.put_key_value_bytes(big_key, value_ext);
267 }
268 WriteOperation::DeletePrefix { key_prefix } => {
269 batch_new.delete_key_prefix(key_prefix);
270 }
271 }
272 }
273 Ok(self.store.write_batch(batch_new).await?)
274 }
275
276 async fn clear_journal(&self) -> Result<(), Self::Error> {
277 Ok(self.store.clear_journal().await?)
278 }
279}
280
281impl<K> AdminKeyValueStore for ValueSplittingStore<K>
282where
283 K: AdminKeyValueStore,
284 K::Error: 'static,
285{
286 type Config = K::Config;
287
288 fn get_name() -> String {
289 format!("value splitting {}", K::get_name())
290 }
291
292 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
293 let store = K::connect(config, namespace).await?;
294 Ok(Self { store })
295 }
296
297 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, Self::Error> {
298 let store = self.store.open_exclusive(root_key)?;
299 Ok(Self { store })
300 }
301
302 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
303 Ok(K::list_all(config).await?)
304 }
305
306 async fn list_root_keys(
307 config: &Self::Config,
308 namespace: &str,
309 ) -> Result<Vec<Vec<u8>>, Self::Error> {
310 Ok(K::list_root_keys(config, namespace).await?)
311 }
312
313 async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
314 Ok(K::delete_all(config).await?)
315 }
316
317 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
318 Ok(K::exists(config, namespace).await?)
319 }
320
321 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
322 Ok(K::create(config, namespace).await?)
323 }
324
325 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
326 Ok(K::delete(config, namespace).await?)
327 }
328}
329
330#[cfg(with_testing)]
331impl<K> TestKeyValueStore for ValueSplittingStore<K>
332where
333 K: TestKeyValueStore,
334 K::Error: 'static,
335{
336 async fn new_test_config() -> Result<K::Config, Self::Error> {
337 Ok(K::new_test_config().await?)
338 }
339}
340
341impl<K> ValueSplittingStore<K>
342where
343 K: WithError,
344{
345 pub fn new(store: K) -> Self {
347 ValueSplittingStore { store }
348 }
349
350 fn get_segment_key(key: &[u8], index: u32) -> Result<Vec<u8>, ValueSplittingError<K::Error>> {
351 let mut big_key_segment = key.to_vec();
352 let mut bytes = bcs::to_bytes(&index)?;
353 bytes.reverse();
354 big_key_segment.extend(bytes);
355 Ok(big_key_segment)
356 }
357
358 fn get_initial_count_first_chunk(
359 count: u32,
360 first_chunk: &[u8],
361 ) -> Result<Vec<u8>, ValueSplittingError<K::Error>> {
362 let mut bytes = bcs::to_bytes(&count)?;
363 bytes.reverse();
364 let mut value_ext = Vec::new();
365 value_ext.extend(bytes);
366 value_ext.extend(first_chunk);
367 Ok(value_ext)
368 }
369
370 fn read_count_from_value(value: &[u8]) -> Result<u32, ValueSplittingError<K::Error>> {
371 if value.len() < 4 {
372 return Err(ValueSplittingError::NoCountAvailable);
373 }
374 let mut bytes = value[0..4].to_vec();
375 bytes.reverse();
376 Ok(bcs::from_bytes::<u32>(&bytes)?)
377 }
378
379 fn read_index_from_key(key: &[u8]) -> Result<u32, ValueSplittingError<K::Error>> {
380 let len = key.len();
381 if len < 4 {
382 return Err(ValueSplittingError::TooShortKey);
383 }
384 let mut bytes = key[len - 4..len].to_vec();
385 bytes.reverse();
386 Ok(bcs::from_bytes::<u32>(&bytes)?)
387 }
388}
389
390#[derive(Clone)]
392#[cfg(with_testing)]
393pub struct LimitedTestMemoryStore {
394 store: MemoryStore,
395}
396
397#[cfg(with_testing)]
398impl Default for LimitedTestMemoryStore {
399 fn default() -> Self {
400 Self::new()
401 }
402}
403
404#[cfg(with_testing)]
405impl WithError for LimitedTestMemoryStore {
406 type Error = MemoryStoreError;
407}
408
409#[cfg(with_testing)]
410impl ReadableKeyValueStore for LimitedTestMemoryStore {
411 const MAX_KEY_SIZE: usize = usize::MAX;
412 type Keys = Vec<Vec<u8>>;
413 type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
414
415 fn max_stream_queries(&self) -> usize {
416 TEST_MEMORY_MAX_STREAM_QUERIES
417 }
418
419 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
420 self.store.read_value_bytes(key).await
421 }
422
423 async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
424 self.store.contains_key(key).await
425 }
426
427 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, MemoryStoreError> {
428 self.store.contains_keys(keys).await
429 }
430
431 async fn read_multi_values_bytes(
432 &self,
433 keys: Vec<Vec<u8>>,
434 ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
435 self.store.read_multi_values_bytes(keys).await
436 }
437
438 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Self::Keys, MemoryStoreError> {
439 self.store.find_keys_by_prefix(key_prefix).await
440 }
441
442 async fn find_key_values_by_prefix(
443 &self,
444 key_prefix: &[u8],
445 ) -> Result<Self::KeyValues, MemoryStoreError> {
446 self.store.find_key_values_by_prefix(key_prefix).await
447 }
448}
449
450#[cfg(with_testing)]
451impl WritableKeyValueStore for LimitedTestMemoryStore {
452 const MAX_VALUE_SIZE: usize = 100;
455
456 async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
457 if !batch.check_value_size(Self::MAX_VALUE_SIZE) {
458 panic!("The batch size is not adequate for this test");
459 }
460 self.store.write_batch(batch).await
461 }
462
463 async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
464 self.store.clear_journal().await
465 }
466}
467
468#[cfg(with_testing)]
469impl LimitedTestMemoryStore {
470 pub fn new() -> Self {
472 let namespace = generate_test_namespace();
473 let store =
474 MemoryStore::new_for_testing(TEST_MEMORY_MAX_STREAM_QUERIES, &namespace).unwrap();
475 LimitedTestMemoryStore { store }
476 }
477}
478
479#[cfg(with_testing)]
481pub fn create_value_splitting_memory_store() -> ValueSplittingStore<LimitedTestMemoryStore> {
482 ValueSplittingStore::new(LimitedTestMemoryStore::new())
483}
484
485#[cfg(test)]
486mod tests {
487 use linera_views::{
488 batch::Batch,
489 store::{ReadableKeyValueStore, WritableKeyValueStore},
490 value_splitting::{LimitedTestMemoryStore, ValueSplittingStore},
491 };
492 use rand::Rng;
493
494 #[tokio::test]
497 #[expect(clippy::assertions_on_constants)]
498 async fn test_value_splitting1_testing_leftovers() {
499 let store = LimitedTestMemoryStore::new();
500 const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
501 assert!(MAX_LEN > 10);
502 let big_store = ValueSplittingStore::new(store.clone());
503 let key = vec![0, 0];
504 let mut batch = Batch::new();
506 let value = Vec::from([0; MAX_LEN + 1]);
507 batch.put_key_value_bytes(key.clone(), value.clone());
508 big_store.write_batch(batch).await.unwrap();
509 let value_read = big_store.read_value_bytes(&key).await.unwrap();
510 assert_eq!(value_read, Some(value));
511 let mut batch = Batch::new();
513 let value = Vec::from([0, 1]);
514 batch.put_key_value_bytes(key.clone(), value.clone());
515 big_store.write_batch(batch).await.unwrap();
516 let value_read = big_store.read_value_bytes(&key).await.unwrap();
517 assert_eq!(value_read, Some(value));
518 let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
520 assert_eq!(keys, vec![vec![0, 0, 0, 0, 0], vec![0, 0, 0, 0, 1]]);
521 }
522
523 #[tokio::test]
524 async fn test_value_splitting2_testing_splitting() {
525 let store = LimitedTestMemoryStore::new();
526 const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
527 let big_store = ValueSplittingStore::new(store.clone());
528 let key = vec![0, 0];
529 let mut batch = Batch::new();
531 let mut value = Vec::new();
532 let mut rng = crate::random::make_deterministic_rng();
533 for _ in 0..2 * MAX_LEN - 4 {
534 value.push(rng.gen::<u8>());
535 }
536 batch.put_key_value_bytes(key.clone(), value.clone());
537 big_store.write_batch(batch).await.unwrap();
538 let value_read = big_store.read_value_bytes(&key).await.unwrap();
539 assert_eq!(value_read, Some(value.clone()));
540 let mut value_concat = Vec::<u8>::new();
542 for index in 0..2 {
543 let mut segment_key = key.clone();
544 let mut bytes = bcs::to_bytes(&index).unwrap();
545 bytes.reverse();
546 segment_key.extend(bytes);
547 let value_read = store.read_value_bytes(&segment_key).await.unwrap();
548 let Some(value_read) = value_read else {
549 unreachable!()
550 };
551 if index == 0 {
552 value_concat.extend(&value_read[4..]);
553 } else {
554 value_concat.extend(&value_read);
555 }
556 }
557 assert_eq!(value, value_concat);
558 }
559
560 #[tokio::test]
561 async fn test_value_splitting3_write_and_delete() {
562 let store = LimitedTestMemoryStore::new();
563 const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
564 let big_store = ValueSplittingStore::new(store.clone());
565 let key = vec![0, 0];
566 let mut batch = Batch::new();
568 let mut value = Vec::new();
569 let mut rng = crate::random::make_deterministic_rng();
570 for _ in 0..3 * MAX_LEN - 4 {
571 value.push(rng.gen::<u8>());
572 }
573 batch.put_key_value_bytes(key.clone(), value.clone());
574 big_store.write_batch(batch).await.unwrap();
575 let mut batch = Batch::new();
577 batch.delete_key(key.clone());
578 big_store.write_batch(batch).await.unwrap();
579 let key_values = big_store.find_key_values_by_prefix(&[0]).await.unwrap();
581 assert_eq!(key_values.len(), 0);
582 let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
584 assert_eq!(keys, vec![vec![0, 0, 0, 0, 1], vec![0, 0, 0, 0, 2]]);
585 }
586}