Skip to main content

linera_views/test_utils/
mod.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4#![allow(clippy::cast_possible_truncation)]
5
6pub mod test_views;
7
8/// Functions for computing the performance of stores.
9#[cfg(not(target_arch = "wasm32"))]
10pub mod performance;
11
12use std::collections::{BTreeMap, BTreeSet, HashSet};
13
14use rand::{seq::SliceRandom, Rng};
15
16use crate::{
17    batch::{
18        Batch, WriteOperation,
19        WriteOperation::{Delete, Put},
20    },
21    random::{generate_test_namespace, make_deterministic_rng, make_nondeterministic_rng},
22    store::{
23        KeyValueDatabase, KeyValueStore, ReadableKeyValueStore, TestKeyValueDatabase,
24        WritableKeyValueStore,
25    },
26};
27
28/// The size of the small value used for tests.
29pub const SMALL_BYTE_UPPER_LIMIT: u8 = 3;
30
31/// Returns a random key prefix used for tests
32pub fn get_random_key_prefix() -> Vec<u8> {
33    let mut key_prefix = vec![0];
34    let value: usize = make_nondeterministic_rng().gen();
35    bcs::serialize_into(&mut key_prefix, &value).unwrap();
36    key_prefix
37}
38
39fn get_random_byte_vector_with_byte_upper_limit<R: Rng>(
40    rng: &mut R,
41    key_prefix: &[u8],
42    n: usize,
43    byte_upper_limit: u8,
44) -> Vec<u8> {
45    let mut v = key_prefix.to_vec();
46    for _ in 0..n {
47        let val = rng.gen_range(0..=byte_upper_limit);
48        v.push(val);
49    }
50    v
51}
52
53fn get_small_key_space<R: Rng>(rng: &mut R, key_prefix: &[u8], n: usize) -> Vec<u8> {
54    get_random_byte_vector_with_byte_upper_limit(rng, key_prefix, n, SMALL_BYTE_UPPER_LIMIT)
55}
56
57/// Takes a random number generator, a `key_prefix` and extends it by n random bytes.
58pub fn get_random_byte_vector<R: Rng>(rng: &mut R, key_prefix: &[u8], n: usize) -> Vec<u8> {
59    get_random_byte_vector_with_byte_upper_limit(rng, key_prefix, n, u8::MAX)
60}
61
62/// Builds a random k element subset of n
63pub fn get_random_kset<R: Rng>(rng: &mut R, n: usize, k: usize) -> Vec<usize> {
64    let mut values = Vec::new();
65    for u in 0..n {
66        values.push(u);
67    }
68    values.shuffle(rng);
69    values[..k].to_vec()
70}
71
72/// Takes a random number generator, a `key_prefix` and generates
73/// pairs `(key, value)` with key obtained by appending `len_key` random bytes to `key_prefix`
74/// and value obtained by creating a vector with `len_value` random bytes.
75/// We return n such `(key, value)` pairs which are all distinct.
76pub fn get_random_key_values_prefix<R: Rng>(
77    rng: &mut R,
78    key_prefix: &[u8],
79    len_key: usize,
80    len_value: usize,
81    num_entries: usize,
82    key_byte_upper_limit: u8,
83) -> Vec<(Vec<u8>, Vec<u8>)> {
84    let mut key_value_pairs = Vec::new();
85    let mut unique_keys = HashSet::new();
86    for _ in 0..num_entries {
87        let key = loop {
88            let key = get_random_byte_vector_with_byte_upper_limit(
89                rng,
90                key_prefix,
91                len_key,
92                key_byte_upper_limit,
93            );
94            if !unique_keys.contains(&key) {
95                unique_keys.insert(key.clone());
96                break key;
97            }
98        };
99        let value = get_random_byte_vector(rng, &[], len_value);
100        key_value_pairs.push((key, value));
101    }
102
103    key_value_pairs
104}
105
106/// Takes a random number generator `rng`, a number n and returns n random `(key, value)`
107/// which are all distinct with key and value being of length 8.
108pub fn get_random_key_values<R: Rng>(rng: &mut R, num_entries: usize) -> Vec<(Vec<u8>, Vec<u8>)> {
109    get_random_key_values_prefix(rng, &[], 8, 8, num_entries, u8::MAX)
110}
111
112type VectorPutDelete = (Vec<(Vec<u8>, Vec<u8>)>, usize);
113
114/// A bunch of puts and some deletes.
115pub fn get_random_key_value_operations<R: Rng>(
116    rng: &mut R,
117    num_entries: usize,
118    k: usize,
119) -> VectorPutDelete {
120    (get_random_key_values(rng, num_entries), k)
121}
122
123/// A random reordering of the puts and deletes.
124/// For something like `MapView` it should get us the same result whatever way we are calling.
125pub fn span_random_reordering_put_delete<R: Rng>(
126    rng: &mut R,
127    info_op: &VectorPutDelete,
128) -> Vec<WriteOperation> {
129    let n = info_op.0.len();
130    let k = info_op.1;
131    let mut indices = Vec::new();
132    for i in 0..n {
133        indices.push(i);
134    }
135    indices.shuffle(rng);
136    let mut indices_rev = vec![0; n];
137    for i in 0..n {
138        indices_rev[indices[i]] = i;
139    }
140    let mut pos_remove_vector = vec![Vec::new(); n];
141    for (i, pos) in indices_rev.iter().enumerate().take(k) {
142        let idx = rng.gen_range(*pos..n);
143        pos_remove_vector[idx].push(i);
144    }
145    let mut operations = Vec::new();
146    for i in 0..n {
147        let pos = indices[i];
148        let pair = info_op.0[pos].clone();
149        operations.push(Put {
150            key: pair.0,
151            value: pair.1,
152        });
153        for pos_remove in pos_remove_vector[i].clone() {
154            let key = info_op.0[pos_remove].0.clone();
155            operations.push(Delete { key });
156        }
157    }
158    operations
159}
160
161/// This test starts with a collection of key/values being inserted into the code
162/// which is then followed by a number of reading tests. The functionalities being
163/// tested are all the reading functionalities:
164/// * `read_value_bytes`
165/// * `read_multi_values_bytes`
166/// * `find_keys_by_prefix` / `find_key_values_by_prefix`
167/// * The ordering of keys returned by `find_keys_by_prefix` and `find_key_values_by_prefix`
168pub async fn run_reads<S: KeyValueStore>(store: S, key_values: Vec<(Vec<u8>, Vec<u8>)>) {
169    // We need a nontrivial key_prefix because some stores require a non-trivial prefix
170    let mut batch = Batch::new();
171    let mut keys = Vec::new();
172    let mut set_keys = HashSet::new();
173    for (key, value) in &key_values {
174        keys.push(&key[..]);
175        set_keys.insert(&key[..]);
176        batch.put_key_value_bytes(key.clone(), value.clone());
177    }
178    store.write_batch(batch).await.unwrap();
179    for key_prefix in keys
180        .iter()
181        .flat_map(|key| (0..=key.len()).map(|u| &key[..u]))
182    {
183        // Getting the find_keys_by_prefix / find_key_values_by_prefix
184        let len_prefix = key_prefix.len();
185        let keys_request = store.find_keys_by_prefix(key_prefix).await.unwrap();
186        let mut set_key_value1 = HashSet::new();
187        let mut keys_request_deriv = Vec::new();
188        let key_values_by_prefix = store.find_key_values_by_prefix(key_prefix).await.unwrap();
189        for (key, value) in key_values_by_prefix {
190            keys_request_deriv.push(key.clone());
191            set_key_value1.insert((key, value));
192        }
193        // Check find_keys / find_key_values
194        assert_eq!(keys_request, keys_request_deriv);
195        // Check key ordering
196        for i in 1..keys_request.len() {
197            assert!(keys_request[i - 1] < keys_request[i]);
198        }
199        // Check the obtained values
200        let mut set_key_value2 = HashSet::new();
201        for (key, value) in &key_values {
202            if key.starts_with(key_prefix) {
203                set_key_value2.insert((key[len_prefix..].to_vec(), value[..].to_vec()));
204            }
205        }
206        assert_eq!(set_key_value1, set_key_value2);
207    }
208    // Now checking the read_multi_values_bytes
209    let mut rng = make_deterministic_rng();
210    for _ in 0..3 {
211        let mut keys = Vec::new();
212        let mut values = Vec::new();
213        for (key, value) in &key_values {
214            if rng.gen() {
215                // Put a key that is already present
216                keys.push(key.clone());
217                values.push(Some(value.clone()));
218            } else {
219                // Put a missing key
220                let len = key.len();
221                let pos = rng.gen_range(0..len);
222                let byte = *key.get(pos).unwrap();
223                let new_byte: u8 = if byte < 255 { byte + 1 } else { byte - 1 };
224                let mut new_key = key.clone();
225                *new_key.get_mut(pos).unwrap() = new_byte;
226                if !set_keys.contains(&*new_key) {
227                    keys.push(new_key);
228                    values.push(None);
229                }
230            }
231        }
232        let mut test_exists = Vec::new();
233        let mut values_single_read = Vec::new();
234        for key in &keys {
235            test_exists.push(store.contains_key(key).await.unwrap());
236            values_single_read.push(store.read_value_bytes(key).await.unwrap());
237        }
238        let test_exists_direct = store.contains_keys(&keys).await.unwrap();
239        let values_read = store.read_multi_values_bytes(&keys).await.unwrap();
240        assert_eq!(values, values_read);
241        assert_eq!(values, values_single_read);
242        let values_read_stat = values_read.iter().map(|x| x.is_some()).collect::<Vec<_>>();
243        assert_eq!(values_read_stat, test_exists);
244        assert_eq!(values_read_stat, test_exists_direct);
245    }
246}
247
248/// Generates a list of random key-values with no duplicates
249pub fn get_random_key_values_with_sizes(
250    num_entries: usize,
251    len_key: usize,
252    len_value: usize,
253) -> Vec<(Vec<u8>, Vec<u8>)> {
254    let key_prefix = vec![0];
255    let mut rng = make_deterministic_rng();
256    get_random_key_values_prefix(
257        &mut rng,
258        &key_prefix,
259        len_key,
260        len_value,
261        num_entries,
262        u8::MAX,
263    )
264}
265
266fn get_random_key_values_with_small_keys(
267    num_entries: usize,
268    len_key: usize,
269    len_value: usize,
270) -> Vec<(Vec<u8>, Vec<u8>)> {
271    let key_prefix = vec![0];
272    let mut rng = make_deterministic_rng();
273    get_random_key_values_prefix(
274        &mut rng,
275        &key_prefix,
276        len_key,
277        len_value,
278        num_entries,
279        SMALL_BYTE_UPPER_LIMIT,
280    )
281}
282
283/// Adds a prefix to a list of key-values
284pub fn add_prefix(prefix: &[u8], key_values: Vec<(Vec<u8>, Vec<u8>)>) -> Vec<(Vec<u8>, Vec<u8>)> {
285    key_values
286        .into_iter()
287        .map(|(key, value)| {
288            let mut big_key = prefix.to_vec();
289            big_key.extend(key);
290            (big_key, value)
291        })
292        .collect()
293}
294
295/// We build a number of scenarios for testing the reads.
296pub fn get_random_test_scenarios() -> Vec<Vec<(Vec<u8>, Vec<u8>)>> {
297    vec![
298        get_random_key_values_with_sizes(7, 8, 3),
299        get_random_key_values_with_sizes(150, 8, 3),
300        get_random_key_values_with_sizes(30, 8, 10),
301        get_random_key_values_with_small_keys(30, 4, 10),
302        get_random_key_values_with_small_keys(30, 4, 100),
303    ]
304}
305
306fn generate_random_batch<R: Rng>(rng: &mut R, key_prefix: &[u8], batch_size: usize) -> Batch {
307    let mut batch = Batch::new();
308    // Fully random batch
309    for _ in 0..batch_size {
310        let choice = rng.gen_range(0..8);
311        // Inserting a key
312        if choice < 6 {
313            // Insert
314            let key = get_small_key_space(rng, key_prefix, 4);
315            let len_value = rng.gen_range(0..10); // Could need to be split
316            let value = get_random_byte_vector(rng, &[], len_value);
317            batch.put_key_value_bytes(key.clone(), value.clone());
318        }
319        if choice == 6 {
320            // key might be missing, no matter, it has to work
321            let key = get_small_key_space(rng, key_prefix, 4);
322            batch.delete_key(key);
323        }
324        if choice == 7 {
325            let len = rng.gen_range(1..4); // We want a non-trivial range
326            let delete_key_prefix = get_small_key_space(rng, key_prefix, len);
327            batch.delete_key_prefix(delete_key_prefix.clone());
328        }
329    }
330    batch
331}
332
333fn get_key(key_prefix: &[u8], key_suffix: Vec<u8>) -> Vec<u8> {
334    let mut key = key_prefix.to_vec();
335    key.extend(key_suffix);
336    key
337}
338
339fn generate_specific_batch(key_prefix: &[u8], option: usize) -> Batch {
340    let mut batch = Batch::new();
341    if option == 0 {
342        let key = get_key(key_prefix, vec![34]);
343        batch.put_key_value_bytes(key.clone(), vec![]);
344        batch.delete_key(key);
345    }
346    if option == 1 {
347        let key1 = get_key(key_prefix, vec![12, 34]);
348        let key2 = get_key(key_prefix, vec![12, 33]);
349        let key3 = get_key(key_prefix, vec![13]);
350        batch.put_key_value_bytes(key1.clone(), vec![]);
351        batch.put_key_value_bytes(key2, vec![]);
352        batch.put_key_value_bytes(key3, vec![]);
353        batch.delete_key(key1);
354        let key_prefix = get_key(key_prefix, vec![12]);
355        batch.delete_key_prefix(key_prefix);
356    }
357    batch
358}
359
360fn update_state_from_batch(kv_state: &mut BTreeMap<Vec<u8>, Vec<u8>>, batch: &Batch) {
361    for operation in &batch.operations {
362        match operation {
363            WriteOperation::Put { key, value } => {
364                kv_state.insert(key.to_vec(), value.to_vec());
365            }
366            WriteOperation::Delete { key } => {
367                kv_state.remove(key);
368            }
369            WriteOperation::DeletePrefix { key_prefix } => {
370                kv_state.retain(|key, _| !key.starts_with(key_prefix));
371            }
372        }
373    }
374}
375
376fn realize_batch(batch: &Batch) -> BTreeMap<Vec<u8>, Vec<u8>> {
377    let mut kv_state = BTreeMap::new();
378    update_state_from_batch(&mut kv_state, batch);
379    kv_state
380}
381
382async fn read_keys_prefix<C: KeyValueStore>(
383    key_value_store: &C,
384    key_prefix: &[u8],
385) -> BTreeSet<Vec<u8>> {
386    let mut keys = BTreeSet::new();
387    for key_suffix in key_value_store
388        .find_keys_by_prefix(key_prefix)
389        .await
390        .unwrap()
391    {
392        let mut key = key_prefix.to_vec();
393        key.extend(key_suffix);
394        keys.insert(key);
395    }
396    keys
397}
398
399async fn read_key_values_prefix<C: KeyValueStore>(
400    key_value_store: &C,
401    key_prefix: &[u8],
402) -> BTreeMap<Vec<u8>, Vec<u8>> {
403    let mut key_values = BTreeMap::new();
404    for key_value in key_value_store
405        .find_key_values_by_prefix(key_prefix)
406        .await
407        .unwrap()
408    {
409        let (key_suffix, value) = key_value;
410        let mut key = key_prefix.to_vec();
411        key.extend(key_suffix);
412        key_values.insert(key, value.to_vec());
413    }
414    key_values
415}
416
417/// Writes and then reads data under a prefix, and verifies the result.
418pub async fn run_test_batch_from_blank<C: KeyValueStore>(
419    key_value_store: &C,
420    key_prefix: Vec<u8>,
421    batch: Batch,
422) {
423    let kv_state = realize_batch(&batch);
424    key_value_store.write_batch(batch).await.unwrap();
425    // Checking the consistency
426    let key_values = read_key_values_prefix(key_value_store, &key_prefix).await;
427    assert_eq!(key_values, kv_state);
428}
429
430/// Run many operations on batches always starting from a blank state.
431pub async fn run_writes_from_blank<C: KeyValueStore>(key_value_store: &C) {
432    let mut rng = make_deterministic_rng();
433    let n_oper = 10;
434    let batch_size = 500;
435    // key space has size 4^4 = 256 so we necessarily encounter collisions
436    // because the number of generated keys is about batch_size * n_oper = 800 > 256.
437    for _ in 0..n_oper {
438        let key_prefix = get_random_key_prefix();
439        let batch = generate_random_batch(&mut rng, &key_prefix, batch_size);
440        run_test_batch_from_blank(key_value_store, key_prefix, batch).await;
441    }
442    for option in 0..2 {
443        let key_prefix = get_random_key_prefix();
444        let batch = generate_specific_batch(&key_prefix, option);
445        run_test_batch_from_blank(key_value_store, key_prefix, batch).await;
446    }
447}
448
449/// Reading many keys at a time could trigger an error. This needs to be tested.
450pub async fn big_read_multi_values<D>(config: D::Config, value_size: usize, n_entries: usize)
451where
452    D: KeyValueDatabase,
453    D::Store: KeyValueStore,
454{
455    let mut rng = make_deterministic_rng();
456    let namespace = generate_test_namespace();
457    let store = D::connect(&config, &namespace).await.unwrap();
458    let store = store.open_exclusive(&[]).unwrap();
459    let key_prefix = vec![42, 54];
460    let mut batch = Batch::new();
461    let mut keys = Vec::new();
462    let mut values = Vec::new();
463    for i in 0..n_entries {
464        let mut key = key_prefix.clone();
465        bcs::serialize_into(&mut key, &i).unwrap();
466        let value = get_random_byte_vector(&mut rng, &[], value_size);
467        batch.put_key_value_bytes(key.clone(), value.clone());
468        keys.push(key);
469        values.push(Some(value));
470    }
471    store.write_batch(batch).await.unwrap();
472    // We reconnect so that the read is not using the cache.
473    let store = D::connect(&config, &namespace).await.unwrap();
474    let store = store.open_exclusive(&[]).unwrap();
475    let values_read = store.read_multi_values_bytes(&keys).await.unwrap();
476    assert_eq!(values, values_read);
477}
478
479/// That test is especially challenging for ScyllaDB.
480/// In its default settings, Scylla has a limitation to 10000 tombstones.
481/// A tombstone is an indication that the data has been deleted. That
482/// is thus a trie data structure for checking whether a requested key
483/// is deleted or not.
484///
485/// In this test we insert 200000 keys into the database.
486/// Then we select half of them at random and delete them. By the random
487/// selection, Scylla is forced to introduce around 100000 tombstones
488/// which triggers the crash with the default settings.
489pub async fn tombstone_triggering_test<C: KeyValueStore>(key_value_store: C) {
490    use linera_base::time::Instant;
491    let t1 = Instant::now();
492    let mut rng = make_deterministic_rng();
493    let value_size = 100;
494    let n_entry = 200000;
495    // Putting the keys
496    let mut batch_insert = Batch::new();
497    let key_prefix = vec![0];
498    let mut batch_delete = Batch::new();
499    let mut remaining_key_values = BTreeMap::new();
500    let mut remaining_keys = BTreeSet::new();
501    for i in 0..n_entry {
502        let mut key = key_prefix.clone();
503        bcs::serialize_into(&mut key, &i).unwrap();
504        let value = get_random_byte_vector(&mut rng, &[], value_size);
505        batch_insert.put_key_value_bytes(key.clone(), value.clone());
506        let to_delete = rng.gen::<bool>();
507        if to_delete {
508            batch_delete.delete_key(key);
509        } else {
510            remaining_keys.insert(key.clone());
511            remaining_key_values.insert(key, value);
512        }
513    }
514    tracing::info!("Set up in {} ms", t1.elapsed().as_millis());
515
516    let t1 = Instant::now();
517    run_test_batch_from_blank(&key_value_store, key_prefix.clone(), batch_insert).await;
518    tracing::info!("run_test_batch in {} ms", t1.elapsed().as_millis());
519
520    // Deleting them all
521    let t1 = Instant::now();
522    key_value_store.write_batch(batch_delete).await.unwrap();
523    tracing::info!("batch_delete in {} ms", t1.elapsed().as_millis());
524
525    for iter in 0..5 {
526        // Reading everything and seeing that it is now cleaned.
527        let t1 = Instant::now();
528        let key_values = read_key_values_prefix(&key_value_store, &key_prefix).await;
529        assert_eq!(key_values, remaining_key_values);
530        tracing::info!(
531            "iter={} read_key_values_prefix in {} ms",
532            iter,
533            t1.elapsed().as_millis()
534        );
535
536        let t1 = Instant::now();
537        let keys = read_keys_prefix(&key_value_store, &key_prefix).await;
538        assert_eq!(keys, remaining_keys);
539        tracing::info!(
540            "iter={} read_keys_prefix after {} ms",
541            iter,
542            t1.elapsed().as_millis()
543        );
544    }
545}
546
547/// Some key-value stores impose limits (e.g. 1 MB for pagination, 4 MB for write).
548/// Let us go right past them at 20 MB of data with writing and then
549/// reading it. And 20 MB is not huge by any mean. All `KeyValueStore`
550/// must handle that.
551///
552/// The size of the value vary as each size has its own issues.
553pub async fn run_big_write_read<C: KeyValueStore>(
554    key_value_store: C,
555    target_size: usize,
556    value_sizes: Vec<usize>,
557) {
558    let mut rng = make_deterministic_rng();
559    for (pos, value_size) in value_sizes.into_iter().enumerate() {
560        let n_entry: usize = target_size / value_size;
561        let mut batch = Batch::new();
562        let key_prefix = vec![0, pos as u8];
563        for i in 0..n_entry {
564            let mut key = key_prefix.clone();
565            bcs::serialize_into(&mut key, &i).unwrap();
566            let value = get_random_byte_vector(&mut rng, &[], value_size);
567            batch.put_key_value_bytes(key, value);
568        }
569        run_test_batch_from_blank(&key_value_store, key_prefix, batch).await;
570    }
571}
572
573type StateBatch = (Vec<(Vec<u8>, Vec<u8>)>, Batch);
574
575async fn run_test_batch_from_state<C: KeyValueStore>(
576    key_value_store: &C,
577    key_prefix: Vec<u8>,
578    state_and_batch: StateBatch,
579) {
580    let (key_values, batch) = state_and_batch;
581    let mut batch_insert = Batch::new();
582    let mut kv_state = BTreeMap::new();
583    for (key, value) in key_values {
584        kv_state.insert(key.clone(), value.clone());
585        batch_insert.put_key_value_bytes(key, value);
586    }
587    key_value_store.write_batch(batch_insert).await.unwrap();
588    let key_values = read_key_values_prefix(key_value_store, &key_prefix).await;
589    assert_eq!(key_values, kv_state);
590
591    update_state_from_batch(&mut kv_state, &batch);
592    key_value_store.write_batch(batch).await.unwrap();
593    let key_values = read_key_values_prefix(key_value_store, &key_prefix).await;
594    assert_eq!(key_values, kv_state);
595}
596
597fn generate_specific_state_batch(key_prefix: &[u8], option: usize) -> StateBatch {
598    let mut key_values = Vec::new();
599    let mut batch = Batch::new();
600    if option == 0 {
601        // A DeletePrefix followed by an insertion that matches the DeletePrefix
602        let key1 = get_key(key_prefix, vec![1, 3]);
603        let key2 = get_key(key_prefix, vec![1, 4]);
604        let key3 = get_key(key_prefix, vec![1, 4, 5]);
605        key_values.push((key1.clone(), vec![34]));
606        key_values.push((key2.clone(), vec![45]));
607        batch.delete_key_prefix(key2);
608        batch.put_key_value_bytes(key3, vec![23]);
609    }
610    if option == 1 {
611        // Just a DeletePrefix
612        let key1 = get_key(key_prefix, vec![1, 3]);
613        let key2 = get_key(key_prefix, vec![1, 4]);
614        key_values.push((key1.clone(), vec![34]));
615        key_values.push((key2.clone(), vec![45]));
616        batch.delete_key_prefix(key2);
617    }
618    if option == 2 {
619        // A Put followed by a DeletePrefix that matches the Put
620        let key1 = get_key(key_prefix, vec![1, 3]);
621        let key2 = get_key(key_prefix, vec![1, 4]);
622        let key3 = get_key(key_prefix, vec![1, 4, 5]);
623        key_values.push((key1.clone(), vec![34]));
624        key_values.push((key2.clone(), vec![45]));
625        batch.put_key_value_bytes(key3, vec![23]);
626        batch.delete_key_prefix(key2);
627    }
628    if option == 3 {
629        // A Put followed by a Delete on the same value
630        let key1 = get_key(key_prefix, vec![1, 3]);
631        let key2 = get_key(key_prefix, vec![1, 4]);
632        let key3 = get_key(key_prefix, vec![1, 4, 5]);
633        key_values.push((key1.clone(), vec![34]));
634        key_values.push((key2.clone(), vec![45]));
635        batch.put_key_value_bytes(key3.clone(), vec![23]);
636        batch.delete_key(key3);
637    }
638    if option == 4 {
639        // A Delete Key followed by a Put on the same key
640        let key1 = get_key(key_prefix, vec![1, 3]);
641        let key2 = get_key(key_prefix, vec![1, 4]);
642        let key3 = get_key(key_prefix, vec![1, 4, 5]);
643        key_values.push((key1.clone(), vec![34]));
644        key_values.push((key2.clone(), vec![45]));
645        batch.delete_key(key3.clone());
646        batch.put_key_value_bytes(key3, vec![23]);
647    }
648    if option == 5 {
649        // A Delete Key followed by a Put on the same key
650        let key1 = get_key(key_prefix, vec![1, 3]);
651        let key2 = get_key(key_prefix, vec![1, 4]);
652        let key3 = get_key(key_prefix, vec![1, 4, 5]);
653        let key4 = get_key(key_prefix, vec![1, 5]);
654        key_values.push((key1.clone(), vec![34]));
655        key_values.push((key2.clone(), vec![45]));
656        batch.delete_key(key3.clone());
657        batch.put_key_value_bytes(key4, vec![23]);
658    }
659    if option == 6 {
660        let key1 = get_key(key_prefix, vec![0]);
661        let key2 = get_key(key_prefix, vec![]);
662        key_values.push((key1, vec![33]));
663        batch.delete_key_prefix(key2);
664    }
665    if option == 7 {
666        let key1 = get_key(key_prefix, vec![255, 255]);
667        let key2 = get_key(key_prefix, vec![255, 255, 1]);
668        key_values.push((key2.clone(), vec![]));
669        batch.delete_key_prefix(key1);
670        batch.put_key_value_bytes(key2, vec![]);
671    }
672    (key_values, batch)
673}
674
675/// Run some deterministic and random batches operation and check their
676/// correctness
677pub async fn run_writes_from_state<C: KeyValueStore>(key_value_store: &C) {
678    for option in 0..8 {
679        let key_prefix = if option >= 6 {
680            vec![255, 255, 255]
681        } else {
682            get_random_key_prefix()
683        };
684        let state_batch = generate_specific_state_batch(&key_prefix, option);
685        run_test_batch_from_state(key_value_store, key_prefix, state_batch).await;
686    }
687}
688
689async fn namespaces_with_prefix<D: KeyValueDatabase>(
690    config: &D::Config,
691    prefix: &str,
692) -> BTreeSet<String> {
693    let namespaces = D::list_all(config).await.expect("namespaces");
694    namespaces
695        .into_iter()
696        .filter(|x| x.starts_with(prefix))
697        .collect::<BTreeSet<_>>()
698}
699
700/// Exercises the namespace functionalities of the `KeyValueDatabase`.
701/// This tests everything except the `delete_all` which would
702/// interact with other namespaces.
703pub async fn namespace_admin_test<D: TestKeyValueDatabase>() {
704    let config = D::new_test_config().await.expect("config");
705    {
706        let namespace = generate_test_namespace();
707        D::create(&config, &namespace)
708            .await
709            .expect("first creation of a namespace");
710        // Creating a namespace two times should returns an error
711        assert!(D::create(&config, &namespace).await.is_err());
712    }
713    let prefix = generate_test_namespace();
714    let namespaces = namespaces_with_prefix::<D>(&config, &prefix).await;
715    assert_eq!(namespaces.len(), 0);
716    let mut rng = make_deterministic_rng();
717    let size = 9;
718    // Creating the initial list of namespaces
719    let mut working_namespaces = BTreeSet::new();
720    for i in 0..size {
721        let namespace = format!("{prefix}_{i}");
722        assert!(!D::exists(&config, &namespace).await.expect("test"));
723        working_namespaces.insert(namespace);
724    }
725    // Creating the namespaces
726    for namespace in &working_namespaces {
727        D::create(&config, namespace)
728            .await
729            .expect("creation of a namespace");
730        assert!(D::exists(&config, namespace).await.expect("test"));
731    }
732    // Connecting to all of them at once
733    {
734        let mut connections = Vec::new();
735        for namespace in &working_namespaces {
736            let connection = D::connect(&config, namespace)
737                .await
738                .expect("a connection to the namespace");
739            connections.push(connection);
740        }
741    }
742    // Listing all of them
743    let namespaces = namespaces_with_prefix::<D>(&config, &prefix).await;
744    assert_eq!(namespaces, working_namespaces);
745    // Selecting at random some for deletion
746    let mut kept_namespaces = BTreeSet::new();
747    for namespace in working_namespaces {
748        let delete = rng.gen::<bool>();
749        if delete {
750            D::delete(&config, &namespace)
751                .await
752                .expect("A successful deletion");
753            assert!(!D::exists(&config, &namespace).await.expect("test"));
754        } else {
755            kept_namespaces.insert(namespace);
756        }
757    }
758    for namespace in &kept_namespaces {
759        assert!(D::exists(&config, namespace).await.expect("test"));
760    }
761    let namespaces = namespaces_with_prefix::<D>(&config, &prefix).await;
762    assert_eq!(namespaces, kept_namespaces);
763    for namespace in kept_namespaces {
764        D::delete(&config, &namespace)
765            .await
766            .expect("A successful deletion");
767    }
768}
769
770/// Tests listing the root keys.
771pub async fn root_key_admin_test<D>()
772where
773    D: TestKeyValueDatabase,
774    D::Store: KeyValueStore,
775{
776    let config = D::new_test_config().await.expect("config");
777    let namespace = generate_test_namespace();
778    let mut root_keys = Vec::new();
779    let mut keys = BTreeSet::new();
780    D::create(&config, &namespace).await.expect("creation");
781    let prefix = vec![0];
782    {
783        let size = 3;
784        let mut rng = make_deterministic_rng();
785        let database = D::connect(&config, &namespace).await.expect("store");
786        let shared_store = database.open_shared(&[]).expect("shared store");
787        root_keys.push(vec![]);
788        let mut batch = Batch::new();
789        for _ in 0..2 {
790            let key = get_random_byte_vector(&mut rng, &prefix, 4);
791            batch.put_key_value_bytes(key.clone(), vec![]);
792            keys.insert((vec![], key));
793        }
794        shared_store.write_batch(batch).await.expect("write batch");
795
796        for _ in 0..20 {
797            let root_key = get_random_byte_vector(&mut rng, &[], 4);
798            let exclusive_store = database.open_exclusive(&root_key).expect("exclusive store");
799            assert_eq!(exclusive_store.root_key().unwrap(), root_key);
800            root_keys.push(root_key.clone());
801            let size_select = rng.gen_range(0..size);
802            let mut batch = Batch::new();
803            for _ in 0..size_select {
804                let key = get_random_byte_vector(&mut rng, &prefix, 4);
805                batch.put_key_value_bytes(key.clone(), vec![]);
806                keys.insert((root_key.clone(), key));
807            }
808            exclusive_store
809                .write_batch(batch)
810                .await
811                .expect("write batch");
812        }
813    }
814
815    let read_root_keys = {
816        let database = D::connect(&config, &namespace).await.expect("store");
817        database.list_root_keys().await.expect("read_root_keys")
818    };
819    let set_root_keys = root_keys.iter().cloned().collect::<HashSet<_>>();
820    for read_root_key in &read_root_keys {
821        assert!(set_root_keys.contains(read_root_key));
822    }
823
824    let mut read_keys = BTreeSet::new();
825    for root_key in read_root_keys {
826        let store = D::connect(&config, &namespace)
827            .await
828            .expect("database")
829            .open_exclusive(&root_key)
830            .expect("store");
831        let keys = store.find_keys_by_prefix(&prefix).await.expect("keys");
832        for key in keys {
833            let mut big_key = prefix.clone();
834            let key = key.to_vec();
835            big_key.extend(key);
836            read_keys.insert((root_key.clone(), big_key));
837        }
838    }
839    assert_eq!(keys, read_keys);
840
841    // Checking prefix freeness of the (root_key, key). This is a
842    // common problem that needs to be tested.
843    let database = D::connect_test_namespace().await.expect("database");
844    let store1 = database.open_shared(&[2, 3, 4, 5]).expect("store1");
845    let mut batch = Batch::new();
846    batch.put_key_value_bytes(vec![6, 7], vec![123, 135]);
847    store1.write_batch(batch).await.expect("write_batch");
848
849    let store2 = database.open_shared(&[]).expect("store2");
850    let key_values = store2
851        .find_key_values_by_prefix(&[2])
852        .await
853        .expect("key_values");
854    assert_eq!(key_values.len(), 0);
855}
856
857/// A store can be in exclusive access where it stores the absence of values
858/// or in shared access where only values are stored and (key, value) once
859/// written are never modified nor erased.
860///
861/// In case of no exclusive access the following scenario is checked
862/// * Store 1 deletes a key and does not mark it as missing in its cache.
863/// * Store 2 writes the key
864/// * Store 1 reads the key, but since it is not in the cache it can read
865///   it correctly.
866///
867/// In case of exclusive access. We have the following scenario:
868/// * Store 1 deletes a key and mark it as missing in its cache.
869/// * Store 2 writes the key (it should not be doing it)
870/// * Store 1 reads the key, see it as missing.
871pub async fn exclusive_access_admin_test<D>(exclusive_access: bool)
872where
873    D: TestKeyValueDatabase,
874    D::Store: KeyValueStore,
875{
876    let config = D::new_test_config().await.expect("config");
877    let namespace = generate_test_namespace();
878    D::create(&config, &namespace).await.expect("creation");
879    let key = vec![42];
880
881    let namespace = D::connect(&config, &namespace).await.expect("store");
882    let store1 = if exclusive_access {
883        namespace.open_exclusive(&[]).expect("store1")
884    } else {
885        namespace.open_shared(&[]).expect("store1")
886    };
887    let mut batch1 = Batch::new();
888    batch1.delete_key(key.clone());
889    store1.write_batch(batch1).await.expect("write batch1");
890
891    let store2 = if exclusive_access {
892        namespace.open_exclusive(&[]).expect("store2")
893    } else {
894        namespace.open_shared(&[]).expect("store2")
895    };
896    let mut batch2 = Batch::new();
897    batch2.put_key_value_bytes(key.clone(), vec![]);
898    store2.write_batch(batch2).await.expect("write batch2");
899
900    assert_eq!(store1.contains_key(&key).await.unwrap(), !exclusive_access);
901}
902
903/// Both checks together.
904pub async fn access_admin_test<D>()
905where
906    D: TestKeyValueDatabase,
907    D::Store: KeyValueStore,
908{
909    exclusive_access_admin_test::<D>(true).await;
910    exclusive_access_admin_test::<D>(false).await;
911}