linera_views/backends/
dual.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] by combining two existing stores.
5
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[cfg(with_testing)]
10use crate::store::TestKeyValueStore;
11use crate::{
12    batch::Batch,
13    store::{
14        AdminKeyValueStore, KeyIterable, KeyValueIterable, KeyValueStoreError,
15        ReadableKeyValueStore, WithError, WritableKeyValueStore,
16    },
17};
18
19/// The initial configuration of the system.
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct DualStoreConfig<C1, C2> {
22    /// The first config.
23    pub first_config: C1,
24    /// The second config.
25    pub second_config: C2,
26}
27
28/// The store in use.
29#[derive(Clone, Copy, Debug)]
30pub enum StoreInUse {
31    /// The first store.
32    First,
33    /// The second store.
34    Second,
35}
36
37/// The trait for a (static) root key assignment.
38#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
39pub trait DualStoreRootKeyAssignment {
40    /// Obtains the store assigned to this root key.
41    fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error>;
42}
43
44/// A store made of two existing stores.
45#[derive(Clone)]
46pub struct DualStore<S1, S2, A> {
47    /// The first underlying store.
48    first_store: S1,
49    /// The second underlying store.
50    second_store: S2,
51    /// Which store is currently in use given the root key. (The root key in the other store will be set arbitrarily.)
52    store_in_use: StoreInUse,
53    /// Marker for the static root key assignment.
54    _marker: std::marker::PhantomData<A>,
55}
56
57impl<S1, S2, A> WithError for DualStore<S1, S2, A>
58where
59    S1: WithError,
60    S2: WithError,
61{
62    type Error = DualStoreError<S1::Error, S2::Error>;
63}
64
65impl<S1, S2, A> ReadableKeyValueStore for DualStore<S1, S2, A>
66where
67    S1: ReadableKeyValueStore,
68    S2: ReadableKeyValueStore,
69    A: DualStoreRootKeyAssignment,
70{
71    // TODO(#2524): consider changing MAX_KEY_SIZE into a function.
72    const MAX_KEY_SIZE: usize = if S1::MAX_KEY_SIZE < S2::MAX_KEY_SIZE {
73        S1::MAX_KEY_SIZE
74    } else {
75        S2::MAX_KEY_SIZE
76    };
77
78    type Keys = DualStoreKeys<S1::Keys, S2::Keys>;
79    type KeyValues = DualStoreKeyValues<S1::KeyValues, S2::KeyValues>;
80
81    fn max_stream_queries(&self) -> usize {
82        match self.store_in_use {
83            StoreInUse::First => self.first_store.max_stream_queries(),
84            StoreInUse::Second => self.second_store.max_stream_queries(),
85        }
86    }
87
88    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
89        let result = match self.store_in_use {
90            StoreInUse::First => self
91                .first_store
92                .read_value_bytes(key)
93                .await
94                .map_err(DualStoreError::First)?,
95            StoreInUse::Second => self
96                .second_store
97                .read_value_bytes(key)
98                .await
99                .map_err(DualStoreError::Second)?,
100        };
101        Ok(result)
102    }
103
104    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
105        let result = match self.store_in_use {
106            StoreInUse::First => self
107                .first_store
108                .contains_key(key)
109                .await
110                .map_err(DualStoreError::First)?,
111            StoreInUse::Second => self
112                .second_store
113                .contains_key(key)
114                .await
115                .map_err(DualStoreError::Second)?,
116        };
117        Ok(result)
118    }
119
120    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
121        let result = match self.store_in_use {
122            StoreInUse::First => self
123                .first_store
124                .contains_keys(keys)
125                .await
126                .map_err(DualStoreError::First)?,
127            StoreInUse::Second => self
128                .second_store
129                .contains_keys(keys)
130                .await
131                .map_err(DualStoreError::Second)?,
132        };
133        Ok(result)
134    }
135
136    async fn read_multi_values_bytes(
137        &self,
138        keys: Vec<Vec<u8>>,
139    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
140        let result = match self.store_in_use {
141            StoreInUse::First => self
142                .first_store
143                .read_multi_values_bytes(keys)
144                .await
145                .map_err(DualStoreError::First)?,
146            StoreInUse::Second => self
147                .second_store
148                .read_multi_values_bytes(keys)
149                .await
150                .map_err(DualStoreError::Second)?,
151        };
152        Ok(result)
153    }
154
155    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Self::Keys, Self::Error> {
156        let result = match self.store_in_use {
157            StoreInUse::First => DualStoreKeys::First(
158                self.first_store
159                    .find_keys_by_prefix(key_prefix)
160                    .await
161                    .map_err(DualStoreError::First)?,
162            ),
163            StoreInUse::Second => DualStoreKeys::Second(
164                self.second_store
165                    .find_keys_by_prefix(key_prefix)
166                    .await
167                    .map_err(DualStoreError::Second)?,
168            ),
169        };
170        Ok(result)
171    }
172
173    async fn find_key_values_by_prefix(
174        &self,
175        key_prefix: &[u8],
176    ) -> Result<Self::KeyValues, Self::Error> {
177        let result = match self.store_in_use {
178            StoreInUse::First => DualStoreKeyValues::First(
179                self.first_store
180                    .find_key_values_by_prefix(key_prefix)
181                    .await
182                    .map_err(DualStoreError::First)?,
183            ),
184            StoreInUse::Second => DualStoreKeyValues::Second(
185                self.second_store
186                    .find_key_values_by_prefix(key_prefix)
187                    .await
188                    .map_err(DualStoreError::Second)?,
189            ),
190        };
191        Ok(result)
192    }
193}
194
195impl<S1, S2, A> WritableKeyValueStore for DualStore<S1, S2, A>
196where
197    S1: WritableKeyValueStore,
198    S2: WritableKeyValueStore,
199    A: DualStoreRootKeyAssignment,
200{
201    const MAX_VALUE_SIZE: usize = usize::MAX;
202
203    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
204        match self.store_in_use {
205            StoreInUse::First => self
206                .first_store
207                .write_batch(batch)
208                .await
209                .map_err(DualStoreError::First)?,
210            StoreInUse::Second => self
211                .second_store
212                .write_batch(batch)
213                .await
214                .map_err(DualStoreError::Second)?,
215        }
216        Ok(())
217    }
218
219    async fn clear_journal(&self) -> Result<(), Self::Error> {
220        match self.store_in_use {
221            StoreInUse::First => self
222                .first_store
223                .clear_journal()
224                .await
225                .map_err(DualStoreError::First)?,
226            StoreInUse::Second => self
227                .second_store
228                .clear_journal()
229                .await
230                .map_err(DualStoreError::Second)?,
231        }
232        Ok(())
233    }
234}
235
236impl<S1, S2, A> AdminKeyValueStore for DualStore<S1, S2, A>
237where
238    S1: AdminKeyValueStore,
239    S2: AdminKeyValueStore,
240    A: DualStoreRootKeyAssignment,
241{
242    type Config = DualStoreConfig<S1::Config, S2::Config>;
243
244    fn get_name() -> String {
245        format!("dual {} and {}", S1::get_name(), S2::get_name())
246    }
247
248    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
249        let first_store = S1::connect(&config.first_config, namespace)
250            .await
251            .map_err(DualStoreError::First)?;
252        let second_store = S2::connect(&config.second_config, namespace)
253            .await
254            .map_err(DualStoreError::Second)?;
255        let store_in_use = A::assigned_store(&[])?;
256        Ok(Self {
257            first_store,
258            second_store,
259            store_in_use,
260            _marker: std::marker::PhantomData,
261        })
262    }
263
264    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, Self::Error> {
265        let first_store = self
266            .first_store
267            .open_exclusive(root_key)
268            .map_err(DualStoreError::First)?;
269        let second_store = self
270            .second_store
271            .open_exclusive(root_key)
272            .map_err(DualStoreError::Second)?;
273        let store_in_use = A::assigned_store(root_key)?;
274        Ok(Self {
275            first_store,
276            second_store,
277            store_in_use,
278            _marker: std::marker::PhantomData,
279        })
280    }
281
282    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
283        let namespaces1 = S1::list_all(&config.first_config)
284            .await
285            .map_err(DualStoreError::First)?;
286        let mut namespaces = Vec::new();
287        for namespace in namespaces1 {
288            if S2::exists(&config.second_config, &namespace)
289                .await
290                .map_err(DualStoreError::Second)?
291            {
292                namespaces.push(namespace);
293            } else {
294                tracing::warn!("Namespace {} only exists in the first store", namespace);
295            }
296        }
297        Ok(namespaces)
298    }
299
300    async fn list_root_keys(
301        config: &Self::Config,
302        namespace: &str,
303    ) -> Result<Vec<Vec<u8>>, Self::Error> {
304        let mut root_keys = S1::list_root_keys(&config.first_config, namespace)
305            .await
306            .map_err(DualStoreError::First)?;
307        root_keys.extend(
308            S2::list_root_keys(&config.second_config, namespace)
309                .await
310                .map_err(DualStoreError::Second)?,
311        );
312        Ok(root_keys)
313    }
314
315    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
316        Ok(S1::exists(&config.first_config, namespace)
317            .await
318            .map_err(DualStoreError::First)?
319            && S2::exists(&config.second_config, namespace)
320                .await
321                .map_err(DualStoreError::Second)?)
322    }
323
324    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
325        let exists1 = S1::exists(&config.first_config, namespace)
326            .await
327            .map_err(DualStoreError::First)?;
328        let exists2 = S2::exists(&config.second_config, namespace)
329            .await
330            .map_err(DualStoreError::Second)?;
331        if exists1 && exists2 {
332            return Err(DualStoreError::StoreAlreadyExists);
333        }
334        if !exists1 {
335            S1::create(&config.first_config, namespace)
336                .await
337                .map_err(DualStoreError::First)?;
338        }
339        if !exists2 {
340            S2::create(&config.second_config, namespace)
341                .await
342                .map_err(DualStoreError::Second)?;
343        }
344        Ok(())
345    }
346
347    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
348        S1::delete(&config.first_config, namespace)
349            .await
350            .map_err(DualStoreError::First)?;
351        S2::delete(&config.second_config, namespace)
352            .await
353            .map_err(DualStoreError::Second)?;
354        Ok(())
355    }
356}
357
358#[cfg(with_testing)]
359impl<S1, S2, A> TestKeyValueStore for DualStore<S1, S2, A>
360where
361    S1: TestKeyValueStore,
362    S2: TestKeyValueStore,
363    A: DualStoreRootKeyAssignment,
364{
365    async fn new_test_config() -> Result<Self::Config, Self::Error> {
366        let first_config = S1::new_test_config().await.map_err(DualStoreError::First)?;
367        let second_config = S2::new_test_config()
368            .await
369            .map_err(DualStoreError::Second)?;
370        Ok(DualStoreConfig {
371            first_config,
372            second_config,
373        })
374    }
375}
376
377/// The error type for [`DualStore`].
378#[derive(Error, Debug)]
379pub enum DualStoreError<E1, E2> {
380    /// Store already exists during a create operation
381    #[error("Store already exists during a create operation")]
382    StoreAlreadyExists,
383
384    /// Serialization error with BCS.
385    #[error(transparent)]
386    BcsError(#[from] bcs::Error),
387
388    /// First store.
389    #[error("Error in first store: {0}")]
390    First(E1),
391
392    /// Second store.
393    #[error("Error in second store: {0}")]
394    Second(E2),
395}
396
397impl<E1, E2> KeyValueStoreError for DualStoreError<E1, E2>
398where
399    E1: KeyValueStoreError,
400    E2: KeyValueStoreError,
401{
402    const BACKEND: &'static str = "dual_store";
403}
404
405/// A set of keys returned by [`DualStore::find_keys_by_prefix`].
406pub enum DualStoreKeys<K1, K2> {
407    /// A set of keys from the first store.
408    First(K1),
409    /// A set of Keys from the second store.
410    Second(K2),
411}
412
413/// An iterator over the keys in [`DualStoreKeys`].
414pub enum DualStoreKeyIterator<I1, I2> {
415    /// Iterating over keys from the first store.
416    First(I1),
417    /// Iterating over keys from the second store.
418    Second(I2),
419}
420
421/// A set of key-values returned by [`DualStore::find_key_values_by_prefix`].
422pub enum DualStoreKeyValues<K1, K2> {
423    /// A set of key-values from the first store.
424    First(K1),
425    /// A set of key-values from the second store.
426    Second(K2),
427}
428
429/// An iterator over the key-values in [`DualStoreKeyValues`].
430pub enum DualStoreKeyValueIterator<I1, I2> {
431    /// Iterating over key-values from the first store.
432    First(I1),
433    /// Iterating over key-values from the second store.
434    Second(I2),
435}
436
437/// An owning iterator over the key-values in [`DualStoreKeyValues`].
438pub enum DualStoreKeyValueIteratorOwned<I1, I2> {
439    /// Iterating over key-values from the first store.
440    First(I1),
441    /// Iterating over key-values from the second store.
442    Second(I2),
443}
444
445impl<E1, E2, K1, K2> KeyIterable<DualStoreError<E1, E2>> for DualStoreKeys<K1, K2>
446where
447    K1: KeyIterable<E1>,
448    K2: KeyIterable<E2>,
449{
450    type Iterator<'a>
451        = DualStoreKeyIterator<K1::Iterator<'a>, K2::Iterator<'a>>
452    where
453        K1: 'a,
454        K2: 'a;
455
456    fn iterator(&self) -> Self::Iterator<'_> {
457        match self {
458            Self::First(keys) => DualStoreKeyIterator::First(keys.iterator()),
459            Self::Second(keys) => DualStoreKeyIterator::Second(keys.iterator()),
460        }
461    }
462}
463
464impl<'a, I1, I2, E1, E2> Iterator for DualStoreKeyIterator<I1, I2>
465where
466    I1: Iterator<Item = Result<&'a [u8], E1>>,
467    I2: Iterator<Item = Result<&'a [u8], E2>>,
468{
469    type Item = Result<&'a [u8], DualStoreError<E1, E2>>;
470
471    fn next(&mut self) -> Option<Self::Item> {
472        match self {
473            Self::First(iter) => iter
474                .next()
475                .map(|result| result.map_err(DualStoreError::First)),
476            Self::Second(iter) => iter
477                .next()
478                .map(|result| result.map_err(DualStoreError::Second)),
479        }
480    }
481}
482
483impl<E1, E2, K1, K2> KeyValueIterable<DualStoreError<E1, E2>> for DualStoreKeyValues<K1, K2>
484where
485    K1: KeyValueIterable<E1>,
486    K2: KeyValueIterable<E2>,
487{
488    type Iterator<'a>
489        = DualStoreKeyValueIterator<K1::Iterator<'a>, K2::Iterator<'a>>
490    where
491        K1: 'a,
492        K2: 'a;
493    type IteratorOwned = DualStoreKeyValueIteratorOwned<K1::IteratorOwned, K2::IteratorOwned>;
494
495    fn iterator(&self) -> Self::Iterator<'_> {
496        match self {
497            Self::First(keys) => DualStoreKeyValueIterator::First(keys.iterator()),
498            Self::Second(keys) => DualStoreKeyValueIterator::Second(keys.iterator()),
499        }
500    }
501
502    fn into_iterator_owned(self) -> Self::IteratorOwned {
503        match self {
504            Self::First(keys) => DualStoreKeyValueIteratorOwned::First(keys.into_iterator_owned()),
505            Self::Second(keys) => {
506                DualStoreKeyValueIteratorOwned::Second(keys.into_iterator_owned())
507            }
508        }
509    }
510}
511
512impl<'a, I1, I2, E1, E2> Iterator for DualStoreKeyValueIterator<I1, I2>
513where
514    I1: Iterator<Item = Result<(&'a [u8], &'a [u8]), E1>>,
515    I2: Iterator<Item = Result<(&'a [u8], &'a [u8]), E2>>,
516{
517    type Item = Result<(&'a [u8], &'a [u8]), DualStoreError<E1, E2>>;
518
519    fn next(&mut self) -> Option<Self::Item> {
520        match self {
521            Self::First(iter) => iter
522                .next()
523                .map(|result| result.map_err(DualStoreError::First)),
524            Self::Second(iter) => iter
525                .next()
526                .map(|result| result.map_err(DualStoreError::Second)),
527        }
528    }
529}
530
531impl<I1, I2, E1, E2> Iterator for DualStoreKeyValueIteratorOwned<I1, I2>
532where
533    I1: Iterator<Item = Result<(Vec<u8>, Vec<u8>), E1>>,
534    I2: Iterator<Item = Result<(Vec<u8>, Vec<u8>), E2>>,
535{
536    type Item = Result<(Vec<u8>, Vec<u8>), DualStoreError<E1, E2>>;
537
538    fn next(&mut self) -> Option<Self::Item> {
539        match self {
540            Self::First(iter) => iter
541                .next()
542                .map(|result| result.map_err(DualStoreError::First)),
543            Self::Second(iter) => iter
544                .next()
545                .map(|result| result.map_err(DualStoreError::Second)),
546        }
547    }
548}