1use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[cfg(with_testing)]
10use crate::store::TestKeyValueDatabase;
11use crate::{
12 batch::Batch,
13 store::{
14 KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
15 WritableKeyValueStore,
16 },
17};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct DualDatabase<D1, D2, A> {
22 pub first_database: D1,
24 pub second_database: D2,
26 _marker: std::marker::PhantomData<A>,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct DualStoreConfig<C1, C2> {
33 pub first_config: C1,
35 pub second_config: C2,
37}
38
39#[derive(Clone, Copy, Debug)]
41pub enum StoreInUse {
42 First,
44 Second,
46}
47
48pub trait DualStoreRootKeyAssignment {
50 fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error>;
52}
53
54#[derive(Clone)]
56pub enum DualStore<S1, S2> {
57 First(S1),
59 Second(S2),
61}
62
63impl<D1, D2, A> WithError for DualDatabase<D1, D2, A>
64where
65 D1: WithError,
66 D2: WithError,
67{
68 type Error = DualStoreError<D1::Error, D2::Error>;
69}
70
71impl<S1, S2> WithError for DualStore<S1, S2>
72where
73 S1: WithError,
74 S2: WithError,
75{
76 type Error = DualStoreError<S1::Error, S2::Error>;
77}
78
79impl<S1, S2> ReadableKeyValueStore for DualStore<S1, S2>
80where
81 S1: ReadableKeyValueStore,
82 S2: ReadableKeyValueStore,
83{
84 const MAX_KEY_SIZE: usize = if S1::MAX_KEY_SIZE < S2::MAX_KEY_SIZE {
86 S1::MAX_KEY_SIZE
87 } else {
88 S2::MAX_KEY_SIZE
89 };
90
91 fn max_stream_queries(&self) -> usize {
92 match self {
93 Self::First(store) => store.max_stream_queries(),
94 Self::Second(store) => store.max_stream_queries(),
95 }
96 }
97
98 fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
99 Ok(match self {
100 Self::First(store) => store.root_key().map_err(DualStoreError::First)?,
101 Self::Second(store) => store.root_key().map_err(DualStoreError::Second)?,
102 })
103 }
104
105 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
106 let result = match self {
107 Self::First(store) => store
108 .read_value_bytes(key)
109 .await
110 .map_err(DualStoreError::First)?,
111 Self::Second(store) => store
112 .read_value_bytes(key)
113 .await
114 .map_err(DualStoreError::Second)?,
115 };
116 Ok(result)
117 }
118
119 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
120 let result = match self {
121 Self::First(store) => store
122 .contains_key(key)
123 .await
124 .map_err(DualStoreError::First)?,
125 Self::Second(store) => store
126 .contains_key(key)
127 .await
128 .map_err(DualStoreError::Second)?,
129 };
130 Ok(result)
131 }
132
133 async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error> {
134 let result = match self {
135 Self::First(store) => store
136 .contains_keys(keys)
137 .await
138 .map_err(DualStoreError::First)?,
139 Self::Second(store) => store
140 .contains_keys(keys)
141 .await
142 .map_err(DualStoreError::Second)?,
143 };
144 Ok(result)
145 }
146
147 async fn read_multi_values_bytes(
148 &self,
149 keys: &[Vec<u8>],
150 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
151 let result = match self {
152 Self::First(store) => store
153 .read_multi_values_bytes(keys)
154 .await
155 .map_err(DualStoreError::First)?,
156 Self::Second(store) => store
157 .read_multi_values_bytes(keys)
158 .await
159 .map_err(DualStoreError::Second)?,
160 };
161 Ok(result)
162 }
163
164 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
165 let result = match self {
166 Self::First(store) => store
167 .find_keys_by_prefix(key_prefix)
168 .await
169 .map_err(DualStoreError::First)?,
170 Self::Second(store) => store
171 .find_keys_by_prefix(key_prefix)
172 .await
173 .map_err(DualStoreError::Second)?,
174 };
175 Ok(result)
176 }
177
178 async fn find_key_values_by_prefix(
179 &self,
180 key_prefix: &[u8],
181 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
182 let result = match self {
183 Self::First(store) => store
184 .find_key_values_by_prefix(key_prefix)
185 .await
186 .map_err(DualStoreError::First)?,
187 Self::Second(store) => store
188 .find_key_values_by_prefix(key_prefix)
189 .await
190 .map_err(DualStoreError::Second)?,
191 };
192 Ok(result)
193 }
194}
195
196impl<S1, S2> WritableKeyValueStore for DualStore<S1, S2>
197where
198 S1: WritableKeyValueStore,
199 S2: WritableKeyValueStore,
200{
201 const MAX_VALUE_SIZE: usize = usize::MAX;
202
203 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
204 match self {
205 Self::First(store) => store
206 .write_batch(batch)
207 .await
208 .map_err(DualStoreError::First)?,
209 Self::Second(store) => store
210 .write_batch(batch)
211 .await
212 .map_err(DualStoreError::Second)?,
213 }
214 Ok(())
215 }
216
217 async fn clear_journal(&self) -> Result<(), Self::Error> {
218 match self {
219 Self::First(store) => store.clear_journal().await.map_err(DualStoreError::First)?,
220 Self::Second(store) => store
221 .clear_journal()
222 .await
223 .map_err(DualStoreError::Second)?,
224 }
225 Ok(())
226 }
227}
228
229impl<D1, D2, A> KeyValueDatabase for DualDatabase<D1, D2, A>
230where
231 D1: KeyValueDatabase,
232 D2: KeyValueDatabase,
233 A: DualStoreRootKeyAssignment + linera_base::util::traits::AutoTraits,
234{
235 type Config = DualStoreConfig<D1::Config, D2::Config>;
236 type Store = DualStore<D1::Store, D2::Store>;
237
238 fn get_name() -> String {
239 format!("dual {} and {}", D1::get_name(), D2::get_name())
240 }
241
242 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
243 let first_database = D1::connect(&config.first_config, namespace)
244 .await
245 .map_err(DualStoreError::First)?;
246 let second_database = D2::connect(&config.second_config, namespace)
247 .await
248 .map_err(DualStoreError::Second)?;
249 let database = Self {
250 first_database,
251 second_database,
252 _marker: std::marker::PhantomData,
253 };
254 Ok(database)
255 }
256
257 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
258 match A::assigned_store(root_key)? {
259 StoreInUse::First => {
260 let store = self
261 .first_database
262 .open_shared(root_key)
263 .map_err(DualStoreError::First)?;
264 Ok(DualStore::First(store))
265 }
266 StoreInUse::Second => {
267 let store = self
268 .second_database
269 .open_shared(root_key)
270 .map_err(DualStoreError::Second)?;
271 Ok(DualStore::Second(store))
272 }
273 }
274 }
275
276 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
277 match A::assigned_store(root_key)? {
278 StoreInUse::First => {
279 let store = self
280 .first_database
281 .open_exclusive(root_key)
282 .map_err(DualStoreError::First)?;
283 Ok(DualStore::First(store))
284 }
285 StoreInUse::Second => {
286 let store = self
287 .second_database
288 .open_exclusive(root_key)
289 .map_err(DualStoreError::Second)?;
290 Ok(DualStore::Second(store))
291 }
292 }
293 }
294
295 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
296 let namespaces1 = D1::list_all(&config.first_config)
297 .await
298 .map_err(DualStoreError::First)?;
299 let mut namespaces = Vec::new();
300 for namespace in namespaces1 {
301 if D2::exists(&config.second_config, &namespace)
302 .await
303 .map_err(DualStoreError::Second)?
304 {
305 namespaces.push(namespace);
306 } else {
307 tracing::warn!("Namespace {} only exists in the first store", namespace);
308 }
309 }
310 Ok(namespaces)
311 }
312
313 async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
314 let mut root_keys = self
315 .first_database
316 .list_root_keys()
317 .await
318 .map_err(DualStoreError::First)?;
319 root_keys.extend(
320 self.second_database
321 .list_root_keys()
322 .await
323 .map_err(DualStoreError::Second)?,
324 );
325 Ok(root_keys)
326 }
327
328 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
329 Ok(D1::exists(&config.first_config, namespace)
330 .await
331 .map_err(DualStoreError::First)?
332 && D2::exists(&config.second_config, namespace)
333 .await
334 .map_err(DualStoreError::Second)?)
335 }
336
337 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
338 let exists1 = D1::exists(&config.first_config, namespace)
339 .await
340 .map_err(DualStoreError::First)?;
341 let exists2 = D2::exists(&config.second_config, namespace)
342 .await
343 .map_err(DualStoreError::Second)?;
344 if exists1 && exists2 {
345 return Err(DualStoreError::StoreAlreadyExists);
346 }
347 if !exists1 {
348 D1::create(&config.first_config, namespace)
349 .await
350 .map_err(DualStoreError::First)?;
351 }
352 if !exists2 {
353 D2::create(&config.second_config, namespace)
354 .await
355 .map_err(DualStoreError::Second)?;
356 }
357 Ok(())
358 }
359
360 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
361 D1::delete(&config.first_config, namespace)
362 .await
363 .map_err(DualStoreError::First)?;
364 D2::delete(&config.second_config, namespace)
365 .await
366 .map_err(DualStoreError::Second)?;
367 Ok(())
368 }
369}
370
371#[cfg(with_testing)]
372impl<D1, D2, A> TestKeyValueDatabase for DualDatabase<D1, D2, A>
373where
374 D1: TestKeyValueDatabase,
375 D2: TestKeyValueDatabase,
376 A: DualStoreRootKeyAssignment + linera_base::util::traits::AutoTraits,
377{
378 async fn new_test_config() -> Result<Self::Config, Self::Error> {
379 let first_config = D1::new_test_config().await.map_err(DualStoreError::First)?;
380 let second_config = D2::new_test_config()
381 .await
382 .map_err(DualStoreError::Second)?;
383 Ok(DualStoreConfig {
384 first_config,
385 second_config,
386 })
387 }
388}
389
390#[derive(Error, Debug)]
392pub enum DualStoreError<E1, E2> {
393 #[error("Store already exists during a create operation")]
395 StoreAlreadyExists,
396
397 #[error(transparent)]
399 BcsError(#[from] bcs::Error),
400
401 #[error("Error in first store: {0}")]
403 First(E1),
404
405 #[error("Error in second store: {0}")]
407 Second(E2),
408}
409
410impl<E1, E2> KeyValueStoreError for DualStoreError<E1, E2>
411where
412 E1: KeyValueStoreError,
413 E2: KeyValueStoreError,
414{
415 const BACKEND: &'static str = "dual_store";
416}