1use linera_base::ensure;
7use thiserror::Error;
8
9use crate::{
10 batch::{Batch, WriteOperation},
11 store::{
12 AdminKeyValueStore, KeyValueStoreError, ReadableKeyValueStore, WithError,
13 WritableKeyValueStore,
14 },
15};
16#[cfg(with_testing)]
17use crate::{
18 memory::{MemoryStore, MemoryStoreError, TEST_MEMORY_MAX_STREAM_QUERIES},
19 random::generate_test_namespace,
20 store::TestKeyValueStore,
21};
22
23#[derive(Error, Debug)]
25pub enum ValueSplittingError<E> {
26 #[error(transparent)]
28 InnerStoreError(#[from] E),
29
30 #[error("the key is of length less than 4, so we cannot extract the first byte")]
32 TooShortKey,
33
34 #[error("value segment is missing from the database")]
36 MissingSegment,
37
38 #[error("no count of size u32 is available in the value")]
40 NoCountAvailable,
41}
42
43impl<E: KeyValueStoreError> From<bcs::Error> for ValueSplittingError<E> {
44 fn from(error: bcs::Error) -> Self {
45 let error = E::from(error);
46 ValueSplittingError::InnerStoreError(error)
47 }
48}
49
50impl<E: KeyValueStoreError + 'static> KeyValueStoreError for ValueSplittingError<E> {
51 const BACKEND: &'static str = "value splitting";
52}
53
54#[derive(Clone)]
61pub struct ValueSplittingStore<K> {
62 store: K,
64}
65
66impl<K> WithError for ValueSplittingStore<K>
67where
68 K: WithError,
69 K::Error: 'static,
70{
71 type Error = ValueSplittingError<K::Error>;
72}
73
74impl<K> ReadableKeyValueStore for ValueSplittingStore<K>
75where
76 K: ReadableKeyValueStore,
77 K::Error: 'static,
78{
79 const MAX_KEY_SIZE: usize = K::MAX_KEY_SIZE - 4;
80
81 fn max_stream_queries(&self) -> usize {
82 self.store.max_stream_queries()
83 }
84
85 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
86 let mut big_key = key.to_vec();
87 big_key.extend(&[0, 0, 0, 0]);
88 let value = self.store.read_value_bytes(&big_key).await?;
89 let Some(value) = value else {
90 return Ok(None);
91 };
92 let count = Self::read_count_from_value(&value)?;
93 let mut big_value = value[4..].to_vec();
94 if count == 1 {
95 return Ok(Some(big_value));
96 }
97 let mut big_keys = Vec::new();
98 for i in 1..count {
99 let big_key_segment = Self::get_segment_key(key, i)?;
100 big_keys.push(big_key_segment);
101 }
102 let segments = self.store.read_multi_values_bytes(big_keys).await?;
103 for segment in segments {
104 match segment {
105 None => {
106 return Err(ValueSplittingError::MissingSegment);
107 }
108 Some(segment) => {
109 big_value.extend(segment);
110 }
111 }
112 }
113 Ok(Some(big_value))
114 }
115
116 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
117 let mut big_key = key.to_vec();
118 big_key.extend(&[0, 0, 0, 0]);
119 Ok(self.store.contains_key(&big_key).await?)
120 }
121
122 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
123 let big_keys = keys
124 .into_iter()
125 .map(|key| {
126 let mut big_key = key.clone();
127 big_key.extend(&[0, 0, 0, 0]);
128 big_key
129 })
130 .collect::<Vec<_>>();
131 Ok(self.store.contains_keys(big_keys).await?)
132 }
133
134 async fn read_multi_values_bytes(
135 &self,
136 keys: Vec<Vec<u8>>,
137 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
138 let mut big_keys = Vec::new();
139 for key in &keys {
140 let mut big_key = key.clone();
141 big_key.extend(&[0, 0, 0, 0]);
142 big_keys.push(big_key);
143 }
144 let values = self.store.read_multi_values_bytes(big_keys).await?;
145 let mut big_values = Vec::<Option<Vec<u8>>>::new();
146 let mut keys_add = Vec::new();
147 let mut n_blocks = Vec::new();
148 for (key, value) in keys.iter().zip(values) {
149 match value {
150 None => {
151 n_blocks.push(0);
152 big_values.push(None);
153 }
154 Some(value) => {
155 let count = Self::read_count_from_value(&value)?;
156 let big_value = value[4..].to_vec();
157 for i in 1..count {
158 let big_key_segment = Self::get_segment_key(key, i)?;
159 keys_add.push(big_key_segment);
160 }
161 n_blocks.push(count);
162 big_values.push(Some(big_value));
163 }
164 }
165 }
166 if !keys_add.is_empty() {
167 let mut segments = self
168 .store
169 .read_multi_values_bytes(keys_add)
170 .await?
171 .into_iter();
172 for (idx, count) in n_blocks.iter().enumerate() {
173 if count > &1 {
174 let value = big_values.get_mut(idx).unwrap();
175 if let Some(ref mut value) = value {
176 for _ in 1..*count {
177 let segment = segments.next().unwrap().unwrap();
178 value.extend(segment);
179 }
180 }
181 }
182 }
183 }
184 Ok(big_values)
185 }
186
187 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
188 let mut keys = Vec::new();
189 for big_key in self.store.find_keys_by_prefix(key_prefix).await? {
190 let len = big_key.len();
191 if Self::read_index_from_key(&big_key)? == 0 {
192 let key = big_key[0..len - 4].to_vec();
193 keys.push(key);
194 }
195 }
196 Ok(keys)
197 }
198
199 async fn find_key_values_by_prefix(
200 &self,
201 key_prefix: &[u8],
202 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
203 let small_key_values = self.store.find_key_values_by_prefix(key_prefix).await?;
204 let mut small_kv_iterator = small_key_values.into_iter();
205 let mut key_values = Vec::new();
206 while let Some((mut big_key, value)) = small_kv_iterator.next() {
207 if Self::read_index_from_key(&big_key)? != 0 {
208 continue; }
210 big_key.truncate(big_key.len() - 4);
211 let key = big_key;
212 let count = Self::read_count_from_value(&value)?;
213 let mut big_value = value[4..].to_vec();
214 for idx in 1..count {
215 let (big_key, value) = small_kv_iterator
216 .next()
217 .ok_or(ValueSplittingError::MissingSegment)?;
218 ensure!(
219 Self::read_index_from_key(&big_key)? == idx
220 && big_key.starts_with(&key)
221 && big_key.len() == key.len() + 4,
222 ValueSplittingError::MissingSegment
223 );
224 big_value.extend(value);
225 }
226 key_values.push((key, big_value));
227 }
228 Ok(key_values)
229 }
230}
231
232impl<K> WritableKeyValueStore for ValueSplittingStore<K>
233where
234 K: WritableKeyValueStore,
235 K::Error: 'static,
236{
237 const MAX_VALUE_SIZE: usize = usize::MAX;
238
239 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
240 let mut batch_new = Batch::new();
241 for operation in batch.operations {
242 match operation {
243 WriteOperation::Delete { key } => {
244 let mut big_key = key.to_vec();
245 big_key.extend(&[0, 0, 0, 0]);
246 batch_new.delete_key(big_key);
247 }
248 WriteOperation::Put { key, mut value } => {
249 let big_key = Self::get_segment_key(&key, 0)?;
250 let mut count: u32 = 1;
251 let value_ext = if value.len() <= K::MAX_VALUE_SIZE - 4 {
252 Self::get_initial_count_first_chunk(count, &value)?
253 } else {
254 let remainder = value.split_off(K::MAX_VALUE_SIZE - 4);
255 for value_chunk in remainder.chunks(K::MAX_VALUE_SIZE) {
256 let big_key_segment = Self::get_segment_key(&key, count)?;
257 batch_new.put_key_value_bytes(big_key_segment, value_chunk.to_vec());
258 count += 1;
259 }
260 Self::get_initial_count_first_chunk(count, &value)?
261 };
262 batch_new.put_key_value_bytes(big_key, value_ext);
263 }
264 WriteOperation::DeletePrefix { key_prefix } => {
265 batch_new.delete_key_prefix(key_prefix);
266 }
267 }
268 }
269 Ok(self.store.write_batch(batch_new).await?)
270 }
271
272 async fn clear_journal(&self) -> Result<(), Self::Error> {
273 Ok(self.store.clear_journal().await?)
274 }
275}
276
277impl<K> AdminKeyValueStore for ValueSplittingStore<K>
278where
279 K: AdminKeyValueStore,
280 K::Error: 'static,
281{
282 type Config = K::Config;
283
284 fn get_name() -> String {
285 format!("value splitting {}", K::get_name())
286 }
287
288 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
289 let store = K::connect(config, namespace).await?;
290 Ok(Self { store })
291 }
292
293 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, Self::Error> {
294 let store = self.store.open_exclusive(root_key)?;
295 Ok(Self { store })
296 }
297
298 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
299 Ok(K::list_all(config).await?)
300 }
301
302 async fn list_root_keys(
303 config: &Self::Config,
304 namespace: &str,
305 ) -> Result<Vec<Vec<u8>>, Self::Error> {
306 Ok(K::list_root_keys(config, namespace).await?)
307 }
308
309 async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
310 Ok(K::delete_all(config).await?)
311 }
312
313 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
314 Ok(K::exists(config, namespace).await?)
315 }
316
317 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
318 Ok(K::create(config, namespace).await?)
319 }
320
321 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
322 Ok(K::delete(config, namespace).await?)
323 }
324}
325
326#[cfg(with_testing)]
327impl<K> TestKeyValueStore for ValueSplittingStore<K>
328where
329 K: TestKeyValueStore,
330 K::Error: 'static,
331{
332 async fn new_test_config() -> Result<K::Config, Self::Error> {
333 Ok(K::new_test_config().await?)
334 }
335}
336
337impl<K> ValueSplittingStore<K>
338where
339 K: WithError,
340{
341 pub fn new(store: K) -> Self {
343 ValueSplittingStore { store }
344 }
345
346 fn get_segment_key(key: &[u8], index: u32) -> Result<Vec<u8>, ValueSplittingError<K::Error>> {
347 let mut big_key_segment = key.to_vec();
348 let mut bytes = bcs::to_bytes(&index)?;
349 bytes.reverse();
350 big_key_segment.extend(bytes);
351 Ok(big_key_segment)
352 }
353
354 fn get_initial_count_first_chunk(
355 count: u32,
356 first_chunk: &[u8],
357 ) -> Result<Vec<u8>, ValueSplittingError<K::Error>> {
358 let mut bytes = bcs::to_bytes(&count)?;
359 bytes.reverse();
360 let mut value_ext = Vec::new();
361 value_ext.extend(bytes);
362 value_ext.extend(first_chunk);
363 Ok(value_ext)
364 }
365
366 fn read_count_from_value(value: &[u8]) -> Result<u32, ValueSplittingError<K::Error>> {
367 if value.len() < 4 {
368 return Err(ValueSplittingError::NoCountAvailable);
369 }
370 let mut bytes = value[0..4].to_vec();
371 bytes.reverse();
372 Ok(bcs::from_bytes::<u32>(&bytes)?)
373 }
374
375 fn read_index_from_key(key: &[u8]) -> Result<u32, ValueSplittingError<K::Error>> {
376 let len = key.len();
377 if len < 4 {
378 return Err(ValueSplittingError::TooShortKey);
379 }
380 let mut bytes = key[len - 4..len].to_vec();
381 bytes.reverse();
382 Ok(bcs::from_bytes::<u32>(&bytes)?)
383 }
384}
385
386#[derive(Clone)]
388#[cfg(with_testing)]
389pub struct LimitedTestMemoryStore {
390 store: MemoryStore,
391}
392
393#[cfg(with_testing)]
394impl Default for LimitedTestMemoryStore {
395 fn default() -> Self {
396 Self::new()
397 }
398}
399
400#[cfg(with_testing)]
401impl WithError for LimitedTestMemoryStore {
402 type Error = MemoryStoreError;
403}
404
405#[cfg(with_testing)]
406impl ReadableKeyValueStore for LimitedTestMemoryStore {
407 const MAX_KEY_SIZE: usize = usize::MAX;
408
409 fn max_stream_queries(&self) -> usize {
410 TEST_MEMORY_MAX_STREAM_QUERIES
411 }
412
413 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
414 self.store.read_value_bytes(key).await
415 }
416
417 async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
418 self.store.contains_key(key).await
419 }
420
421 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, MemoryStoreError> {
422 self.store.contains_keys(keys).await
423 }
424
425 async fn read_multi_values_bytes(
426 &self,
427 keys: Vec<Vec<u8>>,
428 ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
429 self.store.read_multi_values_bytes(keys).await
430 }
431
432 async fn find_keys_by_prefix(
433 &self,
434 key_prefix: &[u8],
435 ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
436 self.store.find_keys_by_prefix(key_prefix).await
437 }
438
439 async fn find_key_values_by_prefix(
440 &self,
441 key_prefix: &[u8],
442 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, MemoryStoreError> {
443 self.store.find_key_values_by_prefix(key_prefix).await
444 }
445}
446
447#[cfg(with_testing)]
448impl WritableKeyValueStore for LimitedTestMemoryStore {
449 const MAX_VALUE_SIZE: usize = 100;
452
453 async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
454 if !batch.check_value_size(Self::MAX_VALUE_SIZE) {
455 panic!("The batch size is not adequate for this test");
456 }
457 self.store.write_batch(batch).await
458 }
459
460 async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
461 self.store.clear_journal().await
462 }
463}
464
465#[cfg(with_testing)]
466impl LimitedTestMemoryStore {
467 pub fn new() -> Self {
469 let namespace = generate_test_namespace();
470 let store =
471 MemoryStore::new_for_testing(TEST_MEMORY_MAX_STREAM_QUERIES, &namespace).unwrap();
472 LimitedTestMemoryStore { store }
473 }
474}
475
476#[cfg(with_testing)]
478pub fn create_value_splitting_memory_store() -> ValueSplittingStore<LimitedTestMemoryStore> {
479 ValueSplittingStore::new(LimitedTestMemoryStore::new())
480}
481
482#[cfg(test)]
483mod tests {
484 use linera_views::{
485 batch::Batch,
486 store::{ReadableKeyValueStore, WritableKeyValueStore},
487 value_splitting::{LimitedTestMemoryStore, ValueSplittingStore},
488 };
489 use rand::Rng;
490
491 #[tokio::test]
494 #[expect(clippy::assertions_on_constants)]
495 async fn test_value_splitting1_testing_leftovers() {
496 let store = LimitedTestMemoryStore::new();
497 const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
498 assert!(MAX_LEN > 10);
499 let big_store = ValueSplittingStore::new(store.clone());
500 let key = vec![0, 0];
501 let mut batch = Batch::new();
503 let value = Vec::from([0; MAX_LEN + 1]);
504 batch.put_key_value_bytes(key.clone(), value.clone());
505 big_store.write_batch(batch).await.unwrap();
506 let value_read = big_store.read_value_bytes(&key).await.unwrap();
507 assert_eq!(value_read, Some(value));
508 let mut batch = Batch::new();
510 let value = Vec::from([0, 1]);
511 batch.put_key_value_bytes(key.clone(), value.clone());
512 big_store.write_batch(batch).await.unwrap();
513 let value_read = big_store.read_value_bytes(&key).await.unwrap();
514 assert_eq!(value_read, Some(value));
515 let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
517 assert_eq!(keys, vec![vec![0, 0, 0, 0, 0], vec![0, 0, 0, 0, 1]]);
518 }
519
520 #[tokio::test]
521 async fn test_value_splitting2_testing_splitting() {
522 let store = LimitedTestMemoryStore::new();
523 const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
524 let big_store = ValueSplittingStore::new(store.clone());
525 let key = vec![0, 0];
526 let mut batch = Batch::new();
528 let mut value = Vec::new();
529 let mut rng = crate::random::make_deterministic_rng();
530 for _ in 0..2 * MAX_LEN - 4 {
531 value.push(rng.gen::<u8>());
532 }
533 batch.put_key_value_bytes(key.clone(), value.clone());
534 big_store.write_batch(batch).await.unwrap();
535 let value_read = big_store.read_value_bytes(&key).await.unwrap();
536 assert_eq!(value_read, Some(value.clone()));
537 let mut value_concat = Vec::<u8>::new();
539 for index in 0..2 {
540 let mut segment_key = key.clone();
541 let mut bytes = bcs::to_bytes(&index).unwrap();
542 bytes.reverse();
543 segment_key.extend(bytes);
544 let value_read = store.read_value_bytes(&segment_key).await.unwrap();
545 let Some(value_read) = value_read else {
546 unreachable!()
547 };
548 if index == 0 {
549 value_concat.extend(&value_read[4..]);
550 } else {
551 value_concat.extend(&value_read);
552 }
553 }
554 assert_eq!(value, value_concat);
555 }
556
557 #[tokio::test]
558 async fn test_value_splitting3_write_and_delete() {
559 let store = LimitedTestMemoryStore::new();
560 const MAX_LEN: usize = LimitedTestMemoryStore::MAX_VALUE_SIZE;
561 let big_store = ValueSplittingStore::new(store.clone());
562 let key = vec![0, 0];
563 let mut batch = Batch::new();
565 let mut value = Vec::new();
566 let mut rng = crate::random::make_deterministic_rng();
567 for _ in 0..3 * MAX_LEN - 4 {
568 value.push(rng.gen::<u8>());
569 }
570 batch.put_key_value_bytes(key.clone(), value.clone());
571 big_store.write_batch(batch).await.unwrap();
572 let mut batch = Batch::new();
574 batch.delete_key(key.clone());
575 big_store.write_batch(batch).await.unwrap();
576 let key_values = big_store.find_key_values_by_prefix(&[0]).await.unwrap();
578 assert_eq!(key_values.len(), 0);
579 let keys = store.find_keys_by_prefix(&[0]).await.unwrap();
581 assert_eq!(keys, vec![vec![0, 0, 0, 0, 1], vec![0, 0, 0, 0, 2]]);
582 }
583}