linera_views/backends/
memory.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueDatabase`] in memory.
5
6use std::{
7    collections::BTreeMap,
8    sync::{Arc, LazyLock, Mutex, RwLock},
9};
10
11use serde::{Deserialize, Serialize};
12use thiserror::Error;
13
14#[cfg(with_testing)]
15use crate::store::TestKeyValueDatabase;
16use crate::{
17    batch::{Batch, WriteOperation},
18    common::get_interval,
19    store::{
20        KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
21        WritableKeyValueStore,
22    },
23};
24
25/// The initial configuration of the system
26#[derive(Debug, Clone, Deserialize, Serialize)]
27pub struct MemoryStoreConfig {
28    /// Preferred buffer size for async streams.
29    pub max_stream_queries: usize,
30    /// Whether a namespace should be immediately cleaned up from memory when the
31    /// connection object is dropped.
32    pub kill_on_drop: bool,
33}
34
35/// The number of streams for the test
36#[cfg(with_testing)]
37const TEST_MEMORY_MAX_STREAM_QUERIES: usize = 10;
38
39/// The values in a partition.
40type MemoryStoreMap = BTreeMap<Vec<u8>, Vec<u8>>;
41
42/// The container for the `MemoryStoreMap`s by namespace and then root key
43#[derive(Default)]
44struct MemoryDatabases {
45    databases: BTreeMap<String, BTreeMap<Vec<u8>, Arc<RwLock<MemoryStoreMap>>>>,
46}
47
48/// A connection to a namespace of key-values in memory.
49#[derive(Clone, Debug)]
50pub struct MemoryDatabase {
51    /// The current namespace.
52    namespace: String,
53    /// The maximum number of queries used for a stream.
54    max_stream_queries: usize,
55    /// Whether to remove the namespace on drop.
56    kill_on_drop: bool,
57}
58
59impl MemoryDatabases {
60    fn sync_open(
61        &mut self,
62        namespace: &str,
63        max_stream_queries: usize,
64        root_key: &[u8],
65    ) -> Result<MemoryStore, MemoryStoreError> {
66        let Some(stores) = self.databases.get_mut(namespace) else {
67            return Err(MemoryStoreError::NamespaceNotFound);
68        };
69        let store = stores.entry(root_key.to_vec()).or_insert_with(|| {
70            let map = MemoryStoreMap::new();
71            Arc::new(RwLock::new(map))
72        });
73        let map = store.clone();
74        Ok(MemoryStore {
75            map,
76            max_stream_queries,
77        })
78    }
79
80    fn sync_list_all(&self) -> Vec<String> {
81        self.databases.keys().cloned().collect::<Vec<_>>()
82    }
83
84    fn sync_list_root_keys(&self, namespace: &str) -> Vec<Vec<u8>> {
85        match self.databases.get(namespace) {
86            None => Vec::new(),
87            Some(map) => map.keys().cloned().collect::<Vec<_>>(),
88        }
89    }
90
91    fn sync_exists(&self, namespace: &str) -> bool {
92        self.databases.contains_key(namespace)
93    }
94
95    fn sync_create(&mut self, namespace: &str) {
96        self.databases
97            .insert(namespace.to_string(), BTreeMap::new());
98    }
99
100    fn sync_delete(&mut self, namespace: &str) {
101        self.databases.remove(namespace);
102    }
103}
104
105/// The global table of namespaces.
106static MEMORY_DATABASES: LazyLock<Mutex<MemoryDatabases>> =
107    LazyLock::new(|| Mutex::new(MemoryDatabases::default()));
108
109/// A virtual DB client where data are persisted in memory.
110#[derive(Clone)]
111pub struct MemoryStore {
112    /// The map used for storing the data.
113    map: Arc<RwLock<MemoryStoreMap>>,
114    /// The maximum number of queries used for a stream.
115    max_stream_queries: usize,
116}
117
118impl WithError for MemoryDatabase {
119    type Error = MemoryStoreError;
120}
121
122impl WithError for MemoryStore {
123    type Error = MemoryStoreError;
124}
125
126impl ReadableKeyValueStore for MemoryStore {
127    const MAX_KEY_SIZE: usize = usize::MAX;
128
129    fn max_stream_queries(&self) -> usize {
130        self.max_stream_queries
131    }
132
133    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
134        let map = self
135            .map
136            .read()
137            .expect("MemoryStore lock should not be poisoned");
138        Ok(map.get(key).cloned())
139    }
140
141    async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
142        let map = self
143            .map
144            .read()
145            .expect("MemoryStore lock should not be poisoned");
146        Ok(map.contains_key(key))
147    }
148
149    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, MemoryStoreError> {
150        let map = self
151            .map
152            .read()
153            .expect("MemoryStore lock should not be poisoned");
154        Ok(keys
155            .into_iter()
156            .map(|key| map.contains_key(&key))
157            .collect::<Vec<_>>())
158    }
159
160    async fn read_multi_values_bytes(
161        &self,
162        keys: Vec<Vec<u8>>,
163    ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
164        let map = self
165            .map
166            .read()
167            .expect("MemoryStore lock should not be poisoned");
168        let mut result = Vec::new();
169        for key in keys {
170            result.push(map.get(&key).cloned());
171        }
172        Ok(result)
173    }
174
175    async fn find_keys_by_prefix(
176        &self,
177        key_prefix: &[u8],
178    ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
179        let map = self
180            .map
181            .read()
182            .expect("MemoryStore lock should not be poisoned");
183        let mut values = Vec::new();
184        let len = key_prefix.len();
185        for (key, _value) in map.range(get_interval(key_prefix.to_vec())) {
186            values.push(key[len..].to_vec())
187        }
188        Ok(values)
189    }
190
191    async fn find_key_values_by_prefix(
192        &self,
193        key_prefix: &[u8],
194    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, MemoryStoreError> {
195        let map = self
196            .map
197            .read()
198            .expect("MemoryStore lock should not be poisoned");
199        let mut key_values = Vec::new();
200        let len = key_prefix.len();
201        for (key, value) in map.range(get_interval(key_prefix.to_vec())) {
202            let key_value = (key[len..].to_vec(), value.to_vec());
203            key_values.push(key_value);
204        }
205        Ok(key_values)
206    }
207}
208
209impl WritableKeyValueStore for MemoryStore {
210    const MAX_VALUE_SIZE: usize = usize::MAX;
211
212    async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
213        let mut map = self
214            .map
215            .write()
216            .expect("MemoryStore lock should not be poisoned");
217        for ent in batch.operations {
218            match ent {
219                WriteOperation::Put { key, value } => {
220                    map.insert(key, value);
221                }
222                WriteOperation::Delete { key } => {
223                    map.remove(&key);
224                }
225                WriteOperation::DeletePrefix { key_prefix } => {
226                    let key_list = map
227                        .range(get_interval(key_prefix))
228                        .map(|x| x.0.to_vec())
229                        .collect::<Vec<_>>();
230                    for key in key_list {
231                        map.remove(&key);
232                    }
233                }
234            }
235        }
236        Ok(())
237    }
238
239    async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
240        Ok(())
241    }
242}
243
244impl MemoryStore {
245    /// Creates a `MemoryStore` that doesn't belong to any registered namespace.
246    #[cfg(with_testing)]
247    pub fn new_for_testing() -> Self {
248        Self {
249            map: Arc::default(),
250            max_stream_queries: TEST_MEMORY_MAX_STREAM_QUERIES,
251        }
252    }
253}
254
255impl Drop for MemoryDatabase {
256    fn drop(&mut self) {
257        if self.kill_on_drop {
258            let mut databases = MEMORY_DATABASES
259                .lock()
260                .expect("MEMORY_DATABASES lock should not be poisoned");
261            databases.databases.remove(&self.namespace);
262        }
263    }
264}
265
266impl KeyValueDatabase for MemoryDatabase {
267    type Config = MemoryStoreConfig;
268
269    type Store = MemoryStore;
270
271    fn get_name() -> String {
272        "memory".to_string()
273    }
274
275    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, MemoryStoreError> {
276        let databases = MEMORY_DATABASES
277            .lock()
278            .expect("MEMORY_DATABASES lock should not be poisoned");
279        if !databases.sync_exists(namespace) {
280            return Err(MemoryStoreError::NamespaceNotFound);
281        };
282        Ok(MemoryDatabase {
283            namespace: namespace.to_string(),
284            max_stream_queries: config.max_stream_queries,
285            kill_on_drop: config.kill_on_drop,
286        })
287    }
288
289    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, MemoryStoreError> {
290        let mut databases = MEMORY_DATABASES
291            .lock()
292            .expect("MEMORY_DATABASES lock should not be poisoned");
293        databases.sync_open(&self.namespace, self.max_stream_queries, root_key)
294    }
295
296    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, MemoryStoreError> {
297        self.open_shared(root_key)
298    }
299
300    async fn list_all(_config: &Self::Config) -> Result<Vec<String>, MemoryStoreError> {
301        let databases = MEMORY_DATABASES
302            .lock()
303            .expect("MEMORY_DATABASES lock should not be poisoned");
304        Ok(databases.sync_list_all())
305    }
306
307    async fn list_root_keys(
308        _config: &Self::Config,
309        namespace: &str,
310    ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
311        let databases = MEMORY_DATABASES
312            .lock()
313            .expect("MEMORY_DATABASES lock should not be poisoned");
314        Ok(databases.sync_list_root_keys(namespace))
315    }
316
317    async fn exists(_config: &Self::Config, namespace: &str) -> Result<bool, MemoryStoreError> {
318        let databases = MEMORY_DATABASES
319            .lock()
320            .expect("MEMORY_DATABASES lock should not be poisoned");
321        Ok(databases.sync_exists(namespace))
322    }
323
324    async fn create(_config: &Self::Config, namespace: &str) -> Result<(), MemoryStoreError> {
325        let mut databases = MEMORY_DATABASES
326            .lock()
327            .expect("MEMORY_DATABASES lock should not be poisoned");
328        if databases.sync_exists(namespace) {
329            return Err(MemoryStoreError::StoreAlreadyExists);
330        }
331        databases.sync_create(namespace);
332        Ok(())
333    }
334
335    async fn delete(_config: &Self::Config, namespace: &str) -> Result<(), MemoryStoreError> {
336        let mut databases = MEMORY_DATABASES
337            .lock()
338            .expect("MEMORY_DATABASES lock should not be poisoned");
339        databases.sync_delete(namespace);
340        Ok(())
341    }
342}
343
344#[cfg(with_testing)]
345impl TestKeyValueDatabase for MemoryDatabase {
346    async fn new_test_config() -> Result<MemoryStoreConfig, MemoryStoreError> {
347        Ok(MemoryStoreConfig {
348            max_stream_queries: TEST_MEMORY_MAX_STREAM_QUERIES,
349            kill_on_drop: false,
350        })
351    }
352}
353
354/// The error type for [`MemoryStore`].
355#[derive(Error, Debug)]
356pub enum MemoryStoreError {
357    /// Store already exists during a create operation
358    #[error("Store already exists during a create operation")]
359    StoreAlreadyExists,
360
361    /// Serialization error with BCS.
362    #[error(transparent)]
363    BcsError(#[from] bcs::Error),
364
365    /// The namespace does not exist
366    #[error("The namespace does not exist")]
367    NamespaceNotFound,
368}
369
370impl KeyValueStoreError for MemoryStoreError {
371    const BACKEND: &'static str = "memory";
372}