1use std::{
7 collections::BTreeMap,
8 sync::{Arc, LazyLock, Mutex, RwLock},
9};
10
11use serde::{Deserialize, Serialize};
12use thiserror::Error;
13
14#[cfg(with_testing)]
15use crate::store::TestKeyValueDatabase;
16use crate::{
17 batch::{Batch, WriteOperation},
18 common::get_key_range_for_prefix,
19 store::{
20 KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
21 WritableKeyValueStore,
22 },
23};
24
25#[derive(Debug, Clone, Deserialize, Serialize)]
27pub struct MemoryStoreConfig {
28 pub max_stream_queries: usize,
30 pub kill_on_drop: bool,
33}
34
35#[cfg(with_testing)]
37const TEST_MEMORY_MAX_STREAM_QUERIES: usize = 10;
38
39type MemoryStoreMap = BTreeMap<Vec<u8>, Vec<u8>>;
41
42#[derive(Default)]
44struct MemoryDatabases {
45 databases: BTreeMap<String, BTreeMap<Vec<u8>, Arc<RwLock<MemoryStoreMap>>>>,
46}
47
48#[derive(Clone, Debug)]
50pub struct MemoryDatabase {
51 namespace: String,
53 max_stream_queries: usize,
55 kill_on_drop: bool,
57}
58
59impl MemoryDatabases {
60 fn sync_open(
61 &mut self,
62 namespace: &str,
63 max_stream_queries: usize,
64 root_key: &[u8],
65 ) -> Result<MemoryStore, MemoryStoreError> {
66 let Some(stores) = self.databases.get_mut(namespace) else {
67 return Err(MemoryStoreError::NamespaceNotFound);
68 };
69 let store = stores.entry(root_key.to_vec()).or_insert_with(|| {
70 let map = MemoryStoreMap::new();
71 Arc::new(RwLock::new(map))
72 });
73 let map = store.clone();
74 Ok(MemoryStore {
75 map,
76 root_key: root_key.to_vec(),
77 max_stream_queries,
78 })
79 }
80
81 fn sync_list_all(&self) -> Vec<String> {
82 self.databases.keys().cloned().collect::<Vec<_>>()
83 }
84
85 fn sync_list_root_keys(&self, namespace: &str) -> Vec<Vec<u8>> {
86 match self.databases.get(namespace) {
87 None => Vec::new(),
88 Some(map) => map.keys().cloned().collect::<Vec<_>>(),
89 }
90 }
91
92 fn sync_exists(&self, namespace: &str) -> bool {
93 self.databases.contains_key(namespace)
94 }
95
96 fn sync_create(&mut self, namespace: &str) {
97 self.databases
98 .insert(namespace.to_string(), BTreeMap::new());
99 }
100
101 fn sync_delete(&mut self, namespace: &str) {
102 self.databases.remove(namespace);
103 }
104}
105
106static MEMORY_DATABASES: LazyLock<Mutex<MemoryDatabases>> =
108 LazyLock::new(|| Mutex::new(MemoryDatabases::default()));
109
110#[derive(Clone)]
112pub struct MemoryStore {
113 map: Arc<RwLock<MemoryStoreMap>>,
115 root_key: Vec<u8>,
117 max_stream_queries: usize,
119}
120
121impl WithError for MemoryDatabase {
122 type Error = MemoryStoreError;
123}
124
125impl WithError for MemoryStore {
126 type Error = MemoryStoreError;
127}
128
129impl ReadableKeyValueStore for MemoryStore {
130 const MAX_KEY_SIZE: usize = usize::MAX;
131
132 fn max_stream_queries(&self) -> usize {
133 self.max_stream_queries
134 }
135
136 fn root_key(&self) -> Result<Vec<u8>, MemoryStoreError> {
137 Ok(self.root_key.clone())
138 }
139
140 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
141 let map = self
142 .map
143 .read()
144 .expect("MemoryStore lock should not be poisoned");
145 Ok(map.get(key).cloned())
146 }
147
148 async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
149 let map = self
150 .map
151 .read()
152 .expect("MemoryStore lock should not be poisoned");
153 Ok(map.contains_key(key))
154 }
155
156 async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, MemoryStoreError> {
157 let map = self
158 .map
159 .read()
160 .expect("MemoryStore lock should not be poisoned");
161 Ok(keys
162 .iter()
163 .map(|key| map.contains_key(key))
164 .collect::<Vec<_>>())
165 }
166
167 async fn read_multi_values_bytes(
168 &self,
169 keys: &[Vec<u8>],
170 ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
171 let map = self
172 .map
173 .read()
174 .expect("MemoryStore lock should not be poisoned");
175 let mut result = Vec::new();
176 for key in keys {
177 result.push(map.get(key).cloned());
178 }
179 Ok(result)
180 }
181
182 async fn find_keys_by_prefix(
183 &self,
184 key_prefix: &[u8],
185 ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
186 let map = self
187 .map
188 .read()
189 .expect("MemoryStore lock should not be poisoned");
190 let mut values = Vec::new();
191 let len = key_prefix.len();
192 for (key, _value) in map.range(get_key_range_for_prefix(key_prefix.to_vec())) {
193 values.push(key[len..].to_vec())
194 }
195 Ok(values)
196 }
197
198 async fn find_key_values_by_prefix(
199 &self,
200 key_prefix: &[u8],
201 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, MemoryStoreError> {
202 let map = self
203 .map
204 .read()
205 .expect("MemoryStore lock should not be poisoned");
206 let mut key_values = Vec::new();
207 let len = key_prefix.len();
208 for (key, value) in map.range(get_key_range_for_prefix(key_prefix.to_vec())) {
209 let key_value = (key[len..].to_vec(), value.to_vec());
210 key_values.push(key_value);
211 }
212 Ok(key_values)
213 }
214}
215
216impl WritableKeyValueStore for MemoryStore {
217 const MAX_VALUE_SIZE: usize = usize::MAX;
218
219 async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
220 let mut map = self
221 .map
222 .write()
223 .expect("MemoryStore lock should not be poisoned");
224 for ent in batch.operations {
225 match ent {
226 WriteOperation::Put { key, value } => {
227 map.insert(key, value);
228 }
229 WriteOperation::Delete { key } => {
230 map.remove(&key);
231 }
232 WriteOperation::DeletePrefix { key_prefix } => {
233 let key_list = map
234 .range(get_key_range_for_prefix(key_prefix))
235 .map(|x| x.0.to_vec())
236 .collect::<Vec<_>>();
237 for key in key_list {
238 map.remove(&key);
239 }
240 }
241 }
242 }
243 Ok(())
244 }
245
246 async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
247 Ok(())
248 }
249}
250
251impl MemoryStore {
252 #[cfg(with_testing)]
254 pub fn new_for_testing() -> Self {
255 Self {
256 map: Arc::default(),
257 root_key: Vec::new(),
258 max_stream_queries: TEST_MEMORY_MAX_STREAM_QUERIES,
259 }
260 }
261}
262
263impl Drop for MemoryDatabase {
264 fn drop(&mut self) {
265 if self.kill_on_drop {
266 let mut databases = MEMORY_DATABASES
267 .lock()
268 .expect("MEMORY_DATABASES lock should not be poisoned");
269 databases.databases.remove(&self.namespace);
270 }
271 }
272}
273
274impl KeyValueDatabase for MemoryDatabase {
275 type Config = MemoryStoreConfig;
276
277 type Store = MemoryStore;
278
279 fn get_name() -> String {
280 "memory".to_string()
281 }
282
283 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, MemoryStoreError> {
284 let databases = MEMORY_DATABASES
285 .lock()
286 .expect("MEMORY_DATABASES lock should not be poisoned");
287 if !databases.sync_exists(namespace) {
288 return Err(MemoryStoreError::NamespaceNotFound);
289 };
290 Ok(MemoryDatabase {
291 namespace: namespace.to_string(),
292 max_stream_queries: config.max_stream_queries,
293 kill_on_drop: config.kill_on_drop,
294 })
295 }
296
297 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, MemoryStoreError> {
298 let mut databases = MEMORY_DATABASES
299 .lock()
300 .expect("MEMORY_DATABASES lock should not be poisoned");
301 databases.sync_open(&self.namespace, self.max_stream_queries, root_key)
302 }
303
304 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, MemoryStoreError> {
305 self.open_shared(root_key)
306 }
307
308 async fn list_all(_config: &Self::Config) -> Result<Vec<String>, MemoryStoreError> {
309 let databases = MEMORY_DATABASES
310 .lock()
311 .expect("MEMORY_DATABASES lock should not be poisoned");
312 Ok(databases.sync_list_all())
313 }
314
315 async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
316 let databases = MEMORY_DATABASES
317 .lock()
318 .expect("MEMORY_DATABASES lock should not be poisoned");
319 Ok(databases.sync_list_root_keys(&self.namespace))
320 }
321
322 async fn exists(_config: &Self::Config, namespace: &str) -> Result<bool, MemoryStoreError> {
323 let databases = MEMORY_DATABASES
324 .lock()
325 .expect("MEMORY_DATABASES lock should not be poisoned");
326 Ok(databases.sync_exists(namespace))
327 }
328
329 async fn create(_config: &Self::Config, namespace: &str) -> Result<(), MemoryStoreError> {
330 let mut databases = MEMORY_DATABASES
331 .lock()
332 .expect("MEMORY_DATABASES lock should not be poisoned");
333 if databases.sync_exists(namespace) {
334 return Err(MemoryStoreError::StoreAlreadyExists);
335 }
336 databases.sync_create(namespace);
337 Ok(())
338 }
339
340 async fn delete(_config: &Self::Config, namespace: &str) -> Result<(), MemoryStoreError> {
341 let mut databases = MEMORY_DATABASES
342 .lock()
343 .expect("MEMORY_DATABASES lock should not be poisoned");
344 databases.sync_delete(namespace);
345 Ok(())
346 }
347}
348
349#[cfg(with_testing)]
350impl TestKeyValueDatabase for MemoryDatabase {
351 async fn new_test_config() -> Result<MemoryStoreConfig, MemoryStoreError> {
352 Ok(MemoryStoreConfig {
353 max_stream_queries: TEST_MEMORY_MAX_STREAM_QUERIES,
354 kill_on_drop: false,
355 })
356 }
357}
358
359#[derive(Error, Debug)]
361pub enum MemoryStoreError {
362 #[error("Store already exists during a create operation")]
364 StoreAlreadyExists,
365
366 #[error(transparent)]
368 BcsError(#[from] bcs::Error),
369
370 #[error("The namespace does not exist")]
372 NamespaceNotFound,
373}
374
375impl KeyValueStoreError for MemoryStoreError {
376 const BACKEND: &'static str = "memory";
377}