1use serde::{de::DeserializeOwned, Deserialize, Serialize};
23use static_assertions as sa;
24use thiserror::Error;
25
26use crate::{
27 batch::{Batch, BatchValueWriter, DeletePrefixExpander, SimplifiedBatch},
28 store::{AdminKeyValueStore, ReadableKeyValueStore, WithError, WritableKeyValueStore},
29 views::MIN_VIEW_TAG,
30};
31
32const JOURNAL_TAG: u8 = 0;
34sa::const_assert!(JOURNAL_TAG < MIN_VIEW_TAG);
37
38#[derive(Error, Debug)]
40#[allow(missing_docs)]
41pub enum JournalConsistencyError {
42 #[error("The journal block could not be retrieved, it could be missing or corrupted.")]
43 FailureToRetrieveJournalBlock,
44
45 #[error("Refusing to use the journal without exclusive database access to the root object.")]
46 JournalRequiresExclusiveAccess,
47}
48
49#[repr(u8)]
50enum KeyTag {
51 Journal = 1,
53 Entry,
55}
56
57fn get_journaling_key(tag: u8, pos: u32) -> Result<Vec<u8>, bcs::Error> {
58 let mut key = vec![JOURNAL_TAG];
59 key.extend([tag]);
60 bcs::serialize_into(&mut key, &pos)?;
61 Ok(key)
62}
63
64#[cfg_attr(not(web), trait_variant::make(Send + Sync))]
66pub trait DirectWritableKeyValueStore: WithError {
67 const MAX_BATCH_SIZE: usize;
69
70 const MAX_BATCH_TOTAL_SIZE: usize;
72
73 const MAX_VALUE_SIZE: usize;
75
76 type Batch: SimplifiedBatch + Serialize + DeserializeOwned + Default;
78
79 async fn write_batch(&self, batch: Self::Batch) -> Result<(), Self::Error>;
81}
82
83pub trait DirectKeyValueStore:
85 ReadableKeyValueStore + DirectWritableKeyValueStore + AdminKeyValueStore
86{
87}
88
89impl<T> DirectKeyValueStore for T where
90 T: ReadableKeyValueStore + DirectWritableKeyValueStore + AdminKeyValueStore
91{
92}
93
94#[derive(Serialize, Deserialize, Debug, Default)]
96struct JournalHeader {
97 block_count: u32,
98}
99
100#[derive(Clone)]
102pub struct JournalingKeyValueStore<K> {
103 store: K,
105 has_exclusive_access: bool,
107}
108
109impl<K> DeletePrefixExpander for &JournalingKeyValueStore<K>
110where
111 K: DirectKeyValueStore,
112{
113 type Error = K::Error;
114
115 async fn expand_delete_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
116 self.store.find_keys_by_prefix(key_prefix).await
117 }
118}
119
120impl<K> WithError for JournalingKeyValueStore<K>
121where
122 K: WithError,
123{
124 type Error = K::Error;
125}
126
127impl<K> ReadableKeyValueStore for JournalingKeyValueStore<K>
128where
129 K: ReadableKeyValueStore,
130 K::Error: From<JournalConsistencyError>,
131{
132 const MAX_KEY_SIZE: usize = K::MAX_KEY_SIZE;
134
135 fn max_stream_queries(&self) -> usize {
137 self.store.max_stream_queries()
138 }
139
140 async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
141 self.store.read_value_bytes(key).await
142 }
143
144 async fn contains_key(&self, key: &[u8]) -> Result<bool, Self::Error> {
145 self.store.contains_key(key).await
146 }
147
148 async fn contains_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<bool>, Self::Error> {
149 self.store.contains_keys(keys).await
150 }
151
152 async fn read_multi_values_bytes(
153 &self,
154 keys: Vec<Vec<u8>>,
155 ) -> Result<Vec<Option<Vec<u8>>>, Self::Error> {
156 self.store.read_multi_values_bytes(keys).await
157 }
158
159 async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Vec<Vec<u8>>, Self::Error> {
160 self.store.find_keys_by_prefix(key_prefix).await
161 }
162
163 async fn find_key_values_by_prefix(
164 &self,
165 key_prefix: &[u8],
166 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Self::Error> {
167 self.store.find_key_values_by_prefix(key_prefix).await
168 }
169}
170
171impl<K> AdminKeyValueStore for JournalingKeyValueStore<K>
172where
173 K: AdminKeyValueStore,
174{
175 type Config = K::Config;
176
177 fn get_name() -> String {
178 format!("journaling {}", K::get_name())
179 }
180
181 async fn connect(config: &Self::Config, namespace: &str) -> Result<Self, Self::Error> {
182 let store = K::connect(config, namespace).await?;
183 Ok(Self {
184 store,
185 has_exclusive_access: false,
186 })
187 }
188
189 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, Self::Error> {
190 let store = self.store.open_exclusive(root_key)?;
191 Ok(Self {
192 store,
193 has_exclusive_access: true,
194 })
195 }
196
197 async fn list_all(config: &Self::Config) -> Result<Vec<String>, Self::Error> {
198 K::list_all(config).await
199 }
200
201 async fn list_root_keys(
202 config: &Self::Config,
203 namespace: &str,
204 ) -> Result<Vec<Vec<u8>>, Self::Error> {
205 K::list_root_keys(config, namespace).await
206 }
207
208 async fn delete_all(config: &Self::Config) -> Result<(), Self::Error> {
209 K::delete_all(config).await
210 }
211
212 async fn exists(config: &Self::Config, namespace: &str) -> Result<bool, Self::Error> {
213 K::exists(config, namespace).await
214 }
215
216 async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
217 K::create(config, namespace).await
218 }
219
220 async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> {
221 K::delete(config, namespace).await
222 }
223}
224
225impl<K> WritableKeyValueStore for JournalingKeyValueStore<K>
226where
227 K: DirectKeyValueStore,
228 K::Error: From<JournalConsistencyError>,
229{
230 const MAX_VALUE_SIZE: usize = K::MAX_VALUE_SIZE;
232
233 async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
234 let batch = K::Batch::from_batch(self, batch).await?;
235 if Self::is_fastpath_feasible(&batch) {
236 self.store.write_batch(batch).await
237 } else {
238 if !self.has_exclusive_access {
239 return Err(JournalConsistencyError::JournalRequiresExclusiveAccess.into());
240 }
241 let header = self.write_journal(batch).await?;
242 self.coherently_resolve_journal(header).await
243 }
244 }
245
246 async fn clear_journal(&self) -> Result<(), Self::Error> {
247 let key = get_journaling_key(KeyTag::Journal as u8, 0)?;
248 let value = self.read_value::<JournalHeader>(&key).await?;
249 if let Some(header) = value {
250 self.coherently_resolve_journal(header).await?;
251 }
252 Ok(())
253 }
254}
255
256impl<K> JournalingKeyValueStore<K>
257where
258 K: DirectKeyValueStore,
259 K::Error: From<JournalConsistencyError>,
260{
261 async fn coherently_resolve_journal(&self, mut header: JournalHeader) -> Result<(), K::Error> {
282 let header_key = get_journaling_key(KeyTag::Journal as u8, 0)?;
283 while header.block_count > 0 {
284 let block_key = get_journaling_key(KeyTag::Entry as u8, header.block_count - 1)?;
285 let mut batch = self
287 .store
288 .read_value::<K::Batch>(&block_key)
289 .await?
290 .ok_or(JournalConsistencyError::FailureToRetrieveJournalBlock)?;
291 batch.add_delete(block_key);
293 header.block_count -= 1;
294 if header.block_count > 0 {
295 let value = bcs::to_bytes(&header)?;
296 batch.add_insert(header_key.clone(), value);
297 } else {
298 batch.add_delete(header_key.clone());
299 }
300 self.store.write_batch(batch).await?;
301 }
302 Ok(())
303 }
304
305 async fn write_journal(&self, batch: K::Batch) -> Result<JournalHeader, K::Error> {
346 let header_key = get_journaling_key(KeyTag::Journal as u8, 0)?;
347 let key_len = header_key.len();
348 let header_value_len = bcs::serialized_size(&JournalHeader::default())?;
349 let journal_len_upper_bound = key_len + header_value_len;
350 let max_transaction_size = K::MAX_BATCH_TOTAL_SIZE;
352 let max_block_size = std::cmp::min(
353 K::MAX_VALUE_SIZE,
354 K::MAX_BATCH_TOTAL_SIZE - key_len - journal_len_upper_bound,
355 );
356
357 let mut iter = batch.into_iter();
358 let mut block_batch = K::Batch::default();
359 let mut block_size = 0;
360 let mut block_count = 0;
361 let mut transaction_batch = K::Batch::default();
362 let mut transaction_size = 0;
363 while iter.write_next_value(&mut block_batch, &mut block_size)? {
364 let (block_flush, transaction_flush) = {
365 if iter.is_empty() || transaction_batch.len() == K::MAX_BATCH_SIZE - 1 {
366 (true, true)
367 } else {
368 let next_block_size = iter
369 .next_batch_size(&block_batch, block_size)?
370 .expect("iter is not empty");
371 let next_transaction_size = transaction_size + next_block_size + key_len;
372 let transaction_flush = next_transaction_size > max_transaction_size;
373 let block_flush = transaction_flush
374 || block_batch.len() == K::MAX_BATCH_SIZE - 2
375 || next_block_size > max_block_size;
376 (block_flush, transaction_flush)
377 }
378 };
379 if block_flush {
380 block_size += block_batch.overhead_size();
381 let value = bcs::to_bytes(&block_batch)?;
382 block_batch = K::Batch::default();
383 assert_eq!(value.len(), block_size);
384 let key = get_journaling_key(KeyTag::Entry as u8, block_count)?;
385 transaction_batch.add_insert(key, value);
386 block_count += 1;
387 transaction_size += block_size + key_len;
388 block_size = 0;
389 }
390 if transaction_flush {
391 let batch = std::mem::take(&mut transaction_batch);
392 self.store.write_batch(batch).await?;
393 transaction_size = 0;
394 }
395 }
396 let header = JournalHeader { block_count };
397 if block_count > 0 {
398 let value = bcs::to_bytes(&header)?;
399 let mut batch = K::Batch::default();
400 batch.add_insert(header_key, value);
401 self.store.write_batch(batch).await?;
402 }
403 Ok(header)
404 }
405
406 fn is_fastpath_feasible(batch: &K::Batch) -> bool {
407 batch.len() <= K::MAX_BATCH_SIZE && batch.num_bytes() <= K::MAX_BATCH_TOTAL_SIZE
408 }
409}
410
411impl<K> JournalingKeyValueStore<K> {
412 pub fn new(store: K) -> Self {
414 Self {
415 store,
416 has_exclusive_access: false,
417 }
418 }
419}