1use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[cfg(with_testing)]
10use crate::store::TestKeyValueDatabase;
11use crate::{
12 batch::Batch,
13 store::{
14 KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
15 WritableKeyValueStore,
16 },
17};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct DualDatabase<D1, D2, A> {
22 pub first_database: D1,
24 pub second_database: D2,
26 _marker: std::marker::PhantomData<A>,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct DualStoreConfig<C1, C2> {
33 pub first_config: C1,
35 pub second_config: C2,
37}
38
39#[derive(Clone, Copy, Debug)]
41pub enum StoreInUse {
42 First,
44 Second,
46}
47
48#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
50pub trait DualStoreRootKeyAssignment {
51 fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error>;
53}
54
55#[derive(Clone)]
57pub enum DualStore<S1, S2> {
58 First(S1),
60 Second(S2),
62}
63
64impl<D1, D2, A> WithError for DualDatabase<D1, D2, A>
65where
66 D1: WithError,
67 D2: WithError,
68{
69 type Error = DualStoreError<D1::Error, D2::Error>;
70}
71
72impl<S1, S2> WithError for DualStore<S1, S2>
73where
74 S1: WithError,
75 S2: WithError,
76{
77 type Error = DualStoreError<S1::Error, S2::Error>;
78}
79
80impl<S1, S2> ReadableKeyValueStore for DualStore<S1, S2>
81where
82 S1: ReadableKeyValueStore,
83 S2: ReadableKeyValueStore,
84{
85 const MAX_KEY_SIZE: usize = if S1::MAX_KEY_SIZE < S2::MAX_KEY_SIZE {
87 S1::MAX_KEY_SIZE
88 } else {
89 S2::MAX_KEY_SIZE
90 };
91
92 fn max_stream_queries(&self) -> usize {
93 match self {
94 Self::First(store) => store.max_stream_queries(),
95 Self::Second(store) => store.max_stream_queries(),
96 }
97 }
98
99 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
100 let result = match self {
101 Self::First(store) => store
102 .read_value_bytes(key)
103 .await
104 .map_err(DualStoreError::First)?,
105 Self::Second(store) => store
106 .read_value_bytes(key)
107 .await
108 .map_err(DualStoreError::Second)?,
109 };
110 Ok(result)
111 }
112
113 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
114 let result = match self {
115 Self::First(store) => store
116 .contains_key(key)
117 .await
118 .map_err(DualStoreError::First)?,
119 Self::Second(store) => store
120 .contains_key(key)
121 .await
122 .map_err(DualStoreError::Second)?,
123 };
124 Ok(result)
125 }
126
127 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
128 let result = match self {
129 Self::First(store) => store
130 .contains_keys(keys)
131 .await
132 .map_err(DualStoreError::First)?,
133 Self::Second(store) => store
134 .contains_keys(keys)
135 .await
136 .map_err(DualStoreError::Second)?,
137 };
138 Ok(result)
139 }
140
141 async fn read_multi_values_bytes(
142 &self,
143 keys: Vec<Vec<u8>>,
144 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
145 let result = match self {
146 Self::First(store) => store
147 .read_multi_values_bytes(keys)
148 .await
149 .map_err(DualStoreError::First)?,
150 Self::Second(store) => store
151 .read_multi_values_bytes(keys)
152 .await
153 .map_err(DualStoreError::Second)?,
154 };
155 Ok(result)
156 }
157
158 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
159 let result = match self {
160 Self::First(store) => store
161 .find_keys_by_prefix(key_prefix)
162 .await
163 .map_err(DualStoreError::First)?,
164 Self::Second(store) => store
165 .find_keys_by_prefix(key_prefix)
166 .await
167 .map_err(DualStoreError::Second)?,
168 };
169 Ok(result)
170 }
171
172 async fn find_key_values_by_prefix(
173 &self,
174 key_prefix: &[u8],
175 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
176 let result = match self {
177 Self::First(store) => store
178 .find_key_values_by_prefix(key_prefix)
179 .await
180 .map_err(DualStoreError::First)?,
181 Self::Second(store) => store
182 .find_key_values_by_prefix(key_prefix)
183 .await
184 .map_err(DualStoreError::Second)?,
185 };
186 Ok(result)
187 }
188}
189
190impl<S1, S2> WritableKeyValueStore for DualStore<S1, S2>
191where
192 S1: WritableKeyValueStore,
193 S2: WritableKeyValueStore,
194{
195 const MAX_VALUE_SIZE: usize = usize::MAX;
196
197 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
198 match self {
199 Self::First(store) => store
200 .write_batch(batch)
201 .await
202 .map_err(DualStoreError::First)?,
203 Self::Second(store) => store
204 .write_batch(batch)
205 .await
206 .map_err(DualStoreError::Second)?,
207 }
208 Ok(())
209 }
210
211 async fn clear_journal(&self) -> Result<(), Self::Error> {
212 match self {
213 Self::First(store) => store.clear_journal().await.map_err(DualStoreError::First)?,
214 Self::Second(store) => store
215 .clear_journal()
216 .await
217 .map_err(DualStoreError::Second)?,
218 }
219 Ok(())
220 }
221}
222
223impl<D1, D2, A> KeyValueDatabase for DualDatabase<D1, D2, A>
224where
225 D1: KeyValueDatabase,
226 D2: KeyValueDatabase,
227 A: DualStoreRootKeyAssignment,
228{
229 type Config = DualStoreConfig<D1::Config, D2::Config>;
230 type Store = DualStore<D1::Store, D2::Store>;
231
232 fn get_name() -> String {
233 format!("dual {} and {}", D1::get_name(), D2::get_name())
234 }
235
236 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
237 let first_database = D1::connect(&config.first_config, namespace)
238 .await
239 .map_err(DualStoreError::First)?;
240 let second_database = D2::connect(&config.second_config, namespace)
241 .await
242 .map_err(DualStoreError::Second)?;
243 let database = Self {
244 first_database,
245 second_database,
246 _marker: std::marker::PhantomData,
247 };
248 Ok(database)
249 }
250
251 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
252 match A::assigned_store(root_key)? {
253 StoreInUse::First => {
254 let store = self
255 .first_database
256 .open_shared(root_key)
257 .map_err(DualStoreError::First)?;
258 Ok(DualStore::First(store))
259 }
260 StoreInUse::Second => {
261 let store = self
262 .second_database
263 .open_shared(root_key)
264 .map_err(DualStoreError::Second)?;
265 Ok(DualStore::Second(store))
266 }
267 }
268 }
269
270 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
271 match A::assigned_store(root_key)? {
272 StoreInUse::First => {
273 let store = self
274 .first_database
275 .open_exclusive(root_key)
276 .map_err(DualStoreError::First)?;
277 Ok(DualStore::First(store))
278 }
279 StoreInUse::Second => {
280 let store = self
281 .second_database
282 .open_exclusive(root_key)
283 .map_err(DualStoreError::Second)?;
284 Ok(DualStore::Second(store))
285 }
286 }
287 }
288
289 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
290 let namespaces1 = D1::list_all(&config.first_config)
291 .await
292 .map_err(DualStoreError::First)?;
293 let mut namespaces = Vec::new();
294 for namespace in namespaces1 {
295 if D2::exists(&config.second_config, &namespace)
296 .await
297 .map_err(DualStoreError::Second)?
298 {
299 namespaces.push(namespace);
300 } else {
301 tracing::warn!("Namespace {} only exists in the first store", namespace);
302 }
303 }
304 Ok(namespaces)
305 }
306
307 async fn list_root_keys(
308 config: &Self::Config,
309 namespace: &str,
310 ) -> Result<Vec<Vec<u8>>, Self::Error> {
311 let mut root_keys = D1::list_root_keys(&config.first_config, namespace)
312 .await
313 .map_err(DualStoreError::First)?;
314 root_keys.extend(
315 D2::list_root_keys(&config.second_config, namespace)
316 .await
317 .map_err(DualStoreError::Second)?,
318 );
319 Ok(root_keys)
320 }
321
322 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
323 Ok(D1::exists(&config.first_config, namespace)
324 .await
325 .map_err(DualStoreError::First)?
326 && D2::exists(&config.second_config, namespace)
327 .await
328 .map_err(DualStoreError::Second)?)
329 }
330
331 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
332 let exists1 = D1::exists(&config.first_config, namespace)
333 .await
334 .map_err(DualStoreError::First)?;
335 let exists2 = D2::exists(&config.second_config, namespace)
336 .await
337 .map_err(DualStoreError::Second)?;
338 if exists1 && exists2 {
339 return Err(DualStoreError::StoreAlreadyExists);
340 }
341 if !exists1 {
342 D1::create(&config.first_config, namespace)
343 .await
344 .map_err(DualStoreError::First)?;
345 }
346 if !exists2 {
347 D2::create(&config.second_config, namespace)
348 .await
349 .map_err(DualStoreError::Second)?;
350 }
351 Ok(())
352 }
353
354 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
355 D1::delete(&config.first_config, namespace)
356 .await
357 .map_err(DualStoreError::First)?;
358 D2::delete(&config.second_config, namespace)
359 .await
360 .map_err(DualStoreError::Second)?;
361 Ok(())
362 }
363}
364
365#[cfg(with_testing)]
366impl<D1, D2, A> TestKeyValueDatabase for DualDatabase<D1, D2, A>
367where
368 D1: TestKeyValueDatabase,
369 D2: TestKeyValueDatabase,
370 A: DualStoreRootKeyAssignment,
371{
372 async fn new_test_config() -> Result<Self::Config, Self::Error> {
373 let first_config = D1::new_test_config().await.map_err(DualStoreError::First)?;
374 let second_config = D2::new_test_config()
375 .await
376 .map_err(DualStoreError::Second)?;
377 Ok(DualStoreConfig {
378 first_config,
379 second_config,
380 })
381 }
382}
383
384#[derive(Error, Debug)]
386pub enum DualStoreError<E1, E2> {
387 #[error("Store already exists during a create operation")]
389 StoreAlreadyExists,
390
391 #[error(transparent)]
393 BcsError(#[from] bcs::Error),
394
395 #[error("Error in first store: {0}")]
397 First(E1),
398
399 #[error("Error in second store: {0}")]
401 Second(E2),
402}
403
404impl<E1, E2> KeyValueStoreError for DualStoreError<E1, E2>
405where
406 E1: KeyValueStoreError,
407 E2: KeyValueStoreError,
408{
409 const BACKEND: &'static str = "dual_store";
410}