linera_views/backends/
memory.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] in memory.
5
6use std::{
7    collections::BTreeMap,
8    sync::{Arc, LazyLock, Mutex, RwLock},
9};
10
11use serde::{Deserialize, Serialize};
12use thiserror::Error;
13
14#[cfg(with_testing)]
15use crate::random::generate_test_namespace;
16#[cfg(with_testing)]
17use crate::store::TestKeyValueStore;
18use crate::{
19    batch::{Batch, WriteOperation},
20    common::get_interval,
21    store::{
22        AdminKeyValueStore, KeyValueStoreError, ReadableKeyValueStore, WithError,
23        WritableKeyValueStore,
24    },
25};
26
27/// The initial configuration of the system
28#[derive(Debug, Clone, Deserialize, Serialize)]
29pub struct MemoryStoreConfig {
30    /// Preferred buffer size for async streams.
31    pub max_stream_queries: usize,
32}
33
34/// The number of streams for the test
35pub const TEST_MEMORY_MAX_STREAM_QUERIES: usize = 10;
36
37/// The data is serialized in memory just like for RocksDB / DynamoDB
38/// The analog of the database is the `BTreeMap`
39type MemoryStoreMap = BTreeMap<Vec<u8>, Vec<u8>>;
40
41/// The container for the `MemoryStoreMap`s by namespace and then root key
42#[derive(Default)]
43struct MemoryStores {
44    stores: BTreeMap<String, BTreeMap<Vec<u8>, Arc<RwLock<MemoryStoreMap>>>>,
45}
46
47impl MemoryStores {
48    fn sync_connect(
49        &mut self,
50        config: &MemoryStoreConfig,
51        namespace: &str,
52        root_key: &[u8],
53        kill_on_drop: bool,
54    ) -> Result<MemoryStore, MemoryStoreError> {
55        let max_stream_queries = config.max_stream_queries;
56        let Some(stores) = self.stores.get_mut(namespace) else {
57            return Err(MemoryStoreError::NamespaceNotFound);
58        };
59        let store = stores.entry(root_key.to_vec()).or_insert_with(|| {
60            let map = MemoryStoreMap::new();
61            Arc::new(RwLock::new(map))
62        });
63        let map = store.clone();
64        let namespace = namespace.to_string();
65        let root_key = root_key.to_vec();
66        Ok(MemoryStore {
67            map,
68            max_stream_queries,
69            namespace,
70            root_key,
71            kill_on_drop,
72        })
73    }
74
75    fn sync_list_all(&self) -> Vec<String> {
76        self.stores.keys().cloned().collect::<Vec<_>>()
77    }
78
79    fn sync_list_root_keys(&self, namespace: &str) -> Vec<Vec<u8>> {
80        match self.stores.get(namespace) {
81            None => Vec::new(),
82            Some(map) => map.keys().cloned().collect::<Vec<_>>(),
83        }
84    }
85
86    fn sync_exists(&self, namespace: &str) -> bool {
87        self.stores.contains_key(namespace)
88    }
89
90    fn sync_create(&mut self, namespace: &str) {
91        self.stores.insert(namespace.to_string(), BTreeMap::new());
92    }
93
94    fn sync_delete(&mut self, namespace: &str) {
95        self.stores.remove(namespace);
96    }
97}
98
99/// The global variables of the Namespace memory stores
100static MEMORY_STORES: LazyLock<Mutex<MemoryStores>> =
101    LazyLock::new(|| Mutex::new(MemoryStores::default()));
102
103/// A virtual DB client where data are persisted in memory.
104#[derive(Clone)]
105pub struct MemoryStore {
106    /// The map used for storing the data.
107    map: Arc<RwLock<MemoryStoreMap>>,
108    /// The maximum number of queries used for the stream.
109    max_stream_queries: usize,
110    /// The namespace of the store
111    namespace: String,
112    /// The root key of the store
113    root_key: Vec<u8>,
114    /// Whether to kill on drop or not the
115    kill_on_drop: bool,
116}
117
118impl Drop for MemoryStore {
119    fn drop(&mut self) {
120        if self.kill_on_drop {
121            let mut memory_stores = MEMORY_STORES
122                .lock()
123                .expect("MEMORY_STORES lock should not be poisoned");
124            let stores = memory_stores.stores.get_mut(&self.namespace).unwrap();
125            stores.remove(&self.root_key);
126        }
127    }
128}
129
130impl WithError for MemoryStore {
131    type Error = MemoryStoreError;
132}
133
134impl ReadableKeyValueStore for MemoryStore {
135    const MAX_KEY_SIZE: usize = usize::MAX;
136
137    fn max_stream_queries(&self) -> usize {
138        self.max_stream_queries
139    }
140
141    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
142        let map = self
143            .map
144            .read()
145            .expect("MemoryStore lock should not be poisoned");
146        Ok(map.get(key).cloned())
147    }
148
149    async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
150        let map = self
151            .map
152            .read()
153            .expect("MemoryStore lock should not be poisoned");
154        Ok(map.contains_key(key))
155    }
156
157    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, MemoryStoreError> {
158        let map = self
159            .map
160            .read()
161            .expect("MemoryStore lock should not be poisoned");
162        Ok(keys
163            .into_iter()
164            .map(|key| map.contains_key(&key))
165            .collect::<Vec<_>>())
166    }
167
168    async fn read_multi_values_bytes(
169        &self,
170        keys: Vec<Vec<u8>>,
171    ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
172        let map = self
173            .map
174            .read()
175            .expect("MemoryStore lock should not be poisoned");
176        let mut result = Vec::new();
177        for key in keys {
178            result.push(map.get(&key).cloned());
179        }
180        Ok(result)
181    }
182
183    async fn find_keys_by_prefix(
184        &self,
185        key_prefix: &[u8],
186    ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
187        let map = self
188            .map
189            .read()
190            .expect("MemoryStore lock should not be poisoned");
191        let mut values = Vec::new();
192        let len = key_prefix.len();
193        for (key, _value) in map.range(get_interval(key_prefix.to_vec())) {
194            values.push(key[len..].to_vec())
195        }
196        Ok(values)
197    }
198
199    async fn find_key_values_by_prefix(
200        &self,
201        key_prefix: &[u8],
202    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, MemoryStoreError> {
203        let map = self
204            .map
205            .read()
206            .expect("MemoryStore lock should not be poisoned");
207        let mut key_values = Vec::new();
208        let len = key_prefix.len();
209        for (key, value) in map.range(get_interval(key_prefix.to_vec())) {
210            let key_value = (key[len..].to_vec(), value.to_vec());
211            key_values.push(key_value);
212        }
213        Ok(key_values)
214    }
215}
216
217impl WritableKeyValueStore for MemoryStore {
218    const MAX_VALUE_SIZE: usize = usize::MAX;
219
220    async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
221        let mut map = self
222            .map
223            .write()
224            .expect("MemoryStore lock should not be poisoned");
225        for ent in batch.operations {
226            match ent {
227                WriteOperation::Put { key, value } => {
228                    map.insert(key, value);
229                }
230                WriteOperation::Delete { key } => {
231                    map.remove(&key);
232                }
233                WriteOperation::DeletePrefix { key_prefix } => {
234                    let key_list = map
235                        .range(get_interval(key_prefix))
236                        .map(|x| x.0.to_vec())
237                        .collect::<Vec<_>>();
238                    for key in key_list {
239                        map.remove(&key);
240                    }
241                }
242            }
243        }
244        Ok(())
245    }
246
247    async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
248        Ok(())
249    }
250}
251
252impl MemoryStore {
253    /// Connects to a memory store. Creates it if it does not exist yet
254    fn sync_maybe_create_and_connect(
255        config: &MemoryStoreConfig,
256        namespace: &str,
257        kill_on_drop: bool,
258    ) -> Result<Self, MemoryStoreError> {
259        let mut memory_stores = MEMORY_STORES.lock().expect("lock should not be poisoned");
260        if !memory_stores.sync_exists(namespace) {
261            memory_stores.sync_create(namespace);
262        }
263        memory_stores.sync_connect(config, namespace, &[], kill_on_drop)
264    }
265
266    /// Creates a `MemoryStore` from a number of queries and a namespace.
267    pub fn new(max_stream_queries: usize, namespace: &str) -> Result<Self, MemoryStoreError> {
268        let config = MemoryStoreConfig { max_stream_queries };
269        MemoryStore::sync_maybe_create_and_connect(
270            &config, namespace, /* kill_on_drop */ false,
271        )
272    }
273
274    /// Creates a `MemoryStore` from a number of queries and a namespace for testing.
275    #[cfg(with_testing)]
276    pub fn new_for_testing(
277        max_stream_queries: usize,
278        namespace: &str,
279    ) -> Result<Self, MemoryStoreError> {
280        let config = MemoryStoreConfig { max_stream_queries };
281        MemoryStore::sync_maybe_create_and_connect(&config, namespace, /* kill_on_drop */ true)
282    }
283}
284
285impl AdminKeyValueStore for MemoryStore {
286    type Config = MemoryStoreConfig;
287
288    fn get_name() -> String {
289        "memory".to_string()
290    }
291
292    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, MemoryStoreError> {
293        let mut memory_stores = MEMORY_STORES
294            .lock()
295            .expect("MEMORY_STORES lock should not be poisoned");
296        memory_stores.sync_connect(config, namespace, &[], /* kill_on_drop */ false)
297    }
298
299    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, MemoryStoreError> {
300        let max_stream_queries = self.max_stream_queries;
301        let config = MemoryStoreConfig { max_stream_queries };
302        let mut memory_stores = MEMORY_STORES
303            .lock()
304            .expect("MEMORY_STORES lock should not be poisoned");
305        memory_stores.sync_connect(&config, &self.namespace, root_key, self.kill_on_drop)
306    }
307
308    async fn list_all(_config: &Self::Config) -> Result<Vec<String>, MemoryStoreError> {
309        let memory_stores = MEMORY_STORES
310            .lock()
311            .expect("MEMORY_STORES lock should not be poisoned");
312        Ok(memory_stores.sync_list_all())
313    }
314
315    async fn list_root_keys(
316        _config: &Self::Config,
317        namespace: &str,
318    ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
319        let memory_stores = MEMORY_STORES
320            .lock()
321            .expect("MEMORY_STORES lock should not be poisoned");
322        Ok(memory_stores.sync_list_root_keys(namespace))
323    }
324
325    async fn exists(_config: &Self::Config, namespace: &str) -> Result<bool, MemoryStoreError> {
326        let memory_stores = MEMORY_STORES
327            .lock()
328            .expect("MEMORY_STORES lock should not be poisoned");
329        Ok(memory_stores.sync_exists(namespace))
330    }
331
332    async fn create(_config: &Self::Config, namespace: &str) -> Result<(), MemoryStoreError> {
333        let mut memory_stores = MEMORY_STORES
334            .lock()
335            .expect("MEMORY_STORES lock should not be poisoned");
336        if memory_stores.sync_exists(namespace) {
337            return Err(MemoryStoreError::StoreAlreadyExists);
338        }
339        memory_stores.sync_create(namespace);
340        Ok(())
341    }
342
343    async fn delete(_config: &Self::Config, namespace: &str) -> Result<(), MemoryStoreError> {
344        let mut memory_stores = MEMORY_STORES
345            .lock()
346            .expect("MEMORY_STORES lock should not be poisoned");
347        memory_stores.sync_delete(namespace);
348        Ok(())
349    }
350}
351
352#[cfg(with_testing)]
353impl TestKeyValueStore for MemoryStore {
354    async fn new_test_config() -> Result<MemoryStoreConfig, MemoryStoreError> {
355        let max_stream_queries = TEST_MEMORY_MAX_STREAM_QUERIES;
356        Ok(MemoryStoreConfig { max_stream_queries })
357    }
358}
359
360/// Creates a test memory store for working.
361#[cfg(with_testing)]
362pub fn create_test_memory_store() -> MemoryStore {
363    let namespace = generate_test_namespace();
364    MemoryStore::new_for_testing(TEST_MEMORY_MAX_STREAM_QUERIES, &namespace).unwrap()
365}
366
367/// The error type for [`MemoryStore`].
368#[derive(Error, Debug)]
369pub enum MemoryStoreError {
370    /// Store already exists during a create operation
371    #[error("Store already exists during a create operation")]
372    StoreAlreadyExists,
373
374    /// Serialization error with BCS.
375    #[error(transparent)]
376    BcsError(#[from] bcs::Error),
377
378    /// The namespace does not exist
379    #[error("The namespace does not exist")]
380    NamespaceNotFound,
381}
382
383impl KeyValueStoreError for MemoryStoreError {
384    const BACKEND: &'static str = "memory";
385}