linera_views/backends/
memory.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueDatabase`] in memory.
5
6use std::{
7    collections::BTreeMap,
8    sync::{Arc, LazyLock, Mutex, RwLock},
9};
10
11use serde::{Deserialize, Serialize};
12use thiserror::Error;
13
14#[cfg(with_testing)]
15use crate::store::TestKeyValueDatabase;
16use crate::{
17    batch::{Batch, WriteOperation},
18    common::get_key_range_for_prefix,
19    store::{
20        KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
21        WritableKeyValueStore,
22    },
23};
24
25/// The initial configuration of the system
26#[derive(Debug, Clone, Deserialize, Serialize)]
27pub struct MemoryStoreConfig {
28    /// Preferred buffer size for async streams.
29    pub max_stream_queries: usize,
30    /// Whether a namespace should be immediately cleaned up from memory when the
31    /// connection object is dropped.
32    pub kill_on_drop: bool,
33}
34
35/// The number of streams for the test
36#[cfg(with_testing)]
37const TEST_MEMORY_MAX_STREAM_QUERIES: usize = 10;
38
39/// The values in a partition.
40type MemoryStoreMap = BTreeMap<Vec<u8>, Vec<u8>>;
41
42/// The container for the `MemoryStoreMap`s by namespace and then root key
43#[derive(Default)]
44struct MemoryDatabases {
45    databases: BTreeMap<String, BTreeMap<Vec<u8>, Arc<RwLock<MemoryStoreMap>>>>,
46}
47
48/// A connection to a namespace of key-values in memory.
49#[derive(Clone, Debug)]
50pub struct MemoryDatabase {
51    /// The current namespace.
52    namespace: String,
53    /// The maximum number of queries used for a stream.
54    max_stream_queries: usize,
55    /// Whether to remove the namespace on drop.
56    kill_on_drop: bool,
57}
58
59impl MemoryDatabases {
60    fn sync_open(
61        &mut self,
62        namespace: &str,
63        max_stream_queries: usize,
64        root_key: &[u8],
65    ) -> Result<MemoryStore, MemoryStoreError> {
66        let Some(stores) = self.databases.get_mut(namespace) else {
67            return Err(MemoryStoreError::NamespaceNotFound);
68        };
69        let store = stores.entry(root_key.to_vec()).or_insert_with(|| {
70            let map = MemoryStoreMap::new();
71            Arc::new(RwLock::new(map))
72        });
73        let map = store.clone();
74        Ok(MemoryStore {
75            map,
76            root_key: root_key.to_vec(),
77            max_stream_queries,
78        })
79    }
80
81    fn sync_list_all(&self) -> Vec<String> {
82        self.databases.keys().cloned().collect::<Vec<_>>()
83    }
84
85    fn sync_list_root_keys(&self, namespace: &str) -> Vec<Vec<u8>> {
86        match self.databases.get(namespace) {
87            None => Vec::new(),
88            Some(map) => map.keys().cloned().collect::<Vec<_>>(),
89        }
90    }
91
92    fn sync_exists(&self, namespace: &str) -> bool {
93        self.databases.contains_key(namespace)
94    }
95
96    fn sync_create(&mut self, namespace: &str) {
97        self.databases
98            .insert(namespace.to_string(), BTreeMap::new());
99    }
100
101    fn sync_delete(&mut self, namespace: &str) {
102        self.databases.remove(namespace);
103    }
104}
105
106/// The global table of namespaces.
107static MEMORY_DATABASES: LazyLock<Mutex<MemoryDatabases>> =
108    LazyLock::new(|| Mutex::new(MemoryDatabases::default()));
109
110/// A virtual DB client where data are persisted in memory.
111#[derive(Clone)]
112pub struct MemoryStore {
113    /// The map used for storing the data.
114    map: Arc<RwLock<MemoryStoreMap>>,
115    /// The root key.
116    root_key: Vec<u8>,
117    /// The maximum number of queries used for a stream.
118    max_stream_queries: usize,
119}
120
121impl WithError for MemoryDatabase {
122    type Error = MemoryStoreError;
123}
124
125impl WithError for MemoryStore {
126    type Error = MemoryStoreError;
127}
128
129impl ReadableKeyValueStore for MemoryStore {
130    const MAX_KEY_SIZE: usize = usize::MAX;
131
132    fn max_stream_queries(&self) -> usize {
133        self.max_stream_queries
134    }
135
136    fn root_key(&self) -> Result<Vec<u8>, MemoryStoreError> {
137        Ok(self.root_key.clone())
138    }
139
140    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
141        let map = self
142            .map
143            .read()
144            .expect("MemoryStore lock should not be poisoned");
145        Ok(map.get(key).cloned())
146    }
147
148    async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
149        let map = self
150            .map
151            .read()
152            .expect("MemoryStore lock should not be poisoned");
153        Ok(map.contains_key(key))
154    }
155
156    async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, MemoryStoreError> {
157        let map = self
158            .map
159            .read()
160            .expect("MemoryStore lock should not be poisoned");
161        Ok(keys
162            .iter()
163            .map(|key| map.contains_key(key))
164            .collect::<Vec<_>>())
165    }
166
167    async fn read_multi_values_bytes(
168        &self,
169        keys: &[Vec<u8>],
170    ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
171        let map = self
172            .map
173            .read()
174            .expect("MemoryStore lock should not be poisoned");
175        let mut result = Vec::new();
176        for key in keys {
177            result.push(map.get(key).cloned());
178        }
179        Ok(result)
180    }
181
182    async fn find_keys_by_prefix(
183        &self,
184        key_prefix: &[u8],
185    ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
186        let map = self
187            .map
188            .read()
189            .expect("MemoryStore lock should not be poisoned");
190        let mut values = Vec::new();
191        let len = key_prefix.len();
192        for (key, _value) in map.range(get_key_range_for_prefix(key_prefix.to_vec())) {
193            values.push(key[len..].to_vec())
194        }
195        Ok(values)
196    }
197
198    async fn find_key_values_by_prefix(
199        &self,
200        key_prefix: &[u8],
201    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, MemoryStoreError> {
202        let map = self
203            .map
204            .read()
205            .expect("MemoryStore lock should not be poisoned");
206        let mut key_values = Vec::new();
207        let len = key_prefix.len();
208        for (key, value) in map.range(get_key_range_for_prefix(key_prefix.to_vec())) {
209            let key_value = (key[len..].to_vec(), value.to_vec());
210            key_values.push(key_value);
211        }
212        Ok(key_values)
213    }
214}
215
216impl WritableKeyValueStore for MemoryStore {
217    const MAX_VALUE_SIZE: usize = usize::MAX;
218
219    async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
220        let mut map = self
221            .map
222            .write()
223            .expect("MemoryStore lock should not be poisoned");
224        for ent in batch.operations {
225            match ent {
226                WriteOperation::Put { key, value } => {
227                    map.insert(key, value);
228                }
229                WriteOperation::Delete { key } => {
230                    map.remove(&key);
231                }
232                WriteOperation::DeletePrefix { key_prefix } => {
233                    let key_list = map
234                        .range(get_key_range_for_prefix(key_prefix))
235                        .map(|x| x.0.to_vec())
236                        .collect::<Vec<_>>();
237                    for key in key_list {
238                        map.remove(&key);
239                    }
240                }
241            }
242        }
243        Ok(())
244    }
245
246    async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
247        Ok(())
248    }
249}
250
251impl MemoryStore {
252    /// Creates a `MemoryStore` that doesn't belong to any registered namespace.
253    #[cfg(with_testing)]
254    pub fn new_for_testing() -> Self {
255        Self {
256            map: Arc::default(),
257            root_key: Vec::new(),
258            max_stream_queries: TEST_MEMORY_MAX_STREAM_QUERIES,
259        }
260    }
261}
262
263impl Drop for MemoryDatabase {
264    fn drop(&mut self) {
265        if self.kill_on_drop {
266            let mut databases = MEMORY_DATABASES
267                .lock()
268                .expect("MEMORY_DATABASES lock should not be poisoned");
269            databases.databases.remove(&self.namespace);
270        }
271    }
272}
273
274impl KeyValueDatabase for MemoryDatabase {
275    type Config = MemoryStoreConfig;
276
277    type Store = MemoryStore;
278
279    fn get_name() -> String {
280        "memory".to_string()
281    }
282
283    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, MemoryStoreError> {
284        let databases = MEMORY_DATABASES
285            .lock()
286            .expect("MEMORY_DATABASES lock should not be poisoned");
287        if !databases.sync_exists(namespace) {
288            return Err(MemoryStoreError::NamespaceNotFound);
289        };
290        Ok(MemoryDatabase {
291            namespace: namespace.to_string(),
292            max_stream_queries: config.max_stream_queries,
293            kill_on_drop: config.kill_on_drop,
294        })
295    }
296
297    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, MemoryStoreError> {
298        let mut databases = MEMORY_DATABASES
299            .lock()
300            .expect("MEMORY_DATABASES lock should not be poisoned");
301        databases.sync_open(&self.namespace, self.max_stream_queries, root_key)
302    }
303
304    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, MemoryStoreError> {
305        self.open_shared(root_key)
306    }
307
308    async fn list_all(_config: &Self::Config) -> Result<Vec<String>, MemoryStoreError> {
309        let databases = MEMORY_DATABASES
310            .lock()
311            .expect("MEMORY_DATABASES lock should not be poisoned");
312        Ok(databases.sync_list_all())
313    }
314
315    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
316        let databases = MEMORY_DATABASES
317            .lock()
318            .expect("MEMORY_DATABASES lock should not be poisoned");
319        Ok(databases.sync_list_root_keys(&self.namespace))
320    }
321
322    async fn exists(_config: &Self::Config, namespace: &str) -> Result<bool, MemoryStoreError> {
323        let databases = MEMORY_DATABASES
324            .lock()
325            .expect("MEMORY_DATABASES lock should not be poisoned");
326        Ok(databases.sync_exists(namespace))
327    }
328
329    async fn create(_config: &Self::Config, namespace: &str) -> Result<(), MemoryStoreError> {
330        let mut databases = MEMORY_DATABASES
331            .lock()
332            .expect("MEMORY_DATABASES lock should not be poisoned");
333        if databases.sync_exists(namespace) {
334            return Err(MemoryStoreError::StoreAlreadyExists);
335        }
336        databases.sync_create(namespace);
337        Ok(())
338    }
339
340    async fn delete(_config: &Self::Config, namespace: &str) -> Result<(), MemoryStoreError> {
341        let mut databases = MEMORY_DATABASES
342            .lock()
343            .expect("MEMORY_DATABASES lock should not be poisoned");
344        databases.sync_delete(namespace);
345        Ok(())
346    }
347}
348
349#[cfg(with_testing)]
350impl TestKeyValueDatabase for MemoryDatabase {
351    async fn new_test_config() -> Result<MemoryStoreConfig, MemoryStoreError> {
352        Ok(MemoryStoreConfig {
353            max_stream_queries: TEST_MEMORY_MAX_STREAM_QUERIES,
354            kill_on_drop: false,
355        })
356    }
357}
358
359/// The error type for [`MemoryStore`].
360#[derive(Error, Debug)]
361pub enum MemoryStoreError {
362    /// Store already exists during a create operation
363    #[error("Store already exists during a create operation")]
364    StoreAlreadyExists,
365
366    /// Serialization error with BCS.
367    #[error(transparent)]
368    BcsError(#[from] bcs::Error),
369
370    /// The namespace does not exist
371    #[error("The namespace does not exist")]
372    NamespaceNotFound,
373}
374
375impl KeyValueStoreError for MemoryStoreError {
376    const BACKEND: &'static str = "memory";
377}