linera_views/
store.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This provides the trait definitions for the stores.
5
6use std::{fmt::Debug, future::Future};
7
8use serde::{de::DeserializeOwned, Serialize};
9
10#[cfg(with_testing)]
11use crate::random::generate_test_namespace;
12use crate::{
13    batch::{Batch, SimplifiedBatch},
14    common::from_bytes_option,
15    ViewError,
16};
17
18/// The error type for the key-value stores.
19pub trait KeyValueStoreError:
20    std::error::Error + From<bcs::Error> + Debug + Send + Sync + 'static
21{
22    /// The name of the backend.
23    const BACKEND: &'static str;
24}
25
26impl<E: KeyValueStoreError> From<E> for ViewError {
27    fn from(error: E) -> Self {
28        Self::StoreError {
29            backend: E::BACKEND,
30            error: Box::new(error),
31        }
32    }
33}
34
35/// Define an associated [`KeyValueStoreError`].
36pub trait WithError {
37    /// The error type.
38    type Error: KeyValueStoreError;
39}
40
41/// Asynchronous read key-value operations.
42#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
43pub trait ReadableKeyValueStore: WithError {
44    /// The maximal size of keys that can be stored.
45    const MAX_KEY_SIZE: usize;
46
47    /// Retrieve the number of stream queries.
48    fn max_stream_queries(&self) -> usize;
49
50    /// Gets the root key of the store.
51    fn root_key(&self) -> Result<Vec<u8>, Self::Error>;
52
53    /// Retrieves a `Vec<u8>` from the database using the provided `key`.
54    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
55
56    /// Tests whether a key exists in the database
57    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error>;
58
59    /// Tests whether a list of keys exist in the database
60    async fn contains_keys(&self, keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error>;
61
62    /// Retrieves multiple `Vec<u8>` from the database using the provided `keys`.
63    async fn read_multi_values_bytes(
64        &self,
65        keys: &[Vec<u8>],
66    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error>;
67
68    /// Finds the `key` matching the prefix. The prefix is not included in the returned keys.
69    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error>;
70
71    /// Finds the `(key,value)` pairs matching the prefix. The prefix is not included in the returned keys.
72    async fn find_key_values_by_prefix(
73        &self,
74        key_prefix: &[u8],
75    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error>;
76
77    // We can't use `async fn` here in the below implementations due to
78    // https://github.com/rust-lang/impl-trait-utils/issues/17, but once that bug is fixed
79    // we can revert them to `async fn` syntax, which is neater.
80
81    /// Reads a single `key` and deserializes the result if present.
82    fn read_value<V: DeserializeOwned>(
83        &self,
84        key: &[u8],
85    ) -> impl Future<Output = Result<Option<V>, Self::Error>> {
86        async { Ok(from_bytes_option(&self.read_value_bytes(key).await?)?) }
87    }
88
89    /// Reads multiple `keys` and deserializes the results if present.
90    fn read_multi_values<V: DeserializeOwned + Send + Sync>(
91        &self,
92        keys: &[Vec<u8>],
93    ) -> impl Future<Output = Result<Vec<Option<V>>, Self::Error>> {
94        async {
95            let mut values = Vec::with_capacity(keys.len());
96            for entry in self.read_multi_values_bytes(keys).await? {
97                values.push(from_bytes_option(&entry)?);
98            }
99            Ok(values)
100        }
101    }
102}
103
104/// Asynchronous write key-value operations.
105#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
106pub trait WritableKeyValueStore: WithError {
107    /// The maximal size of values that can be stored.
108    const MAX_VALUE_SIZE: usize;
109
110    /// Writes the `batch` in the database.
111    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error>;
112
113    /// Clears any journal entry that may remain.
114    /// The journal is located at the `root_key`.
115    async fn clear_journal(&self) -> Result<(), Self::Error>;
116}
117
118/// Asynchronous direct write key-value operations with simplified batch.
119///
120/// Some backend cannot implement `WritableKeyValueStore` directly and will require
121/// journaling.
122#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
123pub trait DirectWritableKeyValueStore: WithError {
124    /// The maximal number of items in a batch.
125    const MAX_BATCH_SIZE: usize;
126
127    /// The maximal number of bytes of a batch.
128    const MAX_BATCH_TOTAL_SIZE: usize;
129
130    /// The maximal size of values that can be stored.
131    const MAX_VALUE_SIZE: usize;
132
133    /// The batch type.
134    type Batch: SimplifiedBatch + Serialize + DeserializeOwned + Default;
135
136    /// Writes the batch to the database.
137    async fn write_batch(&self, batch: Self::Batch) -> Result<(), Self::Error>;
138}
139
140/// The definition of a key-value database.
141#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
142pub trait KeyValueDatabase: WithError + Sized {
143    /// The configuration needed to interact with a new backend.
144    type Config: Send + Sync;
145
146    /// The result of opening a partition.
147    type Store;
148
149    /// The name of this database.
150    fn get_name() -> String;
151
152    /// Connects to an existing namespace using the given configuration.
153    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error>;
154
155    /// Opens a shared partition starting at `root_key`. It is understood that the
156    /// partition MAY be read and written simultaneously from other clients.
157    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error>;
158
159    /// Opens an exclusive partition starting at `root_key`. It is assumed that the
160    /// partition WILL NOT be read and written simultaneously by other clients.
161    ///
162    /// IMPORTANT: This assumption is not enforced at the moment. However, future
163    /// implementations may choose to return an error if another client is detected.
164    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, Self::Error>;
165
166    /// Obtains the list of existing namespaces.
167    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error>;
168
169    /// Lists the root keys of the namespace.
170    /// It is possible that some root keys have no keys.
171    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, Self::Error>;
172
173    /// Deletes all the existing namespaces.
174    fn delete_all(config: &Self::Config) -> impl Future<Output = Result<(), Self::Error>> {
175        async {
176            let namespaces = Self::list_all(config).await?;
177            for namespace in namespaces {
178                Self::delete(config, &namespace).await?;
179            }
180            Ok(())
181        }
182    }
183
184    /// Tests if a given namespace exists.
185    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error>;
186
187    /// Creates a namespace. Returns an error if the namespace exists.
188    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error>;
189
190    /// Deletes the given namespace.
191    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error>;
192
193    /// Initializes a storage if missing and provides it.
194    fn maybe_create_and_connect(
195        config: &Self::Config,
196        namespace: &str,
197    ) -> impl Future<Output = Result<Self, Self::Error>> {
198        async {
199            if !Self::exists(config, namespace).await? {
200                Self::create(config, namespace).await?;
201            }
202            Self::connect(config, namespace).await
203        }
204    }
205
206    /// Creates a new storage. Overwrites it if this namespace already exists.
207    fn recreate_and_connect(
208        config: &Self::Config,
209        namespace: &str,
210    ) -> impl Future<Output = Result<Self, Self::Error>> {
211        async {
212            if Self::exists(config, namespace).await? {
213                Self::delete(config, namespace).await?;
214            }
215            Self::create(config, namespace).await?;
216            Self::connect(config, namespace).await
217        }
218    }
219}
220
221/// A key-value store that can perform both read and direct write operations.
222///
223/// This trait combines the capabilities of [`ReadableKeyValueStore`] and
224/// [`DirectWritableKeyValueStore`], providing a full interface for stores
225/// that can handle simplified batches directly without journaling.
226pub trait DirectKeyValueStore: ReadableKeyValueStore + DirectWritableKeyValueStore {}
227
228impl<T> DirectKeyValueStore for T where T: ReadableKeyValueStore + DirectWritableKeyValueStore {}
229
230/// A key-value store that can perform both read and write operations.
231///
232/// This trait combines the capabilities of [`ReadableKeyValueStore`] and
233/// [`WritableKeyValueStore`], providing a full interface for stores that
234/// can handle complex batches with journaling support.
235pub trait KeyValueStore: ReadableKeyValueStore + WritableKeyValueStore {}
236
237impl<T> KeyValueStore for T where T: ReadableKeyValueStore + WritableKeyValueStore {}
238
239/// The functions needed for testing purposes
240#[cfg(with_testing)]
241pub trait TestKeyValueDatabase: KeyValueDatabase {
242    /// Obtains a test config
243    async fn new_test_config() -> Result<Self::Config, Self::Error>;
244
245    /// Creates a database for testing purposes
246    async fn connect_test_namespace() -> Result<Self, Self::Error> {
247        let config = Self::new_test_config().await?;
248        let namespace = generate_test_namespace();
249        Self::recreate_and_connect(&config, &namespace).await
250    }
251
252    /// Creates a store for testing purposes
253    async fn new_test_store() -> Result<Self::Store, Self::Error> {
254        let database = Self::connect_test_namespace().await?;
255        database.open_shared(&[])
256    }
257}
258
259/// A module containing a dummy store used for caching views.
260pub mod inactive_store {
261    use super::*;
262
263    /// A store which does not actually store anything - used for caching views.
264    pub struct InactiveStore;
265
266    /// An error struct for the inactive store.
267    #[derive(Clone, Copy, Debug)]
268    pub struct InactiveStoreError;
269
270    impl std::fmt::Display for InactiveStoreError {
271        fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
272            write!(f, "inactive store error")
273        }
274    }
275
276    impl From<bcs::Error> for InactiveStoreError {
277        fn from(_other: bcs::Error) -> Self {
278            Self
279        }
280    }
281
282    impl std::error::Error for InactiveStoreError {}
283
284    impl KeyValueStoreError for InactiveStoreError {
285        const BACKEND: &'static str = "inactive";
286    }
287
288    impl WithError for InactiveStore {
289        type Error = InactiveStoreError;
290    }
291
292    impl ReadableKeyValueStore for InactiveStore {
293        const MAX_KEY_SIZE: usize = 0;
294
295        fn max_stream_queries(&self) -> usize {
296            0
297        }
298
299        fn root_key(&self) -> Result<Vec<u8>, Self::Error> {
300            panic!("attempt to read from an inactive store!")
301        }
302
303        async fn read_value_bytes(&self, _key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
304            panic!("attempt to read from an inactive store!")
305        }
306
307        async fn contains_key(&self, _key: &[u8]) -> Result<bool, Self::Error> {
308            panic!("attempt to read from an inactive store!")
309        }
310
311        async fn contains_keys(&self, _keys: &[Vec<u8>]) -> Result<Vec<bool>, Self::Error> {
312            panic!("attempt to read from an inactive store!")
313        }
314
315        async fn read_multi_values_bytes(
316            &self,
317            _keys: &[Vec<u8>],
318        ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
319            panic!("attempt to read from an inactive store!")
320        }
321
322        async fn find_keys_by_prefix(
323            &self,
324            _key_prefix: &[u8],
325        ) -> Result<Vec<Vec<u8>>, Self::Error> {
326            panic!("attempt to read from an inactive store!")
327        }
328
329        /// Finds the `(key,value)` pairs matching the prefix. The prefix is not included in the returned keys.
330        async fn find_key_values_by_prefix(
331            &self,
332            _key_prefix: &[u8],
333        ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
334            panic!("attempt to read from an inactive store!")
335        }
336    }
337
338    impl WritableKeyValueStore for InactiveStore {
339        const MAX_VALUE_SIZE: usize = 0;
340
341        async fn write_batch(&self, _batch: Batch) -> Result<(), Self::Error> {
342            panic!("attempt to write to an inactive store!")
343        }
344
345        async fn clear_journal(&self) -> Result<(), Self::Error> {
346            panic!("attempt to write to an inactive store!")
347        }
348    }
349}