linera_views/backends/
dual.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] by combining two existing stores.
5
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[cfg(with_testing)]
10use crate::store::TestKeyValueDatabase;
11use crate::{
12    batch::Batch,
13    store::{
14        KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
15        WritableKeyValueStore,
16    },
17};
18
19/// A dual database.
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct DualDatabase<D1, D2, A> {
22    /// The first database.
23    pub first_database: D1,
24    /// The second database.
25    pub second_database: D2,
26    /// Marker for the static root key assignment.
27    _marker: std::marker::PhantomData<A>,
28}
29
30/// The initial configuration of the system.
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct DualStoreConfig<C1, C2> {
33    /// The first config.
34    pub first_config: C1,
35    /// The second config.
36    pub second_config: C2,
37}
38
39/// The store in use.
40#[derive(Clone, Copy, Debug)]
41pub enum StoreInUse {
42    /// The first store.
43    First,
44    /// The second store.
45    Second,
46}
47
48/// The trait for a (static) root key assignment.
49#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
50pub trait DualStoreRootKeyAssignment {
51    /// Obtains the store assigned to this root key.
52    fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error>;
53}
54
55/// A partition opened in one of the two databases.
56#[derive(Clone)]
57pub enum DualStore<S1, S2> {
58    /// The first store.
59    First(S1),
60    /// The second store.
61    Second(S2),
62}
63
64impl<D1, D2, A> WithError for DualDatabase<D1, D2, A>
65where
66    D1: WithError,
67    D2: WithError,
68{
69    type Error = DualStoreError<D1::Error, D2::Error>;
70}
71
72impl<S1, S2> WithError for DualStore<S1, S2>
73where
74    S1: WithError,
75    S2: WithError,
76{
77    type Error = DualStoreError<S1::Error, S2::Error>;
78}
79
80impl<S1, S2> ReadableKeyValueStore for DualStore<S1, S2>
81where
82    S1: ReadableKeyValueStore,
83    S2: ReadableKeyValueStore,
84{
85    // TODO(#2524): consider changing MAX_KEY_SIZE into a function.
86    const MAX_KEY_SIZE: usize = if S1::MAX_KEY_SIZE < S2::MAX_KEY_SIZE {
87        S1::MAX_KEY_SIZE
88    } else {
89        S2::MAX_KEY_SIZE
90    };
91
92    fn max_stream_queries(&self) -> usize {
93        match self {
94            Self::First(store) => store.max_stream_queries(),
95            Self::Second(store) => store.max_stream_queries(),
96        }
97    }
98
99    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
100        let result = match self {
101            Self::First(store) => store
102                .read_value_bytes(key)
103                .await
104                .map_err(DualStoreError::First)?,
105            Self::Second(store) => store
106                .read_value_bytes(key)
107                .await
108                .map_err(DualStoreError::Second)?,
109        };
110        Ok(result)
111    }
112
113    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
114        let result = match self {
115            Self::First(store) => store
116                .contains_key(key)
117                .await
118                .map_err(DualStoreError::First)?,
119            Self::Second(store) => store
120                .contains_key(key)
121                .await
122                .map_err(DualStoreError::Second)?,
123        };
124        Ok(result)
125    }
126
127    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
128        let result = match self {
129            Self::First(store) => store
130                .contains_keys(keys)
131                .await
132                .map_err(DualStoreError::First)?,
133            Self::Second(store) => store
134                .contains_keys(keys)
135                .await
136                .map_err(DualStoreError::Second)?,
137        };
138        Ok(result)
139    }
140
141    async fn read_multi_values_bytes(
142        &self,
143        keys: Vec<Vec<u8>>,
144    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
145        let result = match self {
146            Self::First(store) => store
147                .read_multi_values_bytes(keys)
148                .await
149                .map_err(DualStoreError::First)?,
150            Self::Second(store) => store
151                .read_multi_values_bytes(keys)
152                .await
153                .map_err(DualStoreError::Second)?,
154        };
155        Ok(result)
156    }
157
158    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
159        let result = match self {
160            Self::First(store) => store
161                .find_keys_by_prefix(key_prefix)
162                .await
163                .map_err(DualStoreError::First)?,
164            Self::Second(store) => store
165                .find_keys_by_prefix(key_prefix)
166                .await
167                .map_err(DualStoreError::Second)?,
168        };
169        Ok(result)
170    }
171
172    async fn find_key_values_by_prefix(
173        &self,
174        key_prefix: &[u8],
175    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
176        let result = match self {
177            Self::First(store) => store
178                .find_key_values_by_prefix(key_prefix)
179                .await
180                .map_err(DualStoreError::First)?,
181            Self::Second(store) => store
182                .find_key_values_by_prefix(key_prefix)
183                .await
184                .map_err(DualStoreError::Second)?,
185        };
186        Ok(result)
187    }
188}
189
190impl<S1, S2> WritableKeyValueStore for DualStore<S1, S2>
191where
192    S1: WritableKeyValueStore,
193    S2: WritableKeyValueStore,
194{
195    const MAX_VALUE_SIZE: usize = usize::MAX;
196
197    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
198        match self {
199            Self::First(store) => store
200                .write_batch(batch)
201                .await
202                .map_err(DualStoreError::First)?,
203            Self::Second(store) => store
204                .write_batch(batch)
205                .await
206                .map_err(DualStoreError::Second)?,
207        }
208        Ok(())
209    }
210
211    async fn clear_journal(&self) -> Result<(), Self::Error> {
212        match self {
213            Self::First(store) => store.clear_journal().await.map_err(DualStoreError::First)?,
214            Self::Second(store) => store
215                .clear_journal()
216                .await
217                .map_err(DualStoreError::Second)?,
218        }
219        Ok(())
220    }
221}
222
223impl<D1, D2, A> KeyValueDatabase for DualDatabase<D1, D2, A>
224where
225    D1: KeyValueDatabase,
226    D2: KeyValueDatabase,
227    A: DualStoreRootKeyAssignment,
228{
229    type Config = DualStoreConfig<D1::Config, D2::Config>;
230    type Store = DualStore<D1::Store, D2::Store>;
231
232    fn get_name() -> String {
233        format!("dual {} and {}", D1::get_name(), D2::get_name())
234    }
235
236    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
237        let first_database = D1::connect(&config.first_config, namespace)
238            .await
239            .map_err(DualStoreError::First)?;
240        let second_database = D2::connect(&config.second_config, namespace)
241            .await
242            .map_err(DualStoreError::Second)?;
243        let database = Self {
244            first_database,
245            second_database,
246            _marker: std::marker::PhantomData,
247        };
248        Ok(database)
249    }
250
251    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
252        match A::assigned_store(root_key)? {
253            StoreInUse::First => {
254                let store = self
255                    .first_database
256                    .open_shared(root_key)
257                    .map_err(DualStoreError::First)?;
258                Ok(DualStore::First(store))
259            }
260            StoreInUse::Second => {
261                let store = self
262                    .second_database
263                    .open_shared(root_key)
264                    .map_err(DualStoreError::Second)?;
265                Ok(DualStore::Second(store))
266            }
267        }
268    }
269
270    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
271        match A::assigned_store(root_key)? {
272            StoreInUse::First => {
273                let store = self
274                    .first_database
275                    .open_exclusive(root_key)
276                    .map_err(DualStoreError::First)?;
277                Ok(DualStore::First(store))
278            }
279            StoreInUse::Second => {
280                let store = self
281                    .second_database
282                    .open_exclusive(root_key)
283                    .map_err(DualStoreError::Second)?;
284                Ok(DualStore::Second(store))
285            }
286        }
287    }
288
289    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
290        let namespaces1 = D1::list_all(&config.first_config)
291            .await
292            .map_err(DualStoreError::First)?;
293        let mut namespaces = Vec::new();
294        for namespace in namespaces1 {
295            if D2::exists(&config.second_config, &namespace)
296                .await
297                .map_err(DualStoreError::Second)?
298            {
299                namespaces.push(namespace);
300            } else {
301                tracing::warn!("Namespace {} only exists in the first store", namespace);
302            }
303        }
304        Ok(namespaces)
305    }
306
307    async fn list_root_keys(
308        config: &Self::Config,
309        namespace: &str,
310    ) -> Result<Vec<Vec<u8>>, Self::Error> {
311        let mut root_keys = D1::list_root_keys(&config.first_config, namespace)
312            .await
313            .map_err(DualStoreError::First)?;
314        root_keys.extend(
315            D2::list_root_keys(&config.second_config, namespace)
316                .await
317                .map_err(DualStoreError::Second)?,
318        );
319        Ok(root_keys)
320    }
321
322    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
323        Ok(D1::exists(&config.first_config, namespace)
324            .await
325            .map_err(DualStoreError::First)?
326            && D2::exists(&config.second_config, namespace)
327                .await
328                .map_err(DualStoreError::Second)?)
329    }
330
331    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
332        let exists1 = D1::exists(&config.first_config, namespace)
333            .await
334            .map_err(DualStoreError::First)?;
335        let exists2 = D2::exists(&config.second_config, namespace)
336            .await
337            .map_err(DualStoreError::Second)?;
338        if exists1 && exists2 {
339            return Err(DualStoreError::StoreAlreadyExists);
340        }
341        if !exists1 {
342            D1::create(&config.first_config, namespace)
343                .await
344                .map_err(DualStoreError::First)?;
345        }
346        if !exists2 {
347            D2::create(&config.second_config, namespace)
348                .await
349                .map_err(DualStoreError::Second)?;
350        }
351        Ok(())
352    }
353
354    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
355        D1::delete(&config.first_config, namespace)
356            .await
357            .map_err(DualStoreError::First)?;
358        D2::delete(&config.second_config, namespace)
359            .await
360            .map_err(DualStoreError::Second)?;
361        Ok(())
362    }
363}
364
365#[cfg(with_testing)]
366impl<D1, D2, A> TestKeyValueDatabase for DualDatabase<D1, D2, A>
367where
368    D1: TestKeyValueDatabase,
369    D2: TestKeyValueDatabase,
370    A: DualStoreRootKeyAssignment,
371{
372    async fn new_test_config() -> Result<Self::Config, Self::Error> {
373        let first_config = D1::new_test_config().await.map_err(DualStoreError::First)?;
374        let second_config = D2::new_test_config()
375            .await
376            .map_err(DualStoreError::Second)?;
377        Ok(DualStoreConfig {
378            first_config,
379            second_config,
380        })
381    }
382}
383
384/// The error type for [`DualStore`].
385#[derive(Error, Debug)]
386pub enum DualStoreError<E1, E2> {
387    /// Store already exists during a create operation
388    #[error("Store already exists during a create operation")]
389    StoreAlreadyExists,
390
391    /// Serialization error with BCS.
392    #[error(transparent)]
393    BcsError(#[from] bcs::Error),
394
395    /// First store.
396    #[error("Error in first store: {0}")]
397    First(E1),
398
399    /// Second store.
400    #[error("Error in second store: {0}")]
401    Second(E2),
402}
403
404impl<E1, E2> KeyValueStoreError for DualStoreError<E1, E2>
405where
406    E1: KeyValueStoreError,
407    E2: KeyValueStoreError,
408{
409    const BACKEND: &'static str = "dual_store";
410}