1use std::{fmt::Debug, future::Future};
7
8use serde::{de::DeserializeOwned, Serialize};
9
10#[cfg(with_testing)]
11use crate::random::generate_test_namespace;
12use crate::{
13 batch::{Batch, SimplifiedBatch},
14 common::from_bytes_option,
15 ViewError,
16};
17
18pub trait KeyValueStoreError:
20 std::error::Error + From<bcs::Error> + Debug + Send + Sync + 'static
21{
22 const BACKEND: &'static str;
24}
25
26impl<E: KeyValueStoreError> From<E> for ViewError {
27 fn from(error: E) -> Self {
28 Self::StoreError {
29 backend: E::BACKEND,
30 error: Box::new(error),
31 }
32 }
33}
34
35pub trait WithError {
37 type Error: KeyValueStoreError;
39}
40
41#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
43pub trait ReadableKeyValueStore: WithError {
44 const MAX_KEY_SIZE: usize;
46
47 fn max_stream_queries(&self) -> usize;
49
50 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
52
53 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error>;
55
56 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error>;
58
59 async fn read_multi_values_bytes(
61 &self,
62 keys: Vec<Vec<u8>>,
63 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error>;
64
65 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error>;
67
68 async fn find_key_values_by_prefix(
70 &self,
71 key_prefix: &[u8],
72 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error>;
73
74 fn read_value<V: DeserializeOwned>(
80 &self,
81 key: &[u8],
82 ) -> impl Future<Output = Result<Option<V>, Self::Error>> {
83 async { Ok(from_bytes_option(&self.read_value_bytes(key).await?)?) }
84 }
85
86 fn read_multi_values<V: DeserializeOwned + Send + Sync>(
88 &self,
89 keys: Vec<Vec<u8>>,
90 ) -> impl Future<Output = Result<Vec<Option<V>>, Self::Error>> {
91 async {
92 let mut values = Vec::with_capacity(keys.len());
93 for entry in self.read_multi_values_bytes(keys).await? {
94 values.push(from_bytes_option(&entry)?);
95 }
96 Ok(values)
97 }
98 }
99}
100
101#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
103pub trait WritableKeyValueStore: WithError {
104 const MAX_VALUE_SIZE: usize;
106
107 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error>;
109
110 async fn clear_journal(&self) -> Result<(), Self::Error>;
113}
114
115#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
120pub trait DirectWritableKeyValueStore: WithError {
121 const MAX_BATCH_SIZE: usize;
123
124 const MAX_BATCH_TOTAL_SIZE: usize;
126
127 const MAX_VALUE_SIZE: usize;
129
130 type Batch: SimplifiedBatch + Serialize + DeserializeOwned + Default;
132
133 async fn write_batch(&self, batch: Self::Batch) -> Result<(), Self::Error>;
135}
136
137#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
139pub trait KeyValueDatabase: WithError + Sized {
140 type Config: Send + Sync;
142
143 type Store;
145
146 fn get_name() -> String;
148
149 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error>;
151
152 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error>;
155
156 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error>;
162
163 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error>;
165
166 async fn list_root_keys(
169 config: &Self::Config,
170 namespace: &str,
171 ) -> Result<Vec<Vec<u8>>, Self::Error>;
172
173 fn delete_all(config: &Self::Config) -> impl Future<Output = Result<(), Self::Error>> {
175 async {
176 let namespaces = Self::list_all(config).await?;
177 for namespace in namespaces {
178 Self::delete(config, &namespace).await?;
179 }
180 Ok(())
181 }
182 }
183
184 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error>;
186
187 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error>;
189
190 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error>;
192
193 fn maybe_create_and_connect(
195 config: &Self::Config,
196 namespace: &str,
197 ) -> impl Future<Output = Result<Self, Self::Error>> {
198 async {
199 if !Self::exists(config, namespace).await? {
200 Self::create(config, namespace).await?;
201 }
202 Self::connect(config, namespace).await
203 }
204 }
205
206 fn recreate_and_connect(
208 config: &Self::Config,
209 namespace: &str,
210 ) -> impl Future<Output = Result<Self, Self::Error>> {
211 async {
212 if Self::exists(config, namespace).await? {
213 Self::delete(config, namespace).await?;
214 }
215 Self::create(config, namespace).await?;
216 Self::connect(config, namespace).await
217 }
218 }
219}
220
221pub trait DirectKeyValueStore: ReadableKeyValueStore + DirectWritableKeyValueStore {}
227
228impl<T> DirectKeyValueStore for T where T: ReadableKeyValueStore + DirectWritableKeyValueStore {}
229
230pub trait KeyValueStore: ReadableKeyValueStore + WritableKeyValueStore {}
236
237impl<T> KeyValueStore for T where T: ReadableKeyValueStore + WritableKeyValueStore {}
238
239#[cfg(with_testing)]
241pub trait TestKeyValueDatabase: KeyValueDatabase {
242 async fn new_test_config() -> Result<Self::Config, Self::Error>;
244
245 async fn connect_test_namespace() -> Result<Self, Self::Error> {
247 let config = Self::new_test_config().await?;
248 let namespace = generate_test_namespace();
249 Self::recreate_and_connect(&config, &namespace).await
250 }
251
252 async fn new_test_store() -> Result<Self::Store, Self::Error> {
254 let database = Self::connect_test_namespace().await?;
255 database.open_shared(&[])
256 }
257}
258
259pub mod inactive_store {
261 use super::*;
262
263 pub struct InactiveStore;
265
266 #[derive(Clone, Copy, Debug)]
268 pub struct InactiveStoreError;
269
270 impl std::fmt::Display for InactiveStoreError {
271 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
272 write!(f, "inactive store error")
273 }
274 }
275
276 impl From<bcs::Error> for InactiveStoreError {
277 fn from(_other: bcs::Error) -> Self {
278 Self
279 }
280 }
281
282 impl std::error::Error for InactiveStoreError {}
283
284 impl KeyValueStoreError for InactiveStoreError {
285 const BACKEND: &'static str = "inactive";
286 }
287
288 impl WithError for InactiveStore {
289 type Error = InactiveStoreError;
290 }
291
292 impl ReadableKeyValueStore for InactiveStore {
293 const MAX_KEY_SIZE: usize = 0;
294
295 fn max_stream_queries(&self) -> usize {
296 0
297 }
298
299 async fn read_value_bytes(&self, _key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
300 panic!("attempt to read from an inactive store!")
301 }
302
303 async fn contains_key(&self, _key: &[u8]) -> Result<bool, Self::Error> {
304 panic!("attempt to read from an inactive store!")
305 }
306
307 async fn contains_keys(&self, _keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
308 panic!("attempt to read from an inactive store!")
309 }
310
311 async fn read_multi_values_bytes(
312 &self,
313 _keys: Vec<Vec<u8>>,
314 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
315 panic!("attempt to read from an inactive store!")
316 }
317
318 async fn find_keys_by_prefix(
319 &self,
320 _key_prefix: &[u8],
321 ) -> Result<Vec<Vec<u8>>, Self::Error> {
322 panic!("attempt to read from an inactive store!")
323 }
324
325 async fn find_key_values_by_prefix(
327 &self,
328 _key_prefix: &[u8],
329 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
330 panic!("attempt to read from an inactive store!")
331 }
332 }
333
334 impl WritableKeyValueStore for InactiveStore {
335 const MAX_VALUE_SIZE: usize = 0;
336
337 async fn write_batch(&self, _batch: Batch) -> Result<(), Self::Error> {
338 panic!("attempt to write to an inactive store!")
339 }
340
341 async fn clear_journal(&self) -> Result<(), Self::Error> {
342 panic!("attempt to write to an inactive store!")
343 }
344 }
345}