linera_views/test_utils/
mod.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4pub mod test_views;
5
6/// Functions for computing the performance of stores.
7#[cfg(not(target_arch = "wasm32"))]
8pub mod performance;
9
10use std::collections::{BTreeMap, BTreeSet, HashSet};
11
12use rand::{seq::SliceRandom, Rng};
13
14use crate::{
15    batch::{
16        Batch, WriteOperation,
17        WriteOperation::{Delete, Put},
18    },
19    random::{generate_test_namespace, make_deterministic_rng, make_nondeterministic_rng},
20    store::{
21        KeyValueDatabase, KeyValueStore, ReadableKeyValueStore, TestKeyValueDatabase,
22        WritableKeyValueStore,
23    },
24};
25
26/// The size of the small value used for tests.
27pub const SMALL_BYTE_UPPER_LIMIT: u8 = 3;
28
29/// Returns a random key prefix used for tests
30pub fn get_random_key_prefix() -> Vec<u8> {
31    let mut key_prefix = vec![0];
32    let value: usize = make_nondeterministic_rng().gen();
33    bcs::serialize_into(&mut key_prefix, &value).unwrap();
34    key_prefix
35}
36
37fn get_random_byte_vector_with_byte_upper_limit<R: Rng>(
38    rng: &mut R,
39    key_prefix: &[u8],
40    n: usize,
41    byte_upper_limit: u8,
42) -> Vec<u8> {
43    let mut v = key_prefix.to_vec();
44    for _ in 0..n {
45        let val = rng.gen_range(0..=byte_upper_limit);
46        v.push(val);
47    }
48    v
49}
50
51fn get_small_key_space<R: Rng>(rng: &mut R, key_prefix: &[u8], n: usize) -> Vec<u8> {
52    get_random_byte_vector_with_byte_upper_limit(rng, key_prefix, n, SMALL_BYTE_UPPER_LIMIT)
53}
54
55/// Takes a random number generator, a `key_prefix` and extends it by n random bytes.
56pub fn get_random_byte_vector<R: Rng>(rng: &mut R, key_prefix: &[u8], n: usize) -> Vec<u8> {
57    get_random_byte_vector_with_byte_upper_limit(rng, key_prefix, n, u8::MAX)
58}
59
60/// Builds a random k element subset of n
61pub fn get_random_kset<R: Rng>(rng: &mut R, n: usize, k: usize) -> Vec<usize> {
62    let mut values = Vec::new();
63    for u in 0..n {
64        values.push(u);
65    }
66    values.shuffle(rng);
67    values[..k].to_vec()
68}
69
70/// Takes a random number generator, a `key_prefix` and generates
71/// pairs `(key, value)` with key obtained by appending `len_key` random bytes to `key_prefix`
72/// and value obtained by creating a vector with `len_value` random bytes.
73/// We return n such `(key, value)` pairs which are all distinct.
74pub fn get_random_key_values_prefix<R: Rng>(
75    rng: &mut R,
76    key_prefix: Vec<u8>,
77    len_key: usize,
78    len_value: usize,
79    num_entries: usize,
80    key_byte_upper_limit: u8,
81) -> Vec<(Vec<u8>, Vec<u8>)> {
82    let mut key_value_pairs = Vec::new();
83    let mut unique_keys = HashSet::new();
84    for _ in 0..num_entries {
85        let key = loop {
86            let key = get_random_byte_vector_with_byte_upper_limit(
87                rng,
88                &key_prefix,
89                len_key,
90                key_byte_upper_limit,
91            );
92            if !unique_keys.contains(&key) {
93                unique_keys.insert(key.clone());
94                break key;
95            }
96        };
97        let value = get_random_byte_vector(rng, &Vec::new(), len_value);
98        key_value_pairs.push((key, value));
99    }
100
101    key_value_pairs
102}
103
104/// Takes a random number generator `rng`, a number n and returns n random `(key, value)`
105/// which are all distinct with key and value being of length 8.
106pub fn get_random_key_values<R: Rng>(rng: &mut R, num_entries: usize) -> Vec<(Vec<u8>, Vec<u8>)> {
107    get_random_key_values_prefix(rng, Vec::new(), 8, 8, num_entries, u8::MAX)
108}
109
110type VectorPutDelete = (Vec<(Vec<u8>, Vec<u8>)>, usize);
111
112/// A bunch of puts and some deletes.
113pub fn get_random_key_value_operations<R: Rng>(
114    rng: &mut R,
115    num_entries: usize,
116    k: usize,
117) -> VectorPutDelete {
118    (get_random_key_values(rng, num_entries), k)
119}
120
121/// A random reordering of the puts and deletes.
122/// For something like `MapView` it should get us the same result whatever way we are calling.
123pub fn span_random_reordering_put_delete<R: Rng>(
124    rng: &mut R,
125    info_op: VectorPutDelete,
126) -> Vec<WriteOperation> {
127    let n = info_op.0.len();
128    let k = info_op.1;
129    let mut indices = Vec::new();
130    for i in 0..n {
131        indices.push(i);
132    }
133    indices.shuffle(rng);
134    let mut indices_rev = vec![0; n];
135    for i in 0..n {
136        indices_rev[indices[i]] = i;
137    }
138    let mut pos_remove_vector = vec![Vec::new(); n];
139    for (i, pos) in indices_rev.iter().enumerate().take(k) {
140        let idx = rng.gen_range(*pos..n);
141        pos_remove_vector[idx].push(i);
142    }
143    let mut operations = Vec::new();
144    for i in 0..n {
145        let pos = indices[i];
146        let pair = info_op.0[pos].clone();
147        operations.push(Put {
148            key: pair.0,
149            value: pair.1,
150        });
151        for pos_remove in pos_remove_vector[i].clone() {
152            let key = info_op.0[pos_remove].0.clone();
153            operations.push(Delete { key });
154        }
155    }
156    operations
157}
158
159/// This test starts with a collection of key/values being inserted into the code
160/// which is then followed by a number of reading tests. The functionalities being
161/// tested are all the reading functionalities:
162/// * `read_value_bytes`
163/// * `read_multi_values_bytes`
164/// * `find_keys_by_prefix` / `find_key_values_by_prefix`
165/// * The ordering of keys returned by `find_keys_by_prefix` and `find_key_values_by_prefix`
166pub async fn run_reads<S: KeyValueStore>(store: S, key_values: Vec<(Vec<u8>, Vec<u8>)>) {
167    // We need a nontrivial key_prefix because dynamo requires a non-trivial prefix
168    let mut batch = Batch::new();
169    let mut keys = Vec::new();
170    let mut set_keys = HashSet::new();
171    for (key, value) in &key_values {
172        keys.push(&key[..]);
173        set_keys.insert(&key[..]);
174        batch.put_key_value_bytes(key.clone(), value.clone());
175    }
176    store.write_batch(batch).await.unwrap();
177    for key_prefix in keys
178        .iter()
179        .flat_map(|key| (0..key.len()).map(|u| &key[..=u]))
180    {
181        // Getting the find_keys_by_prefix / find_key_values_by_prefix
182        let len_prefix = key_prefix.len();
183        let keys_request = store.find_keys_by_prefix(key_prefix).await.unwrap();
184        let mut set_key_value1 = HashSet::new();
185        let mut keys_request_deriv = Vec::new();
186        let key_values_by_prefix = store.find_key_values_by_prefix(key_prefix).await.unwrap();
187        for (key, value) in key_values_by_prefix {
188            keys_request_deriv.push(key.clone());
189            set_key_value1.insert((key, value));
190        }
191        // Check find_keys / find_key_values
192        assert_eq!(keys_request, keys_request_deriv);
193        // Check key ordering
194        for i in 1..keys_request.len() {
195            assert!(keys_request[i - 1] < keys_request[i]);
196        }
197        // Check the obtained values
198        let mut set_key_value2 = HashSet::new();
199        for (key, value) in &key_values {
200            if key.starts_with(key_prefix) {
201                set_key_value2.insert((key[len_prefix..].to_vec(), value[..].to_vec()));
202            }
203        }
204        assert_eq!(set_key_value1, set_key_value2);
205    }
206    // Now checking the read_multi_values_bytes
207    let mut rng = make_deterministic_rng();
208    for _ in 0..3 {
209        let mut keys = Vec::new();
210        let mut values = Vec::new();
211        for (key, value) in &key_values {
212            if rng.gen() {
213                // Put a key that is already present
214                keys.push(key.clone());
215                values.push(Some(value.clone()));
216            } else {
217                // Put a missing key
218                let len = key.len();
219                let pos = rng.gen_range(0..len);
220                let byte = *key.get(pos).unwrap();
221                let new_byte: u8 = if byte < 255 { byte + 1 } else { byte - 1 };
222                let mut new_key = key.clone();
223                *new_key.get_mut(pos).unwrap() = new_byte;
224                if !set_keys.contains(&*new_key) {
225                    keys.push(new_key);
226                    values.push(None);
227                }
228            }
229        }
230        let mut test_exists = Vec::new();
231        let mut values_single_read = Vec::new();
232        for key in &keys {
233            test_exists.push(store.contains_key(key).await.unwrap());
234            values_single_read.push(store.read_value_bytes(key).await.unwrap());
235        }
236        let test_exists_direct = store.contains_keys(keys.clone()).await.unwrap();
237        let values_read = store.read_multi_values_bytes(keys).await.unwrap();
238        assert_eq!(values, values_read);
239        assert_eq!(values, values_single_read);
240        let values_read_stat = values_read.iter().map(|x| x.is_some()).collect::<Vec<_>>();
241        assert_eq!(values_read_stat, test_exists);
242        assert_eq!(values_read_stat, test_exists_direct);
243    }
244}
245
246/// Generates a list of random key-values with no duplicates
247pub fn get_random_key_values_with_sizes(
248    num_entries: usize,
249    len_key: usize,
250    len_value: usize,
251) -> Vec<(Vec<u8>, Vec<u8>)> {
252    let key_prefix = vec![0];
253    let mut rng = make_deterministic_rng();
254    get_random_key_values_prefix(
255        &mut rng,
256        key_prefix,
257        len_key,
258        len_value,
259        num_entries,
260        u8::MAX,
261    )
262}
263
264fn get_random_key_values_with_small_keys(
265    num_entries: usize,
266    len_key: usize,
267    len_value: usize,
268) -> Vec<(Vec<u8>, Vec<u8>)> {
269    let key_prefix = vec![0];
270    let mut rng = make_deterministic_rng();
271    get_random_key_values_prefix(
272        &mut rng,
273        key_prefix,
274        len_key,
275        len_value,
276        num_entries,
277        SMALL_BYTE_UPPER_LIMIT,
278    )
279}
280
281/// Adds a prefix to a list of key-values
282pub fn add_prefix(prefix: &[u8], key_values: Vec<(Vec<u8>, Vec<u8>)>) -> Vec<(Vec<u8>, Vec<u8>)> {
283    key_values
284        .into_iter()
285        .map(|(key, value)| {
286            let mut big_key = prefix.to_vec();
287            big_key.extend(key);
288            (big_key, value)
289        })
290        .collect()
291}
292
293/// We build a number of scenarios for testing the reads.
294pub fn get_random_test_scenarios() -> Vec<Vec<(Vec<u8>, Vec<u8>)>> {
295    vec![
296        get_random_key_values_with_sizes(7, 8, 3),
297        get_random_key_values_with_sizes(150, 8, 3),
298        get_random_key_values_with_sizes(30, 8, 10),
299        get_random_key_values_with_small_keys(30, 4, 10),
300        get_random_key_values_with_small_keys(30, 4, 100),
301    ]
302}
303
304fn generate_random_batch<R: Rng>(rng: &mut R, key_prefix: &[u8], batch_size: usize) -> Batch {
305    let mut batch = Batch::new();
306    // Fully random batch
307    for _ in 0..batch_size {
308        let choice = rng.gen_range(0..8);
309        // Inserting a key
310        if choice < 6 {
311            // Insert
312            let key = get_small_key_space(rng, key_prefix, 4);
313            let len_value = rng.gen_range(0..10); // Could need to be split
314            let value = get_random_byte_vector(rng, &[], len_value);
315            batch.put_key_value_bytes(key.clone(), value.clone());
316        }
317        if choice == 6 {
318            // key might be missing, no matter, it has to work
319            let key = get_small_key_space(rng, key_prefix, 4);
320            batch.delete_key(key);
321        }
322        if choice == 7 {
323            let len = rng.gen_range(1..4); // We want a non-trivial range
324            let delete_key_prefix = get_small_key_space(rng, key_prefix, len);
325            batch.delete_key_prefix(delete_key_prefix.clone());
326        }
327    }
328    batch
329}
330
331fn get_key(key_prefix: &[u8], key_suffix: Vec<u8>) -> Vec<u8> {
332    let mut key = key_prefix.to_vec();
333    key.extend(key_suffix);
334    key
335}
336
337fn generate_specific_batch(key_prefix: &[u8], option: usize) -> Batch {
338    let mut batch = Batch::new();
339    if option == 0 {
340        let key = get_key(key_prefix, vec![34]);
341        batch.put_key_value_bytes(key.clone(), vec![]);
342        batch.delete_key(key);
343    }
344    if option == 1 {
345        let key1 = get_key(key_prefix, vec![12, 34]);
346        let key2 = get_key(key_prefix, vec![12, 33]);
347        let key3 = get_key(key_prefix, vec![13]);
348        batch.put_key_value_bytes(key1.clone(), vec![]);
349        batch.put_key_value_bytes(key2, vec![]);
350        batch.put_key_value_bytes(key3, vec![]);
351        batch.delete_key(key1);
352        let key_prefix = get_key(key_prefix, vec![12]);
353        batch.delete_key_prefix(key_prefix);
354    }
355    batch
356}
357
358fn update_state_from_batch(kv_state: &mut BTreeMap<Vec<u8>, Vec<u8>>, batch: &Batch) {
359    for operation in &batch.operations {
360        match operation {
361            WriteOperation::Put { key, value } => {
362                kv_state.insert(key.to_vec(), value.to_vec());
363            }
364            WriteOperation::Delete { key } => {
365                kv_state.remove(key);
366            }
367            WriteOperation::DeletePrefix { key_prefix } => {
368                kv_state.retain(|key, _| !key.starts_with(key_prefix));
369            }
370        }
371    }
372}
373
374fn realize_batch(batch: &Batch) -> BTreeMap<Vec<u8>, Vec<u8>> {
375    let mut kv_state = BTreeMap::new();
376    update_state_from_batch(&mut kv_state, batch);
377    kv_state
378}
379
380async fn read_keys_prefix<C: KeyValueStore>(
381    key_value_store: &C,
382    key_prefix: &[u8],
383) -> BTreeSet<Vec<u8>> {
384    let mut keys = BTreeSet::new();
385    for key_suffix in key_value_store
386        .find_keys_by_prefix(key_prefix)
387        .await
388        .unwrap()
389    {
390        let mut key = key_prefix.to_vec();
391        key.extend(key_suffix);
392        keys.insert(key);
393    }
394    keys
395}
396
397async fn read_key_values_prefix<C: KeyValueStore>(
398    key_value_store: &C,
399    key_prefix: &[u8],
400) -> BTreeMap<Vec<u8>, Vec<u8>> {
401    let mut key_values = BTreeMap::new();
402    for key_value in key_value_store
403        .find_key_values_by_prefix(key_prefix)
404        .await
405        .unwrap()
406    {
407        let (key_suffix, value) = key_value;
408        let mut key = key_prefix.to_vec();
409        key.extend(key_suffix);
410        key_values.insert(key, value.to_vec());
411    }
412    key_values
413}
414
415/// Writes and then reads data under a prefix, and verifies the result.
416pub async fn run_test_batch_from_blank<C: KeyValueStore>(
417    key_value_store: &C,
418    key_prefix: Vec<u8>,
419    batch: Batch,
420) {
421    let kv_state = realize_batch(&batch);
422    key_value_store.write_batch(batch).await.unwrap();
423    // Checking the consistency
424    let key_values = read_key_values_prefix(key_value_store, &key_prefix).await;
425    assert_eq!(key_values, kv_state);
426}
427
428/// Run many operations on batches always starting from a blank state.
429pub async fn run_writes_from_blank<C: KeyValueStore>(key_value_store: &C) {
430    let mut rng = make_deterministic_rng();
431    let n_oper = 10;
432    let batch_size = 500;
433    // key space has size 4^4 = 256 so we necessarily encounter collisions
434    // because the number of generated keys is about batch_size * n_oper = 800 > 256.
435    for _ in 0..n_oper {
436        let key_prefix = get_random_key_prefix();
437        let batch = generate_random_batch(&mut rng, &key_prefix, batch_size);
438        run_test_batch_from_blank(key_value_store, key_prefix, batch).await;
439    }
440    for option in 0..2 {
441        let key_prefix = get_random_key_prefix();
442        let batch = generate_specific_batch(&key_prefix, option);
443        run_test_batch_from_blank(key_value_store, key_prefix, batch).await;
444    }
445}
446
447/// Reading many keys at a time could trigger an error. This needs to be tested.
448pub async fn big_read_multi_values<D>(config: D::Config, value_size: usize, n_entries: usize)
449where
450    D: KeyValueDatabase,
451    D::Store: KeyValueStore,
452{
453    let mut rng = make_deterministic_rng();
454    let namespace = generate_test_namespace();
455    let store = D::connect(&config, &namespace).await.unwrap();
456    let store = store.open_exclusive(&[]).unwrap();
457    let key_prefix = vec![42, 54];
458    let mut batch = Batch::new();
459    let mut keys = Vec::new();
460    let mut values = Vec::new();
461    for i in 0..n_entries {
462        let mut key = key_prefix.clone();
463        bcs::serialize_into(&mut key, &i).unwrap();
464        let value = get_random_byte_vector(&mut rng, &[], value_size);
465        batch.put_key_value_bytes(key.clone(), value.clone());
466        keys.push(key);
467        values.push(Some(value));
468    }
469    store.write_batch(batch).await.unwrap();
470    // We reconnect so that the read is not using the cache.
471    let store = D::connect(&config, &namespace).await.unwrap();
472    let store = store.open_exclusive(&[]).unwrap();
473    let values_read = store.read_multi_values_bytes(keys).await.unwrap();
474    assert_eq!(values, values_read);
475}
476
477/// That test is especially challenging for ScyllaDB.
478/// In its default settings, Scylla has a limitation to 10000 tombstones.
479/// A tombstone is an indication that the data has been deleted. That
480/// is thus a trie data structure for checking whether a requested key
481/// is deleted or not.
482///
483/// In this test we insert 200000 keys into the database.
484/// Then we select half of them at random and delete them. By the random
485/// selection, Scylla is forced to introduce around 100000 tombstones
486/// which triggers the crash with the default settings.
487pub async fn tombstone_triggering_test<C: KeyValueStore>(key_value_store: C) {
488    use std::time::Instant;
489    let t1 = Instant::now();
490    let mut rng = make_deterministic_rng();
491    let value_size = 100;
492    let n_entry = 200000;
493    // Putting the keys
494    let mut batch_insert = Batch::new();
495    let key_prefix = vec![0];
496    let mut batch_delete = Batch::new();
497    let mut remaining_key_values = BTreeMap::new();
498    let mut remaining_keys = BTreeSet::new();
499    for i in 0..n_entry {
500        let mut key = key_prefix.clone();
501        bcs::serialize_into(&mut key, &i).unwrap();
502        let value = get_random_byte_vector(&mut rng, &[], value_size);
503        batch_insert.put_key_value_bytes(key.clone(), value.clone());
504        let to_delete = rng.gen::<bool>();
505        if to_delete {
506            batch_delete.delete_key(key);
507        } else {
508            remaining_keys.insert(key.clone());
509            remaining_key_values.insert(key, value);
510        }
511    }
512    tracing::info!("Set up in {} ms", t1.elapsed().as_millis());
513
514    let t1 = Instant::now();
515    run_test_batch_from_blank(&key_value_store, key_prefix.clone(), batch_insert).await;
516    tracing::info!("run_test_batch in {} ms", t1.elapsed().as_millis());
517
518    // Deleting them all
519    let t1 = Instant::now();
520    key_value_store.write_batch(batch_delete).await.unwrap();
521    tracing::info!("batch_delete in {} ms", t1.elapsed().as_millis());
522
523    for iter in 0..5 {
524        // Reading everything and seeing that it is now cleaned.
525        let t1 = Instant::now();
526        let key_values = read_key_values_prefix(&key_value_store, &key_prefix).await;
527        assert_eq!(key_values, remaining_key_values);
528        tracing::info!(
529            "iter={} read_key_values_prefix in {} ms",
530            iter,
531            t1.elapsed().as_millis()
532        );
533
534        let t1 = Instant::now();
535        let keys = read_keys_prefix(&key_value_store, &key_prefix).await;
536        assert_eq!(keys, remaining_keys);
537        tracing::info!(
538            "iter={} read_keys_prefix after {} ms",
539            iter,
540            t1.elapsed().as_millis()
541        );
542    }
543}
544
545/// DynamoDB has limits at 1 MB (for pagination), 4 MB (for write)
546/// Let us go right past them at 20 MB of data with writing and then
547/// reading it. And 20 MB is not huge by any mean. All `KeyValueStore`
548/// must handle that.
549///
550/// The size of the value vary as each size has its own issues.
551pub async fn run_big_write_read<C: KeyValueStore>(
552    key_value_store: C,
553    target_size: usize,
554    value_sizes: Vec<usize>,
555) {
556    let mut rng = make_deterministic_rng();
557    for (pos, value_size) in value_sizes.into_iter().enumerate() {
558        let n_entry: usize = target_size / value_size;
559        let mut batch = Batch::new();
560        let key_prefix = vec![0, pos as u8];
561        for i in 0..n_entry {
562            let mut key = key_prefix.clone();
563            bcs::serialize_into(&mut key, &i).unwrap();
564            let value = get_random_byte_vector(&mut rng, &[], value_size);
565            batch.put_key_value_bytes(key, value);
566        }
567        run_test_batch_from_blank(&key_value_store, key_prefix, batch).await;
568    }
569}
570
571type StateBatch = (Vec<(Vec<u8>, Vec<u8>)>, Batch);
572
573async fn run_test_batch_from_state<C: KeyValueStore>(
574    key_value_store: &C,
575    key_prefix: Vec<u8>,
576    state_and_batch: StateBatch,
577) {
578    let (key_values, batch) = state_and_batch;
579    let mut batch_insert = Batch::new();
580    let mut kv_state = BTreeMap::new();
581    for (key, value) in key_values {
582        kv_state.insert(key.clone(), value.clone());
583        batch_insert.put_key_value_bytes(key, value);
584    }
585    key_value_store.write_batch(batch_insert).await.unwrap();
586    let key_values = read_key_values_prefix(key_value_store, &key_prefix).await;
587    assert_eq!(key_values, kv_state);
588
589    update_state_from_batch(&mut kv_state, &batch);
590    key_value_store.write_batch(batch).await.unwrap();
591    let key_values = read_key_values_prefix(key_value_store, &key_prefix).await;
592    assert_eq!(key_values, kv_state);
593}
594
595fn generate_specific_state_batch(key_prefix: &[u8], option: usize) -> StateBatch {
596    let mut key_values = Vec::new();
597    let mut batch = Batch::new();
598    if option == 0 {
599        // A DeletePrefix followed by an insertion that matches the DeletePrefix
600        let key1 = get_key(key_prefix, vec![1, 3]);
601        let key2 = get_key(key_prefix, vec![1, 4]);
602        let key3 = get_key(key_prefix, vec![1, 4, 5]);
603        key_values.push((key1.clone(), vec![34]));
604        key_values.push((key2.clone(), vec![45]));
605        batch.delete_key_prefix(key2);
606        batch.put_key_value_bytes(key3, vec![23]);
607    }
608    if option == 1 {
609        // Just a DeletePrefix
610        let key1 = get_key(key_prefix, vec![1, 3]);
611        let key2 = get_key(key_prefix, vec![1, 4]);
612        key_values.push((key1.clone(), vec![34]));
613        key_values.push((key2.clone(), vec![45]));
614        batch.delete_key_prefix(key2);
615    }
616    if option == 2 {
617        // A Put followed by a DeletePrefix that matches the Put
618        let key1 = get_key(key_prefix, vec![1, 3]);
619        let key2 = get_key(key_prefix, vec![1, 4]);
620        let key3 = get_key(key_prefix, vec![1, 4, 5]);
621        key_values.push((key1.clone(), vec![34]));
622        key_values.push((key2.clone(), vec![45]));
623        batch.put_key_value_bytes(key3, vec![23]);
624        batch.delete_key_prefix(key2);
625    }
626    if option == 3 {
627        // A Put followed by a Delete on the same value
628        let key1 = get_key(key_prefix, vec![1, 3]);
629        let key2 = get_key(key_prefix, vec![1, 4]);
630        let key3 = get_key(key_prefix, vec![1, 4, 5]);
631        key_values.push((key1.clone(), vec![34]));
632        key_values.push((key2.clone(), vec![45]));
633        batch.put_key_value_bytes(key3.clone(), vec![23]);
634        batch.delete_key(key3);
635    }
636    if option == 4 {
637        // A Delete Key followed by a Put on the same key
638        let key1 = get_key(key_prefix, vec![1, 3]);
639        let key2 = get_key(key_prefix, vec![1, 4]);
640        let key3 = get_key(key_prefix, vec![1, 4, 5]);
641        key_values.push((key1.clone(), vec![34]));
642        key_values.push((key2.clone(), vec![45]));
643        batch.delete_key(key3.clone());
644        batch.put_key_value_bytes(key3, vec![23]);
645    }
646    if option == 5 {
647        // A Delete Key followed by a Put on the same key
648        let key1 = get_key(key_prefix, vec![1, 3]);
649        let key2 = get_key(key_prefix, vec![1, 4]);
650        let key3 = get_key(key_prefix, vec![1, 4, 5]);
651        let key4 = get_key(key_prefix, vec![1, 5]);
652        key_values.push((key1.clone(), vec![34]));
653        key_values.push((key2.clone(), vec![45]));
654        batch.delete_key(key3.clone());
655        batch.put_key_value_bytes(key4, vec![23]);
656    }
657    if option == 6 {
658        let key1 = get_key(key_prefix, vec![0]);
659        let key2 = get_key(key_prefix, vec![]);
660        key_values.push((key1, vec![33]));
661        batch.delete_key_prefix(key2);
662    }
663    if option == 7 {
664        let key1 = get_key(key_prefix, vec![255, 255]);
665        let key2 = get_key(key_prefix, vec![255, 255, 1]);
666        key_values.push((key2.clone(), vec![]));
667        batch.delete_key_prefix(key1);
668        batch.put_key_value_bytes(key2, vec![]);
669    }
670    (key_values, batch)
671}
672
673/// Run some deterministic and random batches operation and check their
674/// correctness
675pub async fn run_writes_from_state<C: KeyValueStore>(key_value_store: &C) {
676    for option in 0..8 {
677        let key_prefix = if option >= 6 {
678            vec![255, 255, 255]
679        } else {
680            get_random_key_prefix()
681        };
682        let state_batch = generate_specific_state_batch(&key_prefix, option);
683        run_test_batch_from_state(key_value_store, key_prefix, state_batch).await;
684    }
685}
686
687async fn namespaces_with_prefix<D: KeyValueDatabase>(
688    config: &D::Config,
689    prefix: &str,
690) -> BTreeSet<String> {
691    let namespaces = D::list_all(config).await.expect("namespaces");
692    namespaces
693        .into_iter()
694        .filter(|x| x.starts_with(prefix))
695        .collect::<BTreeSet<_>>()
696}
697
698/// Exercises the namespace functionalities of the `KeyValueDatabase`.
699/// This tests everything except the `delete_all` which would
700/// interact with other namespaces.
701pub async fn namespace_admin_test<D: TestKeyValueDatabase>() {
702    let config = D::new_test_config().await.expect("config");
703    {
704        let namespace = generate_test_namespace();
705        D::create(&config, &namespace)
706            .await
707            .expect("first creation of a namespace");
708        // Creating a namespace two times should returns an error
709        assert!(D::create(&config, &namespace).await.is_err());
710    }
711    let prefix = generate_test_namespace();
712    let namespaces = namespaces_with_prefix::<D>(&config, &prefix).await;
713    assert_eq!(namespaces.len(), 0);
714    let mut rng = make_deterministic_rng();
715    let size = 9;
716    // Creating the initial list of namespaces
717    let mut working_namespaces = BTreeSet::new();
718    for i in 0..size {
719        let namespace = format!("{}_{}", prefix, i);
720        assert!(!D::exists(&config, &namespace).await.expect("test"));
721        working_namespaces.insert(namespace);
722    }
723    // Creating the namespaces
724    for namespace in &working_namespaces {
725        D::create(&config, namespace)
726            .await
727            .expect("creation of a namespace");
728        assert!(D::exists(&config, namespace).await.expect("test"));
729    }
730    // Connecting to all of them at once
731    {
732        let mut connections = Vec::new();
733        for namespace in &working_namespaces {
734            let connection = D::connect(&config, namespace)
735                .await
736                .expect("a connection to the namespace");
737            connections.push(connection);
738        }
739    }
740    // Listing all of them
741    let namespaces = namespaces_with_prefix::<D>(&config, &prefix).await;
742    assert_eq!(namespaces, working_namespaces);
743    // Selecting at random some for deletion
744    let mut kept_namespaces = BTreeSet::new();
745    for namespace in working_namespaces {
746        let delete = rng.gen::<bool>();
747        if delete {
748            D::delete(&config, &namespace)
749                .await
750                .expect("A successful deletion");
751            assert!(!D::exists(&config, &namespace).await.expect("test"));
752        } else {
753            kept_namespaces.insert(namespace);
754        }
755    }
756    for namespace in &kept_namespaces {
757        assert!(D::exists(&config, namespace).await.expect("test"));
758    }
759    let namespaces = namespaces_with_prefix::<D>(&config, &prefix).await;
760    assert_eq!(namespaces, kept_namespaces);
761    for namespace in kept_namespaces {
762        D::delete(&config, &namespace)
763            .await
764            .expect("A successful deletion");
765    }
766}
767
768/// Tests listing the root keys.
769pub async fn root_key_admin_test<D>()
770where
771    D: TestKeyValueDatabase,
772    D::Store: KeyValueStore,
773{
774    let config = D::new_test_config().await.expect("config");
775    let namespace = generate_test_namespace();
776    let mut root_keys = Vec::new();
777    let mut keys = BTreeSet::new();
778    D::create(&config, &namespace).await.expect("creation");
779    let prefix = vec![0];
780    {
781        let size = 3;
782        let mut rng = make_deterministic_rng();
783        let store = D::connect(&config, &namespace).await.expect("store");
784        let shared_store = store.open_shared(&[]).expect("shared store");
785        root_keys.push(vec![]);
786        let mut batch = Batch::new();
787        for _ in 0..2 {
788            let key = get_random_byte_vector(&mut rng, &prefix, 4);
789            batch.put_key_value_bytes(key.clone(), vec![]);
790            keys.insert((vec![], key));
791        }
792        shared_store.write_batch(batch).await.expect("write batch");
793
794        for _ in 0..20 {
795            let root_key = get_random_byte_vector(&mut rng, &[], 4);
796            let exclusive_store = store.open_exclusive(&root_key).expect("exclusive store");
797            root_keys.push(root_key.clone());
798            let size_select = rng.gen_range(0..size);
799            let mut batch = Batch::new();
800            for _ in 0..size_select {
801                let key = get_random_byte_vector(&mut rng, &prefix, 4);
802                batch.put_key_value_bytes(key.clone(), vec![]);
803                keys.insert((root_key.clone(), key));
804            }
805            exclusive_store
806                .write_batch(batch)
807                .await
808                .expect("write batch");
809        }
810    }
811
812    let read_root_keys = D::list_root_keys(&config, &namespace)
813        .await
814        .expect("read_root_keys");
815    let set_root_keys = root_keys.iter().cloned().collect::<HashSet<_>>();
816    for read_root_key in &read_root_keys {
817        assert!(set_root_keys.contains(read_root_key));
818    }
819
820    let mut read_keys = BTreeSet::new();
821    for root_key in read_root_keys {
822        let store = D::connect(&config, &namespace)
823            .await
824            .expect("store")
825            .open_exclusive(&root_key)
826            .expect("open_exclusive");
827        let keys = store.find_keys_by_prefix(&prefix).await.expect("keys");
828        for key in keys {
829            let mut big_key = prefix.clone();
830            let key = key.to_vec();
831            big_key.extend(key);
832            read_keys.insert((root_key.clone(), big_key));
833        }
834    }
835    assert_eq!(keys, read_keys);
836}
837
838/// A store can be in exclusive access where it stores the absence of values
839/// or in shared access where only values are stored and (key, value) once
840/// written are never modified nor erased.
841///
842/// In case of no exclusive access the following scenario is checked
843/// * Store 1 deletes a key and does not mark it as missing in its cache.
844/// * Store 2 writes the key
845/// * Store 1 reads the key, but since it is not in the cache it can read
846///   it correctly.
847///
848/// In case of exclusive access. We have the following scenario:
849/// * Store 1 deletes a key and mark it as missing in its cache.
850/// * Store 2 writes the key (it should not be doing it)
851/// * Store 1 reads the key, see it as missing.
852pub async fn exclusive_access_admin_test<D>(exclusive_access: bool)
853where
854    D: TestKeyValueDatabase,
855    D::Store: KeyValueStore,
856{
857    let config = D::new_test_config().await.expect("config");
858    let namespace = generate_test_namespace();
859    D::create(&config, &namespace).await.expect("creation");
860    let key = vec![42];
861
862    let namespace = D::connect(&config, &namespace).await.expect("store");
863    let store1 = if exclusive_access {
864        namespace.open_exclusive(&[]).expect("store1")
865    } else {
866        namespace.open_shared(&[]).expect("store1")
867    };
868    let mut batch1 = Batch::new();
869    batch1.delete_key(key.clone());
870    store1.write_batch(batch1).await.expect("write batch1");
871
872    let store2 = if exclusive_access {
873        namespace.open_exclusive(&[]).expect("store2")
874    } else {
875        namespace.open_shared(&[]).expect("store2")
876    };
877    let mut batch2 = Batch::new();
878    batch2.put_key_value_bytes(key.clone(), vec![]);
879    store2.write_batch(batch2).await.expect("write batch2");
880
881    assert_eq!(store1.contains_key(&key).await.unwrap(), !exclusive_access);
882}
883
884/// Both checks together.
885pub async fn access_admin_test<D>()
886where
887    D: TestKeyValueDatabase,
888    D::Store: KeyValueStore,
889{
890    exclusive_access_admin_test::<D>(true).await;
891    exclusive_access_admin_test::<D>(false).await;
892}