1use std::{
7 collections::BTreeMap,
8 sync::{Arc, LazyLock, Mutex, RwLock},
9};
10
11use serde::{Deserialize, Serialize};
12use thiserror::Error;
13
14#[cfg(with_testing)]
15use crate::store::TestKeyValueDatabase;
16use crate::{
17 batch::{Batch, WriteOperation},
18 common::get_interval,
19 store::{
20 KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
21 WritableKeyValueStore,
22 },
23};
24
25#[derive(Debug, Clone, Deserialize, Serialize)]
27pub struct MemoryStoreConfig {
28 pub max_stream_queries: usize,
30 pub kill_on_drop: bool,
33}
34
35#[cfg(with_testing)]
37const TEST_MEMORY_MAX_STREAM_QUERIES: usize = 10;
38
39type MemoryStoreMap = BTreeMap<Vec<u8>, Vec<u8>>;
41
42#[derive(Default)]
44struct MemoryDatabases {
45 databases: BTreeMap<String, BTreeMap<Vec<u8>, Arc<RwLock<MemoryStoreMap>>>>,
46}
47
48#[derive(Clone, Debug)]
50pub struct MemoryDatabase {
51 namespace: String,
53 max_stream_queries: usize,
55 kill_on_drop: bool,
57}
58
59impl MemoryDatabases {
60 fn sync_open(
61 &mut self,
62 namespace: &str,
63 max_stream_queries: usize,
64 root_key: &[u8],
65 ) -> Result<MemoryStore, MemoryStoreError> {
66 let Some(stores) = self.databases.get_mut(namespace) else {
67 return Err(MemoryStoreError::NamespaceNotFound);
68 };
69 let store = stores.entry(root_key.to_vec()).or_insert_with(|| {
70 let map = MemoryStoreMap::new();
71 Arc::new(RwLock::new(map))
72 });
73 let map = store.clone();
74 Ok(MemoryStore {
75 map,
76 max_stream_queries,
77 })
78 }
79
80 fn sync_list_all(&self) -> Vec<String> {
81 self.databases.keys().cloned().collect::<Vec<_>>()
82 }
83
84 fn sync_list_root_keys(&self, namespace: &str) -> Vec<Vec<u8>> {
85 match self.databases.get(namespace) {
86 None => Vec::new(),
87 Some(map) => map.keys().cloned().collect::<Vec<_>>(),
88 }
89 }
90
91 fn sync_exists(&self, namespace: &str) -> bool {
92 self.databases.contains_key(namespace)
93 }
94
95 fn sync_create(&mut self, namespace: &str) {
96 self.databases
97 .insert(namespace.to_string(), BTreeMap::new());
98 }
99
100 fn sync_delete(&mut self, namespace: &str) {
101 self.databases.remove(namespace);
102 }
103}
104
105static MEMORY_DATABASES: LazyLock<Mutex<MemoryDatabases>> =
107 LazyLock::new(|| Mutex::new(MemoryDatabases::default()));
108
109#[derive(Clone)]
111pub struct MemoryStore {
112 map: Arc<RwLock<MemoryStoreMap>>,
114 max_stream_queries: usize,
116}
117
118impl WithError for MemoryDatabase {
119 type Error = MemoryStoreError;
120}
121
122impl WithError for MemoryStore {
123 type Error = MemoryStoreError;
124}
125
126impl ReadableKeyValueStore for MemoryStore {
127 const MAX_KEY_SIZE: usize = usize::MAX;
128
129 fn max_stream_queries(&self) -> usize {
130 self.max_stream_queries
131 }
132
133 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
134 let map = self
135 .map
136 .read()
137 .expect("MemoryStore lock should not be poisoned");
138 Ok(map.get(key).cloned())
139 }
140
141 async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
142 let map = self
143 .map
144 .read()
145 .expect("MemoryStore lock should not be poisoned");
146 Ok(map.contains_key(key))
147 }
148
149 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, MemoryStoreError> {
150 let map = self
151 .map
152 .read()
153 .expect("MemoryStore lock should not be poisoned");
154 Ok(keys
155 .into_iter()
156 .map(|key| map.contains_key(&key))
157 .collect::<Vec<_>>())
158 }
159
160 async fn read_multi_values_bytes(
161 &self,
162 keys: Vec<Vec<u8>>,
163 ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
164 let map = self
165 .map
166 .read()
167 .expect("MemoryStore lock should not be poisoned");
168 let mut result = Vec::new();
169 for key in keys {
170 result.push(map.get(&key).cloned());
171 }
172 Ok(result)
173 }
174
175 async fn find_keys_by_prefix(
176 &self,
177 key_prefix: &[u8],
178 ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
179 let map = self
180 .map
181 .read()
182 .expect("MemoryStore lock should not be poisoned");
183 let mut values = Vec::new();
184 let len = key_prefix.len();
185 for (key, _value) in map.range(get_interval(key_prefix.to_vec())) {
186 values.push(key[len..].to_vec())
187 }
188 Ok(values)
189 }
190
191 async fn find_key_values_by_prefix(
192 &self,
193 key_prefix: &[u8],
194 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, MemoryStoreError> {
195 let map = self
196 .map
197 .read()
198 .expect("MemoryStore lock should not be poisoned");
199 let mut key_values = Vec::new();
200 let len = key_prefix.len();
201 for (key, value) in map.range(get_interval(key_prefix.to_vec())) {
202 let key_value = (key[len..].to_vec(), value.to_vec());
203 key_values.push(key_value);
204 }
205 Ok(key_values)
206 }
207}
208
209impl WritableKeyValueStore for MemoryStore {
210 const MAX_VALUE_SIZE: usize = usize::MAX;
211
212 async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
213 let mut map = self
214 .map
215 .write()
216 .expect("MemoryStore lock should not be poisoned");
217 for ent in batch.operations {
218 match ent {
219 WriteOperation::Put { key, value } => {
220 map.insert(key, value);
221 }
222 WriteOperation::Delete { key } => {
223 map.remove(&key);
224 }
225 WriteOperation::DeletePrefix { key_prefix } => {
226 let key_list = map
227 .range(get_interval(key_prefix))
228 .map(|x| x.0.to_vec())
229 .collect::<Vec<_>>();
230 for key in key_list {
231 map.remove(&key);
232 }
233 }
234 }
235 }
236 Ok(())
237 }
238
239 async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
240 Ok(())
241 }
242}
243
244impl MemoryStore {
245 #[cfg(with_testing)]
247 pub fn new_for_testing() -> Self {
248 Self {
249 map: Arc::default(),
250 max_stream_queries: TEST_MEMORY_MAX_STREAM_QUERIES,
251 }
252 }
253}
254
255impl Drop for MemoryDatabase {
256 fn drop(&mut self) {
257 if self.kill_on_drop {
258 let mut databases = MEMORY_DATABASES
259 .lock()
260 .expect("MEMORY_DATABASES lock should not be poisoned");
261 databases.databases.remove(&self.namespace);
262 }
263 }
264}
265
266impl KeyValueDatabase for MemoryDatabase {
267 type Config = MemoryStoreConfig;
268
269 type Store = MemoryStore;
270
271 fn get_name() -> String {
272 "memory".to_string()
273 }
274
275 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, MemoryStoreError> {
276 let databases = MEMORY_DATABASES
277 .lock()
278 .expect("MEMORY_DATABASES lock should not be poisoned");
279 if !databases.sync_exists(namespace) {
280 return Err(MemoryStoreError::NamespaceNotFound);
281 };
282 Ok(MemoryDatabase {
283 namespace: namespace.to_string(),
284 max_stream_queries: config.max_stream_queries,
285 kill_on_drop: config.kill_on_drop,
286 })
287 }
288
289 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, MemoryStoreError> {
290 let mut databases = MEMORY_DATABASES
291 .lock()
292 .expect("MEMORY_DATABASES lock should not be poisoned");
293 databases.sync_open(&self.namespace, self.max_stream_queries, root_key)
294 }
295
296 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, MemoryStoreError> {
297 self.open_shared(root_key)
298 }
299
300 async fn list_all(_config: &Self::Config) -> Result<Vec<String>, MemoryStoreError> {
301 let databases = MEMORY_DATABASES
302 .lock()
303 .expect("MEMORY_DATABASES lock should not be poisoned");
304 Ok(databases.sync_list_all())
305 }
306
307 async fn list_root_keys(
308 _config: &Self::Config,
309 namespace: &str,
310 ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
311 let databases = MEMORY_DATABASES
312 .lock()
313 .expect("MEMORY_DATABASES lock should not be poisoned");
314 Ok(databases.sync_list_root_keys(namespace))
315 }
316
317 async fn exists(_config: &Self::Config, namespace: &str) -> Result<bool, MemoryStoreError> {
318 let databases = MEMORY_DATABASES
319 .lock()
320 .expect("MEMORY_DATABASES lock should not be poisoned");
321 Ok(databases.sync_exists(namespace))
322 }
323
324 async fn create(_config: &Self::Config, namespace: &str) -> Result<(), MemoryStoreError> {
325 let mut databases = MEMORY_DATABASES
326 .lock()
327 .expect("MEMORY_DATABASES lock should not be poisoned");
328 if databases.sync_exists(namespace) {
329 return Err(MemoryStoreError::StoreAlreadyExists);
330 }
331 databases.sync_create(namespace);
332 Ok(())
333 }
334
335 async fn delete(_config: &Self::Config, namespace: &str) -> Result<(), MemoryStoreError> {
336 let mut databases = MEMORY_DATABASES
337 .lock()
338 .expect("MEMORY_DATABASES lock should not be poisoned");
339 databases.sync_delete(namespace);
340 Ok(())
341 }
342}
343
344#[cfg(with_testing)]
345impl TestKeyValueDatabase for MemoryDatabase {
346 async fn new_test_config() -> Result<MemoryStoreConfig, MemoryStoreError> {
347 Ok(MemoryStoreConfig {
348 max_stream_queries: TEST_MEMORY_MAX_STREAM_QUERIES,
349 kill_on_drop: false,
350 })
351 }
352}
353
354#[derive(Error, Debug)]
356pub enum MemoryStoreError {
357 #[error("Store already exists during a create operation")]
359 StoreAlreadyExists,
360
361 #[error(transparent)]
363 BcsError(#[from] bcs::Error),
364
365 #[error("The namespace does not exist")]
367 NamespaceNotFound,
368}
369
370impl KeyValueStoreError for MemoryStoreError {
371 const BACKEND: &'static str = "memory";
372}