linera_views/backends/
memory.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] in memory.
5
6use std::{
7    collections::BTreeMap,
8    sync::{Arc, LazyLock, Mutex, RwLock},
9};
10
11use serde::{Deserialize, Serialize};
12use thiserror::Error;
13
14#[cfg(with_testing)]
15use crate::random::generate_test_namespace;
16#[cfg(with_testing)]
17use crate::store::TestKeyValueStore;
18use crate::{
19    batch::{Batch, WriteOperation},
20    common::get_interval,
21    store::{
22        AdminKeyValueStore, CommonStoreInternalConfig, KeyValueStoreError, ReadableKeyValueStore,
23        WithError, WritableKeyValueStore,
24    },
25};
26
27/// The initial configuration of the system
28#[derive(Debug, Clone, Deserialize, Serialize)]
29pub struct MemoryStoreConfig {
30    /// The common configuration of the key value store
31    pub common_config: CommonStoreInternalConfig,
32}
33
34impl MemoryStoreConfig {
35    /// Creates a `MemoryStoreConfig`. `max_concurrent_queries`, `cache_size` and `replication_factor` are not used.
36    pub fn new(max_stream_queries: usize) -> Self {
37        let common_config = CommonStoreInternalConfig {
38            max_concurrent_queries: None,
39            max_stream_queries,
40            replication_factor: 1,
41        };
42        Self { common_config }
43    }
44}
45
46/// The number of streams for the test
47pub const TEST_MEMORY_MAX_STREAM_QUERIES: usize = 10;
48
49/// The data is serialized in memory just like for RocksDB / DynamoDB
50/// The analog of the database is the `BTreeMap`
51type MemoryStoreMap = BTreeMap<Vec<u8>, Vec<u8>>;
52
53/// The container for the `MemoryStoreMap`s by namespace and then root key
54#[derive(Default)]
55struct MemoryStores {
56    stores: BTreeMap<String, BTreeMap<Vec<u8>, Arc<RwLock<MemoryStoreMap>>>>,
57}
58
59impl MemoryStores {
60    fn sync_connect(
61        &mut self,
62        config: &MemoryStoreConfig,
63        namespace: &str,
64        root_key: &[u8],
65        kill_on_drop: bool,
66    ) -> Result<MemoryStore, MemoryStoreError> {
67        let max_stream_queries = config.common_config.max_stream_queries;
68        let Some(stores) = self.stores.get_mut(namespace) else {
69            return Err(MemoryStoreError::NamespaceNotFound);
70        };
71        let store = stores.entry(root_key.to_vec()).or_insert_with(|| {
72            let map = MemoryStoreMap::new();
73            Arc::new(RwLock::new(map))
74        });
75        let map = store.clone();
76        let namespace = namespace.to_string();
77        let root_key = root_key.to_vec();
78        Ok(MemoryStore {
79            map,
80            max_stream_queries,
81            namespace,
82            root_key,
83            kill_on_drop,
84        })
85    }
86
87    fn sync_list_all(&self) -> Vec<String> {
88        self.stores.keys().cloned().collect::<Vec<_>>()
89    }
90
91    fn sync_list_root_keys(&self, namespace: &str) -> Vec<Vec<u8>> {
92        match self.stores.get(namespace) {
93            None => Vec::new(),
94            Some(map) => map.keys().cloned().collect::<Vec<_>>(),
95        }
96    }
97
98    fn sync_exists(&self, namespace: &str) -> bool {
99        self.stores.contains_key(namespace)
100    }
101
102    fn sync_create(&mut self, namespace: &str) {
103        self.stores.insert(namespace.to_string(), BTreeMap::new());
104    }
105
106    fn sync_delete(&mut self, namespace: &str) {
107        self.stores.remove(namespace);
108    }
109}
110
111/// The global variables of the Namespace memory stores
112static MEMORY_STORES: LazyLock<Mutex<MemoryStores>> =
113    LazyLock::new(|| Mutex::new(MemoryStores::default()));
114
115/// A virtual DB client where data are persisted in memory.
116#[derive(Clone)]
117pub struct MemoryStore {
118    /// The map used for storing the data.
119    map: Arc<RwLock<MemoryStoreMap>>,
120    /// The maximum number of queries used for the stream.
121    max_stream_queries: usize,
122    /// The namespace of the store
123    namespace: String,
124    /// The root key of the store
125    root_key: Vec<u8>,
126    /// Whether to kill on drop or not the
127    kill_on_drop: bool,
128}
129
130impl Drop for MemoryStore {
131    fn drop(&mut self) {
132        if self.kill_on_drop {
133            let mut memory_stores = MEMORY_STORES
134                .lock()
135                .expect("MEMORY_STORES lock should not be poisoned");
136            let stores = memory_stores.stores.get_mut(&self.namespace).unwrap();
137            stores.remove(&self.root_key);
138        }
139    }
140}
141
142impl WithError for MemoryStore {
143    type Error = MemoryStoreError;
144}
145
146impl ReadableKeyValueStore for MemoryStore {
147    const MAX_KEY_SIZE: usize = usize::MAX;
148    type Keys = Vec<Vec<u8>>;
149    type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
150
151    fn max_stream_queries(&self) -> usize {
152        self.max_stream_queries
153    }
154
155    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
156        let map = self
157            .map
158            .read()
159            .expect("MemoryStore lock should not be poisoned");
160        Ok(map.get(key).cloned())
161    }
162
163    async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
164        let map = self
165            .map
166            .read()
167            .expect("MemoryStore lock should not be poisoned");
168        Ok(map.contains_key(key))
169    }
170
171    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, MemoryStoreError> {
172        let map = self
173            .map
174            .read()
175            .expect("MemoryStore lock should not be poisoned");
176        Ok(keys
177            .into_iter()
178            .map(|key| map.contains_key(&key))
179            .collect::<Vec<_>>())
180    }
181
182    async fn read_multi_values_bytes(
183        &self,
184        keys: Vec<Vec<u8>>,
185    ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
186        let map = self
187            .map
188            .read()
189            .expect("MemoryStore lock should not be poisoned");
190        let mut result = Vec::new();
191        for key in keys {
192            result.push(map.get(&key).cloned());
193        }
194        Ok(result)
195    }
196
197    async fn find_keys_by_prefix(
198        &self,
199        key_prefix: &[u8],
200    ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
201        let map = self
202            .map
203            .read()
204            .expect("MemoryStore lock should not be poisoned");
205        let mut values = Vec::new();
206        let len = key_prefix.len();
207        for (key, _value) in map.range(get_interval(key_prefix.to_vec())) {
208            values.push(key[len..].to_vec())
209        }
210        Ok(values)
211    }
212
213    async fn find_key_values_by_prefix(
214        &self,
215        key_prefix: &[u8],
216    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, MemoryStoreError> {
217        let map = self
218            .map
219            .read()
220            .expect("MemoryStore lock should not be poisoned");
221        let mut key_values = Vec::new();
222        let len = key_prefix.len();
223        for (key, value) in map.range(get_interval(key_prefix.to_vec())) {
224            let key_value = (key[len..].to_vec(), value.to_vec());
225            key_values.push(key_value);
226        }
227        Ok(key_values)
228    }
229}
230
231impl WritableKeyValueStore for MemoryStore {
232    const MAX_VALUE_SIZE: usize = usize::MAX;
233
234    async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
235        let mut map = self
236            .map
237            .write()
238            .expect("MemoryStore lock should not be poisoned");
239        for ent in batch.operations {
240            match ent {
241                WriteOperation::Put { key, value } => {
242                    map.insert(key, value);
243                }
244                WriteOperation::Delete { key } => {
245                    map.remove(&key);
246                }
247                WriteOperation::DeletePrefix { key_prefix } => {
248                    let key_list = map
249                        .range(get_interval(key_prefix))
250                        .map(|x| x.0.to_vec())
251                        .collect::<Vec<_>>();
252                    for key in key_list {
253                        map.remove(&key);
254                    }
255                }
256            }
257        }
258        Ok(())
259    }
260
261    async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
262        Ok(())
263    }
264}
265
266impl MemoryStore {
267    /// Connects to a memory store. Creates it if it does not exist yet
268    fn sync_maybe_create_and_connect(
269        config: &MemoryStoreConfig,
270        namespace: &str,
271        kill_on_drop: bool,
272    ) -> Result<Self, MemoryStoreError> {
273        let mut memory_stores = MEMORY_STORES.lock().expect("lock should not be poisoned");
274        if !memory_stores.sync_exists(namespace) {
275            memory_stores.sync_create(namespace);
276        }
277        memory_stores.sync_connect(config, namespace, &[], kill_on_drop)
278    }
279
280    /// Creates a `MemoryStore` from a number of queries and a namespace.
281    pub fn new(max_stream_queries: usize, namespace: &str) -> Result<Self, MemoryStoreError> {
282        let common_config = CommonStoreInternalConfig {
283            max_concurrent_queries: None,
284            max_stream_queries,
285            replication_factor: 1,
286        };
287        let config = MemoryStoreConfig { common_config };
288        let kill_on_drop = false;
289        MemoryStore::sync_maybe_create_and_connect(&config, namespace, kill_on_drop)
290    }
291
292    /// Creates a `MemoryStore` from a number of queries and a namespace for testing.
293    #[cfg(with_testing)]
294    pub fn new_for_testing(
295        max_stream_queries: usize,
296        namespace: &str,
297    ) -> Result<Self, MemoryStoreError> {
298        let common_config = CommonStoreInternalConfig {
299            max_concurrent_queries: None,
300            max_stream_queries,
301            replication_factor: 1,
302        };
303        let config = MemoryStoreConfig { common_config };
304        let kill_on_drop = true;
305        MemoryStore::sync_maybe_create_and_connect(&config, namespace, kill_on_drop)
306    }
307}
308
309impl AdminKeyValueStore for MemoryStore {
310    type Config = MemoryStoreConfig;
311
312    fn get_name() -> String {
313        "memory".to_string()
314    }
315
316    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, MemoryStoreError> {
317        let mut memory_stores = MEMORY_STORES
318            .lock()
319            .expect("MEMORY_STORES lock should not be poisoned");
320        let kill_on_drop = false;
321        memory_stores.sync_connect(config, namespace, &[], kill_on_drop)
322    }
323
324    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, MemoryStoreError> {
325        let max_stream_queries = self.max_stream_queries;
326        let common_config = CommonStoreInternalConfig {
327            max_concurrent_queries: None,
328            max_stream_queries,
329            replication_factor: 1,
330        };
331        let config = MemoryStoreConfig { common_config };
332        let mut memory_stores = MEMORY_STORES
333            .lock()
334            .expect("MEMORY_STORES lock should not be poisoned");
335        let kill_on_drop = self.kill_on_drop;
336        let namespace = &self.namespace;
337        memory_stores.sync_connect(&config, namespace, root_key, kill_on_drop)
338    }
339
340    async fn list_all(_config: &Self::Config) -> Result<Vec<String>, MemoryStoreError> {
341        let memory_stores = MEMORY_STORES
342            .lock()
343            .expect("MEMORY_STORES lock should not be poisoned");
344        Ok(memory_stores.sync_list_all())
345    }
346
347    async fn list_root_keys(
348        _config: &Self::Config,
349        namespace: &str,
350    ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
351        let memory_stores = MEMORY_STORES
352            .lock()
353            .expect("MEMORY_STORES lock should not be poisoned");
354        Ok(memory_stores.sync_list_root_keys(namespace))
355    }
356
357    async fn exists(_config: &Self::Config, namespace: &str) -> Result<bool, MemoryStoreError> {
358        let memory_stores = MEMORY_STORES
359            .lock()
360            .expect("MEMORY_STORES lock should not be poisoned");
361        Ok(memory_stores.sync_exists(namespace))
362    }
363
364    async fn create(_config: &Self::Config, namespace: &str) -> Result<(), MemoryStoreError> {
365        let mut memory_stores = MEMORY_STORES
366            .lock()
367            .expect("MEMORY_STORES lock should not be poisoned");
368        if memory_stores.sync_exists(namespace) {
369            return Err(MemoryStoreError::StoreAlreadyExists);
370        }
371        memory_stores.sync_create(namespace);
372        Ok(())
373    }
374
375    async fn delete(_config: &Self::Config, namespace: &str) -> Result<(), MemoryStoreError> {
376        let mut memory_stores = MEMORY_STORES
377            .lock()
378            .expect("MEMORY_STORES lock should not be poisoned");
379        memory_stores.sync_delete(namespace);
380        Ok(())
381    }
382}
383
384#[cfg(with_testing)]
385impl TestKeyValueStore for MemoryStore {
386    async fn new_test_config() -> Result<MemoryStoreConfig, MemoryStoreError> {
387        let max_stream_queries = TEST_MEMORY_MAX_STREAM_QUERIES;
388        let common_config = CommonStoreInternalConfig {
389            max_concurrent_queries: None,
390            max_stream_queries,
391            replication_factor: 1,
392        };
393        Ok(MemoryStoreConfig { common_config })
394    }
395}
396
397/// Creates a test memory store for working.
398#[cfg(with_testing)]
399pub fn create_test_memory_store() -> MemoryStore {
400    let namespace = generate_test_namespace();
401    MemoryStore::new_for_testing(TEST_MEMORY_MAX_STREAM_QUERIES, &namespace).unwrap()
402}
403
404/// The error type for [`MemoryStore`].
405#[derive(Error, Debug)]
406pub enum MemoryStoreError {
407    /// Store already exists during a create operation
408    #[error("Store already exists during a create operation")]
409    StoreAlreadyExists,
410
411    /// Serialization error with BCS.
412    #[error(transparent)]
413    BcsError(#[from] bcs::Error),
414
415    /// The namespace does not exist
416    #[error("The namespace does not exist")]
417    NamespaceNotFound,
418}
419
420impl KeyValueStoreError for MemoryStoreError {
421    const BACKEND: &'static str = "memory";
422}