1use std::{
7 collections::BTreeMap,
8 sync::{Arc, LazyLock, Mutex, RwLock},
9};
10
11use serde::{Deserialize, Serialize};
12use thiserror::Error;
13
14#[cfg(with_testing)]
15use crate::random::generate_test_namespace;
16#[cfg(with_testing)]
17use crate::store::TestKeyValueStore;
18use crate::{
19 batch::{Batch, WriteOperation},
20 common::get_interval,
21 store::{
22 AdminKeyValueStore, KeyValueStoreError, ReadableKeyValueStore, WithError,
23 WritableKeyValueStore,
24 },
25};
26
27#[derive(Debug, Clone, Deserialize, Serialize)]
29pub struct MemoryStoreConfig {
30 pub max_stream_queries: usize,
32}
33
34pub const TEST_MEMORY_MAX_STREAM_QUERIES: usize = 10;
36
37type MemoryStoreMap = BTreeMap<Vec<u8>, Vec<u8>>;
40
41#[derive(Default)]
43struct MemoryStores {
44 stores: BTreeMap<String, BTreeMap<Vec<u8>, Arc<RwLock<MemoryStoreMap>>>>,
45}
46
47impl MemoryStores {
48 fn sync_connect(
49 &mut self,
50 config: &MemoryStoreConfig,
51 namespace: &str,
52 root_key: &[u8],
53 kill_on_drop: bool,
54 ) -> Result<MemoryStore, MemoryStoreError> {
55 let max_stream_queries = config.max_stream_queries;
56 let Some(stores) = self.stores.get_mut(namespace) else {
57 return Err(MemoryStoreError::NamespaceNotFound);
58 };
59 let store = stores.entry(root_key.to_vec()).or_insert_with(|| {
60 let map = MemoryStoreMap::new();
61 Arc::new(RwLock::new(map))
62 });
63 let map = store.clone();
64 let namespace = namespace.to_string();
65 let root_key = root_key.to_vec();
66 Ok(MemoryStore {
67 map,
68 max_stream_queries,
69 namespace,
70 root_key,
71 kill_on_drop,
72 })
73 }
74
75 fn sync_list_all(&self) -> Vec<String> {
76 self.stores.keys().cloned().collect::<Vec<_>>()
77 }
78
79 fn sync_list_root_keys(&self, namespace: &str) -> Vec<Vec<u8>> {
80 match self.stores.get(namespace) {
81 None => Vec::new(),
82 Some(map) => map.keys().cloned().collect::<Vec<_>>(),
83 }
84 }
85
86 fn sync_exists(&self, namespace: &str) -> bool {
87 self.stores.contains_key(namespace)
88 }
89
90 fn sync_create(&mut self, namespace: &str) {
91 self.stores.insert(namespace.to_string(), BTreeMap::new());
92 }
93
94 fn sync_delete(&mut self, namespace: &str) {
95 self.stores.remove(namespace);
96 }
97}
98
99static MEMORY_STORES: LazyLock<Mutex<MemoryStores>> =
101 LazyLock::new(|| Mutex::new(MemoryStores::default()));
102
103#[derive(Clone)]
105pub struct MemoryStore {
106 map: Arc<RwLock<MemoryStoreMap>>,
108 max_stream_queries: usize,
110 namespace: String,
112 root_key: Vec<u8>,
114 kill_on_drop: bool,
116}
117
118impl Drop for MemoryStore {
119 fn drop(&mut self) {
120 if self.kill_on_drop {
121 let mut memory_stores = MEMORY_STORES
122 .lock()
123 .expect("MEMORY_STORES lock should not be poisoned");
124 let stores = memory_stores.stores.get_mut(&self.namespace).unwrap();
125 stores.remove(&self.root_key);
126 }
127 }
128}
129
130impl WithError for MemoryStore {
131 type Error = MemoryStoreError;
132}
133
134impl ReadableKeyValueStore for MemoryStore {
135 const MAX_KEY_SIZE: usize = usize::MAX;
136
137 fn max_stream_queries(&self) -> usize {
138 self.max_stream_queries
139 }
140
141 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
142 let map = self
143 .map
144 .read()
145 .expect("MemoryStore lock should not be poisoned");
146 Ok(map.get(key).cloned())
147 }
148
149 async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
150 let map = self
151 .map
152 .read()
153 .expect("MemoryStore lock should not be poisoned");
154 Ok(map.contains_key(key))
155 }
156
157 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, MemoryStoreError> {
158 let map = self
159 .map
160 .read()
161 .expect("MemoryStore lock should not be poisoned");
162 Ok(keys
163 .into_iter()
164 .map(|key| map.contains_key(&key))
165 .collect::<Vec<_>>())
166 }
167
168 async fn read_multi_values_bytes(
169 &self,
170 keys: Vec<Vec<u8>>,
171 ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
172 let map = self
173 .map
174 .read()
175 .expect("MemoryStore lock should not be poisoned");
176 let mut result = Vec::new();
177 for key in keys {
178 result.push(map.get(&key).cloned());
179 }
180 Ok(result)
181 }
182
183 async fn find_keys_by_prefix(
184 &self,
185 key_prefix: &[u8],
186 ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
187 let map = self
188 .map
189 .read()
190 .expect("MemoryStore lock should not be poisoned");
191 let mut values = Vec::new();
192 let len = key_prefix.len();
193 for (key, _value) in map.range(get_interval(key_prefix.to_vec())) {
194 values.push(key[len..].to_vec())
195 }
196 Ok(values)
197 }
198
199 async fn find_key_values_by_prefix(
200 &self,
201 key_prefix: &[u8],
202 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, MemoryStoreError> {
203 let map = self
204 .map
205 .read()
206 .expect("MemoryStore lock should not be poisoned");
207 let mut key_values = Vec::new();
208 let len = key_prefix.len();
209 for (key, value) in map.range(get_interval(key_prefix.to_vec())) {
210 let key_value = (key[len..].to_vec(), value.to_vec());
211 key_values.push(key_value);
212 }
213 Ok(key_values)
214 }
215}
216
217impl WritableKeyValueStore for MemoryStore {
218 const MAX_VALUE_SIZE: usize = usize::MAX;
219
220 async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
221 let mut map = self
222 .map
223 .write()
224 .expect("MemoryStore lock should not be poisoned");
225 for ent in batch.operations {
226 match ent {
227 WriteOperation::Put { key, value } => {
228 map.insert(key, value);
229 }
230 WriteOperation::Delete { key } => {
231 map.remove(&key);
232 }
233 WriteOperation::DeletePrefix { key_prefix } => {
234 let key_list = map
235 .range(get_interval(key_prefix))
236 .map(|x| x.0.to_vec())
237 .collect::<Vec<_>>();
238 for key in key_list {
239 map.remove(&key);
240 }
241 }
242 }
243 }
244 Ok(())
245 }
246
247 async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
248 Ok(())
249 }
250}
251
252impl MemoryStore {
253 fn sync_maybe_create_and_connect(
255 config: &MemoryStoreConfig,
256 namespace: &str,
257 kill_on_drop: bool,
258 ) -> Result<Self, MemoryStoreError> {
259 let mut memory_stores = MEMORY_STORES.lock().expect("lock should not be poisoned");
260 if !memory_stores.sync_exists(namespace) {
261 memory_stores.sync_create(namespace);
262 }
263 memory_stores.sync_connect(config, namespace, &[], kill_on_drop)
264 }
265
266 pub fn new(max_stream_queries: usize, namespace: &str) -> Result<Self, MemoryStoreError> {
268 let config = MemoryStoreConfig { max_stream_queries };
269 MemoryStore::sync_maybe_create_and_connect(
270 &config, namespace, false,
271 )
272 }
273
274 #[cfg(with_testing)]
276 pub fn new_for_testing(
277 max_stream_queries: usize,
278 namespace: &str,
279 ) -> Result<Self, MemoryStoreError> {
280 let config = MemoryStoreConfig { max_stream_queries };
281 MemoryStore::sync_maybe_create_and_connect(&config, namespace, true)
282 }
283}
284
285impl AdminKeyValueStore for MemoryStore {
286 type Config = MemoryStoreConfig;
287
288 fn get_name() -> String {
289 "memory".to_string()
290 }
291
292 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, MemoryStoreError> {
293 let mut memory_stores = MEMORY_STORES
294 .lock()
295 .expect("MEMORY_STORES lock should not be poisoned");
296 memory_stores.sync_connect(config, namespace, &[], false)
297 }
298
299 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, MemoryStoreError> {
300 let max_stream_queries = self.max_stream_queries;
301 let config = MemoryStoreConfig { max_stream_queries };
302 let mut memory_stores = MEMORY_STORES
303 .lock()
304 .expect("MEMORY_STORES lock should not be poisoned");
305 memory_stores.sync_connect(&config, &self.namespace, root_key, self.kill_on_drop)
306 }
307
308 async fn list_all(_config: &Self::Config) -> Result<Vec<String>, MemoryStoreError> {
309 let memory_stores = MEMORY_STORES
310 .lock()
311 .expect("MEMORY_STORES lock should not be poisoned");
312 Ok(memory_stores.sync_list_all())
313 }
314
315 async fn list_root_keys(
316 _config: &Self::Config,
317 namespace: &str,
318 ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
319 let memory_stores = MEMORY_STORES
320 .lock()
321 .expect("MEMORY_STORES lock should not be poisoned");
322 Ok(memory_stores.sync_list_root_keys(namespace))
323 }
324
325 async fn exists(_config: &Self::Config, namespace: &str) -> Result<bool, MemoryStoreError> {
326 let memory_stores = MEMORY_STORES
327 .lock()
328 .expect("MEMORY_STORES lock should not be poisoned");
329 Ok(memory_stores.sync_exists(namespace))
330 }
331
332 async fn create(_config: &Self::Config, namespace: &str) -> Result<(), MemoryStoreError> {
333 let mut memory_stores = MEMORY_STORES
334 .lock()
335 .expect("MEMORY_STORES lock should not be poisoned");
336 if memory_stores.sync_exists(namespace) {
337 return Err(MemoryStoreError::StoreAlreadyExists);
338 }
339 memory_stores.sync_create(namespace);
340 Ok(())
341 }
342
343 async fn delete(_config: &Self::Config, namespace: &str) -> Result<(), MemoryStoreError> {
344 let mut memory_stores = MEMORY_STORES
345 .lock()
346 .expect("MEMORY_STORES lock should not be poisoned");
347 memory_stores.sync_delete(namespace);
348 Ok(())
349 }
350}
351
352#[cfg(with_testing)]
353impl TestKeyValueStore for MemoryStore {
354 async fn new_test_config() -> Result<MemoryStoreConfig, MemoryStoreError> {
355 let max_stream_queries = TEST_MEMORY_MAX_STREAM_QUERIES;
356 Ok(MemoryStoreConfig { max_stream_queries })
357 }
358}
359
360#[cfg(with_testing)]
362pub fn create_test_memory_store() -> MemoryStore {
363 let namespace = generate_test_namespace();
364 MemoryStore::new_for_testing(TEST_MEMORY_MAX_STREAM_QUERIES, &namespace).unwrap()
365}
366
367#[derive(Error, Debug)]
369pub enum MemoryStoreError {
370 #[error("Store already exists during a create operation")]
372 StoreAlreadyExists,
373
374 #[error(transparent)]
376 BcsError(#[from] bcs::Error),
377
378 #[error("The namespace does not exist")]
380 NamespaceNotFound,
381}
382
383impl KeyValueStoreError for MemoryStoreError {
384 const BACKEND: &'static str = "memory";
385}