linera_views/backends/
dual.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] by combining two existing stores.
5
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[cfg(with_testing)]
10use crate::store::TestKeyValueDatabase;
11use crate::{
12    batch::Batch,
13    store::{
14        KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
15        WritableKeyValueStore,
16    },
17};
18
19/// A dual database.
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct DualDatabase<D1, D2, A> {
22    /// The first database.
23    pub first_database: D1,
24    /// The second database.
25    pub second_database: D2,
26    /// Marker for the static root key assignment.
27    _marker: std::marker::PhantomData<A>,
28}
29
30/// The initial configuration of the system.
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct DualStoreConfig<C1, C2> {
33    /// The first config.
34    pub first_config: C1,
35    /// The second config.
36    pub second_config: C2,
37}
38
39/// The store in use.
40#[derive(Clone, Copy, Debug)]
41pub enum StoreInUse {
42    /// The first store.
43    First,
44    /// The second store.
45    Second,
46}
47
48/// The trait for a (static) root key assignment.
49pub trait DualStoreRootKeyAssignment {
50    /// Obtains the store assigned to this root key.
51    fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error>;
52}
53
54/// A partition opened in one of the two databases.
55#[derive(Clone)]
56pub enum DualStore<S1, S2> {
57    /// The first store.
58    First(S1),
59    /// The second store.
60    Second(S2),
61}
62
63impl<D1, D2, A> WithError for DualDatabase<D1, D2, A>
64where
65    D1: WithError,
66    D2: WithError,
67{
68    type Error = DualStoreError<D1::Error, D2::Error>;
69}
70
71impl<S1, S2> WithError for DualStore<S1, S2>
72where
73    S1: WithError,
74    S2: WithError,
75{
76    type Error = DualStoreError<S1::Error, S2::Error>;
77}
78
79impl<S1, S2> ReadableKeyValueStore for DualStore<S1, S2>
80where
81    S1: ReadableKeyValueStore,
82    S2: ReadableKeyValueStore,
83{
84    // TODO(#2524): consider changing MAX_KEY_SIZE into a function.
85    const MAX_KEY_SIZE: usize = if S1::MAX_KEY_SIZE < S2::MAX_KEY_SIZE {
86        S1::MAX_KEY_SIZE
87    } else {
88        S2::MAX_KEY_SIZE
89    };
90
91    fn max_stream_queries(&self) -> usize {
92        match self {
93            Self::First(store) => store.max_stream_queries(),
94            Self::Second(store) => store.max_stream_queries(),
95        }
96    }
97
98    fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
99        Ok(match self {
100            Self::First(store) => store.root_key().map_err(DualStoreError::First)?,
101            Self::Second(store) => store.root_key().map_err(DualStoreError::Second)?,
102        })
103    }
104
105    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
106        let result = match self {
107            Self::First(store) => store
108                .read_value_bytes(key)
109                .await
110                .map_err(DualStoreError::First)?,
111            Self::Second(store) => store
112                .read_value_bytes(key)
113                .await
114                .map_err(DualStoreError::Second)?,
115        };
116        Ok(result)
117    }
118
119    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
120        let result = match self {
121            Self::First(store) => store
122                .contains_key(key)
123                .await
124                .map_err(DualStoreError::First)?,
125            Self::Second(store) => store
126                .contains_key(key)
127                .await
128                .map_err(DualStoreError::Second)?,
129        };
130        Ok(result)
131    }
132
133    async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error> {
134        let result = match self {
135            Self::First(store) => store
136                .contains_keys(keys)
137                .await
138                .map_err(DualStoreError::First)?,
139            Self::Second(store) => store
140                .contains_keys(keys)
141                .await
142                .map_err(DualStoreError::Second)?,
143        };
144        Ok(result)
145    }
146
147    async fn read_multi_values_bytes(
148        &self,
149        keys: &[Vec<u8>],
150    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
151        let result = match self {
152            Self::First(store) => store
153                .read_multi_values_bytes(keys)
154                .await
155                .map_err(DualStoreError::First)?,
156            Self::Second(store) => store
157                .read_multi_values_bytes(keys)
158                .await
159                .map_err(DualStoreError::Second)?,
160        };
161        Ok(result)
162    }
163
164    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
165        let result = match self {
166            Self::First(store) => store
167                .find_keys_by_prefix(key_prefix)
168                .await
169                .map_err(DualStoreError::First)?,
170            Self::Second(store) => store
171                .find_keys_by_prefix(key_prefix)
172                .await
173                .map_err(DualStoreError::Second)?,
174        };
175        Ok(result)
176    }
177
178    async fn find_key_values_by_prefix(
179        &self,
180        key_prefix: &[u8],
181    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
182        let result = match self {
183            Self::First(store) => store
184                .find_key_values_by_prefix(key_prefix)
185                .await
186                .map_err(DualStoreError::First)?,
187            Self::Second(store) => store
188                .find_key_values_by_prefix(key_prefix)
189                .await
190                .map_err(DualStoreError::Second)?,
191        };
192        Ok(result)
193    }
194}
195
196impl<S1, S2> WritableKeyValueStore for DualStore<S1, S2>
197where
198    S1: WritableKeyValueStore,
199    S2: WritableKeyValueStore,
200{
201    const MAX_VALUE_SIZE: usize = usize::MAX;
202
203    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
204        match self {
205            Self::First(store) => store
206                .write_batch(batch)
207                .await
208                .map_err(DualStoreError::First)?,
209            Self::Second(store) => store
210                .write_batch(batch)
211                .await
212                .map_err(DualStoreError::Second)?,
213        }
214        Ok(())
215    }
216
217    async fn clear_journal(&self) -> Result<(), Self::Error> {
218        match self {
219            Self::First(store) => store.clear_journal().await.map_err(DualStoreError::First)?,
220            Self::Second(store) => store
221                .clear_journal()
222                .await
223                .map_err(DualStoreError::Second)?,
224        }
225        Ok(())
226    }
227}
228
229impl<D1, D2, A> KeyValueDatabase for DualDatabase<D1, D2, A>
230where
231    D1: KeyValueDatabase,
232    D2: KeyValueDatabase,
233    A: DualStoreRootKeyAssignment + linera_base::util::traits::AutoTraits,
234{
235    type Config = DualStoreConfig<D1::Config, D2::Config>;
236    type Store = DualStore<D1::Store, D2::Store>;
237
238    fn get_name() -> String {
239        format!("dual {} and {}", D1::get_name(), D2::get_name())
240    }
241
242    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
243        let first_database = D1::connect(&config.first_config, namespace)
244            .await
245            .map_err(DualStoreError::First)?;
246        let second_database = D2::connect(&config.second_config, namespace)
247            .await
248            .map_err(DualStoreError::Second)?;
249        let database = Self {
250            first_database,
251            second_database,
252            _marker: std::marker::PhantomData,
253        };
254        Ok(database)
255    }
256
257    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
258        match A::assigned_store(root_key)? {
259            StoreInUse::First => {
260                let store = self
261                    .first_database
262                    .open_shared(root_key)
263                    .map_err(DualStoreError::First)?;
264                Ok(DualStore::First(store))
265            }
266            StoreInUse::Second => {
267                let store = self
268                    .second_database
269                    .open_shared(root_key)
270                    .map_err(DualStoreError::Second)?;
271                Ok(DualStore::Second(store))
272            }
273        }
274    }
275
276    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error> {
277        match A::assigned_store(root_key)? {
278            StoreInUse::First => {
279                let store = self
280                    .first_database
281                    .open_exclusive(root_key)
282                    .map_err(DualStoreError::First)?;
283                Ok(DualStore::First(store))
284            }
285            StoreInUse::Second => {
286                let store = self
287                    .second_database
288                    .open_exclusive(root_key)
289                    .map_err(DualStoreError::Second)?;
290                Ok(DualStore::Second(store))
291            }
292        }
293    }
294
295    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
296        let namespaces1 = D1::list_all(&config.first_config)
297            .await
298            .map_err(DualStoreError::First)?;
299        let mut namespaces = Vec::new();
300        for namespace in namespaces1 {
301            if D2::exists(&config.second_config, &namespace)
302                .await
303                .map_err(DualStoreError::Second)?
304            {
305                namespaces.push(namespace);
306            } else {
307                tracing::warn!("Namespace {} only exists in the first store", namespace);
308            }
309        }
310        Ok(namespaces)
311    }
312
313    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
314        let mut root_keys = self
315            .first_database
316            .list_root_keys()
317            .await
318            .map_err(DualStoreError::First)?;
319        root_keys.extend(
320            self.second_database
321                .list_root_keys()
322                .await
323                .map_err(DualStoreError::Second)?,
324        );
325        Ok(root_keys)
326    }
327
328    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
329        Ok(D1::exists(&config.first_config, namespace)
330            .await
331            .map_err(DualStoreError::First)?
332            && D2::exists(&config.second_config, namespace)
333                .await
334                .map_err(DualStoreError::Second)?)
335    }
336
337    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
338        let exists1 = D1::exists(&config.first_config, namespace)
339            .await
340            .map_err(DualStoreError::First)?;
341        let exists2 = D2::exists(&config.second_config, namespace)
342            .await
343            .map_err(DualStoreError::Second)?;
344        if exists1 && exists2 {
345            return Err(DualStoreError::StoreAlreadyExists);
346        }
347        if !exists1 {
348            D1::create(&config.first_config, namespace)
349                .await
350                .map_err(DualStoreError::First)?;
351        }
352        if !exists2 {
353            D2::create(&config.second_config, namespace)
354                .await
355                .map_err(DualStoreError::Second)?;
356        }
357        Ok(())
358    }
359
360    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
361        D1::delete(&config.first_config, namespace)
362            .await
363            .map_err(DualStoreError::First)?;
364        D2::delete(&config.second_config, namespace)
365            .await
366            .map_err(DualStoreError::Second)?;
367        Ok(())
368    }
369}
370
371#[cfg(with_testing)]
372impl<D1, D2, A> TestKeyValueDatabase for DualDatabase<D1, D2, A>
373where
374    D1: TestKeyValueDatabase,
375    D2: TestKeyValueDatabase,
376    A: DualStoreRootKeyAssignment + linera_base::util::traits::AutoTraits,
377{
378    async fn new_test_config() -> Result<Self::Config, Self::Error> {
379        let first_config = D1::new_test_config().await.map_err(DualStoreError::First)?;
380        let second_config = D2::new_test_config()
381            .await
382            .map_err(DualStoreError::Second)?;
383        Ok(DualStoreConfig {
384            first_config,
385            second_config,
386        })
387    }
388}
389
390/// The error type for [`DualStore`].
391#[derive(Error, Debug)]
392pub enum DualStoreError<E1, E2> {
393    /// Store already exists during a create operation
394    #[error("Store already exists during a create operation")]
395    StoreAlreadyExists,
396
397    /// Serialization error with BCS.
398    #[error(transparent)]
399    BcsError(#[from] bcs::Error),
400
401    /// First store.
402    #[error("Error in first store: {0}")]
403    First(E1),
404
405    /// Second store.
406    #[error("Error in second store: {0}")]
407    Second(E2),
408}
409
410impl<E1, E2> KeyValueStoreError for DualStoreError<E1, E2>
411where
412    E1: KeyValueStoreError,
413    E2: KeyValueStoreError,
414{
415    const BACKEND: &'static str = "dual_store";
416}