linera_views/backends/
dual.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] by combining two existing stores.
5
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[cfg(with_testing)]
10use crate::store::TestKeyValueDatabase;
11use crate::{
12    batch::Batch,
13    store::{
14        KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
15        WritableKeyValueStore,
16    },
17};
18
19/// A dual database.
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct DualDatabase<D1, D2, A> {
22    /// The first database.
23    pub first_database: D1,
24    /// The second database.
25    pub second_database: D2,
26    /// Marker for the static root key assignment.
27    _marker: std::marker::PhantomData<A>,
28}
29
30/// The initial configuration of the system.
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct DualStoreConfig<C1, C2> {
33    /// The first config.
34    pub first_config: C1,
35    /// The second config.
36    pub second_config: C2,
37}
38
39/// The store in use.
40#[derive(Clone, Copy, Debug)]
41pub enum StoreInUse {
42    /// The first store.
43    First,
44    /// The second store.
45    Second,
46}
47
48/// The trait for a (static) root key assignment.
49#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
50pub trait DualStoreRootKeyAssignment {
51    /// Obtains the store assigned to this root key.
52    fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error>;
53}
54
55/// A partition opened in one of the two databases.
56#[derive(Clone)]
57pub enum DualStore<S1, S2> {
58    /// The first store.
59    First(S1),
60    /// The second store.
61    Second(S2),
62}
63
64impl<D1, D2, A> WithError for DualDatabase<D1, D2, A>
65where
66    D1: WithError,
67    D2: WithError,
68{
69    type Error = DualStoreError<D1::Error, D2::Error>;
70}
71
72impl<S1, S2> WithError for DualStore<S1, S2>
73where
74    S1: WithError,
75    S2: WithError,
76{
77    type Error = DualStoreError<S1::Error, S2::Error>;
78}
79
80impl<S1, S2> ReadableKeyValueStore for DualStore<S1, S2>
81where
82    S1: ReadableKeyValueStore,
83    S2: ReadableKeyValueStore,
84{
85    // TODO(#2524): consider changing MAX_KEY_SIZE into a function.
86    const MAX_KEY_SIZE: usize = if S1::MAX_KEY_SIZE < S2::MAX_KEY_SIZE {
87        S1::MAX_KEY_SIZE
88    } else {
89        S2::MAX_KEY_SIZE
90    };
91
92    fn max_stream_queries(&self) -> usize {
93        match self {
94            Self::First(store) => store.max_stream_queries(),
95            Self::Second(store) => store.max_stream_queries(),
96        }
97    }
98
99    fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
100        Ok(match self {
101            Self::First(store) => store.root_key().map_err(DualStoreError::First)?,
102            Self::Second(store) => store.root_key().map_err(DualStoreError::Second)?,
103        })
104    }
105
106    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
107        let result = match self {
108            Self::First(store) => store
109                .read_value_bytes(key)
110                .await
111                .map_err(DualStoreError::First)?,
112            Self::Second(store) => store
113                .read_value_bytes(key)
114                .await
115                .map_err(DualStoreError::Second)?,
116        };
117        Ok(result)
118    }
119
120    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
121        let result = match self {
122            Self::First(store) => store
123                .contains_key(key)
124                .await
125                .map_err(DualStoreError::First)?,
126            Self::Second(store) => store
127                .contains_key(key)
128                .await
129                .map_err(DualStoreError::Second)?,
130        };
131        Ok(result)
132    }
133
134    async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error> {
135        let result = match self {
136            Self::First(store) => store
137                .contains_keys(keys)
138                .await
139                .map_err(DualStoreError::First)?,
140            Self::Second(store) => store
141                .contains_keys(keys)
142                .await
143                .map_err(DualStoreError::Second)?,
144        };
145        Ok(result)
146    }
147
148    async fn read_multi_values_bytes(
149        &self,
150        keys: &[Vec<u8>],
151    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
152        let result = match self {
153            Self::First(store) => store
154                .read_multi_values_bytes(keys)
155                .await
156                .map_err(DualStoreError::First)?,
157            Self::Second(store) => store
158                .read_multi_values_bytes(keys)
159                .await
160                .map_err(DualStoreError::Second)?,
161        };
162        Ok(result)
163    }
164
165    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
166        let result = match self {
167            Self::First(store) => store
168                .find_keys_by_prefix(key_prefix)
169                .await
170                .map_err(DualStoreError::First)?,
171            Self::Second(store) => store
172                .find_keys_by_prefix(key_prefix)
173                .await
174                .map_err(DualStoreError::Second)?,
175        };
176        Ok(result)
177    }
178
179    async fn find_key_values_by_prefix(
180        &self,
181        key_prefix: &[u8],
182    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
183        let result = match self {
184            Self::First(store) => store
185                .find_key_values_by_prefix(key_prefix)
186                .await
187                .map_err(DualStoreError::First)?,
188            Self::Second(store) => store
189                .find_key_values_by_prefix(key_prefix)
190                .await
191                .map_err(DualStoreError::Second)?,
192        };
193        Ok(result)
194    }
195}
196
197impl<S1, S2> WritableKeyValueStore for DualStore<S1, S2>
198where
199    S1: WritableKeyValueStore,
200    S2: WritableKeyValueStore,
201{
202    const MAX_VALUE_SIZE: usize = usize::MAX;
203
204    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
205        match self {
206            Self::First(store) => store
207                .write_batch(batch)
208                .await
209                .map_err(DualStoreError::First)?,
210            Self::Second(store) => store
211                .write_batch(batch)
212                .await
213                .map_err(DualStoreError::Second)?,
214        }
215        Ok(())
216    }
217
218    async fn clear_journal(&self) -> Result<(), Self::Error> {
219        match self {
220            Self::First(store) => store.clear_journal().await.map_err(DualStoreError::First)?,
221            Self::Second(store) => store
222                .clear_journal()
223                .await
224                .map_err(DualStoreError::Second)?,
225        }
226        Ok(())
227    }
228}
229
230impl<D1, D2, A> KeyValueDatabase for DualDatabase<D1, D2, A>
231where
232    D1: KeyValueDatabase,
233    D2: KeyValueDatabase,
234    A: DualStoreRootKeyAssignment,
235{
236    type Config = DualStoreConfig<D1::Config, D2::Config>;
237    type Store = DualStore<D1::Store, D2::Store>;
238
239    fn get_name() -> String {
240        format!("dual {} and {}", D1::get_name(), D2::get_name())
241    }
242
243    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
244        let first_database = D1::connect(&config.first_config, namespace)
245            .await
246            .map_err(DualStoreError::First)?;
247        let second_database = D2::connect(&config.second_config, namespace)
248            .await
249            .map_err(DualStoreError::Second)?;
250        let database = Self {
251            first_database,
252            second_database,
253            _marker: std::marker::PhantomData,
254        };
255        Ok(database)
256    }
257
258    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
259        match A::assigned_store(root_key)? {
260            StoreInUse::First => {
261                let store = self
262                    .first_database
263                    .open_shared(root_key)
264                    .map_err(DualStoreError::First)?;
265                Ok(DualStore::First(store))
266            }
267            StoreInUse::Second => {
268                let store = self
269                    .second_database
270                    .open_shared(root_key)
271                    .map_err(DualStoreError::Second)?;
272                Ok(DualStore::Second(store))
273            }
274        }
275    }
276
277    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
278        match A::assigned_store(root_key)? {
279            StoreInUse::First => {
280                let store = self
281                    .first_database
282                    .open_exclusive(root_key)
283                    .map_err(DualStoreError::First)?;
284                Ok(DualStore::First(store))
285            }
286            StoreInUse::Second => {
287                let store = self
288                    .second_database
289                    .open_exclusive(root_key)
290                    .map_err(DualStoreError::Second)?;
291                Ok(DualStore::Second(store))
292            }
293        }
294    }
295
296    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
297        let namespaces1 = D1::list_all(&config.first_config)
298            .await
299            .map_err(DualStoreError::First)?;
300        let mut namespaces = Vec::new();
301        for namespace in namespaces1 {
302            if D2::exists(&config.second_config, &namespace)
303                .await
304                .map_err(DualStoreError::Second)?
305            {
306                namespaces.push(namespace);
307            } else {
308                tracing::warn!("Namespace {} only exists in the first store", namespace);
309            }
310        }
311        Ok(namespaces)
312    }
313
314    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
315        let mut root_keys = self
316            .first_database
317            .list_root_keys()
318            .await
319            .map_err(DualStoreError::First)?;
320        root_keys.extend(
321            self.second_database
322                .list_root_keys()
323                .await
324                .map_err(DualStoreError::Second)?,
325        );
326        Ok(root_keys)
327    }
328
329    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
330        Ok(D1::exists(&config.first_config, namespace)
331            .await
332            .map_err(DualStoreError::First)?
333            && D2::exists(&config.second_config, namespace)
334                .await
335                .map_err(DualStoreError::Second)?)
336    }
337
338    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
339        let exists1 = D1::exists(&config.first_config, namespace)
340            .await
341            .map_err(DualStoreError::First)?;
342        let exists2 = D2::exists(&config.second_config, namespace)
343            .await
344            .map_err(DualStoreError::Second)?;
345        if exists1 && exists2 {
346            return Err(DualStoreError::StoreAlreadyExists);
347        }
348        if !exists1 {
349            D1::create(&config.first_config, namespace)
350                .await
351                .map_err(DualStoreError::First)?;
352        }
353        if !exists2 {
354            D2::create(&config.second_config, namespace)
355                .await
356                .map_err(DualStoreError::Second)?;
357        }
358        Ok(())
359    }
360
361    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
362        D1::delete(&config.first_config, namespace)
363            .await
364            .map_err(DualStoreError::First)?;
365        D2::delete(&config.second_config, namespace)
366            .await
367            .map_err(DualStoreError::Second)?;
368        Ok(())
369    }
370}
371
372#[cfg(with_testing)]
373impl<D1, D2, A> TestKeyValueDatabase for DualDatabase<D1, D2, A>
374where
375    D1: TestKeyValueDatabase,
376    D2: TestKeyValueDatabase,
377    A: DualStoreRootKeyAssignment,
378{
379    async fn new_test_config() -> Result<Self::Config, Self::Error> {
380        let first_config = D1::new_test_config().await.map_err(DualStoreError::First)?;
381        let second_config = D2::new_test_config()
382            .await
383            .map_err(DualStoreError::Second)?;
384        Ok(DualStoreConfig {
385            first_config,
386            second_config,
387        })
388    }
389}
390
391/// The error type for [`DualStore`].
392#[derive(Error, Debug)]
393pub enum DualStoreError<E1, E2> {
394    /// Store already exists during a create operation
395    #[error("Store already exists during a create operation")]
396    StoreAlreadyExists,
397
398    /// Serialization error with BCS.
399    #[error(transparent)]
400    BcsError(#[from] bcs::Error),
401
402    /// First store.
403    #[error("Error in first store: {0}")]
404    First(E1),
405
406    /// Second store.
407    #[error("Error in second store: {0}")]
408    Second(E2),
409}
410
411impl<E1, E2> KeyValueStoreError for DualStoreError<E1, E2>
412where
413    E1: KeyValueStoreError,
414    E2: KeyValueStoreError,
415{
416    const BACKEND: &'static str = "dual_store";
417}