linera_views/backends/
journaling.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Turns a `DirectKeyValueStore` into a `KeyValueStore` by adding journaling.
5//!
6//! Journaling aims to allow writing arbitrarily large batches of data in an atomic way.
7//! This is useful for database backends that limit the number of keys and/or the size of
8//! the data that can be written atomically (i.e. in the same database transaction).
9//!
10//! Journaling requires to set aside a range of keys to hold a possible "header" and an
11//! array of unwritten entries called "blocks".
12//!
13//! When a new batch to be written exceeds the capacity of the underlying storage, the
14//! "slow path" is taken: the batch of operations is first written into blocks, then the
15//! journal header is (atomically) updated to make the batch of updates persistent.
16//!
17//! Before any new read or write operation, if a journal is present, it must first be
18//! cleared. This is done by processing every block of the journal successively. Every
19//! time the data in a block are written, the journal header is updated in the same
20//! transaction to mark the block as processed.
21
22use serde::{de::DeserializeOwned, Deserialize, Serialize};
23use static_assertions as sa;
24use thiserror::Error;
25
26use crate::{
27    batch::{Batch, BatchValueWriter, DeletePrefixExpander, SimplifiedBatch},
28    store::{
29        AdminKeyValueStore, KeyIterable, ReadableKeyValueStore, WithError, WritableKeyValueStore,
30    },
31    views::MIN_VIEW_TAG,
32};
33
34/// The tag used for the journal stuff.
35const JOURNAL_TAG: u8 = 0;
36// To prevent collisions, the tag value 0 is reserved for journals.
37// The tags used by views must be greater or equal than `MIN_VIEW_TAG`.
38sa::const_assert!(JOURNAL_TAG < MIN_VIEW_TAG);
39
40/// Data type indicating that the database is not consistent
41#[derive(Error, Debug)]
42#[allow(missing_docs)]
43pub enum JournalConsistencyError {
44    #[error("The journal block could not be retrieved, it could be missing or corrupted.")]
45    FailureToRetrieveJournalBlock,
46
47    #[error("Refusing to use the journal without exclusive database access to the root object.")]
48    JournalRequiresExclusiveAccess,
49}
50
51#[repr(u8)]
52enum KeyTag {
53    /// Prefix for the storing of the header of the journal.
54    Journal = 1,
55    /// Prefix for the block entry.
56    Entry,
57}
58
59fn get_journaling_key(tag: u8, pos: u32) -> Result<Vec<u8>, bcs::Error> {
60    let mut key = vec![JOURNAL_TAG];
61    key.extend([tag]);
62    bcs::serialize_into(&mut key, &pos)?;
63    Ok(key)
64}
65
66/// Low-level, asynchronous direct write key-value operations with simplified batch
67#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
68pub trait DirectWritableKeyValueStore: WithError {
69    /// The maximal number of items in a batch.
70    const MAX_BATCH_SIZE: usize;
71
72    /// The maximal number of bytes of a batch.
73    const MAX_BATCH_TOTAL_SIZE: usize;
74
75    /// The maximal size of values that can be stored.
76    const MAX_VALUE_SIZE: usize;
77
78    /// The batch type.
79    type Batch: SimplifiedBatch + Serialize + DeserializeOwned + Default;
80
81    /// Writes the batch to the database.
82    async fn write_batch(&self, batch: Self::Batch) -> Result<(), Self::Error>;
83}
84
85/// Low-level, asynchronous direct read/write key-value operations with simplified batch
86pub trait DirectKeyValueStore:
87    ReadableKeyValueStore + DirectWritableKeyValueStore + AdminKeyValueStore
88{
89}
90
91impl<T> DirectKeyValueStore for T where
92    T: ReadableKeyValueStore + DirectWritableKeyValueStore + AdminKeyValueStore
93{
94}
95
96/// The header that contains the current state of the journal.
97#[derive(Serialize, Deserialize, Debug, Default)]
98struct JournalHeader {
99    block_count: u32,
100}
101
102/// A journaling Key Value Store built from an inner [`DirectKeyValueStore`].
103#[derive(Clone)]
104pub struct JournalingKeyValueStore<K> {
105    /// The inner store.
106    store: K,
107    /// Whether we have exclusive R/W access to the keys under root key.
108    has_exclusive_access: bool,
109}
110
111impl<K> DeletePrefixExpander for &JournalingKeyValueStore<K>
112where
113    K: DirectKeyValueStore,
114{
115    type Error = K::Error;
116    async fn expand_delete_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
117        let mut vector_list = Vec::new();
118        for key in self.store.find_keys_by_prefix(key_prefix).await?.iterator() {
119            vector_list.push(key?.to_vec());
120        }
121        Ok(vector_list)
122    }
123}
124
125impl<K> WithError for JournalingKeyValueStore<K>
126where
127    K: WithError,
128{
129    type Error = K::Error;
130}
131
132impl<K> ReadableKeyValueStore for JournalingKeyValueStore<K>
133where
134    K: ReadableKeyValueStore,
135    K::Error: From<JournalConsistencyError>,
136{
137    /// The size constant do not change
138    const MAX_KEY_SIZE: usize = K::MAX_KEY_SIZE;
139    /// The basic types do not change
140    type Keys = K::Keys;
141    type KeyValues = K::KeyValues;
142
143    /// The read stuff does not change
144    fn max_stream_queries(&self) -> usize {
145        self.store.max_stream_queries()
146    }
147
148    async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
149        self.store.read_value_bytes(key).await
150    }
151
152    async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
153        self.store.contains_key(key).await
154    }
155
156    async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
157        self.store.contains_keys(keys).await
158    }
159
160    async fn read_multi_values_bytes(
161        &self,
162        keys: Vec<Vec<u8>>,
163    ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
164        self.store.read_multi_values_bytes(keys).await
165    }
166
167    async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Self::Keys, Self::Error> {
168        self.store.find_keys_by_prefix(key_prefix).await
169    }
170
171    async fn find_key_values_by_prefix(
172        &self,
173        key_prefix: &[u8],
174    ) -> Result<Self::KeyValues, Self::Error> {
175        self.store.find_key_values_by_prefix(key_prefix).await
176    }
177}
178
179impl<K> AdminKeyValueStore for JournalingKeyValueStore<K>
180where
181    K: AdminKeyValueStore,
182{
183    type Config = K::Config;
184
185    fn get_name() -> String {
186        format!("journaling {}", K::get_name())
187    }
188
189    async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
190        let store = K::connect(config, namespace).await?;
191        Ok(Self {
192            store,
193            has_exclusive_access: false,
194        })
195    }
196
197    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, Self::Error> {
198        let store = self.store.open_exclusive(root_key)?;
199        Ok(Self {
200            store,
201            has_exclusive_access: true,
202        })
203    }
204
205    async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
206        K::list_all(config).await
207    }
208
209    async fn list_root_keys(
210        config: &Self::Config,
211        namespace: &str,
212    ) -> Result<Vec<Vec<u8>>, Self::Error> {
213        K::list_root_keys(config, namespace).await
214    }
215
216    async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
217        K::delete_all(config).await
218    }
219
220    async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
221        K::exists(config, namespace).await
222    }
223
224    async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
225        K::create(config, namespace).await
226    }
227
228    async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
229        K::delete(config, namespace).await
230    }
231}
232
233impl<K> WritableKeyValueStore for JournalingKeyValueStore<K>
234where
235    K: DirectKeyValueStore,
236    K::Error: From<JournalConsistencyError>,
237{
238    /// The size constant do not change
239    const MAX_VALUE_SIZE: usize = K::MAX_VALUE_SIZE;
240
241    async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
242        let batch = K::Batch::from_batch(self, batch).await?;
243        if Self::is_fastpath_feasible(&batch) {
244            self.store.write_batch(batch).await
245        } else {
246            if !self.has_exclusive_access {
247                return Err(JournalConsistencyError::JournalRequiresExclusiveAccess.into());
248            }
249            let header = self.write_journal(batch).await?;
250            self.coherently_resolve_journal(header).await
251        }
252    }
253
254    async fn clear_journal(&self) -> Result<(), Self::Error> {
255        let key = get_journaling_key(KeyTag::Journal as u8, 0)?;
256        let value = self.read_value::<JournalHeader>(&key).await?;
257        if let Some(header) = value {
258            self.coherently_resolve_journal(header).await?;
259        }
260        Ok(())
261    }
262}
263
264impl<K> JournalingKeyValueStore<K>
265where
266    K: DirectKeyValueStore,
267    K::Error: From<JournalConsistencyError>,
268{
269    /// Resolves the pending operations that were previously stored in the database
270    /// journal.
271    ///
272    /// For each block processed, we atomically update the journal header as well. When
273    /// the last block is processed, this atomically clears the journal and make the store
274    /// finally available again (for the range of keys managed by the journal).
275    ///
276    /// This function respects the constraints of the underlying key-value store `K` if
277    /// the following conditions are met:
278    ///
279    /// (1) each block contains at most `K::MAX_BATCH_SIZE - 2` operations;
280    ///
281    /// (2) the total size of the all operations in a block doesn't exceed:
282    /// `K::MAX_BATCH_TOTAL_SIZE - sizeof(block_key) - sizeof(header_key) - sizeof(bcs_header)`
283    ///
284    /// (3) every operation in a block satisfies the constraints on individual database
285    /// operations represented by `K::MAX_KEY_SIZE` and `K::MAX_VALUE_SIZE`.
286    ///
287    /// (4) `block_key` and `header_key` don't exceed `K::MAX_KEY_SIZE` and `bcs_header`
288    /// doesn't exceed `K::MAX_VALUE_SIZE`.
289    async fn coherently_resolve_journal(&self, mut header: JournalHeader) -> Result<(), K::Error> {
290        let header_key = get_journaling_key(KeyTag::Journal as u8, 0)?;
291        while header.block_count > 0 {
292            let block_key = get_journaling_key(KeyTag::Entry as u8, header.block_count - 1)?;
293            // Read the batch of updates (aka. "block") previously saved in the journal.
294            let mut batch = self
295                .store
296                .read_value::<K::Batch>(&block_key)
297                .await?
298                .ok_or(JournalConsistencyError::FailureToRetrieveJournalBlock)?;
299            // Execute the block and delete it from the journal atomically.
300            batch.add_delete(block_key);
301            header.block_count -= 1;
302            if header.block_count > 0 {
303                let value = bcs::to_bytes(&header)?;
304                batch.add_insert(header_key.clone(), value);
305            } else {
306                batch.add_delete(header_key.clone());
307            }
308            self.store.write_batch(batch).await?;
309        }
310        Ok(())
311    }
312
313    /// Writes the content of `batch` to the journal as a succession of blocks that can be
314    /// interpreted later by `coherently_resolve_journal`.
315    ///
316    /// Starting with a batch of operations that is typically too large to be executed in
317    /// one go (see `is_fastpath_feasible()` below), the goal of this function is to split
318    /// the batch into smaller blocks so that `coherently_resolve_journal` respects the
319    /// constraints of the underlying key-value store (see analysis above).
320    ///
321    /// For efficiency reasons, we write as many blocks as possible in each "transaction"
322    /// batch, using one write-operation per block. Then we also update the journal header
323    /// with the final number of blocks.
324    ///
325    /// As a result, the constraints of the underlying database are respected if the
326    /// following conditions are met while a "transaction" batch is being built:
327    ///
328    /// (1) The number of blocks per transaction doesn't exceed `K::MAX_BATCH_SIZE`.
329    /// But it is perfectly possible to have `K::MAX_BATCH_SIZE = usize::MAX`.
330    ///
331    /// (2) The total size of BCS-serialized blocks together with their corresponding keys
332    /// does not exceed `K::MAX_BATCH_TOTAL_SIZE`.
333    ///
334    /// (3) The size of each BCS-serialized block doesn't exceed `K::MAX_VALUE_SIZE`.
335    ///
336    /// (4) When processing a journal block, we have to do two other operations.
337    ///   (a) removing the existing block. The cost is `key_len`.
338    ///   (b) updating or removing the journal. The cost is `key_len + header_value_len`
339    ///       or `key_len`. An upper bound is thus
340    ///       `journal_len_upper_bound = key_len + header_value_len`.
341    ///   Thus the following has to be taken as upper bound on the block size:
342    ///   `K::MAX_BATCH_TOTAL_SIZE - key_len - journal_len_upper_bound`.
343    ///
344    /// NOTE:
345    /// * Since a block must contain at least one operation and M bytes of the
346    ///   serialization overhead (typically M is 2 or 3 bytes of vector sizes), condition (3)
347    ///   requires that each operation in the original batch satisfies:
348    ///   `sizeof(key) + sizeof(value) + M <= K::MAX_VALUE_SIZE`
349    ///
350    /// * Similarly, a transaction must contain at least one block so it is desirable that
351    ///   the maximum size of a block insertion `1 + sizeof(block_key) + K::MAX_VALUE_SIZE`
352    ///   plus M bytes of overhead doesn't exceed the threshold of condition (2).
353    async fn write_journal(&self, batch: K::Batch) -> Result<JournalHeader, K::Error> {
354        let header_key = get_journaling_key(KeyTag::Journal as u8, 0)?;
355        let key_len = header_key.len();
356        let header_value_len = bcs::serialized_size(&JournalHeader::default())?;
357        let journal_len_upper_bound = key_len + header_value_len;
358        // Each block in a transaction comes with a key.
359        let max_transaction_size = K::MAX_BATCH_TOTAL_SIZE;
360        let max_block_size = std::cmp::min(
361            K::MAX_VALUE_SIZE,
362            K::MAX_BATCH_TOTAL_SIZE - key_len - journal_len_upper_bound,
363        );
364
365        let mut iter = batch.into_iter();
366        let mut block_batch = K::Batch::default();
367        let mut block_size = 0;
368        let mut block_count = 0;
369        let mut transaction_batch = K::Batch::default();
370        let mut transaction_size = 0;
371        while iter.write_next_value(&mut block_batch, &mut block_size)? {
372            let (block_flush, transaction_flush) = {
373                if iter.is_empty() || transaction_batch.len() == K::MAX_BATCH_SIZE - 1 {
374                    (true, true)
375                } else {
376                    let next_block_size = iter
377                        .next_batch_size(&block_batch, block_size)?
378                        .expect("iter is not empty");
379                    let next_transaction_size = transaction_size + next_block_size + key_len;
380                    let transaction_flush = next_transaction_size > max_transaction_size;
381                    let block_flush = transaction_flush
382                        || block_batch.len() == K::MAX_BATCH_SIZE - 2
383                        || next_block_size > max_block_size;
384                    (block_flush, transaction_flush)
385                }
386            };
387            if block_flush {
388                block_size += block_batch.overhead_size();
389                let value = bcs::to_bytes(&block_batch)?;
390                block_batch = K::Batch::default();
391                assert_eq!(value.len(), block_size);
392                let key = get_journaling_key(KeyTag::Entry as u8, block_count)?;
393                transaction_batch.add_insert(key, value);
394                block_count += 1;
395                transaction_size += block_size + key_len;
396                block_size = 0;
397            }
398            if transaction_flush {
399                let batch = std::mem::take(&mut transaction_batch);
400                self.store.write_batch(batch).await?;
401                transaction_size = 0;
402            }
403        }
404        let header = JournalHeader { block_count };
405        if block_count > 0 {
406            let value = bcs::to_bytes(&header)?;
407            let mut batch = K::Batch::default();
408            batch.add_insert(header_key, value);
409            self.store.write_batch(batch).await?;
410        }
411        Ok(header)
412    }
413
414    fn is_fastpath_feasible(batch: &K::Batch) -> bool {
415        batch.len() <= K::MAX_BATCH_SIZE && batch.num_bytes() <= K::MAX_BATCH_TOTAL_SIZE
416    }
417}
418
419impl<K> JournalingKeyValueStore<K> {
420    /// Creates a new journaling store.
421    pub fn new(store: K) -> Self {
422        Self {
423            store,
424            has_exclusive_access: false,
425        }
426    }
427}