linera_views/backends/
dual.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] by combining two existing stores.
5
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[cfg(with_testing)]
10use crate::store::TestKeyValueStore;
11use crate::{
12    batch::Batch,
13    store::{
14        AdminKeyValueStore, KeyValueStoreError, ReadableKeyValueStore, WithError,
15        WritableKeyValueStore,
16    },
17};
18
19/// The initial configuration of the system.
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct DualStoreConfig<C1, C2> {
22    /// The first config.
23    pub first_config: C1,
24    /// The second config.
25    pub second_config: C2,
26}
27
28/// The store in use.
29#[derive(Clone, Copy, Debug)]
30pub enum StoreInUse {
31    /// The first store.
32    First,
33    /// The second store.
34    Second,
35}
36
37/// The trait for a (static) root key assignment.
38#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
39pub trait DualStoreRootKeyAssignment {
40    /// Obtains the store assigned to this root key.
41    fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error>;
42}
43
44/// A store made of two existing stores.
45#[derive(Clone)]
46pub struct DualStore<S1, S2, A> {
47    /// The first underlying store.
48    first_store: S1,
49    /// The second underlying store.
50    second_store: S2,
51    /// Which store is currently in use given the root key. (The root key in the other store will be set arbitrarily.)
52    store_in_use: StoreInUse,
53    /// Marker for the static root key assignment.
54    _marker: std::marker::PhantomData<A>,
55}
56
57impl<S1, S2, A> WithError for DualStore<S1, S2, A>
58where
59    S1: WithError,
60    S2: WithError,
61{
62    type Error = DualStoreError<S1::Error, S2::Error>;
63}
64
65impl<S1, S2, A> ReadableKeyValueStore for DualStore<S1, S2, A>
66where
67    S1: ReadableKeyValueStore,
68    S2: ReadableKeyValueStore,
69    A: DualStoreRootKeyAssignment,
70{
71    // TODO(#2524): consider changing MAX_KEY_SIZE into a function.
72    const MAX_KEY_SIZE: usize = if S1::MAX_KEY_SIZE < S2::MAX_KEY_SIZE {
73        S1::MAX_KEY_SIZE
74    } else {
75        S2::MAX_KEY_SIZE
76    };
77
78    fn max_stream_queries(&self) -> usize {
79        match self.store_in_use {
80            StoreInUse::First => self.first_store.max_stream_queries(),
81            StoreInUse::Second => self.second_store.max_stream_queries(),
82        }
83    }
84
85    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
86        let result = match self.store_in_use {
87            StoreInUse::First => self
88                .first_store
89                .read_value_bytes(key)
90                .await
91                .map_err(DualStoreError::First)?,
92            StoreInUse::Second => self
93                .second_store
94                .read_value_bytes(key)
95                .await
96                .map_err(DualStoreError::Second)?,
97        };
98        Ok(result)
99    }
100
101    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
102        let result = match self.store_in_use {
103            StoreInUse::First => self
104                .first_store
105                .contains_key(key)
106                .await
107                .map_err(DualStoreError::First)?,
108            StoreInUse::Second => self
109                .second_store
110                .contains_key(key)
111                .await
112                .map_err(DualStoreError::Second)?,
113        };
114        Ok(result)
115    }
116
117    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
118        let result = match self.store_in_use {
119            StoreInUse::First => self
120                .first_store
121                .contains_keys(keys)
122                .await
123                .map_err(DualStoreError::First)?,
124            StoreInUse::Second => self
125                .second_store
126                .contains_keys(keys)
127                .await
128                .map_err(DualStoreError::Second)?,
129        };
130        Ok(result)
131    }
132
133    async fn read_multi_values_bytes(
134        &self,
135        keys: Vec<Vec<u8>>,
136    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
137        let result = match self.store_in_use {
138            StoreInUse::First => self
139                .first_store
140                .read_multi_values_bytes(keys)
141                .await
142                .map_err(DualStoreError::First)?,
143            StoreInUse::Second => self
144                .second_store
145                .read_multi_values_bytes(keys)
146                .await
147                .map_err(DualStoreError::Second)?,
148        };
149        Ok(result)
150    }
151
152    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
153        let result = match self.store_in_use {
154            StoreInUse::First => self
155                .first_store
156                .find_keys_by_prefix(key_prefix)
157                .await
158                .map_err(DualStoreError::First)?,
159            StoreInUse::Second => self
160                .second_store
161                .find_keys_by_prefix(key_prefix)
162                .await
163                .map_err(DualStoreError::Second)?,
164        };
165        Ok(result)
166    }
167
168    async fn find_key_values_by_prefix(
169        &self,
170        key_prefix: &[u8],
171    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
172        let result = match self.store_in_use {
173            StoreInUse::First => self
174                .first_store
175                .find_key_values_by_prefix(key_prefix)
176                .await
177                .map_err(DualStoreError::First)?,
178            StoreInUse::Second => self
179                .second_store
180                .find_key_values_by_prefix(key_prefix)
181                .await
182                .map_err(DualStoreError::Second)?,
183        };
184        Ok(result)
185    }
186}
187
188impl<S1, S2, A> WritableKeyValueStore for DualStore<S1, S2, A>
189where
190    S1: WritableKeyValueStore,
191    S2: WritableKeyValueStore,
192    A: DualStoreRootKeyAssignment,
193{
194    const MAX_VALUE_SIZE: usize = usize::MAX;
195
196    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
197        match self.store_in_use {
198            StoreInUse::First => self
199                .first_store
200                .write_batch(batch)
201                .await
202                .map_err(DualStoreError::First)?,
203            StoreInUse::Second => self
204                .second_store
205                .write_batch(batch)
206                .await
207                .map_err(DualStoreError::Second)?,
208        }
209        Ok(())
210    }
211
212    async fn clear_journal(&self) -> Result<(), Self::Error> {
213        match self.store_in_use {
214            StoreInUse::First => self
215                .first_store
216                .clear_journal()
217                .await
218                .map_err(DualStoreError::First)?,
219            StoreInUse::Second => self
220                .second_store
221                .clear_journal()
222                .await
223                .map_err(DualStoreError::Second)?,
224        }
225        Ok(())
226    }
227}
228
229impl<S1, S2, A> AdminKeyValueStore for DualStore<S1, S2, A>
230where
231    S1: AdminKeyValueStore,
232    S2: AdminKeyValueStore,
233    A: DualStoreRootKeyAssignment,
234{
235    type Config = DualStoreConfig<S1::Config, S2::Config>;
236
237    fn get_name() -> String {
238        format!("dual {} and {}", S1::get_name(), S2::get_name())
239    }
240
241    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
242        let first_store = S1::connect(&config.first_config, namespace)
243            .await
244            .map_err(DualStoreError::First)?;
245        let second_store = S2::connect(&config.second_config, namespace)
246            .await
247            .map_err(DualStoreError::Second)?;
248        let store_in_use = A::assigned_store(&[])?;
249        Ok(Self {
250            first_store,
251            second_store,
252            store_in_use,
253            _marker: std::marker::PhantomData,
254        })
255    }
256
257    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, Self::Error> {
258        let first_store = self
259            .first_store
260            .open_exclusive(root_key)
261            .map_err(DualStoreError::First)?;
262        let second_store = self
263            .second_store
264            .open_exclusive(root_key)
265            .map_err(DualStoreError::Second)?;
266        let store_in_use = A::assigned_store(root_key)?;
267        Ok(Self {
268            first_store,
269            second_store,
270            store_in_use,
271            _marker: std::marker::PhantomData,
272        })
273    }
274
275    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
276        let namespaces1 = S1::list_all(&config.first_config)
277            .await
278            .map_err(DualStoreError::First)?;
279        let mut namespaces = Vec::new();
280        for namespace in namespaces1 {
281            if S2::exists(&config.second_config, &namespace)
282                .await
283                .map_err(DualStoreError::Second)?
284            {
285                namespaces.push(namespace);
286            } else {
287                tracing::warn!("Namespace {} only exists in the first store", namespace);
288            }
289        }
290        Ok(namespaces)
291    }
292
293    async fn list_root_keys(
294        config: &Self::Config,
295        namespace: &str,
296    ) -> Result<Vec<Vec<u8>>, Self::Error> {
297        let mut root_keys = S1::list_root_keys(&config.first_config, namespace)
298            .await
299            .map_err(DualStoreError::First)?;
300        root_keys.extend(
301            S2::list_root_keys(&config.second_config, namespace)
302                .await
303                .map_err(DualStoreError::Second)?,
304        );
305        Ok(root_keys)
306    }
307
308    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
309        Ok(S1::exists(&config.first_config, namespace)
310            .await
311            .map_err(DualStoreError::First)?
312            && S2::exists(&config.second_config, namespace)
313                .await
314                .map_err(DualStoreError::Second)?)
315    }
316
317    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
318        let exists1 = S1::exists(&config.first_config, namespace)
319            .await
320            .map_err(DualStoreError::First)?;
321        let exists2 = S2::exists(&config.second_config, namespace)
322            .await
323            .map_err(DualStoreError::Second)?;
324        if exists1 && exists2 {
325            return Err(DualStoreError::StoreAlreadyExists);
326        }
327        if !exists1 {
328            S1::create(&config.first_config, namespace)
329                .await
330                .map_err(DualStoreError::First)?;
331        }
332        if !exists2 {
333            S2::create(&config.second_config, namespace)
334                .await
335                .map_err(DualStoreError::Second)?;
336        }
337        Ok(())
338    }
339
340    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
341        S1::delete(&config.first_config, namespace)
342            .await
343            .map_err(DualStoreError::First)?;
344        S2::delete(&config.second_config, namespace)
345            .await
346            .map_err(DualStoreError::Second)?;
347        Ok(())
348    }
349}
350
351#[cfg(with_testing)]
352impl<S1, S2, A> TestKeyValueStore for DualStore<S1, S2, A>
353where
354    S1: TestKeyValueStore,
355    S2: TestKeyValueStore,
356    A: DualStoreRootKeyAssignment,
357{
358    async fn new_test_config() -> Result<Self::Config, Self::Error> {
359        let first_config = S1::new_test_config().await.map_err(DualStoreError::First)?;
360        let second_config = S2::new_test_config()
361            .await
362            .map_err(DualStoreError::Second)?;
363        Ok(DualStoreConfig {
364            first_config,
365            second_config,
366        })
367    }
368}
369
370/// The error type for [`DualStore`].
371#[derive(Error, Debug)]
372pub enum DualStoreError<E1, E2> {
373    /// Store already exists during a create operation
374    #[error("Store already exists during a create operation")]
375    StoreAlreadyExists,
376
377    /// Serialization error with BCS.
378    #[error(transparent)]
379    BcsError(#[from] bcs::Error),
380
381    /// First store.
382    #[error("Error in first store: {0}")]
383    First(E1),
384
385    /// Second store.
386    #[error("Error in second store: {0}")]
387    Second(E2),
388}
389
390impl<E1, E2> KeyValueStoreError for DualStoreError<E1, E2>
391where
392    E1: KeyValueStoreError,
393    E2: KeyValueStoreError,
394{
395    const BACKEND: &'static str = "dual_store";
396}