1use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[cfg(with_testing)]
10use crate::store::TestKeyValueDatabase;
11use crate::{
12 batch::Batch,
13 store::{
14 KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
15 WritableKeyValueStore,
16 },
17};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct DualDatabase<D1, D2, A> {
22 pub first_database: D1,
24 pub second_database: D2,
26 _marker: std::marker::PhantomData<A>,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct DualStoreConfig<C1, C2> {
33 pub first_config: C1,
35 pub second_config: C2,
37}
38
39#[derive(Clone, Copy, Debug)]
41pub enum StoreInUse {
42 First,
44 Second,
46}
47
48#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
50pub trait DualStoreRootKeyAssignment {
51 fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error>;
53}
54
55#[derive(Clone)]
57pub enum DualStore<S1, S2> {
58 First(S1),
60 Second(S2),
62}
63
64impl<D1, D2, A> WithError for DualDatabase<D1, D2, A>
65where
66 D1: WithError,
67 D2: WithError,
68{
69 type Error = DualStoreError<D1::Error, D2::Error>;
70}
71
72impl<S1, S2> WithError for DualStore<S1, S2>
73where
74 S1: WithError,
75 S2: WithError,
76{
77 type Error = DualStoreError<S1::Error, S2::Error>;
78}
79
80impl<S1, S2> ReadableKeyValueStore for DualStore<S1, S2>
81where
82 S1: ReadableKeyValueStore,
83 S2: ReadableKeyValueStore,
84{
85 const MAX_KEY_SIZE: usize = if S1::MAX_KEY_SIZE < S2::MAX_KEY_SIZE {
87 S1::MAX_KEY_SIZE
88 } else {
89 S2::MAX_KEY_SIZE
90 };
91
92 fn max_stream_queries(&self) -> usize {
93 match self {
94 Self::First(store) => store.max_stream_queries(),
95 Self::Second(store) => store.max_stream_queries(),
96 }
97 }
98
99 fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
100 Ok(match self {
101 Self::First(store) => store.root_key().map_err(DualStoreError::First)?,
102 Self::Second(store) => store.root_key().map_err(DualStoreError::Second)?,
103 })
104 }
105
106 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
107 let result = match self {
108 Self::First(store) => store
109 .read_value_bytes(key)
110 .await
111 .map_err(DualStoreError::First)?,
112 Self::Second(store) => store
113 .read_value_bytes(key)
114 .await
115 .map_err(DualStoreError::Second)?,
116 };
117 Ok(result)
118 }
119
120 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
121 let result = match self {
122 Self::First(store) => store
123 .contains_key(key)
124 .await
125 .map_err(DualStoreError::First)?,
126 Self::Second(store) => store
127 .contains_key(key)
128 .await
129 .map_err(DualStoreError::Second)?,
130 };
131 Ok(result)
132 }
133
134 async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error> {
135 let result = match self {
136 Self::First(store) => store
137 .contains_keys(keys)
138 .await
139 .map_err(DualStoreError::First)?,
140 Self::Second(store) => store
141 .contains_keys(keys)
142 .await
143 .map_err(DualStoreError::Second)?,
144 };
145 Ok(result)
146 }
147
148 async fn read_multi_values_bytes(
149 &self,
150 keys: &[Vec<u8>],
151 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
152 let result = match self {
153 Self::First(store) => store
154 .read_multi_values_bytes(keys)
155 .await
156 .map_err(DualStoreError::First)?,
157 Self::Second(store) => store
158 .read_multi_values_bytes(keys)
159 .await
160 .map_err(DualStoreError::Second)?,
161 };
162 Ok(result)
163 }
164
165 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
166 let result = match self {
167 Self::First(store) => store
168 .find_keys_by_prefix(key_prefix)
169 .await
170 .map_err(DualStoreError::First)?,
171 Self::Second(store) => store
172 .find_keys_by_prefix(key_prefix)
173 .await
174 .map_err(DualStoreError::Second)?,
175 };
176 Ok(result)
177 }
178
179 async fn find_key_values_by_prefix(
180 &self,
181 key_prefix: &[u8],
182 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
183 let result = match self {
184 Self::First(store) => store
185 .find_key_values_by_prefix(key_prefix)
186 .await
187 .map_err(DualStoreError::First)?,
188 Self::Second(store) => store
189 .find_key_values_by_prefix(key_prefix)
190 .await
191 .map_err(DualStoreError::Second)?,
192 };
193 Ok(result)
194 }
195}
196
197impl<S1, S2> WritableKeyValueStore for DualStore<S1, S2>
198where
199 S1: WritableKeyValueStore,
200 S2: WritableKeyValueStore,
201{
202 const MAX_VALUE_SIZE: usize = usize::MAX;
203
204 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
205 match self {
206 Self::First(store) => store
207 .write_batch(batch)
208 .await
209 .map_err(DualStoreError::First)?,
210 Self::Second(store) => store
211 .write_batch(batch)
212 .await
213 .map_err(DualStoreError::Second)?,
214 }
215 Ok(())
216 }
217
218 async fn clear_journal(&self) -> Result<(), Self::Error> {
219 match self {
220 Self::First(store) => store.clear_journal().await.map_err(DualStoreError::First)?,
221 Self::Second(store) => store
222 .clear_journal()
223 .await
224 .map_err(DualStoreError::Second)?,
225 }
226 Ok(())
227 }
228}
229
230impl<D1, D2, A> KeyValueDatabase for DualDatabase<D1, D2, A>
231where
232 D1: KeyValueDatabase,
233 D2: KeyValueDatabase,
234 A: DualStoreRootKeyAssignment,
235{
236 type Config = DualStoreConfig<D1::Config, D2::Config>;
237 type Store = DualStore<D1::Store, D2::Store>;
238
239 fn get_name() -> String {
240 format!("dual {} and {}", D1::get_name(), D2::get_name())
241 }
242
243 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
244 let first_database = D1::connect(&config.first_config, namespace)
245 .await
246 .map_err(DualStoreError::First)?;
247 let second_database = D2::connect(&config.second_config, namespace)
248 .await
249 .map_err(DualStoreError::Second)?;
250 let database = Self {
251 first_database,
252 second_database,
253 _marker: std::marker::PhantomData,
254 };
255 Ok(database)
256 }
257
258 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
259 match A::assigned_store(root_key)? {
260 StoreInUse::First => {
261 let store = self
262 .first_database
263 .open_shared(root_key)
264 .map_err(DualStoreError::First)?;
265 Ok(DualStore::First(store))
266 }
267 StoreInUse::Second => {
268 let store = self
269 .second_database
270 .open_shared(root_key)
271 .map_err(DualStoreError::Second)?;
272 Ok(DualStore::Second(store))
273 }
274 }
275 }
276
277 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
278 match A::assigned_store(root_key)? {
279 StoreInUse::First => {
280 let store = self
281 .first_database
282 .open_exclusive(root_key)
283 .map_err(DualStoreError::First)?;
284 Ok(DualStore::First(store))
285 }
286 StoreInUse::Second => {
287 let store = self
288 .second_database
289 .open_exclusive(root_key)
290 .map_err(DualStoreError::Second)?;
291 Ok(DualStore::Second(store))
292 }
293 }
294 }
295
296 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
297 let namespaces1 = D1::list_all(&config.first_config)
298 .await
299 .map_err(DualStoreError::First)?;
300 let mut namespaces = Vec::new();
301 for namespace in namespaces1 {
302 if D2::exists(&config.second_config, &namespace)
303 .await
304 .map_err(DualStoreError::Second)?
305 {
306 namespaces.push(namespace);
307 } else {
308 tracing::warn!("Namespace {} only exists in the first store", namespace);
309 }
310 }
311 Ok(namespaces)
312 }
313
314 async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
315 let mut root_keys = self
316 .first_database
317 .list_root_keys()
318 .await
319 .map_err(DualStoreError::First)?;
320 root_keys.extend(
321 self.second_database
322 .list_root_keys()
323 .await
324 .map_err(DualStoreError::Second)?,
325 );
326 Ok(root_keys)
327 }
328
329 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
330 Ok(D1::exists(&config.first_config, namespace)
331 .await
332 .map_err(DualStoreError::First)?
333 && D2::exists(&config.second_config, namespace)
334 .await
335 .map_err(DualStoreError::Second)?)
336 }
337
338 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
339 let exists1 = D1::exists(&config.first_config, namespace)
340 .await
341 .map_err(DualStoreError::First)?;
342 let exists2 = D2::exists(&config.second_config, namespace)
343 .await
344 .map_err(DualStoreError::Second)?;
345 if exists1 && exists2 {
346 return Err(DualStoreError::StoreAlreadyExists);
347 }
348 if !exists1 {
349 D1::create(&config.first_config, namespace)
350 .await
351 .map_err(DualStoreError::First)?;
352 }
353 if !exists2 {
354 D2::create(&config.second_config, namespace)
355 .await
356 .map_err(DualStoreError::Second)?;
357 }
358 Ok(())
359 }
360
361 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
362 D1::delete(&config.first_config, namespace)
363 .await
364 .map_err(DualStoreError::First)?;
365 D2::delete(&config.second_config, namespace)
366 .await
367 .map_err(DualStoreError::Second)?;
368 Ok(())
369 }
370}
371
372#[cfg(with_testing)]
373impl<D1, D2, A> TestKeyValueDatabase for DualDatabase<D1, D2, A>
374where
375 D1: TestKeyValueDatabase,
376 D2: TestKeyValueDatabase,
377 A: DualStoreRootKeyAssignment,
378{
379 async fn new_test_config() -> Result<Self::Config, Self::Error> {
380 let first_config = D1::new_test_config().await.map_err(DualStoreError::First)?;
381 let second_config = D2::new_test_config()
382 .await
383 .map_err(DualStoreError::Second)?;
384 Ok(DualStoreConfig {
385 first_config,
386 second_config,
387 })
388 }
389}
390
391#[derive(Error, Debug)]
393pub enum DualStoreError<E1, E2> {
394 #[error("Store already exists during a create operation")]
396 StoreAlreadyExists,
397
398 #[error(transparent)]
400 BcsError(#[from] bcs::Error),
401
402 #[error("Error in first store: {0}")]
404 First(E1),
405
406 #[error("Error in second store: {0}")]
408 Second(E2),
409}
410
411impl<E1, E2> KeyValueStoreError for DualStoreError<E1, E2>
412where
413 E1: KeyValueStoreError,
414 E2: KeyValueStoreError,
415{
416 const BACKEND: &'static str = "dual_store";
417}