linera_views/backends/
rocks_db.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] for the RocksDB database.
5
6use std::{
7    ffi::OsString,
8    fmt::Display,
9    path::PathBuf,
10    sync::{
11        atomic::{AtomicBool, Ordering},
12        Arc,
13    },
14};
15
16use linera_base::ensure;
17use rocksdb::{BlockBasedOptions, Cache, DBCompactionStyle};
18use serde::{Deserialize, Serialize};
19use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};
20use tempfile::TempDir;
21use thiserror::Error;
22
23#[cfg(with_metrics)]
24use crate::metering::MeteredStore;
25#[cfg(with_testing)]
26use crate::store::TestKeyValueStore;
27use crate::{
28    batch::{Batch, WriteOperation},
29    common::get_upper_bound_option,
30    lru_caching::{LruCachingConfig, LruCachingStore},
31    store::{
32        AdminKeyValueStore, CommonStoreInternalConfig, KeyValueStoreError, ReadableKeyValueStore,
33        WithError, WritableKeyValueStore,
34    },
35    value_splitting::{ValueSplittingError, ValueSplittingStore},
36};
37
38/// The prefixes being used in the system
39static ROOT_KEY_DOMAIN: [u8; 1] = [0];
40static STORED_ROOT_KEYS_PREFIX: u8 = 1;
41
42/// The number of streams for the test
43#[cfg(with_testing)]
44const TEST_ROCKS_DB_MAX_STREAM_QUERIES: usize = 10;
45
46// The maximum size of values in RocksDB is 3 GiB
47// For offset reasons we decrease by 400
48const MAX_VALUE_SIZE: usize = 3 * 1024 * 1024 * 1024 - 400;
49
50// The maximum size of keys in RocksDB is 8 MiB
51// For offset reasons we decrease by 400
52const MAX_KEY_SIZE: usize = 8 * 1024 * 1024 - 400;
53
54const WRITE_BUFFER_SIZE: usize = 256 * 1024 * 1024; // 256 MiB
55const MAX_WRITE_BUFFER_NUMBER: i32 = 6;
56const HYPER_CLOCK_CACHE_BLOCK_SIZE: usize = 8 * 1024; // 8 KiB
57
58/// The RocksDB client that we use.
59type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;
60
61/// The choice of the spawning mode.
62/// `SpawnBlocking` always works and is the safest.
63/// `BlockInPlace` can only be used in multi-threaded environment.
64/// One way to select that is to select BlockInPlace when
65/// `tokio::runtime::Handle::current().metrics().num_workers() > 1`
66/// `BlockInPlace` is documented in <https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html>
67#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize, Serialize)]
68pub enum RocksDbSpawnMode {
69    /// This uses the `spawn_blocking` function of Tokio.
70    SpawnBlocking,
71    /// This uses the `block_in_place` function of Tokio.
72    BlockInPlace,
73}
74
75impl RocksDbSpawnMode {
76    /// Obtains the spawning mode from runtime.
77    pub fn get_spawn_mode_from_runtime() -> Self {
78        if tokio::runtime::Handle::current().metrics().num_workers() > 1 {
79            RocksDbSpawnMode::BlockInPlace
80        } else {
81            RocksDbSpawnMode::SpawnBlocking
82        }
83    }
84
85    /// Runs the computation for a function according to the selected policy.
86    #[inline]
87    async fn spawn<F, I, O>(&self, f: F, input: I) -> Result<O, RocksDbStoreInternalError>
88    where
89        F: FnOnce(I) -> Result<O, RocksDbStoreInternalError> + Send + 'static,
90        I: Send + 'static,
91        O: Send + 'static,
92    {
93        Ok(match self {
94            RocksDbSpawnMode::BlockInPlace => tokio::task::block_in_place(move || f(input))?,
95            RocksDbSpawnMode::SpawnBlocking => {
96                tokio::task::spawn_blocking(move || f(input)).await??
97            }
98        })
99    }
100}
101
102impl Display for RocksDbSpawnMode {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        match &self {
105            RocksDbSpawnMode::SpawnBlocking => write!(f, "spawn_blocking"),
106            RocksDbSpawnMode::BlockInPlace => write!(f, "block_in_place"),
107        }
108    }
109}
110
111fn check_key_size(key: &[u8]) -> Result<(), RocksDbStoreInternalError> {
112    ensure!(
113        key.len() <= MAX_KEY_SIZE,
114        RocksDbStoreInternalError::KeyTooLong
115    );
116    Ok(())
117}
118
119#[derive(Clone)]
120struct RocksDbStoreExecutor {
121    db: Arc<DB>,
122    start_key: Vec<u8>,
123}
124
125impl RocksDbStoreExecutor {
126    pub fn contains_keys_internal(
127        &self,
128        keys: Vec<Vec<u8>>,
129    ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
130        let size = keys.len();
131        let mut results = vec![false; size];
132        let mut indices = Vec::new();
133        let mut keys_red = Vec::new();
134        for (i, key) in keys.into_iter().enumerate() {
135            check_key_size(&key)?;
136            let mut full_key = self.start_key.to_vec();
137            full_key.extend(key);
138            if self.db.key_may_exist(&full_key) {
139                indices.push(i);
140                keys_red.push(full_key);
141            }
142        }
143        let values_red = self.db.multi_get(keys_red);
144        for (index, value) in indices.into_iter().zip(values_red) {
145            results[index] = value?.is_some();
146        }
147        Ok(results)
148    }
149
150    fn read_multi_values_bytes_internal(
151        &self,
152        keys: Vec<Vec<u8>>,
153    ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
154        for key in &keys {
155            check_key_size(key)?;
156        }
157        let full_keys = keys
158            .into_iter()
159            .map(|key| {
160                let mut full_key = self.start_key.to_vec();
161                full_key.extend(key);
162                full_key
163            })
164            .collect::<Vec<_>>();
165        let entries = self.db.multi_get(&full_keys);
166        Ok(entries.into_iter().collect::<Result<_, _>>()?)
167    }
168
169    fn find_keys_by_prefix_internal(
170        &self,
171        key_prefix: Vec<u8>,
172    ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
173        check_key_size(&key_prefix)?;
174        let mut prefix = self.start_key.clone();
175        prefix.extend(key_prefix);
176        let len = prefix.len();
177        let mut iter = self.db.raw_iterator();
178        let mut keys = Vec::new();
179        iter.seek(&prefix);
180        let mut next_key = iter.key();
181        while let Some(key) = next_key {
182            if !key.starts_with(&prefix) {
183                break;
184            }
185            keys.push(key[len..].to_vec());
186            iter.next();
187            next_key = iter.key();
188        }
189        Ok(keys)
190    }
191
192    #[expect(clippy::type_complexity)]
193    fn find_key_values_by_prefix_internal(
194        &self,
195        key_prefix: Vec<u8>,
196    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, RocksDbStoreInternalError> {
197        check_key_size(&key_prefix)?;
198        let mut prefix = self.start_key.clone();
199        prefix.extend(key_prefix);
200        let len = prefix.len();
201        let mut iter = self.db.raw_iterator();
202        let mut key_values = Vec::new();
203        iter.seek(&prefix);
204        let mut next_key = iter.key();
205        while let Some(key) = next_key {
206            if !key.starts_with(&prefix) {
207                break;
208            }
209            if let Some(value) = iter.value() {
210                let key_value = (key[len..].to_vec(), value.to_vec());
211                key_values.push(key_value);
212            }
213            iter.next();
214            next_key = iter.key();
215        }
216        Ok(key_values)
217    }
218
219    fn write_batch_internal(
220        &self,
221        batch: Batch,
222        write_root_key: bool,
223    ) -> Result<(), RocksDbStoreInternalError> {
224        let mut inner_batch = rocksdb::WriteBatchWithTransaction::default();
225        for operation in batch.operations {
226            match operation {
227                WriteOperation::Delete { key } => {
228                    check_key_size(&key)?;
229                    let mut full_key = self.start_key.to_vec();
230                    full_key.extend(key);
231                    inner_batch.delete(&full_key)
232                }
233                WriteOperation::Put { key, value } => {
234                    check_key_size(&key)?;
235                    let mut full_key = self.start_key.to_vec();
236                    full_key.extend(key);
237                    inner_batch.put(&full_key, value)
238                }
239                WriteOperation::DeletePrefix { key_prefix } => {
240                    check_key_size(&key_prefix)?;
241                    let mut full_key1 = self.start_key.to_vec();
242                    full_key1.extend(&key_prefix);
243                    let full_key2 =
244                        get_upper_bound_option(&full_key1).expect("the first entry cannot be 255");
245                    inner_batch.delete_range(&full_key1, &full_key2);
246                }
247            }
248        }
249        if write_root_key {
250            let mut full_key = self.start_key.to_vec();
251            full_key[0] = STORED_ROOT_KEYS_PREFIX;
252            inner_batch.put(&full_key, vec![]);
253        }
254        self.db.write(inner_batch)?;
255        Ok(())
256    }
257}
258
259/// The inner client
260#[derive(Clone)]
261pub struct RocksDbStoreInternal {
262    executor: RocksDbStoreExecutor,
263    _path_with_guard: PathWithGuard,
264    max_stream_queries: usize,
265    spawn_mode: RocksDbSpawnMode,
266    root_key_written: Arc<AtomicBool>,
267}
268
269/// The initial configuration of the system
270#[derive(Clone, Debug, Deserialize, Serialize)]
271pub struct RocksDbStoreInternalConfig {
272    /// The path to the storage containing the namespaces
273    pub path_with_guard: PathWithGuard,
274    /// The chosen spawn mode
275    spawn_mode: RocksDbSpawnMode,
276    /// The common configuration of the key value store
277    common_config: CommonStoreInternalConfig,
278}
279
280impl RocksDbStoreInternal {
281    fn check_namespace(namespace: &str) -> Result<(), RocksDbStoreInternalError> {
282        if !namespace
283            .chars()
284            .all(|character| character.is_ascii_alphanumeric() || character == '_')
285        {
286            return Err(RocksDbStoreInternalError::InvalidNamespace);
287        }
288        Ok(())
289    }
290
291    fn build(
292        config: &RocksDbStoreInternalConfig,
293        namespace: &str,
294        start_key: Vec<u8>,
295    ) -> Result<RocksDbStoreInternal, RocksDbStoreInternalError> {
296        Self::check_namespace(namespace)?;
297        let mut path_buf = config.path_with_guard.path_buf.clone();
298        let mut path_with_guard = config.path_with_guard.clone();
299        path_buf.push(namespace);
300        path_with_guard.path_buf = path_buf.clone();
301        let max_stream_queries = config.common_config.max_stream_queries;
302        let spawn_mode = config.spawn_mode;
303        if !std::path::Path::exists(&path_buf) {
304            std::fs::create_dir(path_buf.clone())?;
305        }
306        let sys = System::new_with_specifics(
307            RefreshKind::nothing()
308                .with_cpu(CpuRefreshKind::everything())
309                .with_memory(MemoryRefreshKind::nothing().with_ram()),
310        );
311        let num_cpus = sys.cpus().len() as i32;
312        let total_ram = sys.total_memory() as usize;
313        let mut options = rocksdb::Options::default();
314        options.create_if_missing(true);
315        options.create_missing_column_families(true);
316        // Flush in-memory buffer to disk more often
317        options.set_write_buffer_size(WRITE_BUFFER_SIZE);
318        options.set_max_write_buffer_number(MAX_WRITE_BUFFER_NUMBER);
319        options.set_compression_type(rocksdb::DBCompressionType::Lz4);
320        options.set_level_zero_slowdown_writes_trigger(8);
321        options.set_level_zero_stop_writes_trigger(12);
322        options.set_level_zero_file_num_compaction_trigger(2);
323        // We deliberately give RocksDB one background thread *per* CPU so that
324        // flush + (N-1) compactions can hammer the NVMe at full bandwidth while
325        // still leaving enough CPU time for the foreground application threads.
326        options.increase_parallelism(num_cpus);
327        options.set_max_background_jobs(num_cpus);
328        options.set_max_subcompactions(num_cpus as u32);
329        options.set_level_compaction_dynamic_level_bytes(true);
330
331        options.set_compaction_style(DBCompactionStyle::Level);
332        options.set_target_file_size_base(2 * WRITE_BUFFER_SIZE as u64);
333
334        let mut block_options = BlockBasedOptions::default();
335        block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
336        block_options.set_cache_index_and_filter_blocks(true);
337        // Allocate 1/4 of total RAM for RocksDB block cache, which is a reasonable balance:
338        // - Large enough to significantly improve read performance by caching frequently accessed blocks
339        // - Small enough to leave memory for other system components
340        // - Follows common practice for database caching in server environments
341        // - Prevents excessive memory pressure that could lead to swapping or OOM conditions
342        block_options.set_block_cache(&Cache::new_hyper_clock_cache(
343            total_ram / 4,
344            HYPER_CLOCK_CACHE_BLOCK_SIZE,
345        ));
346        options.set_block_based_table_factory(&block_options);
347
348        let db = DB::open(&options, path_buf)?;
349        let executor = RocksDbStoreExecutor {
350            db: Arc::new(db),
351            start_key,
352        };
353        Ok(RocksDbStoreInternal {
354            executor,
355            _path_with_guard: path_with_guard,
356            max_stream_queries,
357            spawn_mode,
358            root_key_written: Arc::new(AtomicBool::new(false)),
359        })
360    }
361}
362
363impl WithError for RocksDbStoreInternal {
364    type Error = RocksDbStoreInternalError;
365}
366
367impl ReadableKeyValueStore for RocksDbStoreInternal {
368    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
369    type Keys = Vec<Vec<u8>>;
370    type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
371
372    fn max_stream_queries(&self) -> usize {
373        self.max_stream_queries
374    }
375
376    async fn read_value_bytes(
377        &self,
378        key: &[u8],
379    ) -> Result<Option<Vec<u8>>, RocksDbStoreInternalError> {
380        check_key_size(key)?;
381        let db = self.executor.db.clone();
382        let mut full_key = self.executor.start_key.to_vec();
383        full_key.extend(key);
384        self.spawn_mode
385            .spawn(move |x| Ok(db.get(&x)?), full_key)
386            .await
387    }
388
389    async fn contains_key(&self, key: &[u8]) -> Result<bool, RocksDbStoreInternalError> {
390        check_key_size(key)?;
391        let db = self.executor.db.clone();
392        let mut full_key = self.executor.start_key.to_vec();
393        full_key.extend(key);
394        self.spawn_mode
395            .spawn(
396                move |x| {
397                    if !db.key_may_exist(&x) {
398                        return Ok(false);
399                    }
400                    Ok(db.get(&x)?.is_some())
401                },
402                full_key,
403            )
404            .await
405    }
406
407    async fn contains_keys(
408        &self,
409        keys: Vec<Vec<u8>>,
410    ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
411        let executor = self.executor.clone();
412        self.spawn_mode
413            .spawn(move |x| executor.contains_keys_internal(x), keys)
414            .await
415    }
416
417    async fn read_multi_values_bytes(
418        &self,
419        keys: Vec<Vec<u8>>,
420    ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
421        let executor = self.executor.clone();
422        self.spawn_mode
423            .spawn(move |x| executor.read_multi_values_bytes_internal(x), keys)
424            .await
425    }
426
427    async fn find_keys_by_prefix(
428        &self,
429        key_prefix: &[u8],
430    ) -> Result<Self::Keys, RocksDbStoreInternalError> {
431        let executor = self.executor.clone();
432        let key_prefix = key_prefix.to_vec();
433        self.spawn_mode
434            .spawn(
435                move |x| executor.find_keys_by_prefix_internal(x),
436                key_prefix,
437            )
438            .await
439    }
440
441    async fn find_key_values_by_prefix(
442        &self,
443        key_prefix: &[u8],
444    ) -> Result<Self::KeyValues, RocksDbStoreInternalError> {
445        let executor = self.executor.clone();
446        let key_prefix = key_prefix.to_vec();
447        self.spawn_mode
448            .spawn(
449                move |x| executor.find_key_values_by_prefix_internal(x),
450                key_prefix,
451            )
452            .await
453    }
454}
455
456impl WritableKeyValueStore for RocksDbStoreInternal {
457    const MAX_VALUE_SIZE: usize = MAX_VALUE_SIZE;
458
459    async fn write_batch(&self, batch: Batch) -> Result<(), RocksDbStoreInternalError> {
460        let write_root_key = !self.root_key_written.fetch_or(true, Ordering::SeqCst);
461        let executor = self.executor.clone();
462        self.spawn_mode
463            .spawn(
464                move |x| executor.write_batch_internal(x, write_root_key),
465                batch,
466            )
467            .await
468    }
469
470    async fn clear_journal(&self) -> Result<(), RocksDbStoreInternalError> {
471        Ok(())
472    }
473}
474
475impl AdminKeyValueStore for RocksDbStoreInternal {
476    type Config = RocksDbStoreInternalConfig;
477
478    fn get_name() -> String {
479        "rocksdb internal".to_string()
480    }
481
482    async fn connect(
483        config: &Self::Config,
484        namespace: &str,
485    ) -> Result<Self, RocksDbStoreInternalError> {
486        let start_key = ROOT_KEY_DOMAIN.to_vec();
487        RocksDbStoreInternal::build(config, namespace, start_key)
488    }
489
490    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, RocksDbStoreInternalError> {
491        let mut store = self.clone();
492        let mut start_key = ROOT_KEY_DOMAIN.to_vec();
493        start_key.extend(root_key);
494        store.executor.start_key = start_key;
495        store.root_key_written = Arc::new(AtomicBool::new(false));
496        Ok(store)
497    }
498
499    async fn list_all(config: &Self::Config) -> Result<Vec<String>, RocksDbStoreInternalError> {
500        let entries = std::fs::read_dir(config.path_with_guard.path_buf.clone())?;
501        let mut namespaces = Vec::new();
502        for entry in entries {
503            let entry = entry?;
504            if !entry.file_type()?.is_dir() {
505                return Err(RocksDbStoreInternalError::NonDirectoryNamespace);
506            }
507            let namespace = match entry.file_name().into_string() {
508                Err(error) => {
509                    return Err(RocksDbStoreInternalError::IntoStringError(error));
510                }
511                Ok(namespace) => namespace,
512            };
513            namespaces.push(namespace);
514        }
515        Ok(namespaces)
516    }
517
518    async fn list_root_keys(
519        config: &Self::Config,
520        namespace: &str,
521    ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
522        let start_key = vec![STORED_ROOT_KEYS_PREFIX];
523        let store = RocksDbStoreInternal::build(config, namespace, start_key)?;
524        store.find_keys_by_prefix(&[]).await
525    }
526
527    async fn delete_all(config: &Self::Config) -> Result<(), RocksDbStoreInternalError> {
528        let namespaces = RocksDbStoreInternal::list_all(config).await?;
529        for namespace in namespaces {
530            let mut path_buf = config.path_with_guard.path_buf.clone();
531            path_buf.push(&namespace);
532            std::fs::remove_dir_all(path_buf.as_path())?;
533        }
534        Ok(())
535    }
536
537    async fn exists(
538        config: &Self::Config,
539        namespace: &str,
540    ) -> Result<bool, RocksDbStoreInternalError> {
541        Self::check_namespace(namespace)?;
542        let mut path_buf = config.path_with_guard.path_buf.clone();
543        path_buf.push(namespace);
544        let test = std::path::Path::exists(&path_buf);
545        Ok(test)
546    }
547
548    async fn create(
549        config: &Self::Config,
550        namespace: &str,
551    ) -> Result<(), RocksDbStoreInternalError> {
552        Self::check_namespace(namespace)?;
553        let mut path_buf = config.path_with_guard.path_buf.clone();
554        path_buf.push(namespace);
555        if std::path::Path::exists(&path_buf) {
556            return Err(RocksDbStoreInternalError::StoreAlreadyExists);
557        }
558        std::fs::create_dir_all(path_buf)?;
559        Ok(())
560    }
561
562    async fn delete(
563        config: &Self::Config,
564        namespace: &str,
565    ) -> Result<(), RocksDbStoreInternalError> {
566        Self::check_namespace(namespace)?;
567        let mut path_buf = config.path_with_guard.path_buf.clone();
568        path_buf.push(namespace);
569        let path = path_buf.as_path();
570        std::fs::remove_dir_all(path)?;
571        Ok(())
572    }
573}
574
575#[cfg(with_testing)]
576impl TestKeyValueStore for RocksDbStoreInternal {
577    async fn new_test_config() -> Result<RocksDbStoreInternalConfig, RocksDbStoreInternalError> {
578        let path_with_guard = PathWithGuard::new_testing();
579        let common_config = CommonStoreInternalConfig {
580            max_concurrent_queries: None,
581            max_stream_queries: TEST_ROCKS_DB_MAX_STREAM_QUERIES,
582            replication_factor: 1,
583        };
584        let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
585        Ok(RocksDbStoreInternalConfig {
586            path_with_guard,
587            spawn_mode,
588            common_config,
589        })
590    }
591}
592
593/// The error type for [`RocksDbStoreInternal`]
594#[derive(Error, Debug)]
595pub enum RocksDbStoreInternalError {
596    /// Store already exists
597    #[error("Store already exists")]
598    StoreAlreadyExists,
599
600    /// Tokio join error in RocksDB.
601    #[error("tokio join error: {0}")]
602    TokioJoinError(#[from] tokio::task::JoinError),
603
604    /// RocksDB error.
605    #[error("RocksDB error: {0}")]
606    RocksDb(#[from] rocksdb::Error),
607
608    /// The database contains a file which is not a directory
609    #[error("Namespaces should be directories")]
610    NonDirectoryNamespace,
611
612    /// Error converting `OsString` to `String`
613    #[error("error in the conversion from OsString: {0:?}")]
614    IntoStringError(OsString),
615
616    /// The key must have at most 8 MiB
617    #[error("The key must have at most 8 MiB")]
618    KeyTooLong,
619
620    /// Namespace contains forbidden characters
621    #[error("Namespace contains forbidden characters")]
622    InvalidNamespace,
623
624    /// Filesystem error
625    #[error("Filesystem error: {0}")]
626    FsError(#[from] std::io::Error),
627
628    /// BCS serialization error.
629    #[error(transparent)]
630    BcsError(#[from] bcs::Error),
631}
632
633/// A path and the guard for the temporary directory if needed
634#[derive(Clone, Debug, Deserialize, Serialize)]
635pub struct PathWithGuard {
636    /// The path to the data
637    pub path_buf: PathBuf,
638    /// The guard for the directory if one is needed
639    #[serde(skip)]
640    _dir: Option<Arc<TempDir>>,
641}
642
643impl PathWithGuard {
644    /// Creates a `PathWithGuard` from an existing path.
645    pub fn new(path_buf: PathBuf) -> Self {
646        Self {
647            path_buf,
648            _dir: None,
649        }
650    }
651
652    /// Returns the test path for RocksDB without common config.
653    #[cfg(with_testing)]
654    fn new_testing() -> PathWithGuard {
655        let dir = TempDir::new().unwrap();
656        let path_buf = dir.path().to_path_buf();
657        let _dir = Some(Arc::new(dir));
658        PathWithGuard { path_buf, _dir }
659    }
660}
661
662impl PartialEq for PathWithGuard {
663    fn eq(&self, other: &Self) -> bool {
664        self.path_buf == other.path_buf
665    }
666}
667impl Eq for PathWithGuard {}
668
669impl KeyValueStoreError for RocksDbStoreInternalError {
670    const BACKEND: &'static str = "rocks_db";
671}
672
673/// The `RocksDbStore` composed type with metrics
674#[cfg(with_metrics)]
675pub type RocksDbStore = MeteredStore<
676    LruCachingStore<MeteredStore<ValueSplittingStore<MeteredStore<RocksDbStoreInternal>>>>,
677>;
678
679/// The `RocksDbStore` composed type
680#[cfg(not(with_metrics))]
681pub type RocksDbStore = LruCachingStore<ValueSplittingStore<RocksDbStoreInternal>>;
682
683/// The composed error type for the `RocksDbStore`
684pub type RocksDbStoreError = ValueSplittingError<RocksDbStoreInternalError>;
685
686/// The composed config type for the `RocksDbStore`
687pub type RocksDbStoreConfig = LruCachingConfig<RocksDbStoreInternalConfig>;
688
689impl RocksDbStoreConfig {
690    /// Creates a new `RocksDbStoreConfig` from the input.
691    pub fn new(
692        spawn_mode: RocksDbSpawnMode,
693        path_with_guard: PathWithGuard,
694        common_config: crate::store::CommonStoreConfig,
695    ) -> RocksDbStoreConfig {
696        let inner_config = RocksDbStoreInternalConfig {
697            path_with_guard,
698            spawn_mode,
699            common_config: common_config.reduced(),
700        };
701        RocksDbStoreConfig {
702            inner_config,
703            storage_cache_config: common_config.storage_cache_config,
704        }
705    }
706}