linera_views/
store.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This provides the trait definitions for the stores.
5
6use std::{fmt::Debug, future::Future};
7
8use serde::{de::DeserializeOwned, Serialize};
9
10#[cfg(with_testing)]
11use crate::random::generate_test_namespace;
12use crate::{
13    batch::{Batch, SimplifiedBatch},
14    common::from_bytes_option,
15    ViewError,
16};
17
18/// The error type for the key-value stores.
19pub trait KeyValueStoreError:
20    std::error::Error + From<bcs::Error> + Debug + Send + Sync + 'static
21{
22    /// The name of the backend.
23    const BACKEND: &'static str;
24}
25
26impl<E: KeyValueStoreError> From<E> for ViewError {
27    fn from(error: E) -> Self {
28        Self::StoreError {
29            backend: E::BACKEND,
30            error: Box::new(error),
31        }
32    }
33}
34
35/// Define an associated [`KeyValueStoreError`].
36pub trait WithError {
37    /// The error type.
38    type Error: KeyValueStoreError;
39}
40
41/// Asynchronous read key-value operations.
42#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
43pub trait ReadableKeyValueStore: WithError {
44    /// The maximal size of keys that can be stored.
45    const MAX_KEY_SIZE: usize;
46
47    /// Retrieve the number of stream queries.
48    fn max_stream_queries(&self) -> usize;
49
50    /// Retrieves a `Vec<u8>` from the database using the provided `key`.
51    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
52
53    /// Tests whether a key exists in the database
54    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error>;
55
56    /// Tests whether a list of keys exist in the database
57    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error>;
58
59    /// Retrieves multiple `Vec<u8>` from the database using the provided `keys`.
60    async fn read_multi_values_bytes(
61        &self,
62        keys: Vec<Vec<u8>>,
63    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error>;
64
65    /// Finds the `key` matching the prefix. The prefix is not included in the returned keys.
66    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error>;
67
68    /// Finds the `(key,value)` pairs matching the prefix. The prefix is not included in the returned keys.
69    async fn find_key_values_by_prefix(
70        &self,
71        key_prefix: &[u8],
72    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error>;
73
74    // We can't use `async fn` here in the below implementations due to
75    // https://github.com/rust-lang/impl-trait-utils/issues/17, but once that bug is fixed
76    // we can revert them to `async fn` syntax, which is neater.
77
78    /// Reads a single `key` and deserializes the result if present.
79    fn read_value<V: DeserializeOwned>(
80        &self,
81        key: &[u8],
82    ) -> impl Future<Output = Result<Option<V>, Self::Error>> {
83        async { Ok(from_bytes_option(&self.read_value_bytes(key).await?)?) }
84    }
85
86    /// Reads multiple `keys` and deserializes the results if present.
87    fn read_multi_values<V: DeserializeOwned + Send + Sync>(
88        &self,
89        keys: Vec<Vec<u8>>,
90    ) -> impl Future<Output = Result<Vec<Option<V>>, Self::Error>> {
91        async {
92            let mut values = Vec::with_capacity(keys.len());
93            for entry in self.read_multi_values_bytes(keys).await? {
94                values.push(from_bytes_option(&entry)?);
95            }
96            Ok(values)
97        }
98    }
99}
100
101/// Asynchronous write key-value operations.
102#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
103pub trait WritableKeyValueStore: WithError {
104    /// The maximal size of values that can be stored.
105    const MAX_VALUE_SIZE: usize;
106
107    /// Writes the `batch` in the database.
108    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error>;
109
110    /// Clears any journal entry that may remain.
111    /// The journal is located at the `root_key`.
112    async fn clear_journal(&self) -> Result<(), Self::Error>;
113}
114
115/// Asynchronous direct write key-value operations with simplified batch.
116///
117/// Some backend cannot implement `WritableKeyValueStore` directly and will require
118/// journaling.
119#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
120pub trait DirectWritableKeyValueStore: WithError {
121    /// The maximal number of items in a batch.
122    const MAX_BATCH_SIZE: usize;
123
124    /// The maximal number of bytes of a batch.
125    const MAX_BATCH_TOTAL_SIZE: usize;
126
127    /// The maximal size of values that can be stored.
128    const MAX_VALUE_SIZE: usize;
129
130    /// The batch type.
131    type Batch: SimplifiedBatch + Serialize + DeserializeOwned + Default;
132
133    /// Writes the batch to the database.
134    async fn write_batch(&self, batch: Self::Batch) -> Result<(), Self::Error>;
135}
136
137/// The definition of a key-value database.
138#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
139pub trait KeyValueDatabase: WithError + Sized {
140    /// The configuration needed to interact with a new backend.
141    type Config: Send + Sync;
142
143    /// The result of opening a partition.
144    type Store;
145
146    /// The name of this database.
147    fn get_name() -> String;
148
149    /// Connects to an existing namespace using the given configuration.
150    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error>;
151
152    /// Opens a shared partition starting at `root_key`. It is understood that the
153    /// partition MAY be read and written simultaneously from other clients.
154    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error>;
155
156    /// Opens an exclusive partition starting at `root_key`. It is assumed that the
157    /// partition WILL NOT be read and written simultaneously by other clients.
158    ///
159    /// IMPORTANT: This assumption is not enforced at the moment. However, future
160    /// implementations may choose to return an error if another client is detected.
161    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error>;
162
163    /// Obtains the list of existing namespaces.
164    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error>;
165
166    /// Lists the root keys of the namespace.
167    /// It is possible that some root keys have no keys.
168    async fn list_root_keys(
169        config: &Self::Config,
170        namespace: &str,
171    ) -> Result<Vec<Vec<u8>>, Self::Error>;
172
173    /// Deletes all the existing namespaces.
174    fn delete_all(config: &Self::Config) -> impl Future<Output = Result<(), Self::Error>> {
175        async {
176            let namespaces = Self::list_all(config).await?;
177            for namespace in namespaces {
178                Self::delete(config, &namespace).await?;
179            }
180            Ok(())
181        }
182    }
183
184    /// Tests if a given namespace exists.
185    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error>;
186
187    /// Creates a namespace. Returns an error if the namespace exists.
188    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error>;
189
190    /// Deletes the given namespace.
191    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error>;
192
193    /// Initializes a storage if missing and provides it.
194    fn maybe_create_and_connect(
195        config: &Self::Config,
196        namespace: &str,
197    ) -> impl Future<Output = Result<Self, Self::Error>> {
198        async {
199            if !Self::exists(config, namespace).await? {
200                Self::create(config, namespace).await?;
201            }
202            Self::connect(config, namespace).await
203        }
204    }
205
206    /// Creates a new storage. Overwrites it if this namespace already exists.
207    fn recreate_and_connect(
208        config: &Self::Config,
209        namespace: &str,
210    ) -> impl Future<Output = Result<Self, Self::Error>> {
211        async {
212            if Self::exists(config, namespace).await? {
213                Self::delete(config, namespace).await?;
214            }
215            Self::create(config, namespace).await?;
216            Self::connect(config, namespace).await
217        }
218    }
219}
220
221/// A key-value store that can perform both read and direct write operations.
222///
223/// This trait combines the capabilities of [`ReadableKeyValueStore`] and
224/// [`DirectWritableKeyValueStore`], providing a full interface for stores
225/// that can handle simplified batches directly without journaling.
226pub trait DirectKeyValueStore: ReadableKeyValueStore + DirectWritableKeyValueStore {}
227
228impl<T> DirectKeyValueStore for T where T: ReadableKeyValueStore + DirectWritableKeyValueStore {}
229
230/// A key-value store that can perform both read and write operations.
231///
232/// This trait combines the capabilities of [`ReadableKeyValueStore`] and
233/// [`WritableKeyValueStore`], providing a full interface for stores that
234/// can handle complex batches with journaling support.
235pub trait KeyValueStore: ReadableKeyValueStore + WritableKeyValueStore {}
236
237impl<T> KeyValueStore for T where T: ReadableKeyValueStore + WritableKeyValueStore {}
238
239/// The functions needed for testing purposes
240#[cfg(with_testing)]
241pub trait TestKeyValueDatabase: KeyValueDatabase {
242    /// Obtains a test config
243    async fn new_test_config() -> Result<Self::Config, Self::Error>;
244
245    /// Creates a database for testing purposes
246    async fn connect_test_namespace() -> Result<Self, Self::Error> {
247        let config = Self::new_test_config().await?;
248        let namespace = generate_test_namespace();
249        Self::recreate_and_connect(&config, &namespace).await
250    }
251
252    /// Creates a store for testing purposes
253    async fn new_test_store() -> Result<Self::Store, Self::Error> {
254        let database = Self::connect_test_namespace().await?;
255        database.open_shared(&[])
256    }
257}
258
259/// A module containing a dummy store used for caching views.
260pub mod inactive_store {
261    use super::*;
262
263    /// A store which does not actually store anything - used for caching views.
264    pub struct InactiveStore;
265
266    /// An error struct for the inactive store.
267    #[derive(Clone, Copy, Debug)]
268    pub struct InactiveStoreError;
269
270    impl std::fmt::Display for InactiveStoreError {
271        fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
272            write!(f, "inactive store error")
273        }
274    }
275
276    impl From<bcs::Error> for InactiveStoreError {
277        fn from(_other: bcs::Error) -> Self {
278            Self
279        }
280    }
281
282    impl std::error::Error for InactiveStoreError {}
283
284    impl KeyValueStoreError for InactiveStoreError {
285        const BACKEND: &'static str = "inactive";
286    }
287
288    impl WithError for InactiveStore {
289        type Error = InactiveStoreError;
290    }
291
292    impl ReadableKeyValueStore for InactiveStore {
293        const MAX_KEY_SIZE: usize = 0;
294
295        fn max_stream_queries(&self) -> usize {
296            0
297        }
298
299        async fn read_value_bytes(&self, _key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
300            panic!("attempt to read from an inactive store!")
301        }
302
303        async fn contains_key(&self, _key: &[u8]) -> Result<bool, Self::Error> {
304            panic!("attempt to read from an inactive store!")
305        }
306
307        async fn contains_keys(&self, _keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
308            panic!("attempt to read from an inactive store!")
309        }
310
311        async fn read_multi_values_bytes(
312            &self,
313            _keys: Vec<Vec<u8>>,
314        ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
315            panic!("attempt to read from an inactive store!")
316        }
317
318        async fn find_keys_by_prefix(
319            &self,
320            _key_prefix: &[u8],
321        ) -> Result<Vec<Vec<u8>>, Self::Error> {
322            panic!("attempt to read from an inactive store!")
323        }
324
325        /// Finds the `(key,value)` pairs matching the prefix. The prefix is not included in the returned keys.
326        async fn find_key_values_by_prefix(
327            &self,
328            _key_prefix: &[u8],
329        ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
330            panic!("attempt to read from an inactive store!")
331        }
332    }
333
334    impl WritableKeyValueStore for InactiveStore {
335        const MAX_VALUE_SIZE: usize = 0;
336
337        async fn write_batch(&self, _batch: Batch) -> Result<(), Self::Error> {
338            panic!("attempt to write to an inactive store!")
339        }
340
341        async fn clear_journal(&self) -> Result<(), Self::Error> {
342            panic!("attempt to write to an inactive store!")
343        }
344    }
345}