1use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[cfg(with_testing)]
10use crate::store::TestKeyValueStore;
11use crate::{
12 batch::Batch,
13 store::{
14 AdminKeyValueStore, KeyIterable, KeyValueIterable, KeyValueStoreError,
15 ReadableKeyValueStore, WithError, WritableKeyValueStore,
16 },
17};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct DualStoreConfig<C1, C2> {
22 pub first_config: C1,
24 pub second_config: C2,
26}
27
28#[derive(Clone, Copy, Debug)]
30pub enum StoreInUse {
31 First,
33 Second,
35}
36
37#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
39pub trait DualStoreRootKeyAssignment {
40 fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error>;
42}
43
44#[derive(Clone)]
46pub struct DualStore<S1, S2, A> {
47 first_store: S1,
49 second_store: S2,
51 store_in_use: StoreInUse,
53 _marker: std::marker::PhantomData<A>,
55}
56
57impl<S1, S2, A> WithError for DualStore<S1, S2, A>
58where
59 S1: WithError,
60 S2: WithError,
61{
62 type Error = DualStoreError<S1::Error, S2::Error>;
63}
64
65impl<S1, S2, A> ReadableKeyValueStore for DualStore<S1, S2, A>
66where
67 S1: ReadableKeyValueStore,
68 S2: ReadableKeyValueStore,
69 A: DualStoreRootKeyAssignment,
70{
71 const MAX_KEY_SIZE: usize = if S1::MAX_KEY_SIZE < S2::MAX_KEY_SIZE {
73 S1::MAX_KEY_SIZE
74 } else {
75 S2::MAX_KEY_SIZE
76 };
77
78 type Keys = DualStoreKeys<S1::Keys, S2::Keys>;
79 type KeyValues = DualStoreKeyValues<S1::KeyValues, S2::KeyValues>;
80
81 fn max_stream_queries(&self) -> usize {
82 match self.store_in_use {
83 StoreInUse::First => self.first_store.max_stream_queries(),
84 StoreInUse::Second => self.second_store.max_stream_queries(),
85 }
86 }
87
88 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
89 let result = match self.store_in_use {
90 StoreInUse::First => self
91 .first_store
92 .read_value_bytes(key)
93 .await
94 .map_err(DualStoreError::First)?,
95 StoreInUse::Second => self
96 .second_store
97 .read_value_bytes(key)
98 .await
99 .map_err(DualStoreError::Second)?,
100 };
101 Ok(result)
102 }
103
104 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
105 let result = match self.store_in_use {
106 StoreInUse::First => self
107 .first_store
108 .contains_key(key)
109 .await
110 .map_err(DualStoreError::First)?,
111 StoreInUse::Second => self
112 .second_store
113 .contains_key(key)
114 .await
115 .map_err(DualStoreError::Second)?,
116 };
117 Ok(result)
118 }
119
120 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
121 let result = match self.store_in_use {
122 StoreInUse::First => self
123 .first_store
124 .contains_keys(keys)
125 .await
126 .map_err(DualStoreError::First)?,
127 StoreInUse::Second => self
128 .second_store
129 .contains_keys(keys)
130 .await
131 .map_err(DualStoreError::Second)?,
132 };
133 Ok(result)
134 }
135
136 async fn read_multi_values_bytes(
137 &self,
138 keys: Vec<Vec<u8>>,
139 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
140 let result = match self.store_in_use {
141 StoreInUse::First => self
142 .first_store
143 .read_multi_values_bytes(keys)
144 .await
145 .map_err(DualStoreError::First)?,
146 StoreInUse::Second => self
147 .second_store
148 .read_multi_values_bytes(keys)
149 .await
150 .map_err(DualStoreError::Second)?,
151 };
152 Ok(result)
153 }
154
155 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Self::Keys, Self::Error> {
156 let result = match self.store_in_use {
157 StoreInUse::First => DualStoreKeys::First(
158 self.first_store
159 .find_keys_by_prefix(key_prefix)
160 .await
161 .map_err(DualStoreError::First)?,
162 ),
163 StoreInUse::Second => DualStoreKeys::Second(
164 self.second_store
165 .find_keys_by_prefix(key_prefix)
166 .await
167 .map_err(DualStoreError::Second)?,
168 ),
169 };
170 Ok(result)
171 }
172
173 async fn find_key_values_by_prefix(
174 &self,
175 key_prefix: &[u8],
176 ) -> Result<Self::KeyValues, Self::Error> {
177 let result = match self.store_in_use {
178 StoreInUse::First => DualStoreKeyValues::First(
179 self.first_store
180 .find_key_values_by_prefix(key_prefix)
181 .await
182 .map_err(DualStoreError::First)?,
183 ),
184 StoreInUse::Second => DualStoreKeyValues::Second(
185 self.second_store
186 .find_key_values_by_prefix(key_prefix)
187 .await
188 .map_err(DualStoreError::Second)?,
189 ),
190 };
191 Ok(result)
192 }
193}
194
195impl<S1, S2, A> WritableKeyValueStore for DualStore<S1, S2, A>
196where
197 S1: WritableKeyValueStore,
198 S2: WritableKeyValueStore,
199 A: DualStoreRootKeyAssignment,
200{
201 const MAX_VALUE_SIZE: usize = usize::MAX;
202
203 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
204 match self.store_in_use {
205 StoreInUse::First => self
206 .first_store
207 .write_batch(batch)
208 .await
209 .map_err(DualStoreError::First)?,
210 StoreInUse::Second => self
211 .second_store
212 .write_batch(batch)
213 .await
214 .map_err(DualStoreError::Second)?,
215 }
216 Ok(())
217 }
218
219 async fn clear_journal(&self) -> Result<(), Self::Error> {
220 match self.store_in_use {
221 StoreInUse::First => self
222 .first_store
223 .clear_journal()
224 .await
225 .map_err(DualStoreError::First)?,
226 StoreInUse::Second => self
227 .second_store
228 .clear_journal()
229 .await
230 .map_err(DualStoreError::Second)?,
231 }
232 Ok(())
233 }
234}
235
236impl<S1, S2, A> AdminKeyValueStore for DualStore<S1, S2, A>
237where
238 S1: AdminKeyValueStore,
239 S2: AdminKeyValueStore,
240 A: DualStoreRootKeyAssignment,
241{
242 type Config = DualStoreConfig<S1::Config, S2::Config>;
243
244 fn get_name() -> String {
245 format!("dual {} and {}", S1::get_name(), S2::get_name())
246 }
247
248 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
249 let first_store = S1::connect(&config.first_config, namespace)
250 .await
251 .map_err(DualStoreError::First)?;
252 let second_store = S2::connect(&config.second_config, namespace)
253 .await
254 .map_err(DualStoreError::Second)?;
255 let store_in_use = A::assigned_store(&[])?;
256 Ok(Self {
257 first_store,
258 second_store,
259 store_in_use,
260 _marker: std::marker::PhantomData,
261 })
262 }
263
264 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, Self::Error> {
265 let first_store = self
266 .first_store
267 .open_exclusive(root_key)
268 .map_err(DualStoreError::First)?;
269 let second_store = self
270 .second_store
271 .open_exclusive(root_key)
272 .map_err(DualStoreError::Second)?;
273 let store_in_use = A::assigned_store(root_key)?;
274 Ok(Self {
275 first_store,
276 second_store,
277 store_in_use,
278 _marker: std::marker::PhantomData,
279 })
280 }
281
282 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
283 let namespaces1 = S1::list_all(&config.first_config)
284 .await
285 .map_err(DualStoreError::First)?;
286 let mut namespaces = Vec::new();
287 for namespace in namespaces1 {
288 if S2::exists(&config.second_config, &namespace)
289 .await
290 .map_err(DualStoreError::Second)?
291 {
292 namespaces.push(namespace);
293 } else {
294 tracing::warn!("Namespace {} only exists in the first store", namespace);
295 }
296 }
297 Ok(namespaces)
298 }
299
300 async fn list_root_keys(
301 config: &Self::Config,
302 namespace: &str,
303 ) -> Result<Vec<Vec<u8>>, Self::Error> {
304 let mut root_keys = S1::list_root_keys(&config.first_config, namespace)
305 .await
306 .map_err(DualStoreError::First)?;
307 root_keys.extend(
308 S2::list_root_keys(&config.second_config, namespace)
309 .await
310 .map_err(DualStoreError::Second)?,
311 );
312 Ok(root_keys)
313 }
314
315 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
316 Ok(S1::exists(&config.first_config, namespace)
317 .await
318 .map_err(DualStoreError::First)?
319 && S2::exists(&config.second_config, namespace)
320 .await
321 .map_err(DualStoreError::Second)?)
322 }
323
324 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
325 let exists1 = S1::exists(&config.first_config, namespace)
326 .await
327 .map_err(DualStoreError::First)?;
328 let exists2 = S2::exists(&config.second_config, namespace)
329 .await
330 .map_err(DualStoreError::Second)?;
331 if exists1 && exists2 {
332 return Err(DualStoreError::StoreAlreadyExists);
333 }
334 if !exists1 {
335 S1::create(&config.first_config, namespace)
336 .await
337 .map_err(DualStoreError::First)?;
338 }
339 if !exists2 {
340 S2::create(&config.second_config, namespace)
341 .await
342 .map_err(DualStoreError::Second)?;
343 }
344 Ok(())
345 }
346
347 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
348 S1::delete(&config.first_config, namespace)
349 .await
350 .map_err(DualStoreError::First)?;
351 S2::delete(&config.second_config, namespace)
352 .await
353 .map_err(DualStoreError::Second)?;
354 Ok(())
355 }
356}
357
358#[cfg(with_testing)]
359impl<S1, S2, A> TestKeyValueStore for DualStore<S1, S2, A>
360where
361 S1: TestKeyValueStore,
362 S2: TestKeyValueStore,
363 A: DualStoreRootKeyAssignment,
364{
365 async fn new_test_config() -> Result<Self::Config, Self::Error> {
366 let first_config = S1::new_test_config().await.map_err(DualStoreError::First)?;
367 let second_config = S2::new_test_config()
368 .await
369 .map_err(DualStoreError::Second)?;
370 Ok(DualStoreConfig {
371 first_config,
372 second_config,
373 })
374 }
375}
376
377#[derive(Error, Debug)]
379pub enum DualStoreError<E1, E2> {
380 #[error("Store already exists during a create operation")]
382 StoreAlreadyExists,
383
384 #[error(transparent)]
386 BcsError(#[from] bcs::Error),
387
388 #[error("Error in first store: {0}")]
390 First(E1),
391
392 #[error("Error in second store: {0}")]
394 Second(E2),
395}
396
397impl<E1, E2> KeyValueStoreError for DualStoreError<E1, E2>
398where
399 E1: KeyValueStoreError,
400 E2: KeyValueStoreError,
401{
402 const BACKEND: &'static str = "dual_store";
403}
404
405pub enum DualStoreKeys<K1, K2> {
407 First(K1),
409 Second(K2),
411}
412
413pub enum DualStoreKeyIterator<I1, I2> {
415 First(I1),
417 Second(I2),
419}
420
421pub enum DualStoreKeyValues<K1, K2> {
423 First(K1),
425 Second(K2),
427}
428
429pub enum DualStoreKeyValueIterator<I1, I2> {
431 First(I1),
433 Second(I2),
435}
436
437pub enum DualStoreKeyValueIteratorOwned<I1, I2> {
439 First(I1),
441 Second(I2),
443}
444
445impl<E1, E2, K1, K2> KeyIterable<DualStoreError<E1, E2>> for DualStoreKeys<K1, K2>
446where
447 K1: KeyIterable<E1>,
448 K2: KeyIterable<E2>,
449{
450 type Iterator<'a>
451 = DualStoreKeyIterator<K1::Iterator<'a>, K2::Iterator<'a>>
452 where
453 K1: 'a,
454 K2: 'a;
455
456 fn iterator(&self) -> Self::Iterator<'_> {
457 match self {
458 Self::First(keys) => DualStoreKeyIterator::First(keys.iterator()),
459 Self::Second(keys) => DualStoreKeyIterator::Second(keys.iterator()),
460 }
461 }
462}
463
464impl<'a, I1, I2, E1, E2> Iterator for DualStoreKeyIterator<I1, I2>
465where
466 I1: Iterator<Item = Result<&'a [u8], E1>>,
467 I2: Iterator<Item = Result<&'a [u8], E2>>,
468{
469 type Item = Result<&'a [u8], DualStoreError<E1, E2>>;
470
471 fn next(&mut self) -> Option<Self::Item> {
472 match self {
473 Self::First(iter) => iter
474 .next()
475 .map(|result| result.map_err(DualStoreError::First)),
476 Self::Second(iter) => iter
477 .next()
478 .map(|result| result.map_err(DualStoreError::Second)),
479 }
480 }
481}
482
483impl<E1, E2, K1, K2> KeyValueIterable<DualStoreError<E1, E2>> for DualStoreKeyValues<K1, K2>
484where
485 K1: KeyValueIterable<E1>,
486 K2: KeyValueIterable<E2>,
487{
488 type Iterator<'a>
489 = DualStoreKeyValueIterator<K1::Iterator<'a>, K2::Iterator<'a>>
490 where
491 K1: 'a,
492 K2: 'a;
493 type IteratorOwned = DualStoreKeyValueIteratorOwned<K1::IteratorOwned, K2::IteratorOwned>;
494
495 fn iterator(&self) -> Self::Iterator<'_> {
496 match self {
497 Self::First(keys) => DualStoreKeyValueIterator::First(keys.iterator()),
498 Self::Second(keys) => DualStoreKeyValueIterator::Second(keys.iterator()),
499 }
500 }
501
502 fn into_iterator_owned(self) -> Self::IteratorOwned {
503 match self {
504 Self::First(keys) => DualStoreKeyValueIteratorOwned::First(keys.into_iterator_owned()),
505 Self::Second(keys) => {
506 DualStoreKeyValueIteratorOwned::Second(keys.into_iterator_owned())
507 }
508 }
509 }
510}
511
512impl<'a, I1, I2, E1, E2> Iterator for DualStoreKeyValueIterator<I1, I2>
513where
514 I1: Iterator<Item = Result<(&'a [u8], &'a [u8]), E1>>,
515 I2: Iterator<Item = Result<(&'a [u8], &'a [u8]), E2>>,
516{
517 type Item = Result<(&'a [u8], &'a [u8]), DualStoreError<E1, E2>>;
518
519 fn next(&mut self) -> Option<Self::Item> {
520 match self {
521 Self::First(iter) => iter
522 .next()
523 .map(|result| result.map_err(DualStoreError::First)),
524 Self::Second(iter) => iter
525 .next()
526 .map(|result| result.map_err(DualStoreError::Second)),
527 }
528 }
529}
530
531impl<I1, I2, E1, E2> Iterator for DualStoreKeyValueIteratorOwned<I1, I2>
532where
533 I1: Iterator<Item = Result<(Vec<u8>, Vec<u8>), E1>>,
534 I2: Iterator<Item = Result<(Vec<u8>, Vec<u8>), E2>>,
535{
536 type Item = Result<(Vec<u8>, Vec<u8>), DualStoreError<E1, E2>>;
537
538 fn next(&mut self) -> Option<Self::Item> {
539 match self {
540 Self::First(iter) => iter
541 .next()
542 .map(|result| result.map_err(DualStoreError::First)),
543 Self::Second(iter) => iter
544 .next()
545 .map(|result| result.map_err(DualStoreError::Second)),
546 }
547 }
548}