linera_views/
store.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This provides the trait definitions for the stores.
5
6use std::{fmt::Debug, future::Future};
7
8use serde::{de::DeserializeOwned, Deserialize, Serialize};
9
10#[cfg(with_testing)]
11use crate::random::generate_test_namespace;
12use crate::{
13    batch::Batch,
14    common::from_bytes_option,
15    lru_caching::{StorageCacheConfig, DEFAULT_STORAGE_CACHE_CONFIG},
16    ViewError,
17};
18
19/// The common initialization parameters for the `KeyValueStore`
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct CommonStoreInternalConfig {
22    /// The number of concurrent to a database
23    pub max_concurrent_queries: Option<usize>,
24    /// The number of streams used for the async streams.
25    pub max_stream_queries: usize,
26    /// The replication factor for the keyspace
27    pub replication_factor: u32,
28}
29
30/// The common initialization parameters for the `KeyValueStore`
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct CommonStoreConfig {
33    /// The number of concurrent to a database
34    pub max_concurrent_queries: Option<usize>,
35    /// The number of streams used for the async streams.
36    pub max_stream_queries: usize,
37    /// The cache size being used.
38    pub storage_cache_config: StorageCacheConfig,
39    /// The replication factor for the keyspace
40    pub replication_factor: u32,
41}
42
43impl CommonStoreConfig {
44    /// Gets the reduced `CommonStoreInternalConfig`.
45    pub fn reduced(&self) -> CommonStoreInternalConfig {
46        CommonStoreInternalConfig {
47            max_concurrent_queries: self.max_concurrent_queries,
48            max_stream_queries: self.max_stream_queries,
49            replication_factor: self.replication_factor,
50        }
51    }
52}
53
54impl Default for CommonStoreConfig {
55    fn default() -> Self {
56        CommonStoreConfig {
57            max_concurrent_queries: None,
58            max_stream_queries: 10,
59            storage_cache_config: DEFAULT_STORAGE_CACHE_CONFIG,
60            replication_factor: 1,
61        }
62    }
63}
64
65/// The error type for the key-value stores.
66pub trait KeyValueStoreError:
67    std::error::Error + From<bcs::Error> + Debug + Send + Sync + 'static
68{
69    /// The name of the backend.
70    const BACKEND: &'static str;
71}
72
73impl<E: KeyValueStoreError> From<E> for ViewError {
74    fn from(error: E) -> Self {
75        Self::StoreError {
76            backend: E::BACKEND,
77            error: Box::new(error),
78        }
79    }
80}
81
82/// Define an associated [`KeyValueStoreError`].
83pub trait WithError {
84    /// The error type.
85    type Error: KeyValueStoreError;
86}
87
88/// Low-level, asynchronous read key-value operations. Useful for storage APIs not based on views.
89#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
90pub trait ReadableKeyValueStore: WithError {
91    /// The maximal size of keys that can be stored.
92    const MAX_KEY_SIZE: usize;
93
94    /// Returns type for key search operations.
95    type Keys: KeyIterable<Self::Error>;
96
97    /// Returns type for key-value search operations.
98    type KeyValues: KeyValueIterable<Self::Error>;
99
100    /// Retrieve the number of stream queries.
101    fn max_stream_queries(&self) -> usize;
102
103    /// Retrieves a `Vec<u8>` from the database using the provided `key`.
104    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
105
106    /// Tests whether a key exists in the database
107    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error>;
108
109    /// Tests whether a list of keys exist in the database
110    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error>;
111
112    /// Retrieves multiple `Vec<u8>` from the database using the provided `keys`.
113    async fn read_multi_values_bytes(
114        &self,
115        keys: Vec<Vec<u8>>,
116    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error>;
117
118    /// Finds the `key` matching the prefix. The prefix is not included in the returned keys.
119    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Self::Keys, Self::Error>;
120
121    /// Finds the `(key,value)` pairs matching the prefix. The prefix is not included in the returned keys.
122    async fn find_key_values_by_prefix(
123        &self,
124        key_prefix: &[u8],
125    ) -> Result<Self::KeyValues, Self::Error>;
126
127    // We can't use `async fn` here in the below implementations due to
128    // https://github.com/rust-lang/impl-trait-utils/issues/17, but once that bug is fixed
129    // we can revert them to `async fn` syntax, which is neater.
130
131    /// Reads a single `key` and deserializes the result if present.
132    fn read_value<V: DeserializeOwned>(
133        &self,
134        key: &[u8],
135    ) -> impl Future<Output = Result<Option<V>, Self::Error>> {
136        async { Ok(from_bytes_option(&self.read_value_bytes(key).await?)?) }
137    }
138
139    /// Reads multiple `keys` and deserializes the results if present.
140    fn read_multi_values<V: DeserializeOwned + Send + Sync>(
141        &self,
142        keys: Vec<Vec<u8>>,
143    ) -> impl Future<Output = Result<Vec<Option<V>>, Self::Error>> {
144        async {
145            let mut values = Vec::with_capacity(keys.len());
146            for entry in self.read_multi_values_bytes(keys).await? {
147                values.push(from_bytes_option(&entry)?);
148            }
149            Ok(values)
150        }
151    }
152}
153
154/// Low-level, asynchronous write key-value operations. Useful for storage APIs not based on views.
155#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
156pub trait WritableKeyValueStore: WithError {
157    /// The maximal size of values that can be stored.
158    const MAX_VALUE_SIZE: usize;
159
160    /// Writes the `batch` in the database.
161    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error>;
162
163    /// Clears any journal entry that may remain.
164    /// The journal is located at the `root_key`.
165    async fn clear_journal(&self) -> Result<(), Self::Error>;
166}
167
168/// Low-level trait for the administration of stores and their namespaces.
169#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
170pub trait AdminKeyValueStore: WithError + Sized {
171    /// The configuration needed to interact with a new store.
172    type Config: Send + Sync;
173    /// The name of this class of stores
174    fn get_name() -> String;
175
176    /// Connects to an existing namespace using the given configuration.
177    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error>;
178
179    /// Opens the key partition starting at `root_key` and returns a clone of the
180    /// connection to work in this partition.
181    ///
182    /// IMPORTANT: It is assumed that the returned connection is the only user of the
183    /// partition (for both read and write) and will remain so until it is ended. Future
184    /// implementations of this method may fail if this is not the case.
185    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, Self::Error>;
186
187    /// Obtains the list of existing namespaces.
188    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error>;
189
190    /// Lists the root keys of the namespace.
191    /// It is possible that some root keys have no keys.
192    async fn list_root_keys(
193        config: &Self::Config,
194        namespace: &str,
195    ) -> Result<Vec<Vec<u8>>, Self::Error>;
196
197    /// Deletes all the existing namespaces.
198    fn delete_all(config: &Self::Config) -> impl Future<Output = Result<(), Self::Error>> {
199        async {
200            let namespaces = Self::list_all(config).await?;
201            for namespace in namespaces {
202                Self::delete(config, &namespace).await?;
203            }
204            Ok(())
205        }
206    }
207
208    /// Tests if a given namespace exists.
209    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error>;
210
211    /// Creates a namespace. Returns an error if the namespace exists.
212    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error>;
213
214    /// Deletes the given namespace.
215    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error>;
216
217    /// Initializes a storage if missing and provides it.
218    fn maybe_create_and_connect(
219        config: &Self::Config,
220        namespace: &str,
221    ) -> impl Future<Output = Result<Self, Self::Error>> {
222        async {
223            if !Self::exists(config, namespace).await? {
224                Self::create(config, namespace).await?;
225            }
226            Self::connect(config, namespace).await
227        }
228    }
229
230    /// Creates a new storage. Overwrites it if this namespace already exists.
231    fn recreate_and_connect(
232        config: &Self::Config,
233        namespace: &str,
234    ) -> impl Future<Output = Result<Self, Self::Error>> {
235        async {
236            if Self::exists(config, namespace).await? {
237                Self::delete(config, namespace).await?;
238            }
239            Self::create(config, namespace).await?;
240            Self::connect(config, namespace).await
241        }
242    }
243}
244
245/// Low-level, asynchronous write and read key-value operations. Useful for storage APIs not based on views.
246pub trait RestrictedKeyValueStore: ReadableKeyValueStore + WritableKeyValueStore {}
247
248impl<T> RestrictedKeyValueStore for T where T: ReadableKeyValueStore + WritableKeyValueStore {}
249
250/// Low-level, asynchronous write and read key-value operations. Useful for storage APIs not based on views.
251pub trait KeyValueStore:
252    ReadableKeyValueStore + WritableKeyValueStore + AdminKeyValueStore
253{
254}
255
256impl<T> KeyValueStore for T where
257    T: ReadableKeyValueStore + WritableKeyValueStore + AdminKeyValueStore
258{
259}
260
261/// The functions needed for testing purposes
262#[cfg(with_testing)]
263pub trait TestKeyValueStore: KeyValueStore {
264    /// Obtains a test config
265    async fn new_test_config() -> Result<Self::Config, Self::Error>;
266
267    /// Creates a store for testing purposes
268    async fn new_test_store() -> Result<Self, Self::Error> {
269        let config = Self::new_test_config().await?;
270        let namespace = generate_test_namespace();
271        Self::recreate_and_connect(&config, &namespace).await
272    }
273}
274
275#[doc(hidden)]
276/// Iterates keys by reference in a vector of keys.
277/// Inspired by https://depth-first.com/articles/2020/06/22/returning-rust-iterators/
278pub struct SimpleKeyIterator<'a, E> {
279    iter: std::slice::Iter<'a, Vec<u8>>,
280    _error_type: std::marker::PhantomData<E>,
281}
282
283impl<'a, E> Iterator for SimpleKeyIterator<'a, E> {
284    type Item = Result<&'a [u8], E>;
285
286    fn next(&mut self) -> Option<Self::Item> {
287        self.iter.next().map(|key| Result::Ok(key.as_ref()))
288    }
289}
290
291impl<E> KeyIterable<E> for Vec<Vec<u8>> {
292    type Iterator<'a> = SimpleKeyIterator<'a, E>;
293
294    fn iterator(&self) -> Self::Iterator<'_> {
295        SimpleKeyIterator {
296            iter: self.iter(),
297            _error_type: std::marker::PhantomData,
298        }
299    }
300}
301
302#[doc(hidden)]
303/// Same as `SimpleKeyIterator` but for key-value pairs.
304pub struct SimpleKeyValueIterator<'a, E> {
305    iter: std::slice::Iter<'a, (Vec<u8>, Vec<u8>)>,
306    _error_type: std::marker::PhantomData<E>,
307}
308
309impl<'a, E> Iterator for SimpleKeyValueIterator<'a, E> {
310    type Item = Result<(&'a [u8], &'a [u8]), E>;
311
312    fn next(&mut self) -> Option<Self::Item> {
313        self.iter
314            .next()
315            .map(|entry| Ok((&entry.0[..], &entry.1[..])))
316    }
317}
318
319#[doc(hidden)]
320/// Same as `SimpleKeyValueIterator` but key-value pairs are passed by value.
321pub struct SimpleKeyValueIteratorOwned<E> {
322    iter: std::vec::IntoIter<(Vec<u8>, Vec<u8>)>,
323    _error_type: std::marker::PhantomData<E>,
324}
325
326impl<E> Iterator for SimpleKeyValueIteratorOwned<E> {
327    type Item = Result<(Vec<u8>, Vec<u8>), E>;
328
329    fn next(&mut self) -> Option<Self::Item> {
330        self.iter.next().map(Result::Ok)
331    }
332}
333
334impl<E> KeyValueIterable<E> for Vec<(Vec<u8>, Vec<u8>)> {
335    type Iterator<'a> = SimpleKeyValueIterator<'a, E>;
336    type IteratorOwned = SimpleKeyValueIteratorOwned<E>;
337
338    fn iterator(&self) -> Self::Iterator<'_> {
339        SimpleKeyValueIterator {
340            iter: self.iter(),
341            _error_type: std::marker::PhantomData,
342        }
343    }
344
345    fn into_iterator_owned(self) -> Self::IteratorOwned {
346        SimpleKeyValueIteratorOwned {
347            iter: self.into_iter(),
348            _error_type: std::marker::PhantomData,
349        }
350    }
351}
352
353/// How to iterate over the keys returned by a search query.
354pub trait KeyIterable<Error> {
355    /// The iterator returning keys by reference.
356    type Iterator<'a>: Iterator<Item = Result<&'a [u8], Error>>
357    where
358        Self: 'a;
359
360    /// Iterates keys by reference.
361    fn iterator(&self) -> Self::Iterator<'_>;
362}
363
364/// How to iterate over the key-value pairs returned by a search query.
365pub trait KeyValueIterable<Error> {
366    /// The iterator that returns key-value pairs by reference.
367    type Iterator<'a>: Iterator<Item = Result<(&'a [u8], &'a [u8]), Error>>
368    where
369        Self: 'a;
370
371    /// The iterator that returns key-value pairs by value.
372    type IteratorOwned: Iterator<Item = Result<(Vec<u8>, Vec<u8>), Error>>;
373
374    /// Iterates keys and values by reference.
375    fn iterator(&self) -> Self::Iterator<'_>;
376
377    /// Iterates keys and values by value.
378    fn into_iterator_owned(self) -> Self::IteratorOwned;
379}