1use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[cfg(with_testing)]
10use crate::store::TestKeyValueStore;
11use crate::{
12 batch::Batch,
13 store::{
14 AdminKeyValueStore, KeyValueStoreError, ReadableKeyValueStore, WithError,
15 WritableKeyValueStore,
16 },
17};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct DualStoreConfig<C1, C2> {
22 pub first_config: C1,
24 pub second_config: C2,
26}
27
28#[derive(Clone, Copy, Debug)]
30pub enum StoreInUse {
31 First,
33 Second,
35}
36
37#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
39pub trait DualStoreRootKeyAssignment {
40 fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error>;
42}
43
44#[derive(Clone)]
46pub struct DualStore<S1, S2, A> {
47 first_store: S1,
49 second_store: S2,
51 store_in_use: StoreInUse,
53 _marker: std::marker::PhantomData<A>,
55}
56
57impl<S1, S2, A> WithError for DualStore<S1, S2, A>
58where
59 S1: WithError,
60 S2: WithError,
61{
62 type Error = DualStoreError<S1::Error, S2::Error>;
63}
64
65impl<S1, S2, A> ReadableKeyValueStore for DualStore<S1, S2, A>
66where
67 S1: ReadableKeyValueStore,
68 S2: ReadableKeyValueStore,
69 A: DualStoreRootKeyAssignment,
70{
71 const MAX_KEY_SIZE: usize = if S1::MAX_KEY_SIZE < S2::MAX_KEY_SIZE {
73 S1::MAX_KEY_SIZE
74 } else {
75 S2::MAX_KEY_SIZE
76 };
77
78 fn max_stream_queries(&self) -> usize {
79 match self.store_in_use {
80 StoreInUse::First => self.first_store.max_stream_queries(),
81 StoreInUse::Second => self.second_store.max_stream_queries(),
82 }
83 }
84
85 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
86 let result = match self.store_in_use {
87 StoreInUse::First => self
88 .first_store
89 .read_value_bytes(key)
90 .await
91 .map_err(DualStoreError::First)?,
92 StoreInUse::Second => self
93 .second_store
94 .read_value_bytes(key)
95 .await
96 .map_err(DualStoreError::Second)?,
97 };
98 Ok(result)
99 }
100
101 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
102 let result = match self.store_in_use {
103 StoreInUse::First => self
104 .first_store
105 .contains_key(key)
106 .await
107 .map_err(DualStoreError::First)?,
108 StoreInUse::Second => self
109 .second_store
110 .contains_key(key)
111 .await
112 .map_err(DualStoreError::Second)?,
113 };
114 Ok(result)
115 }
116
117 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
118 let result = match self.store_in_use {
119 StoreInUse::First => self
120 .first_store
121 .contains_keys(keys)
122 .await
123 .map_err(DualStoreError::First)?,
124 StoreInUse::Second => self
125 .second_store
126 .contains_keys(keys)
127 .await
128 .map_err(DualStoreError::Second)?,
129 };
130 Ok(result)
131 }
132
133 async fn read_multi_values_bytes(
134 &self,
135 keys: Vec<Vec<u8>>,
136 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
137 let result = match self.store_in_use {
138 StoreInUse::First => self
139 .first_store
140 .read_multi_values_bytes(keys)
141 .await
142 .map_err(DualStoreError::First)?,
143 StoreInUse::Second => self
144 .second_store
145 .read_multi_values_bytes(keys)
146 .await
147 .map_err(DualStoreError::Second)?,
148 };
149 Ok(result)
150 }
151
152 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
153 let result = match self.store_in_use {
154 StoreInUse::First => self
155 .first_store
156 .find_keys_by_prefix(key_prefix)
157 .await
158 .map_err(DualStoreError::First)?,
159 StoreInUse::Second => self
160 .second_store
161 .find_keys_by_prefix(key_prefix)
162 .await
163 .map_err(DualStoreError::Second)?,
164 };
165 Ok(result)
166 }
167
168 async fn find_key_values_by_prefix(
169 &self,
170 key_prefix: &[u8],
171 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
172 let result = match self.store_in_use {
173 StoreInUse::First => self
174 .first_store
175 .find_key_values_by_prefix(key_prefix)
176 .await
177 .map_err(DualStoreError::First)?,
178 StoreInUse::Second => self
179 .second_store
180 .find_key_values_by_prefix(key_prefix)
181 .await
182 .map_err(DualStoreError::Second)?,
183 };
184 Ok(result)
185 }
186}
187
188impl<S1, S2, A> WritableKeyValueStore for DualStore<S1, S2, A>
189where
190 S1: WritableKeyValueStore,
191 S2: WritableKeyValueStore,
192 A: DualStoreRootKeyAssignment,
193{
194 const MAX_VALUE_SIZE: usize = usize::MAX;
195
196 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
197 match self.store_in_use {
198 StoreInUse::First => self
199 .first_store
200 .write_batch(batch)
201 .await
202 .map_err(DualStoreError::First)?,
203 StoreInUse::Second => self
204 .second_store
205 .write_batch(batch)
206 .await
207 .map_err(DualStoreError::Second)?,
208 }
209 Ok(())
210 }
211
212 async fn clear_journal(&self) -> Result<(), Self::Error> {
213 match self.store_in_use {
214 StoreInUse::First => self
215 .first_store
216 .clear_journal()
217 .await
218 .map_err(DualStoreError::First)?,
219 StoreInUse::Second => self
220 .second_store
221 .clear_journal()
222 .await
223 .map_err(DualStoreError::Second)?,
224 }
225 Ok(())
226 }
227}
228
229impl<S1, S2, A> AdminKeyValueStore for DualStore<S1, S2, A>
230where
231 S1: AdminKeyValueStore,
232 S2: AdminKeyValueStore,
233 A: DualStoreRootKeyAssignment,
234{
235 type Config = DualStoreConfig<S1::Config, S2::Config>;
236
237 fn get_name() -> String {
238 format!("dual {} and {}", S1::get_name(), S2::get_name())
239 }
240
241 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
242 let first_store = S1::connect(&config.first_config, namespace)
243 .await
244 .map_err(DualStoreError::First)?;
245 let second_store = S2::connect(&config.second_config, namespace)
246 .await
247 .map_err(DualStoreError::Second)?;
248 let store_in_use = A::assigned_store(&[])?;
249 Ok(Self {
250 first_store,
251 second_store,
252 store_in_use,
253 _marker: std::marker::PhantomData,
254 })
255 }
256
257 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, Self::Error> {
258 let first_store = self
259 .first_store
260 .open_exclusive(root_key)
261 .map_err(DualStoreError::First)?;
262 let second_store = self
263 .second_store
264 .open_exclusive(root_key)
265 .map_err(DualStoreError::Second)?;
266 let store_in_use = A::assigned_store(root_key)?;
267 Ok(Self {
268 first_store,
269 second_store,
270 store_in_use,
271 _marker: std::marker::PhantomData,
272 })
273 }
274
275 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
276 let namespaces1 = S1::list_all(&config.first_config)
277 .await
278 .map_err(DualStoreError::First)?;
279 let mut namespaces = Vec::new();
280 for namespace in namespaces1 {
281 if S2::exists(&config.second_config, &namespace)
282 .await
283 .map_err(DualStoreError::Second)?
284 {
285 namespaces.push(namespace);
286 } else {
287 tracing::warn!("Namespace {} only exists in the first store", namespace);
288 }
289 }
290 Ok(namespaces)
291 }
292
293 async fn list_root_keys(
294 config: &Self::Config,
295 namespace: &str,
296 ) -> Result<Vec<Vec<u8>>, Self::Error> {
297 let mut root_keys = S1::list_root_keys(&config.first_config, namespace)
298 .await
299 .map_err(DualStoreError::First)?;
300 root_keys.extend(
301 S2::list_root_keys(&config.second_config, namespace)
302 .await
303 .map_err(DualStoreError::Second)?,
304 );
305 Ok(root_keys)
306 }
307
308 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
309 Ok(S1::exists(&config.first_config, namespace)
310 .await
311 .map_err(DualStoreError::First)?
312 && S2::exists(&config.second_config, namespace)
313 .await
314 .map_err(DualStoreError::Second)?)
315 }
316
317 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
318 let exists1 = S1::exists(&config.first_config, namespace)
319 .await
320 .map_err(DualStoreError::First)?;
321 let exists2 = S2::exists(&config.second_config, namespace)
322 .await
323 .map_err(DualStoreError::Second)?;
324 if exists1 && exists2 {
325 return Err(DualStoreError::StoreAlreadyExists);
326 }
327 if !exists1 {
328 S1::create(&config.first_config, namespace)
329 .await
330 .map_err(DualStoreError::First)?;
331 }
332 if !exists2 {
333 S2::create(&config.second_config, namespace)
334 .await
335 .map_err(DualStoreError::Second)?;
336 }
337 Ok(())
338 }
339
340 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
341 S1::delete(&config.first_config, namespace)
342 .await
343 .map_err(DualStoreError::First)?;
344 S2::delete(&config.second_config, namespace)
345 .await
346 .map_err(DualStoreError::Second)?;
347 Ok(())
348 }
349}
350
351#[cfg(with_testing)]
352impl<S1, S2, A> TestKeyValueStore for DualStore<S1, S2, A>
353where
354 S1: TestKeyValueStore,
355 S2: TestKeyValueStore,
356 A: DualStoreRootKeyAssignment,
357{
358 async fn new_test_config() -> Result<Self::Config, Self::Error> {
359 let first_config = S1::new_test_config().await.map_err(DualStoreError::First)?;
360 let second_config = S2::new_test_config()
361 .await
362 .map_err(DualStoreError::Second)?;
363 Ok(DualStoreConfig {
364 first_config,
365 second_config,
366 })
367 }
368}
369
370#[derive(Error, Debug)]
372pub enum DualStoreError<E1, E2> {
373 #[error("Store already exists during a create operation")]
375 StoreAlreadyExists,
376
377 #[error(transparent)]
379 BcsError(#[from] bcs::Error),
380
381 #[error("Error in first store: {0}")]
383 First(E1),
384
385 #[error("Error in second store: {0}")]
387 Second(E2),
388}
389
390impl<E1, E2> KeyValueStoreError for DualStoreError<E1, E2>
391where
392 E1: KeyValueStoreError,
393 E2: KeyValueStoreError,
394{
395 const BACKEND: &'static str = "dual_store";
396}