linera_views/test_utils/
mod.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4pub mod test_views;
5
6/// Functions for computing the performance of stores.
7#[cfg(not(target_arch = "wasm32"))]
8pub mod performance;
9
10use std::collections::{BTreeMap, BTreeSet, HashSet};
11
12use rand::{seq::SliceRandom, Rng};
13
14use crate::{
15    batch::{
16        Batch, WriteOperation,
17        WriteOperation::{Delete, Put},
18    },
19    random::{generate_test_namespace, make_deterministic_rng, make_nondeterministic_rng},
20    store::{
21        KeyIterable, KeyValueIterable, KeyValueStore, RestrictedKeyValueStore, TestKeyValueStore,
22    },
23};
24
25/// The size of the small value used for tests.
26pub const SMALL_BYTE_UPPER_LIMIT: u8 = 3;
27
28/// Returns a random key prefix used for tests
29pub fn get_random_key_prefix() -> Vec<u8> {
30    let mut key_prefix = vec![0];
31    let value: usize = make_nondeterministic_rng().gen();
32    bcs::serialize_into(&mut key_prefix, &value).unwrap();
33    key_prefix
34}
35
36fn get_random_byte_vector_with_byte_upper_limit<R: Rng>(
37    rng: &mut R,
38    key_prefix: &[u8],
39    n: usize,
40    byte_upper_limit: u8,
41) -> Vec<u8> {
42    let mut v = key_prefix.to_vec();
43    for _ in 0..n {
44        let val = rng.gen_range(0..=byte_upper_limit);
45        v.push(val);
46    }
47    v
48}
49
50fn get_small_key_space<R: Rng>(rng: &mut R, key_prefix: &[u8], n: usize) -> Vec<u8> {
51    get_random_byte_vector_with_byte_upper_limit(rng, key_prefix, n, SMALL_BYTE_UPPER_LIMIT)
52}
53
54/// Takes a random number generator, a `key_prefix` and extends it by n random bytes.
55pub fn get_random_byte_vector<R: Rng>(rng: &mut R, key_prefix: &[u8], n: usize) -> Vec<u8> {
56    get_random_byte_vector_with_byte_upper_limit(rng, key_prefix, n, u8::MAX)
57}
58
59/// Builds a random k element subset of n
60pub fn get_random_kset<R: Rng>(rng: &mut R, n: usize, k: usize) -> Vec<usize> {
61    let mut values = Vec::new();
62    for u in 0..n {
63        values.push(u);
64    }
65    values.shuffle(rng);
66    values[..k].to_vec()
67}
68
69/// Takes a random number generator, a `key_prefix` and generates
70/// pairs `(key, value)` with key obtained by appending `len_key` random bytes to `key_prefix`
71/// and value obtained by creating a vector with `len_value` random bytes.
72/// We return n such `(key, value)` pairs which are all distinct.
73pub fn get_random_key_values_prefix<R: Rng>(
74    rng: &mut R,
75    key_prefix: Vec<u8>,
76    len_key: usize,
77    len_value: usize,
78    num_entries: usize,
79    key_byte_upper_limit: u8,
80) -> Vec<(Vec<u8>, Vec<u8>)> {
81    let mut key_value_pairs = Vec::new();
82    let mut unique_keys = HashSet::new();
83    for _ in 0..num_entries {
84        let key = loop {
85            let key = get_random_byte_vector_with_byte_upper_limit(
86                rng,
87                &key_prefix,
88                len_key,
89                key_byte_upper_limit,
90            );
91            if !unique_keys.contains(&key) {
92                unique_keys.insert(key.clone());
93                break key;
94            }
95        };
96        let value = get_random_byte_vector(rng, &Vec::new(), len_value);
97        key_value_pairs.push((key, value));
98    }
99
100    key_value_pairs
101}
102
103/// Takes a random number generator `rng`, a number n and returns n random `(key, value)`
104/// which are all distinct with key and value being of length 8.
105pub fn get_random_key_values<R: Rng>(rng: &mut R, num_entries: usize) -> Vec<(Vec<u8>, Vec<u8>)> {
106    get_random_key_values_prefix(rng, Vec::new(), 8, 8, num_entries, u8::MAX)
107}
108
109type VectorPutDelete = (Vec<(Vec<u8>, Vec<u8>)>, usize);
110
111/// A bunch of puts and some deletes.
112pub fn get_random_key_value_operations<R: Rng>(
113    rng: &mut R,
114    num_entries: usize,
115    k: usize,
116) -> VectorPutDelete {
117    (get_random_key_values(rng, num_entries), k)
118}
119
120/// A random reordering of the puts and deletes.
121/// For something like `MapView` it should get us the same result whatever way we are calling.
122pub fn span_random_reordering_put_delete<R: Rng>(
123    rng: &mut R,
124    info_op: VectorPutDelete,
125) -> Vec<WriteOperation> {
126    let n = info_op.0.len();
127    let k = info_op.1;
128    let mut indices = Vec::new();
129    for i in 0..n {
130        indices.push(i);
131    }
132    indices.shuffle(rng);
133    let mut indices_rev = vec![0; n];
134    for i in 0..n {
135        indices_rev[indices[i]] = i;
136    }
137    let mut pos_remove_vector = vec![Vec::new(); n];
138    for (i, pos) in indices_rev.iter().enumerate().take(k) {
139        let idx = rng.gen_range(*pos..n);
140        pos_remove_vector[idx].push(i);
141    }
142    let mut operations = Vec::new();
143    for i in 0..n {
144        let pos = indices[i];
145        let pair = info_op.0[pos].clone();
146        operations.push(Put {
147            key: pair.0,
148            value: pair.1,
149        });
150        for pos_remove in pos_remove_vector[i].clone() {
151            let key = info_op.0[pos_remove].0.clone();
152            operations.push(Delete { key });
153        }
154    }
155    operations
156}
157
158/// This test starts with a collection of key/values being inserted into the code
159/// which is then followed by a number of reading tests. The functionalities being
160/// tested are all the reading functionalities:
161/// * `read_value_bytes`
162/// * `read_multi_values_bytes`
163/// * `find_keys_by_prefix` / `find_key_values_by_prefix`
164/// * The ordering of keys returned by `find_keys_by_prefix` and `find_key_values_by_prefix`
165pub async fn run_reads<S: RestrictedKeyValueStore>(store: S, key_values: Vec<(Vec<u8>, Vec<u8>)>) {
166    // We need a nontrivial key_prefix because dynamo requires a non-trivial prefix
167    let mut batch = Batch::new();
168    let mut keys = Vec::new();
169    let mut set_keys = HashSet::new();
170    for (key, value) in &key_values {
171        keys.push(&key[..]);
172        set_keys.insert(&key[..]);
173        batch.put_key_value_bytes(key.clone(), value.clone());
174    }
175    store.write_batch(batch).await.unwrap();
176    for key_prefix in keys
177        .iter()
178        .flat_map(|key| (0..key.len()).map(|u| &key[..=u]))
179    {
180        // Getting the find_keys_by_prefix / find_key_values_by_prefix
181        let len_prefix = key_prefix.len();
182        let keys_by_prefix = store.find_keys_by_prefix(key_prefix).await.unwrap();
183        let keys_request = keys_by_prefix
184            .iterator()
185            .map(Result::unwrap)
186            .collect::<Vec<_>>();
187        let mut set_key_value1 = HashSet::new();
188        let mut keys_request_deriv = Vec::new();
189        let key_values_by_prefix = store.find_key_values_by_prefix(key_prefix).await.unwrap();
190        for (key, value) in key_values_by_prefix.iterator().map(Result::unwrap) {
191            set_key_value1.insert((key, value));
192            keys_request_deriv.push(key);
193        }
194        // Check find_keys / find_key_values
195        assert_eq!(keys_request, keys_request_deriv);
196        // Check key ordering
197        for i in 1..keys_request.len() {
198            assert!(keys_request[i - 1] < keys_request[i]);
199        }
200        // Check the obtained values
201        let mut set_key_value2 = HashSet::new();
202        for (key, value) in &key_values {
203            if key.starts_with(key_prefix) {
204                set_key_value2.insert((&key[len_prefix..], &value[..]));
205            }
206        }
207        assert_eq!(set_key_value1, set_key_value2);
208    }
209    // Now checking the read_multi_values_bytes
210    let mut rng = make_deterministic_rng();
211    for _ in 0..3 {
212        let mut keys = Vec::new();
213        let mut values = Vec::new();
214        for (key, value) in &key_values {
215            if rng.gen() {
216                // Put a key that is already present
217                keys.push(key.clone());
218                values.push(Some(value.clone()));
219            } else {
220                // Put a missing key
221                let len = key.len();
222                let pos = rng.gen_range(0..len);
223                let byte = *key.get(pos).unwrap();
224                let new_byte: u8 = if byte < 255 { byte + 1 } else { byte - 1 };
225                let mut new_key = key.clone();
226                *new_key.get_mut(pos).unwrap() = new_byte;
227                if !set_keys.contains(&*new_key) {
228                    keys.push(new_key);
229                    values.push(None);
230                }
231            }
232        }
233        let mut test_exists = Vec::new();
234        let mut values_single_read = Vec::new();
235        for key in &keys {
236            test_exists.push(store.contains_key(key).await.unwrap());
237            values_single_read.push(store.read_value_bytes(key).await.unwrap());
238        }
239        let test_exists_direct = store.contains_keys(keys.clone()).await.unwrap();
240        let values_read = store.read_multi_values_bytes(keys).await.unwrap();
241        assert_eq!(values, values_read);
242        assert_eq!(values, values_single_read);
243        let values_read_stat = values_read.iter().map(|x| x.is_some()).collect::<Vec<_>>();
244        assert_eq!(values_read_stat, test_exists);
245        assert_eq!(values_read_stat, test_exists_direct);
246    }
247}
248
249/// Generates a list of random key-values with no duplicates
250pub fn get_random_key_values_with_sizes(
251    num_entries: usize,
252    len_key: usize,
253    len_value: usize,
254) -> Vec<(Vec<u8>, Vec<u8>)> {
255    let key_prefix = vec![0];
256    let mut rng = make_deterministic_rng();
257    get_random_key_values_prefix(
258        &mut rng,
259        key_prefix,
260        len_key,
261        len_value,
262        num_entries,
263        u8::MAX,
264    )
265}
266
267fn get_random_key_values_with_small_keys(
268    num_entries: usize,
269    len_key: usize,
270    len_value: usize,
271) -> Vec<(Vec<u8>, Vec<u8>)> {
272    let key_prefix = vec![0];
273    let mut rng = make_deterministic_rng();
274    get_random_key_values_prefix(
275        &mut rng,
276        key_prefix,
277        len_key,
278        len_value,
279        num_entries,
280        SMALL_BYTE_UPPER_LIMIT,
281    )
282}
283
284/// Adds a prefix to a list of key-values
285pub fn add_prefix(prefix: &[u8], key_values: Vec<(Vec<u8>, Vec<u8>)>) -> Vec<(Vec<u8>, Vec<u8>)> {
286    key_values
287        .into_iter()
288        .map(|(key, value)| {
289            let mut big_key = prefix.to_vec();
290            big_key.extend(key);
291            (big_key, value)
292        })
293        .collect()
294}
295
296/// We build a number of scenarios for testing the reads.
297pub fn get_random_test_scenarios() -> Vec<Vec<(Vec<u8>, Vec<u8>)>> {
298    vec![
299        get_random_key_values_with_sizes(7, 8, 3),
300        get_random_key_values_with_sizes(150, 8, 3),
301        get_random_key_values_with_sizes(30, 8, 10),
302        get_random_key_values_with_small_keys(30, 4, 10),
303        get_random_key_values_with_small_keys(30, 4, 100),
304    ]
305}
306
307fn generate_random_batch<R: Rng>(rng: &mut R, key_prefix: &[u8], batch_size: usize) -> Batch {
308    let mut batch = Batch::new();
309    // Fully random batch
310    for _ in 0..batch_size {
311        let choice = rng.gen_range(0..8);
312        // Inserting a key
313        if choice < 6 {
314            // Insert
315            let key = get_small_key_space(rng, key_prefix, 4);
316            let len_value = rng.gen_range(0..10); // Could need to be split
317            let value = get_random_byte_vector(rng, &[], len_value);
318            batch.put_key_value_bytes(key.clone(), value.clone());
319        }
320        if choice == 6 {
321            // key might be missing, no matter, it has to work
322            let key = get_small_key_space(rng, key_prefix, 4);
323            batch.delete_key(key);
324        }
325        if choice == 7 {
326            let len = rng.gen_range(1..4); // We want a non-trivial range
327            let delete_key_prefix = get_small_key_space(rng, key_prefix, len);
328            batch.delete_key_prefix(delete_key_prefix.clone());
329        }
330    }
331    batch
332}
333
334fn get_key(key_prefix: &[u8], key_suffix: Vec<u8>) -> Vec<u8> {
335    let mut key = key_prefix.to_vec();
336    key.extend(key_suffix);
337    key
338}
339
340fn generate_specific_batch(key_prefix: &[u8], option: usize) -> Batch {
341    let mut batch = Batch::new();
342    if option == 0 {
343        let key = get_key(key_prefix, vec![34]);
344        batch.put_key_value_bytes(key.clone(), vec![]);
345        batch.delete_key(key);
346    }
347    if option == 1 {
348        let key1 = get_key(key_prefix, vec![12, 34]);
349        let key2 = get_key(key_prefix, vec![12, 33]);
350        let key3 = get_key(key_prefix, vec![13]);
351        batch.put_key_value_bytes(key1.clone(), vec![]);
352        batch.put_key_value_bytes(key2, vec![]);
353        batch.put_key_value_bytes(key3, vec![]);
354        batch.delete_key(key1);
355        let key_prefix = get_key(key_prefix, vec![12]);
356        batch.delete_key_prefix(key_prefix);
357    }
358    batch
359}
360
361fn update_state_from_batch(kv_state: &mut BTreeMap<Vec<u8>, Vec<u8>>, batch: &Batch) {
362    for operation in &batch.operations {
363        match operation {
364            WriteOperation::Put { key, value } => {
365                kv_state.insert(key.to_vec(), value.to_vec());
366            }
367            WriteOperation::Delete { key } => {
368                kv_state.remove(key);
369            }
370            WriteOperation::DeletePrefix { key_prefix } => {
371                kv_state.retain(|key, _| !key.starts_with(key_prefix));
372            }
373        }
374    }
375}
376
377fn realize_batch(batch: &Batch) -> BTreeMap<Vec<u8>, Vec<u8>> {
378    let mut kv_state = BTreeMap::new();
379    update_state_from_batch(&mut kv_state, batch);
380    kv_state
381}
382
383async fn read_keys_prefix<C: RestrictedKeyValueStore>(
384    key_value_store: &C,
385    key_prefix: &[u8],
386) -> BTreeSet<Vec<u8>> {
387    let mut keys = BTreeSet::new();
388    for key in key_value_store
389        .find_keys_by_prefix(key_prefix)
390        .await
391        .unwrap()
392        .iterator()
393    {
394        let key_suffix = key.unwrap();
395        let mut key = key_prefix.to_vec();
396        key.extend(key_suffix);
397        keys.insert(key);
398    }
399    keys
400}
401
402async fn read_key_values_prefix<C: RestrictedKeyValueStore>(
403    key_value_store: &C,
404    key_prefix: &[u8],
405) -> BTreeMap<Vec<u8>, Vec<u8>> {
406    let mut key_values = BTreeMap::new();
407    for key_value in key_value_store
408        .find_key_values_by_prefix(key_prefix)
409        .await
410        .unwrap()
411        .iterator()
412    {
413        let (key_suffix, value) = key_value.unwrap();
414        let mut key = key_prefix.to_vec();
415        key.extend(key_suffix);
416        key_values.insert(key, value.to_vec());
417    }
418    key_values
419}
420
421/// Writes and then reads data under a prefix, and verifies the result.
422pub async fn run_test_batch_from_blank<C: RestrictedKeyValueStore>(
423    key_value_store: &C,
424    key_prefix: Vec<u8>,
425    batch: Batch,
426) {
427    let kv_state = realize_batch(&batch);
428    key_value_store.write_batch(batch).await.unwrap();
429    // Checking the consistency
430    let key_values = read_key_values_prefix(key_value_store, &key_prefix).await;
431    assert_eq!(key_values, kv_state);
432}
433
434/// Run many operations on batches always starting from a blank state.
435pub async fn run_writes_from_blank<C: RestrictedKeyValueStore>(key_value_store: &C) {
436    let mut rng = make_deterministic_rng();
437    let n_oper = 10;
438    let batch_size = 500;
439    // key space has size 4^4 = 256 so we necessarily encounter collisions
440    // because the number of generated keys is about batch_size * n_oper = 800 > 256.
441    for _ in 0..n_oper {
442        let key_prefix = get_random_key_prefix();
443        let batch = generate_random_batch(&mut rng, &key_prefix, batch_size);
444        run_test_batch_from_blank(key_value_store, key_prefix, batch).await;
445    }
446    for option in 0..2 {
447        let key_prefix = get_random_key_prefix();
448        let batch = generate_specific_batch(&key_prefix, option);
449        run_test_batch_from_blank(key_value_store, key_prefix, batch).await;
450    }
451}
452
453/// Reading many keys at a time could trigger an error. This needs to be tested.
454pub async fn big_read_multi_values<C: KeyValueStore>(
455    config: C::Config,
456    value_size: usize,
457    n_entries: usize,
458) {
459    let mut rng = make_deterministic_rng();
460    let namespace = generate_test_namespace();
461    let store = C::recreate_and_connect(&config, &namespace).await.unwrap();
462    let store = store.open_exclusive(&[]).unwrap();
463    let key_prefix = vec![42, 54];
464    let mut batch = Batch::new();
465    let mut keys = Vec::new();
466    let mut values = Vec::new();
467    for i in 0..n_entries {
468        let mut key = key_prefix.clone();
469        bcs::serialize_into(&mut key, &i).unwrap();
470        let value = get_random_byte_vector(&mut rng, &[], value_size);
471        batch.put_key_value_bytes(key.clone(), value.clone());
472        keys.push(key);
473        values.push(Some(value));
474    }
475    store.write_batch(batch).await.unwrap();
476    // We reconnect so that the read is not using the cache.
477    let store = C::connect(&config, &namespace).await.unwrap();
478    let store = store.open_exclusive(&[]).unwrap();
479    let values_read = store.read_multi_values_bytes(keys).await.unwrap();
480    assert_eq!(values, values_read);
481}
482
483/// That test is especially challenging for ScyllaDB.
484/// In its default settings, Scylla has a limitation to 10000 tombstones.
485/// A tombstone is an indication that the data has been deleted. That
486/// is thus a trie data structure for checking whether a requested key
487/// is deleted or not.
488///
489/// In this test we insert 200000 keys into the database.
490/// Then we select half of them at random and delete them. By the random
491/// selection, Scylla is forced to introduce around 100000 tombstones
492/// which triggers the crash with the default settings.
493pub async fn tombstone_triggering_test<C: RestrictedKeyValueStore>(key_value_store: C) {
494    use std::time::Instant;
495    let t1 = Instant::now();
496    let mut rng = make_deterministic_rng();
497    let value_size = 100;
498    let n_entry = 200000;
499    // Putting the keys
500    let mut batch_insert = Batch::new();
501    let key_prefix = vec![0];
502    let mut batch_delete = Batch::new();
503    let mut remaining_key_values = BTreeMap::new();
504    let mut remaining_keys = BTreeSet::new();
505    for i in 0..n_entry {
506        let mut key = key_prefix.clone();
507        bcs::serialize_into(&mut key, &i).unwrap();
508        let value = get_random_byte_vector(&mut rng, &[], value_size);
509        batch_insert.put_key_value_bytes(key.clone(), value.clone());
510        let to_delete = rng.gen::<bool>();
511        if to_delete {
512            batch_delete.delete_key(key);
513        } else {
514            remaining_keys.insert(key.clone());
515            remaining_key_values.insert(key, value);
516        }
517    }
518    tracing::info!("Set up in {} ms", t1.elapsed().as_millis());
519
520    let t1 = Instant::now();
521    run_test_batch_from_blank(&key_value_store, key_prefix.clone(), batch_insert).await;
522    tracing::info!("run_test_batch in {} ms", t1.elapsed().as_millis());
523
524    // Deleting them all
525    let t1 = Instant::now();
526    key_value_store.write_batch(batch_delete).await.unwrap();
527    tracing::info!("batch_delete in {} ms", t1.elapsed().as_millis());
528
529    for iter in 0..5 {
530        // Reading everything and seeing that it is now cleaned.
531        let t1 = Instant::now();
532        let key_values = read_key_values_prefix(&key_value_store, &key_prefix).await;
533        assert_eq!(key_values, remaining_key_values);
534        tracing::info!(
535            "iter={} read_key_values_prefix in {} ms",
536            iter,
537            t1.elapsed().as_millis()
538        );
539
540        let t1 = Instant::now();
541        let keys = read_keys_prefix(&key_value_store, &key_prefix).await;
542        assert_eq!(keys, remaining_keys);
543        tracing::info!(
544            "iter={} read_keys_prefix after {} ms",
545            iter,
546            t1.elapsed().as_millis()
547        );
548    }
549}
550
551/// DynamoDB has limits at 1 MB (for pagination), 4 MB (for write)
552/// Let us go right past them at 20 MB of data with writing and then
553/// reading it. And 20 MB is not huge by any mean. All `KeyValueStore`
554/// must handle that.
555///
556/// The size of the value vary as each size has its own issues.
557pub async fn run_big_write_read<C: RestrictedKeyValueStore>(
558    key_value_store: C,
559    target_size: usize,
560    value_sizes: Vec<usize>,
561) {
562    let mut rng = make_deterministic_rng();
563    for (pos, value_size) in value_sizes.into_iter().enumerate() {
564        let n_entry: usize = target_size / value_size;
565        let mut batch = Batch::new();
566        let key_prefix = vec![0, pos as u8];
567        for i in 0..n_entry {
568            let mut key = key_prefix.clone();
569            bcs::serialize_into(&mut key, &i).unwrap();
570            let value = get_random_byte_vector(&mut rng, &[], value_size);
571            batch.put_key_value_bytes(key, value);
572        }
573        run_test_batch_from_blank(&key_value_store, key_prefix, batch).await;
574    }
575}
576
577type StateBatch = (Vec<(Vec<u8>, Vec<u8>)>, Batch);
578
579async fn run_test_batch_from_state<C: RestrictedKeyValueStore>(
580    key_value_store: &C,
581    key_prefix: Vec<u8>,
582    state_and_batch: StateBatch,
583) {
584    let (key_values, batch) = state_and_batch;
585    let mut batch_insert = Batch::new();
586    let mut kv_state = BTreeMap::new();
587    for (key, value) in key_values {
588        kv_state.insert(key.clone(), value.clone());
589        batch_insert.put_key_value_bytes(key, value);
590    }
591    key_value_store.write_batch(batch_insert).await.unwrap();
592    let key_values = read_key_values_prefix(key_value_store, &key_prefix).await;
593    assert_eq!(key_values, kv_state);
594
595    update_state_from_batch(&mut kv_state, &batch);
596    key_value_store.write_batch(batch).await.unwrap();
597    let key_values = read_key_values_prefix(key_value_store, &key_prefix).await;
598    assert_eq!(key_values, kv_state);
599}
600
601fn generate_specific_state_batch(key_prefix: &[u8], option: usize) -> StateBatch {
602    let mut key_values = Vec::new();
603    let mut batch = Batch::new();
604    if option == 0 {
605        // A DeletePrefix followed by an insertion that matches the DeletePrefix
606        let key1 = get_key(key_prefix, vec![1, 3]);
607        let key2 = get_key(key_prefix, vec![1, 4]);
608        let key3 = get_key(key_prefix, vec![1, 4, 5]);
609        key_values.push((key1.clone(), vec![34]));
610        key_values.push((key2.clone(), vec![45]));
611        batch.delete_key_prefix(key2);
612        batch.put_key_value_bytes(key3, vec![23]);
613    }
614    if option == 1 {
615        // Just a DeletePrefix
616        let key1 = get_key(key_prefix, vec![1, 3]);
617        let key2 = get_key(key_prefix, vec![1, 4]);
618        key_values.push((key1.clone(), vec![34]));
619        key_values.push((key2.clone(), vec![45]));
620        batch.delete_key_prefix(key2);
621    }
622    if option == 2 {
623        // A Put followed by a DeletePrefix that matches the Put
624        let key1 = get_key(key_prefix, vec![1, 3]);
625        let key2 = get_key(key_prefix, vec![1, 4]);
626        let key3 = get_key(key_prefix, vec![1, 4, 5]);
627        key_values.push((key1.clone(), vec![34]));
628        key_values.push((key2.clone(), vec![45]));
629        batch.put_key_value_bytes(key3, vec![23]);
630        batch.delete_key_prefix(key2);
631    }
632    if option == 3 {
633        // A Put followed by a Delete on the same value
634        let key1 = get_key(key_prefix, vec![1, 3]);
635        let key2 = get_key(key_prefix, vec![1, 4]);
636        let key3 = get_key(key_prefix, vec![1, 4, 5]);
637        key_values.push((key1.clone(), vec![34]));
638        key_values.push((key2.clone(), vec![45]));
639        batch.put_key_value_bytes(key3.clone(), vec![23]);
640        batch.delete_key(key3);
641    }
642    if option == 4 {
643        // A Delete Key followed by a Put on the same key
644        let key1 = get_key(key_prefix, vec![1, 3]);
645        let key2 = get_key(key_prefix, vec![1, 4]);
646        let key3 = get_key(key_prefix, vec![1, 4, 5]);
647        key_values.push((key1.clone(), vec![34]));
648        key_values.push((key2.clone(), vec![45]));
649        batch.delete_key(key3.clone());
650        batch.put_key_value_bytes(key3, vec![23]);
651    }
652    if option == 5 {
653        // A Delete Key followed by a Put on the same key
654        let key1 = get_key(key_prefix, vec![1, 3]);
655        let key2 = get_key(key_prefix, vec![1, 4]);
656        let key3 = get_key(key_prefix, vec![1, 4, 5]);
657        let key4 = get_key(key_prefix, vec![1, 5]);
658        key_values.push((key1.clone(), vec![34]));
659        key_values.push((key2.clone(), vec![45]));
660        batch.delete_key(key3.clone());
661        batch.put_key_value_bytes(key4, vec![23]);
662    }
663    if option == 6 {
664        let key1 = get_key(key_prefix, vec![0]);
665        let key2 = get_key(key_prefix, vec![]);
666        key_values.push((key1, vec![33]));
667        batch.delete_key_prefix(key2);
668    }
669    if option == 7 {
670        let key1 = get_key(key_prefix, vec![255, 255]);
671        let key2 = get_key(key_prefix, vec![255, 255, 1]);
672        key_values.push((key2.clone(), vec![]));
673        batch.delete_key_prefix(key1);
674        batch.put_key_value_bytes(key2, vec![]);
675    }
676    (key_values, batch)
677}
678
679/// Run some deterministic and random batches operation and check their
680/// correctness
681pub async fn run_writes_from_state<C: RestrictedKeyValueStore>(key_value_store: &C) {
682    for option in 0..8 {
683        let key_prefix = if option >= 6 {
684            vec![255, 255, 255]
685        } else {
686            get_random_key_prefix()
687        };
688        let state_batch = generate_specific_state_batch(&key_prefix, option);
689        run_test_batch_from_state(key_value_store, key_prefix, state_batch).await;
690    }
691}
692
693async fn namespaces_with_prefix<S: KeyValueStore>(
694    config: &S::Config,
695    prefix: &str,
696) -> BTreeSet<String> {
697    let namespaces = S::list_all(config).await.expect("namespaces");
698    namespaces
699        .into_iter()
700        .filter(|x| x.starts_with(prefix))
701        .collect::<BTreeSet<_>>()
702}
703
704/// Exercises the namespace functionalities of the `AdminKeyValueStore`.
705/// This tests everything except the `delete_all` which would
706/// interact with other namespaces.
707pub async fn namespace_admin_test<S: TestKeyValueStore>() {
708    let config = S::new_test_config().await.expect("config");
709    {
710        let namespace = generate_test_namespace();
711        S::create(&config, &namespace)
712            .await
713            .expect("first creation of a namespace");
714        // Creating a namespace two times should returns an error
715        assert!(S::create(&config, &namespace).await.is_err());
716    }
717    let prefix = generate_test_namespace();
718    let namespaces = namespaces_with_prefix::<S>(&config, &prefix).await;
719    assert_eq!(namespaces.len(), 0);
720    let mut rng = make_deterministic_rng();
721    let size = 9;
722    // Creating the initial list of namespaces
723    let mut working_namespaces = BTreeSet::new();
724    for i in 0..size {
725        let namespace = format!("{}_{}", prefix, i);
726        assert!(!S::exists(&config, &namespace).await.expect("test"));
727        working_namespaces.insert(namespace);
728    }
729    // Creating the namespaces
730    for namespace in &working_namespaces {
731        S::create(&config, namespace)
732            .await
733            .expect("creation of a namespace");
734        assert!(S::exists(&config, namespace).await.expect("test"));
735    }
736    // Connecting to all of them at once
737    {
738        let mut connections = Vec::new();
739        for namespace in &working_namespaces {
740            let connection = S::connect(&config, namespace)
741                .await
742                .expect("a connection to the namespace");
743            connections.push(connection);
744        }
745    }
746    // Listing all of them
747    let namespaces = namespaces_with_prefix::<S>(&config, &prefix).await;
748    assert_eq!(namespaces, working_namespaces);
749    // Selecting at random some for deletion
750    let mut kept_namespaces = BTreeSet::new();
751    for namespace in working_namespaces {
752        let delete = rng.gen::<bool>();
753        if delete {
754            S::delete(&config, &namespace)
755                .await
756                .expect("A successful deletion");
757            assert!(!S::exists(&config, &namespace).await.expect("test"));
758        } else {
759            kept_namespaces.insert(namespace);
760        }
761    }
762    for namespace in &kept_namespaces {
763        assert!(S::exists(&config, namespace).await.expect("test"));
764    }
765    let namespaces = namespaces_with_prefix::<S>(&config, &prefix).await;
766    assert_eq!(namespaces, kept_namespaces);
767    for namespace in kept_namespaces {
768        S::delete(&config, &namespace)
769            .await
770            .expect("A successful deletion");
771    }
772}
773
774/// Tests listing the root keys.
775pub async fn root_key_admin_test<S: TestKeyValueStore>() {
776    let config = S::new_test_config().await.expect("config");
777    let namespace = generate_test_namespace();
778    let mut root_keys = Vec::new();
779    let mut keys = BTreeSet::new();
780    S::create(&config, &namespace).await.expect("creation");
781    let prefix = vec![0];
782    {
783        let size = 3;
784        let mut rng = make_deterministic_rng();
785        let store = S::connect(&config, &namespace).await.expect("store");
786        root_keys.push(vec![]);
787        let mut batch = Batch::new();
788        for _ in 0..2 {
789            let key = get_random_byte_vector(&mut rng, &prefix, 4);
790            batch.put_key_value_bytes(key.clone(), vec![]);
791            keys.insert((vec![], key));
792        }
793        store.write_batch(batch).await.expect("write batch");
794
795        for _ in 0..20 {
796            let root_key = get_random_byte_vector(&mut rng, &[], 4);
797            let cloned_store = store.open_exclusive(&root_key).expect("cloned store");
798            root_keys.push(root_key.clone());
799            let size_select = rng.gen_range(0..size);
800            let mut batch = Batch::new();
801            for _ in 0..size_select {
802                let key = get_random_byte_vector(&mut rng, &prefix, 4);
803                batch.put_key_value_bytes(key.clone(), vec![]);
804                keys.insert((root_key.clone(), key));
805            }
806            cloned_store.write_batch(batch).await.expect("write batch");
807        }
808    }
809
810    let read_root_keys = S::list_root_keys(&config, &namespace)
811        .await
812        .expect("read_root_keys");
813    let set_root_keys = root_keys.iter().cloned().collect::<HashSet<_>>();
814    for read_root_key in &read_root_keys {
815        assert!(set_root_keys.contains(read_root_key));
816    }
817
818    let mut read_keys = BTreeSet::new();
819    for root_key in read_root_keys {
820        let store = S::connect(&config, &namespace)
821            .await
822            .expect("store")
823            .open_exclusive(&root_key)
824            .expect("open_exclusive");
825        let keys = store.find_keys_by_prefix(&prefix).await.expect("keys");
826        for key in keys.iterator() {
827            let key = key.expect("key");
828            let mut big_key = prefix.clone();
829            let key = key.to_vec();
830            big_key.extend(key);
831            read_keys.insert((root_key.clone(), big_key));
832        }
833    }
834    assert_eq!(keys, read_keys);
835}
836
837/// A store can be in exclusive access where it stores the absence of values
838/// or in shared access where only values are stored and (key, value) once
839/// written are never modified nor erased.
840///
841/// In case of no exclusive access the following scenario is checked
842/// * Store 1 deletes a key and does not mark it as missing in its cache.
843/// * Store 2 writes the key
844/// * Store 1 reads the key, but since it is not in the cache it can read
845///   it correctly.
846///
847/// In case of exclusive access. We have the following scenario:
848/// * Store 1 deletes a key and mark it as missing in its cache.
849/// * Store 2 writes the key (it should not be doing it)
850/// * Store 1 reads the key, see it as missing.
851pub async fn exclusive_access_admin_test<S: TestKeyValueStore>(exclusive_access: bool) {
852    let config = S::new_test_config().await.expect("config");
853    let namespace = generate_test_namespace();
854    S::create(&config, &namespace).await.expect("creation");
855    let key = vec![42];
856
857    let mut store1 = S::connect(&config, &namespace).await.expect("store");
858    if exclusive_access {
859        store1 = store1.open_exclusive(&[]).expect("store1");
860    }
861    let mut batch1 = Batch::new();
862    batch1.delete_key(key.clone());
863    store1.write_batch(batch1).await.expect("write batch1");
864
865    let mut store2 = S::connect(&config, &namespace).await.expect("store");
866    if exclusive_access {
867        store2 = store2.open_exclusive(&[]).expect("store2");
868    }
869    let mut batch2 = Batch::new();
870    batch2.put_key_value_bytes(key.clone(), vec![]);
871    store2.write_batch(batch2).await.expect("write batch2");
872
873    assert_eq!(store1.contains_key(&key).await.unwrap(), !exclusive_access);
874}
875
876/// Both checks together.
877pub async fn access_admin_test<S: TestKeyValueStore>() {
878    exclusive_access_admin_test::<S>(true).await;
879    exclusive_access_admin_test::<S>(false).await;
880}