linera_views/backends/
rocks_db.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] for the RocksDB database.
5
6use std::{
7    ffi::OsString,
8    fmt::Display,
9    path::PathBuf,
10    sync::{
11        atomic::{AtomicBool, Ordering},
12        Arc,
13    },
14};
15
16use linera_base::ensure;
17use rocksdb::{BlockBasedOptions, Cache, DBCompactionStyle};
18use serde::{Deserialize, Serialize};
19use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};
20use tempfile::TempDir;
21use thiserror::Error;
22
23#[cfg(with_metrics)]
24use crate::metering::MeteredDatabase;
25#[cfg(with_testing)]
26use crate::store::TestKeyValueDatabase;
27use crate::{
28    batch::{Batch, WriteOperation},
29    common::get_upper_bound_option,
30    lru_caching::{LruCachingConfig, LruCachingDatabase},
31    store::{
32        KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
33        WritableKeyValueStore,
34    },
35    value_splitting::{ValueSplittingDatabase, ValueSplittingError},
36};
37
38/// The prefixes being used in the system
39static ROOT_KEY_DOMAIN: [u8; 1] = [0];
40static STORED_ROOT_KEYS_PREFIX: u8 = 1;
41
42/// The number of streams for the test
43#[cfg(with_testing)]
44const TEST_ROCKS_DB_MAX_STREAM_QUERIES: usize = 10;
45
46// The maximum size of values in RocksDB is 3 GiB
47// For offset reasons we decrease by 400
48const MAX_VALUE_SIZE: usize = 3 * 1024 * 1024 * 1024 - 400;
49
50// The maximum size of keys in RocksDB is 8 MiB
51// For offset reasons we decrease by 400
52const MAX_KEY_SIZE: usize = 8 * 1024 * 1024 - 400;
53
54const WRITE_BUFFER_SIZE: usize = 256 * 1024 * 1024; // 256 MiB
55const MAX_WRITE_BUFFER_NUMBER: i32 = 6;
56const HYPER_CLOCK_CACHE_BLOCK_SIZE: usize = 8 * 1024; // 8 KiB
57
58/// The RocksDB client that we use.
59type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;
60
61/// The choice of the spawning mode.
62/// `SpawnBlocking` always works and is the safest.
63/// `BlockInPlace` can only be used in multi-threaded environment.
64/// One way to select that is to select BlockInPlace when
65/// `tokio::runtime::Handle::current().metrics().num_workers() > 1`
66/// `BlockInPlace` is documented in <https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html>
67#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize, Serialize)]
68pub enum RocksDbSpawnMode {
69    /// This uses the `spawn_blocking` function of Tokio.
70    SpawnBlocking,
71    /// This uses the `block_in_place` function of Tokio.
72    BlockInPlace,
73}
74
75impl RocksDbSpawnMode {
76    /// Obtains the spawning mode from runtime.
77    pub fn get_spawn_mode_from_runtime() -> Self {
78        if tokio::runtime::Handle::current().metrics().num_workers() > 1 {
79            RocksDbSpawnMode::BlockInPlace
80        } else {
81            RocksDbSpawnMode::SpawnBlocking
82        }
83    }
84
85    /// Runs the computation for a function according to the selected policy.
86    #[inline]
87    async fn spawn<F, I, O>(&self, f: F, input: I) -> Result<O, RocksDbStoreInternalError>
88    where
89        F: FnOnce(I) -> Result<O, RocksDbStoreInternalError> + Send + 'static,
90        I: Send + 'static,
91        O: Send + 'static,
92    {
93        Ok(match self {
94            RocksDbSpawnMode::BlockInPlace => tokio::task::block_in_place(move || f(input))?,
95            RocksDbSpawnMode::SpawnBlocking => {
96                tokio::task::spawn_blocking(move || f(input)).await??
97            }
98        })
99    }
100}
101
102impl Display for RocksDbSpawnMode {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        match &self {
105            RocksDbSpawnMode::SpawnBlocking => write!(f, "spawn_blocking"),
106            RocksDbSpawnMode::BlockInPlace => write!(f, "block_in_place"),
107        }
108    }
109}
110
111fn check_key_size(key: &[u8]) -> Result<(), RocksDbStoreInternalError> {
112    ensure!(
113        key.len() <= MAX_KEY_SIZE,
114        RocksDbStoreInternalError::KeyTooLong
115    );
116    Ok(())
117}
118
119#[derive(Clone)]
120struct RocksDbStoreExecutor {
121    db: Arc<DB>,
122    start_key: Vec<u8>,
123}
124
125impl RocksDbStoreExecutor {
126    fn contains_keys_internal(
127        &self,
128        keys: Vec<Vec<u8>>,
129    ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
130        let size = keys.len();
131        let mut results = vec![false; size];
132        let mut indices = Vec::new();
133        let mut keys_red = Vec::new();
134        for (i, key) in keys.into_iter().enumerate() {
135            check_key_size(&key)?;
136            let mut full_key = self.start_key.to_vec();
137            full_key.extend(key);
138            if self.db.key_may_exist(&full_key) {
139                indices.push(i);
140                keys_red.push(full_key);
141            }
142        }
143        let values_red = self.db.multi_get(keys_red);
144        for (index, value) in indices.into_iter().zip(values_red) {
145            results[index] = value?.is_some();
146        }
147        Ok(results)
148    }
149
150    fn read_multi_values_bytes_internal(
151        &self,
152        keys: Vec<Vec<u8>>,
153    ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
154        for key in &keys {
155            check_key_size(key)?;
156        }
157        let full_keys = keys
158            .into_iter()
159            .map(|key| {
160                let mut full_key = self.start_key.to_vec();
161                full_key.extend(key);
162                full_key
163            })
164            .collect::<Vec<_>>();
165        let entries = self.db.multi_get(&full_keys);
166        Ok(entries.into_iter().collect::<Result<_, _>>()?)
167    }
168
169    fn find_keys_by_prefix_internal(
170        &self,
171        key_prefix: Vec<u8>,
172    ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
173        check_key_size(&key_prefix)?;
174        let mut prefix = self.start_key.clone();
175        prefix.extend(key_prefix);
176        let len = prefix.len();
177        let mut iter = self.db.raw_iterator();
178        let mut keys = Vec::new();
179        iter.seek(&prefix);
180        let mut next_key = iter.key();
181        while let Some(key) = next_key {
182            if !key.starts_with(&prefix) {
183                break;
184            }
185            keys.push(key[len..].to_vec());
186            iter.next();
187            next_key = iter.key();
188        }
189        Ok(keys)
190    }
191
192    #[expect(clippy::type_complexity)]
193    fn find_key_values_by_prefix_internal(
194        &self,
195        key_prefix: Vec<u8>,
196    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, RocksDbStoreInternalError> {
197        check_key_size(&key_prefix)?;
198        let mut prefix = self.start_key.clone();
199        prefix.extend(key_prefix);
200        let len = prefix.len();
201        let mut iter = self.db.raw_iterator();
202        let mut key_values = Vec::new();
203        iter.seek(&prefix);
204        let mut next_key = iter.key();
205        while let Some(key) = next_key {
206            if !key.starts_with(&prefix) {
207                break;
208            }
209            if let Some(value) = iter.value() {
210                let key_value = (key[len..].to_vec(), value.to_vec());
211                key_values.push(key_value);
212            }
213            iter.next();
214            next_key = iter.key();
215        }
216        Ok(key_values)
217    }
218
219    fn write_batch_internal(
220        &self,
221        batch: Batch,
222        write_root_key: bool,
223    ) -> Result<(), RocksDbStoreInternalError> {
224        let mut inner_batch = rocksdb::WriteBatchWithTransaction::default();
225        for operation in batch.operations {
226            match operation {
227                WriteOperation::Delete { key } => {
228                    check_key_size(&key)?;
229                    let mut full_key = self.start_key.to_vec();
230                    full_key.extend(key);
231                    inner_batch.delete(&full_key)
232                }
233                WriteOperation::Put { key, value } => {
234                    check_key_size(&key)?;
235                    let mut full_key = self.start_key.to_vec();
236                    full_key.extend(key);
237                    inner_batch.put(&full_key, value)
238                }
239                WriteOperation::DeletePrefix { key_prefix } => {
240                    check_key_size(&key_prefix)?;
241                    let mut full_key1 = self.start_key.to_vec();
242                    full_key1.extend(&key_prefix);
243                    let full_key2 =
244                        get_upper_bound_option(&full_key1).expect("the first entry cannot be 255");
245                    inner_batch.delete_range(&full_key1, &full_key2);
246                }
247            }
248        }
249        if write_root_key {
250            let mut full_key = self.start_key.to_vec();
251            full_key[0] = STORED_ROOT_KEYS_PREFIX;
252            inner_batch.put(&full_key, vec![]);
253        }
254        self.db.write(inner_batch)?;
255        Ok(())
256    }
257}
258
259/// The inner client
260#[derive(Clone)]
261pub struct RocksDbStoreInternal {
262    executor: RocksDbStoreExecutor,
263    _path_with_guard: PathWithGuard,
264    max_stream_queries: usize,
265    spawn_mode: RocksDbSpawnMode,
266    root_key_written: Arc<AtomicBool>,
267}
268
269/// Database-level connection to RocksDB for managing namespaces and partitions.
270#[derive(Clone)]
271pub struct RocksDbDatabaseInternal {
272    executor: RocksDbStoreExecutor,
273    _path_with_guard: PathWithGuard,
274    max_stream_queries: usize,
275    spawn_mode: RocksDbSpawnMode,
276}
277
278impl WithError for RocksDbDatabaseInternal {
279    type Error = RocksDbStoreInternalError;
280}
281
282/// The initial configuration of the system
283#[derive(Clone, Debug, Deserialize, Serialize)]
284pub struct RocksDbStoreInternalConfig {
285    /// The path to the storage containing the namespaces
286    pub path_with_guard: PathWithGuard,
287    /// The chosen spawn mode
288    pub spawn_mode: RocksDbSpawnMode,
289    /// Preferred buffer size for async streams.
290    pub max_stream_queries: usize,
291}
292
293impl RocksDbDatabaseInternal {
294    fn check_namespace(namespace: &str) -> Result<(), RocksDbStoreInternalError> {
295        if !namespace
296            .chars()
297            .all(|character| character.is_ascii_alphanumeric() || character == '_')
298        {
299            return Err(RocksDbStoreInternalError::InvalidNamespace);
300        }
301        Ok(())
302    }
303
304    fn build(
305        config: &RocksDbStoreInternalConfig,
306        namespace: &str,
307    ) -> Result<RocksDbDatabaseInternal, RocksDbStoreInternalError> {
308        let start_key = ROOT_KEY_DOMAIN.to_vec();
309        // Create a store to extract its executor and configuration
310        let temp_store = RocksDbStoreInternal::build(config, namespace, start_key)?;
311        Ok(RocksDbDatabaseInternal {
312            executor: temp_store.executor,
313            _path_with_guard: temp_store._path_with_guard,
314            max_stream_queries: temp_store.max_stream_queries,
315            spawn_mode: temp_store.spawn_mode,
316        })
317    }
318}
319
320impl RocksDbStoreInternal {
321    fn build(
322        config: &RocksDbStoreInternalConfig,
323        namespace: &str,
324        start_key: Vec<u8>,
325    ) -> Result<RocksDbStoreInternal, RocksDbStoreInternalError> {
326        RocksDbDatabaseInternal::check_namespace(namespace)?;
327        let mut path_buf = config.path_with_guard.path_buf.clone();
328        let mut path_with_guard = config.path_with_guard.clone();
329        path_buf.push(namespace);
330        path_with_guard.path_buf = path_buf.clone();
331        let max_stream_queries = config.max_stream_queries;
332        let spawn_mode = config.spawn_mode;
333        if !std::path::Path::exists(&path_buf) {
334            std::fs::create_dir(path_buf.clone())?;
335        }
336        let sys = System::new_with_specifics(
337            RefreshKind::nothing()
338                .with_cpu(CpuRefreshKind::everything())
339                .with_memory(MemoryRefreshKind::nothing().with_ram()),
340        );
341        let num_cpus = sys.cpus().len() as i32;
342        let total_ram = sys.total_memory() as usize;
343        let mut options = rocksdb::Options::default();
344        options.create_if_missing(true);
345        options.create_missing_column_families(true);
346        // Flush in-memory buffer to disk more often
347        options.set_write_buffer_size(WRITE_BUFFER_SIZE);
348        options.set_max_write_buffer_number(MAX_WRITE_BUFFER_NUMBER);
349        options.set_compression_type(rocksdb::DBCompressionType::Lz4);
350        options.set_level_zero_slowdown_writes_trigger(8);
351        options.set_level_zero_stop_writes_trigger(12);
352        options.set_level_zero_file_num_compaction_trigger(2);
353        // We deliberately give RocksDB one background thread *per* CPU so that
354        // flush + (N-1) compactions can hammer the NVMe at full bandwidth while
355        // still leaving enough CPU time for the foreground application threads.
356        options.increase_parallelism(num_cpus);
357        options.set_max_background_jobs(num_cpus);
358        options.set_max_subcompactions(num_cpus as u32);
359        options.set_level_compaction_dynamic_level_bytes(true);
360
361        options.set_compaction_style(DBCompactionStyle::Level);
362        options.set_target_file_size_base(2 * WRITE_BUFFER_SIZE as u64);
363
364        let mut block_options = BlockBasedOptions::default();
365        block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
366        block_options.set_cache_index_and_filter_blocks(true);
367        // Allocate 1/4 of total RAM for RocksDB block cache, which is a reasonable balance:
368        // - Large enough to significantly improve read performance by caching frequently accessed blocks
369        // - Small enough to leave memory for other system components
370        // - Follows common practice for database caching in server environments
371        // - Prevents excessive memory pressure that could lead to swapping or OOM conditions
372        block_options.set_block_cache(&Cache::new_hyper_clock_cache(
373            total_ram / 4,
374            HYPER_CLOCK_CACHE_BLOCK_SIZE,
375        ));
376        options.set_block_based_table_factory(&block_options);
377
378        let db = DB::open(&options, path_buf)?;
379        let executor = RocksDbStoreExecutor {
380            db: Arc::new(db),
381            start_key,
382        };
383        Ok(RocksDbStoreInternal {
384            executor,
385            _path_with_guard: path_with_guard,
386            max_stream_queries,
387            spawn_mode,
388            root_key_written: Arc::new(AtomicBool::new(false)),
389        })
390    }
391}
392
393impl WithError for RocksDbStoreInternal {
394    type Error = RocksDbStoreInternalError;
395}
396
397impl ReadableKeyValueStore for RocksDbStoreInternal {
398    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
399
400    fn max_stream_queries(&self) -> usize {
401        self.max_stream_queries
402    }
403
404    async fn read_value_bytes(
405        &self,
406        key: &[u8],
407    ) -> Result<Option<Vec<u8>>, RocksDbStoreInternalError> {
408        check_key_size(key)?;
409        let db = self.executor.db.clone();
410        let mut full_key = self.executor.start_key.to_vec();
411        full_key.extend(key);
412        self.spawn_mode
413            .spawn(move |x| Ok(db.get(&x)?), full_key)
414            .await
415    }
416
417    async fn contains_key(&self, key: &[u8]) -> Result<bool, RocksDbStoreInternalError> {
418        check_key_size(key)?;
419        let db = self.executor.db.clone();
420        let mut full_key = self.executor.start_key.to_vec();
421        full_key.extend(key);
422        self.spawn_mode
423            .spawn(
424                move |x| {
425                    if !db.key_may_exist(&x) {
426                        return Ok(false);
427                    }
428                    Ok(db.get(&x)?.is_some())
429                },
430                full_key,
431            )
432            .await
433    }
434
435    async fn contains_keys(
436        &self,
437        keys: Vec<Vec<u8>>,
438    ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
439        let executor = self.executor.clone();
440        self.spawn_mode
441            .spawn(move |x| executor.contains_keys_internal(x), keys)
442            .await
443    }
444
445    async fn read_multi_values_bytes(
446        &self,
447        keys: Vec<Vec<u8>>,
448    ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
449        let executor = self.executor.clone();
450        self.spawn_mode
451            .spawn(move |x| executor.read_multi_values_bytes_internal(x), keys)
452            .await
453    }
454
455    async fn find_keys_by_prefix(
456        &self,
457        key_prefix: &[u8],
458    ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
459        let executor = self.executor.clone();
460        let key_prefix = key_prefix.to_vec();
461        self.spawn_mode
462            .spawn(
463                move |x| executor.find_keys_by_prefix_internal(x),
464                key_prefix,
465            )
466            .await
467    }
468
469    async fn find_key_values_by_prefix(
470        &self,
471        key_prefix: &[u8],
472    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, RocksDbStoreInternalError> {
473        let executor = self.executor.clone();
474        let key_prefix = key_prefix.to_vec();
475        self.spawn_mode
476            .spawn(
477                move |x| executor.find_key_values_by_prefix_internal(x),
478                key_prefix,
479            )
480            .await
481    }
482}
483
484impl WritableKeyValueStore for RocksDbStoreInternal {
485    const MAX_VALUE_SIZE: usize = MAX_VALUE_SIZE;
486
487    async fn write_batch(&self, batch: Batch) -> Result<(), RocksDbStoreInternalError> {
488        let write_root_key = !self.root_key_written.fetch_or(true, Ordering::SeqCst);
489        let executor = self.executor.clone();
490        self.spawn_mode
491            .spawn(
492                move |x| executor.write_batch_internal(x, write_root_key),
493                batch,
494            )
495            .await
496    }
497
498    async fn clear_journal(&self) -> Result<(), RocksDbStoreInternalError> {
499        Ok(())
500    }
501}
502
503impl KeyValueDatabase for RocksDbDatabaseInternal {
504    type Config = RocksDbStoreInternalConfig;
505    type Store = RocksDbStoreInternal;
506
507    fn get_name() -> String {
508        "rocksdb internal".to_string()
509    }
510
511    async fn connect(
512        config: &Self::Config,
513        namespace: &str,
514    ) -> Result<Self, RocksDbStoreInternalError> {
515        Self::build(config, namespace)
516    }
517
518    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, RocksDbStoreInternalError> {
519        let mut start_key = ROOT_KEY_DOMAIN.to_vec();
520        start_key.extend(root_key);
521        let mut executor = self.executor.clone();
522        executor.start_key = start_key;
523        Ok(RocksDbStoreInternal {
524            executor,
525            _path_with_guard: self._path_with_guard.clone(),
526            max_stream_queries: self.max_stream_queries,
527            spawn_mode: self.spawn_mode,
528            root_key_written: Arc::new(AtomicBool::new(false)),
529        })
530    }
531
532    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, RocksDbStoreInternalError> {
533        self.open_shared(root_key)
534    }
535
536    async fn list_all(config: &Self::Config) -> Result<Vec<String>, RocksDbStoreInternalError> {
537        let entries = std::fs::read_dir(config.path_with_guard.path_buf.clone())?;
538        let mut namespaces = Vec::new();
539        for entry in entries {
540            let entry = entry?;
541            if !entry.file_type()?.is_dir() {
542                return Err(RocksDbStoreInternalError::NonDirectoryNamespace);
543            }
544            let namespace = match entry.file_name().into_string() {
545                Err(error) => {
546                    return Err(RocksDbStoreInternalError::IntoStringError(error));
547                }
548                Ok(namespace) => namespace,
549            };
550            namespaces.push(namespace);
551        }
552        Ok(namespaces)
553    }
554
555    async fn list_root_keys(
556        config: &Self::Config,
557        namespace: &str,
558    ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
559        let start_key = vec![STORED_ROOT_KEYS_PREFIX];
560        let store = RocksDbStoreInternal::build(config, namespace, start_key)?;
561        store.find_keys_by_prefix(&[]).await
562    }
563
564    async fn delete_all(config: &Self::Config) -> Result<(), RocksDbStoreInternalError> {
565        let namespaces = Self::list_all(config).await?;
566        for namespace in namespaces {
567            let mut path_buf = config.path_with_guard.path_buf.clone();
568            path_buf.push(&namespace);
569            std::fs::remove_dir_all(path_buf.as_path())?;
570        }
571        Ok(())
572    }
573
574    async fn exists(
575        config: &Self::Config,
576        namespace: &str,
577    ) -> Result<bool, RocksDbStoreInternalError> {
578        Self::check_namespace(namespace)?;
579        let mut path_buf = config.path_with_guard.path_buf.clone();
580        path_buf.push(namespace);
581        let test = std::path::Path::exists(&path_buf);
582        Ok(test)
583    }
584
585    async fn create(
586        config: &Self::Config,
587        namespace: &str,
588    ) -> Result<(), RocksDbStoreInternalError> {
589        Self::check_namespace(namespace)?;
590        let mut path_buf = config.path_with_guard.path_buf.clone();
591        path_buf.push(namespace);
592        if std::path::Path::exists(&path_buf) {
593            return Err(RocksDbStoreInternalError::StoreAlreadyExists);
594        }
595        std::fs::create_dir_all(path_buf)?;
596        Ok(())
597    }
598
599    async fn delete(
600        config: &Self::Config,
601        namespace: &str,
602    ) -> Result<(), RocksDbStoreInternalError> {
603        Self::check_namespace(namespace)?;
604        let mut path_buf = config.path_with_guard.path_buf.clone();
605        path_buf.push(namespace);
606        let path = path_buf.as_path();
607        std::fs::remove_dir_all(path)?;
608        Ok(())
609    }
610}
611
612#[cfg(with_testing)]
613impl TestKeyValueDatabase for RocksDbDatabaseInternal {
614    async fn new_test_config() -> Result<RocksDbStoreInternalConfig, RocksDbStoreInternalError> {
615        let path_with_guard = PathWithGuard::new_testing();
616        let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
617        let max_stream_queries = TEST_ROCKS_DB_MAX_STREAM_QUERIES;
618        Ok(RocksDbStoreInternalConfig {
619            path_with_guard,
620            spawn_mode,
621            max_stream_queries,
622        })
623    }
624}
625
626/// The error type for [`RocksDbStoreInternal`]
627#[derive(Error, Debug)]
628pub enum RocksDbStoreInternalError {
629    /// Store already exists
630    #[error("Store already exists")]
631    StoreAlreadyExists,
632
633    /// Tokio join error in RocksDB.
634    #[error("tokio join error: {0}")]
635    TokioJoinError(#[from] tokio::task::JoinError),
636
637    /// RocksDB error.
638    #[error("RocksDB error: {0}")]
639    RocksDb(#[from] rocksdb::Error),
640
641    /// The database contains a file which is not a directory
642    #[error("Namespaces should be directories")]
643    NonDirectoryNamespace,
644
645    /// Error converting `OsString` to `String`
646    #[error("error in the conversion from OsString: {0:?}")]
647    IntoStringError(OsString),
648
649    /// The key must have at most 8 MiB
650    #[error("The key must have at most 8 MiB")]
651    KeyTooLong,
652
653    /// Namespace contains forbidden characters
654    #[error("Namespace contains forbidden characters")]
655    InvalidNamespace,
656
657    /// Filesystem error
658    #[error("Filesystem error: {0}")]
659    FsError(#[from] std::io::Error),
660
661    /// BCS serialization error.
662    #[error(transparent)]
663    BcsError(#[from] bcs::Error),
664}
665
666/// A path and the guard for the temporary directory if needed
667#[derive(Clone, Debug, Deserialize, Serialize)]
668pub struct PathWithGuard {
669    /// The path to the data
670    pub path_buf: PathBuf,
671    /// The guard for the directory if one is needed
672    #[serde(skip)]
673    _dir: Option<Arc<TempDir>>,
674}
675
676impl PathWithGuard {
677    /// Creates a `PathWithGuard` from an existing path.
678    pub fn new(path_buf: PathBuf) -> Self {
679        Self {
680            path_buf,
681            _dir: None,
682        }
683    }
684
685    /// Returns the test path for RocksDB without common config.
686    #[cfg(with_testing)]
687    fn new_testing() -> PathWithGuard {
688        let dir = TempDir::new().unwrap();
689        let path_buf = dir.path().to_path_buf();
690        let _dir = Some(Arc::new(dir));
691        PathWithGuard { path_buf, _dir }
692    }
693}
694
695impl PartialEq for PathWithGuard {
696    fn eq(&self, other: &Self) -> bool {
697        self.path_buf == other.path_buf
698    }
699}
700impl Eq for PathWithGuard {}
701
702impl KeyValueStoreError for RocksDbStoreInternalError {
703    const BACKEND: &'static str = "rocks_db";
704}
705
706/// The composed error type for the `RocksDbStore`
707pub type RocksDbStoreError = ValueSplittingError<RocksDbStoreInternalError>;
708
709/// The composed config type for the `RocksDbStore`
710pub type RocksDbStoreConfig = LruCachingConfig<RocksDbStoreInternalConfig>;
711
712/// The `RocksDbDatabase` composed type with metrics
713#[cfg(with_metrics)]
714pub type RocksDbDatabase = MeteredDatabase<
715    LruCachingDatabase<
716        MeteredDatabase<ValueSplittingDatabase<MeteredDatabase<RocksDbDatabaseInternal>>>,
717    >,
718>;
719/// The `RocksDbDatabase` composed type
720#[cfg(not(with_metrics))]
721pub type RocksDbDatabase = LruCachingDatabase<ValueSplittingDatabase<RocksDbDatabaseInternal>>;