1use std::{
7 collections::BTreeMap,
8 sync::{Arc, LazyLock, Mutex, RwLock},
9};
10
11use serde::{Deserialize, Serialize};
12use thiserror::Error;
13
14#[cfg(with_testing)]
15use crate::random::generate_test_namespace;
16#[cfg(with_testing)]
17use crate::store::TestKeyValueStore;
18use crate::{
19 batch::{Batch, WriteOperation},
20 common::get_interval,
21 store::{
22 AdminKeyValueStore, CommonStoreInternalConfig, KeyValueStoreError, ReadableKeyValueStore,
23 WithError, WritableKeyValueStore,
24 },
25};
26
27#[derive(Debug, Clone, Deserialize, Serialize)]
29pub struct MemoryStoreConfig {
30 pub common_config: CommonStoreInternalConfig,
32}
33
34impl MemoryStoreConfig {
35 pub fn new(max_stream_queries: usize) -> Self {
37 let common_config = CommonStoreInternalConfig {
38 max_concurrent_queries: None,
39 max_stream_queries,
40 replication_factor: 1,
41 };
42 Self { common_config }
43 }
44}
45
46pub const TEST_MEMORY_MAX_STREAM_QUERIES: usize = 10;
48
49type MemoryStoreMap = BTreeMap<Vec<u8>, Vec<u8>>;
52
53#[derive(Default)]
55struct MemoryStores {
56 stores: BTreeMap<String, BTreeMap<Vec<u8>, Arc<RwLock<MemoryStoreMap>>>>,
57}
58
59impl MemoryStores {
60 fn sync_connect(
61 &mut self,
62 config: &MemoryStoreConfig,
63 namespace: &str,
64 root_key: &[u8],
65 kill_on_drop: bool,
66 ) -> Result<MemoryStore, MemoryStoreError> {
67 let max_stream_queries = config.common_config.max_stream_queries;
68 let Some(stores) = self.stores.get_mut(namespace) else {
69 return Err(MemoryStoreError::NamespaceNotFound);
70 };
71 let store = stores.entry(root_key.to_vec()).or_insert_with(|| {
72 let map = MemoryStoreMap::new();
73 Arc::new(RwLock::new(map))
74 });
75 let map = store.clone();
76 let namespace = namespace.to_string();
77 let root_key = root_key.to_vec();
78 Ok(MemoryStore {
79 map,
80 max_stream_queries,
81 namespace,
82 root_key,
83 kill_on_drop,
84 })
85 }
86
87 fn sync_list_all(&self) -> Vec<String> {
88 self.stores.keys().cloned().collect::<Vec<_>>()
89 }
90
91 fn sync_list_root_keys(&self, namespace: &str) -> Vec<Vec<u8>> {
92 match self.stores.get(namespace) {
93 None => Vec::new(),
94 Some(map) => map.keys().cloned().collect::<Vec<_>>(),
95 }
96 }
97
98 fn sync_exists(&self, namespace: &str) -> bool {
99 self.stores.contains_key(namespace)
100 }
101
102 fn sync_create(&mut self, namespace: &str) {
103 self.stores.insert(namespace.to_string(), BTreeMap::new());
104 }
105
106 fn sync_delete(&mut self, namespace: &str) {
107 self.stores.remove(namespace);
108 }
109}
110
111static MEMORY_STORES: LazyLock<Mutex<MemoryStores>> =
113 LazyLock::new(|| Mutex::new(MemoryStores::default()));
114
115#[derive(Clone)]
117pub struct MemoryStore {
118 map: Arc<RwLock<MemoryStoreMap>>,
120 max_stream_queries: usize,
122 namespace: String,
124 root_key: Vec<u8>,
126 kill_on_drop: bool,
128}
129
130impl Drop for MemoryStore {
131 fn drop(&mut self) {
132 if self.kill_on_drop {
133 let mut memory_stores = MEMORY_STORES
134 .lock()
135 .expect("MEMORY_STORES lock should not be poisoned");
136 let stores = memory_stores.stores.get_mut(&self.namespace).unwrap();
137 stores.remove(&self.root_key);
138 }
139 }
140}
141
142impl WithError for MemoryStore {
143 type Error = MemoryStoreError;
144}
145
146impl ReadableKeyValueStore for MemoryStore {
147 const MAX_KEY_SIZE: usize = usize::MAX;
148 type Keys = Vec<Vec<u8>>;
149 type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
150
151 fn max_stream_queries(&self) -> usize {
152 self.max_stream_queries
153 }
154
155 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, MemoryStoreError> {
156 let map = self
157 .map
158 .read()
159 .expect("MemoryStore lock should not be poisoned");
160 Ok(map.get(key).cloned())
161 }
162
163 async fn contains_key(&self, key: &[u8]) -> Result<bool, MemoryStoreError> {
164 let map = self
165 .map
166 .read()
167 .expect("MemoryStore lock should not be poisoned");
168 Ok(map.contains_key(key))
169 }
170
171 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, MemoryStoreError> {
172 let map = self
173 .map
174 .read()
175 .expect("MemoryStore lock should not be poisoned");
176 Ok(keys
177 .into_iter()
178 .map(|key| map.contains_key(&key))
179 .collect::<Vec<_>>())
180 }
181
182 async fn read_multi_values_bytes(
183 &self,
184 keys: Vec<Vec<u8>>,
185 ) -> Result<Vec<Option<Vec<u8>>>, MemoryStoreError> {
186 let map = self
187 .map
188 .read()
189 .expect("MemoryStore lock should not be poisoned");
190 let mut result = Vec::new();
191 for key in keys {
192 result.push(map.get(&key).cloned());
193 }
194 Ok(result)
195 }
196
197 async fn find_keys_by_prefix(
198 &self,
199 key_prefix: &[u8],
200 ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
201 let map = self
202 .map
203 .read()
204 .expect("MemoryStore lock should not be poisoned");
205 let mut values = Vec::new();
206 let len = key_prefix.len();
207 for (key, _value) in map.range(get_interval(key_prefix.to_vec())) {
208 values.push(key[len..].to_vec())
209 }
210 Ok(values)
211 }
212
213 async fn find_key_values_by_prefix(
214 &self,
215 key_prefix: &[u8],
216 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, MemoryStoreError> {
217 let map = self
218 .map
219 .read()
220 .expect("MemoryStore lock should not be poisoned");
221 let mut key_values = Vec::new();
222 let len = key_prefix.len();
223 for (key, value) in map.range(get_interval(key_prefix.to_vec())) {
224 let key_value = (key[len..].to_vec(), value.to_vec());
225 key_values.push(key_value);
226 }
227 Ok(key_values)
228 }
229}
230
231impl WritableKeyValueStore for MemoryStore {
232 const MAX_VALUE_SIZE: usize = usize::MAX;
233
234 async fn write_batch(&self, batch: Batch) -> Result<(), MemoryStoreError> {
235 let mut map = self
236 .map
237 .write()
238 .expect("MemoryStore lock should not be poisoned");
239 for ent in batch.operations {
240 match ent {
241 WriteOperation::Put { key, value } => {
242 map.insert(key, value);
243 }
244 WriteOperation::Delete { key } => {
245 map.remove(&key);
246 }
247 WriteOperation::DeletePrefix { key_prefix } => {
248 let key_list = map
249 .range(get_interval(key_prefix))
250 .map(|x| x.0.to_vec())
251 .collect::<Vec<_>>();
252 for key in key_list {
253 map.remove(&key);
254 }
255 }
256 }
257 }
258 Ok(())
259 }
260
261 async fn clear_journal(&self) -> Result<(), MemoryStoreError> {
262 Ok(())
263 }
264}
265
266impl MemoryStore {
267 fn sync_maybe_create_and_connect(
269 config: &MemoryStoreConfig,
270 namespace: &str,
271 kill_on_drop: bool,
272 ) -> Result<Self, MemoryStoreError> {
273 let mut memory_stores = MEMORY_STORES.lock().expect("lock should not be poisoned");
274 if !memory_stores.sync_exists(namespace) {
275 memory_stores.sync_create(namespace);
276 }
277 memory_stores.sync_connect(config, namespace, &[], kill_on_drop)
278 }
279
280 pub fn new(max_stream_queries: usize, namespace: &str) -> Result<Self, MemoryStoreError> {
282 let common_config = CommonStoreInternalConfig {
283 max_concurrent_queries: None,
284 max_stream_queries,
285 replication_factor: 1,
286 };
287 let config = MemoryStoreConfig { common_config };
288 let kill_on_drop = false;
289 MemoryStore::sync_maybe_create_and_connect(&config, namespace, kill_on_drop)
290 }
291
292 #[cfg(with_testing)]
294 pub fn new_for_testing(
295 max_stream_queries: usize,
296 namespace: &str,
297 ) -> Result<Self, MemoryStoreError> {
298 let common_config = CommonStoreInternalConfig {
299 max_concurrent_queries: None,
300 max_stream_queries,
301 replication_factor: 1,
302 };
303 let config = MemoryStoreConfig { common_config };
304 let kill_on_drop = true;
305 MemoryStore::sync_maybe_create_and_connect(&config, namespace, kill_on_drop)
306 }
307}
308
309impl AdminKeyValueStore for MemoryStore {
310 type Config = MemoryStoreConfig;
311
312 fn get_name() -> String {
313 "memory".to_string()
314 }
315
316 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, MemoryStoreError> {
317 let mut memory_stores = MEMORY_STORES
318 .lock()
319 .expect("MEMORY_STORES lock should not be poisoned");
320 let kill_on_drop = false;
321 memory_stores.sync_connect(config, namespace, &[], kill_on_drop)
322 }
323
324 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, MemoryStoreError> {
325 let max_stream_queries = self.max_stream_queries;
326 let common_config = CommonStoreInternalConfig {
327 max_concurrent_queries: None,
328 max_stream_queries,
329 replication_factor: 1,
330 };
331 let config = MemoryStoreConfig { common_config };
332 let mut memory_stores = MEMORY_STORES
333 .lock()
334 .expect("MEMORY_STORES lock should not be poisoned");
335 let kill_on_drop = self.kill_on_drop;
336 let namespace = &self.namespace;
337 memory_stores.sync_connect(&config, namespace, root_key, kill_on_drop)
338 }
339
340 async fn list_all(_config: &Self::Config) -> Result<Vec<String>, MemoryStoreError> {
341 let memory_stores = MEMORY_STORES
342 .lock()
343 .expect("MEMORY_STORES lock should not be poisoned");
344 Ok(memory_stores.sync_list_all())
345 }
346
347 async fn list_root_keys(
348 _config: &Self::Config,
349 namespace: &str,
350 ) -> Result<Vec<Vec<u8>>, MemoryStoreError> {
351 let memory_stores = MEMORY_STORES
352 .lock()
353 .expect("MEMORY_STORES lock should not be poisoned");
354 Ok(memory_stores.sync_list_root_keys(namespace))
355 }
356
357 async fn exists(_config: &Self::Config, namespace: &str) -> Result<bool, MemoryStoreError> {
358 let memory_stores = MEMORY_STORES
359 .lock()
360 .expect("MEMORY_STORES lock should not be poisoned");
361 Ok(memory_stores.sync_exists(namespace))
362 }
363
364 async fn create(_config: &Self::Config, namespace: &str) -> Result<(), MemoryStoreError> {
365 let mut memory_stores = MEMORY_STORES
366 .lock()
367 .expect("MEMORY_STORES lock should not be poisoned");
368 if memory_stores.sync_exists(namespace) {
369 return Err(MemoryStoreError::StoreAlreadyExists);
370 }
371 memory_stores.sync_create(namespace);
372 Ok(())
373 }
374
375 async fn delete(_config: &Self::Config, namespace: &str) -> Result<(), MemoryStoreError> {
376 let mut memory_stores = MEMORY_STORES
377 .lock()
378 .expect("MEMORY_STORES lock should not be poisoned");
379 memory_stores.sync_delete(namespace);
380 Ok(())
381 }
382}
383
384#[cfg(with_testing)]
385impl TestKeyValueStore for MemoryStore {
386 async fn new_test_config() -> Result<MemoryStoreConfig, MemoryStoreError> {
387 let max_stream_queries = TEST_MEMORY_MAX_STREAM_QUERIES;
388 let common_config = CommonStoreInternalConfig {
389 max_concurrent_queries: None,
390 max_stream_queries,
391 replication_factor: 1,
392 };
393 Ok(MemoryStoreConfig { common_config })
394 }
395}
396
397#[cfg(with_testing)]
399pub fn create_test_memory_store() -> MemoryStore {
400 let namespace = generate_test_namespace();
401 MemoryStore::new_for_testing(TEST_MEMORY_MAX_STREAM_QUERIES, &namespace).unwrap()
402}
403
404#[derive(Error, Debug)]
406pub enum MemoryStoreError {
407 #[error("Store already exists during a create operation")]
409 StoreAlreadyExists,
410
411 #[error(transparent)]
413 BcsError(#[from] bcs::Error),
414
415 #[error("The namespace does not exist")]
417 NamespaceNotFound,
418}
419
420impl KeyValueStoreError for MemoryStoreError {
421 const BACKEND: &'static str = "memory";
422}