linera_views/backends/
rocks_db.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] for the RocksDB database.
5
6use std::{
7    ffi::OsString,
8    fmt::Display,
9    path::PathBuf,
10    sync::{
11        atomic::{AtomicBool, Ordering},
12        Arc,
13    },
14};
15
16use linera_base::ensure;
17use rocksdb::{BlockBasedOptions, Cache, DBCompactionStyle, SliceTransform};
18use serde::{Deserialize, Serialize};
19use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};
20use tempfile::TempDir;
21use thiserror::Error;
22
23#[cfg(with_metrics)]
24use crate::metering::MeteredDatabase;
25#[cfg(with_testing)]
26use crate::store::TestKeyValueDatabase;
27use crate::{
28    batch::{Batch, WriteOperation},
29    common::get_upper_bound_option,
30    lru_caching::{LruCachingConfig, LruCachingDatabase},
31    store::{
32        KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
33        WritableKeyValueStore,
34    },
35    value_splitting::{ValueSplittingDatabase, ValueSplittingError},
36};
37
38/// The prefixes being used in the system
39static ROOT_KEY_DOMAIN: [u8; 1] = [0];
40static STORED_ROOT_KEYS_PREFIX: u8 = 1;
41
42/// The number of streams for the test
43#[cfg(with_testing)]
44const TEST_ROCKS_DB_MAX_STREAM_QUERIES: usize = 10;
45
46// The maximum size of values in RocksDB is 3 GiB
47// For offset reasons we decrease by 400
48const MAX_VALUE_SIZE: usize = 3 * 1024 * 1024 * 1024 - 400;
49
50// The maximum size of keys in RocksDB is 8 MiB
51// For offset reasons we decrease by 400
52const MAX_KEY_SIZE: usize = 8 * 1024 * 1024 - 400;
53
54const WRITE_BUFFER_SIZE: usize = 256 * 1024 * 1024; // 256 MiB
55const MAX_WRITE_BUFFER_NUMBER: i32 = 6;
56const HYPER_CLOCK_CACHE_BLOCK_SIZE: usize = 8 * 1024; // 8 KiB
57
58/// The RocksDB client that we use.
59type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;
60
61/// The choice of the spawning mode.
62/// `SpawnBlocking` always works and is the safest.
63/// `BlockInPlace` can only be used in multi-threaded environment.
64/// One way to select that is to select BlockInPlace when
65/// `tokio::runtime::Handle::current().metrics().num_workers() > 1`
66/// `BlockInPlace` is documented in <https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html>
67#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize, Serialize)]
68pub enum RocksDbSpawnMode {
69    /// This uses the `spawn_blocking` function of Tokio.
70    SpawnBlocking,
71    /// This uses the `block_in_place` function of Tokio.
72    BlockInPlace,
73}
74
75impl RocksDbSpawnMode {
76    /// Obtains the spawning mode from runtime.
77    pub fn get_spawn_mode_from_runtime() -> Self {
78        if tokio::runtime::Handle::current().metrics().num_workers() > 1 {
79            RocksDbSpawnMode::BlockInPlace
80        } else {
81            RocksDbSpawnMode::SpawnBlocking
82        }
83    }
84
85    /// Runs the computation for a function according to the selected policy.
86    #[inline]
87    async fn spawn<F, I, O>(&self, f: F, input: I) -> Result<O, RocksDbStoreInternalError>
88    where
89        F: FnOnce(I) -> Result<O, RocksDbStoreInternalError> + Send + 'static,
90        I: Send + 'static,
91        O: Send + 'static,
92    {
93        Ok(match self {
94            RocksDbSpawnMode::BlockInPlace => tokio::task::block_in_place(move || f(input))?,
95            RocksDbSpawnMode::SpawnBlocking => {
96                tokio::task::spawn_blocking(move || f(input)).await??
97            }
98        })
99    }
100}
101
102impl Display for RocksDbSpawnMode {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        match &self {
105            RocksDbSpawnMode::SpawnBlocking => write!(f, "spawn_blocking"),
106            RocksDbSpawnMode::BlockInPlace => write!(f, "block_in_place"),
107        }
108    }
109}
110
111fn check_key_size(key: &[u8]) -> Result<(), RocksDbStoreInternalError> {
112    ensure!(
113        key.len() <= MAX_KEY_SIZE,
114        RocksDbStoreInternalError::KeyTooLong
115    );
116    Ok(())
117}
118
119#[derive(Clone)]
120struct RocksDbStoreExecutor {
121    db: Arc<DB>,
122    start_key: Vec<u8>,
123}
124
125impl RocksDbStoreExecutor {
126    fn contains_keys_internal(
127        &self,
128        keys: Vec<Vec<u8>>,
129    ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
130        let size = keys.len();
131        let mut results = vec![false; size];
132        let mut indices = Vec::new();
133        let mut keys_red = Vec::new();
134        for (i, key) in keys.into_iter().enumerate() {
135            check_key_size(&key)?;
136            let mut full_key = self.start_key.to_vec();
137            full_key.extend(key);
138            if self.db.key_may_exist(&full_key) {
139                indices.push(i);
140                keys_red.push(full_key);
141            }
142        }
143        let values_red = self.db.multi_get(keys_red);
144        for (index, value) in indices.into_iter().zip(values_red) {
145            results[index] = value?.is_some();
146        }
147        Ok(results)
148    }
149
150    fn read_multi_values_bytes_internal(
151        &self,
152        keys: Vec<Vec<u8>>,
153    ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
154        for key in &keys {
155            check_key_size(key)?;
156        }
157        let full_keys = keys
158            .into_iter()
159            .map(|key| {
160                let mut full_key = self.start_key.to_vec();
161                full_key.extend(key);
162                full_key
163            })
164            .collect::<Vec<_>>();
165        let entries = self.db.multi_get(&full_keys);
166        Ok(entries.into_iter().collect::<Result<_, _>>()?)
167    }
168
169    fn get_find_prefix_iterator(&self, prefix: &[u8]) -> rocksdb::DBRawIteratorWithThreadMode<DB> {
170        // Configure ReadOptions optimized for SSDs and iterator performance
171        let mut read_opts = rocksdb::ReadOptions::default();
172        // Enable async I/O for better concurrency
173        read_opts.set_async_io(true);
174
175        // Set precise upper bound to minimize key traversal
176        let upper_bound = get_upper_bound_option(prefix);
177        if let Some(upper_bound) = upper_bound {
178            read_opts.set_iterate_upper_bound(upper_bound);
179        }
180
181        let mut iter = self.db.raw_iterator_opt(read_opts);
182        iter.seek(prefix);
183        iter
184    }
185
186    fn find_keys_by_prefix_internal(
187        &self,
188        key_prefix: Vec<u8>,
189    ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
190        check_key_size(&key_prefix)?;
191
192        let mut prefix = self.start_key.clone();
193        prefix.extend(key_prefix);
194        let len = prefix.len();
195
196        let mut iter = self.get_find_prefix_iterator(&prefix);
197        let mut keys = Vec::new();
198        while let Some(key) = iter.key() {
199            keys.push(key[len..].to_vec());
200            iter.next();
201        }
202        Ok(keys)
203    }
204
205    #[expect(clippy::type_complexity)]
206    fn find_key_values_by_prefix_internal(
207        &self,
208        key_prefix: Vec<u8>,
209    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, RocksDbStoreInternalError> {
210        check_key_size(&key_prefix)?;
211        let mut prefix = self.start_key.clone();
212        prefix.extend(key_prefix);
213        let len = prefix.len();
214
215        let mut iter = self.get_find_prefix_iterator(&prefix);
216        let mut key_values = Vec::new();
217        while let Some((key, value)) = iter.item() {
218            let key_value = (key[len..].to_vec(), value.to_vec());
219            key_values.push(key_value);
220            iter.next();
221        }
222        Ok(key_values)
223    }
224
225    fn write_batch_internal(
226        &self,
227        batch: Batch,
228        write_root_key: bool,
229    ) -> Result<(), RocksDbStoreInternalError> {
230        let mut inner_batch = rocksdb::WriteBatchWithTransaction::default();
231        for operation in batch.operations {
232            match operation {
233                WriteOperation::Delete { key } => {
234                    check_key_size(&key)?;
235                    let mut full_key = self.start_key.to_vec();
236                    full_key.extend(key);
237                    inner_batch.delete(&full_key)
238                }
239                WriteOperation::Put { key, value } => {
240                    check_key_size(&key)?;
241                    let mut full_key = self.start_key.to_vec();
242                    full_key.extend(key);
243                    inner_batch.put(&full_key, value)
244                }
245                WriteOperation::DeletePrefix { key_prefix } => {
246                    check_key_size(&key_prefix)?;
247                    let mut full_key1 = self.start_key.to_vec();
248                    full_key1.extend(&key_prefix);
249                    let full_key2 =
250                        get_upper_bound_option(&full_key1).expect("the first entry cannot be 255");
251                    inner_batch.delete_range(&full_key1, &full_key2);
252                }
253            }
254        }
255        if write_root_key {
256            let mut full_key = self.start_key.to_vec();
257            full_key[0] = STORED_ROOT_KEYS_PREFIX;
258            inner_batch.put(&full_key, vec![]);
259        }
260        self.db.write(inner_batch)?;
261        Ok(())
262    }
263}
264
265/// The inner client
266#[derive(Clone)]
267pub struct RocksDbStoreInternal {
268    executor: RocksDbStoreExecutor,
269    _path_with_guard: PathWithGuard,
270    max_stream_queries: usize,
271    spawn_mode: RocksDbSpawnMode,
272    root_key_written: Arc<AtomicBool>,
273}
274
275/// Database-level connection to RocksDB for managing namespaces and partitions.
276#[derive(Clone)]
277pub struct RocksDbDatabaseInternal {
278    executor: RocksDbStoreExecutor,
279    _path_with_guard: PathWithGuard,
280    max_stream_queries: usize,
281    spawn_mode: RocksDbSpawnMode,
282}
283
284impl WithError for RocksDbDatabaseInternal {
285    type Error = RocksDbStoreInternalError;
286}
287
288/// The initial configuration of the system
289#[derive(Clone, Debug, Deserialize, Serialize)]
290pub struct RocksDbStoreInternalConfig {
291    /// The path to the storage containing the namespaces
292    pub path_with_guard: PathWithGuard,
293    /// The chosen spawn mode
294    pub spawn_mode: RocksDbSpawnMode,
295    /// Preferred buffer size for async streams.
296    pub max_stream_queries: usize,
297}
298
299impl RocksDbDatabaseInternal {
300    fn check_namespace(namespace: &str) -> Result<(), RocksDbStoreInternalError> {
301        if !namespace
302            .chars()
303            .all(|character| character.is_ascii_alphanumeric() || character == '_')
304        {
305            return Err(RocksDbStoreInternalError::InvalidNamespace);
306        }
307        Ok(())
308    }
309
310    fn build(
311        config: &RocksDbStoreInternalConfig,
312        namespace: &str,
313    ) -> Result<RocksDbDatabaseInternal, RocksDbStoreInternalError> {
314        let start_key = ROOT_KEY_DOMAIN.to_vec();
315        // Create a store to extract its executor and configuration
316        let temp_store = RocksDbStoreInternal::build(config, namespace, start_key)?;
317        Ok(RocksDbDatabaseInternal {
318            executor: temp_store.executor,
319            _path_with_guard: temp_store._path_with_guard,
320            max_stream_queries: temp_store.max_stream_queries,
321            spawn_mode: temp_store.spawn_mode,
322        })
323    }
324}
325
326impl RocksDbStoreInternal {
327    fn build(
328        config: &RocksDbStoreInternalConfig,
329        namespace: &str,
330        start_key: Vec<u8>,
331    ) -> Result<RocksDbStoreInternal, RocksDbStoreInternalError> {
332        RocksDbDatabaseInternal::check_namespace(namespace)?;
333        let mut path_buf = config.path_with_guard.path_buf.clone();
334        let mut path_with_guard = config.path_with_guard.clone();
335        path_buf.push(namespace);
336        path_with_guard.path_buf = path_buf.clone();
337        let max_stream_queries = config.max_stream_queries;
338        let spawn_mode = config.spawn_mode;
339        if !std::path::Path::exists(&path_buf) {
340            std::fs::create_dir(path_buf.clone())?;
341        }
342        let sys = System::new_with_specifics(
343            RefreshKind::nothing()
344                .with_cpu(CpuRefreshKind::everything())
345                .with_memory(MemoryRefreshKind::nothing().with_ram()),
346        );
347        let num_cpus = sys.cpus().len() as i32;
348        let total_ram = sys.total_memory() as usize;
349        let mut options = rocksdb::Options::default();
350        options.create_if_missing(true);
351        options.create_missing_column_families(true);
352        // Flush in-memory buffer to disk more often
353        options.set_write_buffer_size(WRITE_BUFFER_SIZE);
354        options.set_max_write_buffer_number(MAX_WRITE_BUFFER_NUMBER);
355        options.set_compression_type(rocksdb::DBCompressionType::Lz4);
356        options.set_level_zero_slowdown_writes_trigger(8);
357        options.set_level_zero_stop_writes_trigger(12);
358        options.set_level_zero_file_num_compaction_trigger(2);
359        // We deliberately give RocksDB one background thread *per* CPU so that
360        // flush + (N-1) compactions can hammer the NVMe at full bandwidth while
361        // still leaving enough CPU time for the foreground application threads.
362        options.increase_parallelism(num_cpus);
363        options.set_max_background_jobs(num_cpus);
364        options.set_max_subcompactions(num_cpus as u32);
365        options.set_level_compaction_dynamic_level_bytes(true);
366
367        options.set_compaction_style(DBCompactionStyle::Level);
368        options.set_target_file_size_base(2 * WRITE_BUFFER_SIZE as u64);
369
370        let mut block_options = BlockBasedOptions::default();
371        block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
372        block_options.set_cache_index_and_filter_blocks(true);
373        // Allocate 1/4 of total RAM for RocksDB block cache, which is a reasonable balance:
374        // - Large enough to significantly improve read performance by caching frequently accessed blocks
375        // - Small enough to leave memory for other system components
376        // - Follows common practice for database caching in server environments
377        // - Prevents excessive memory pressure that could lead to swapping or OOM conditions
378        block_options.set_block_cache(&Cache::new_hyper_clock_cache(
379            total_ram / 4,
380            HYPER_CLOCK_CACHE_BLOCK_SIZE,
381        ));
382
383        // Configure bloom filters for prefix iteration optimization
384        block_options.set_bloom_filter(10.0, false);
385        block_options.set_whole_key_filtering(false);
386
387        // 32KB blocks instead of default 4KB - reduces iterator seeks
388        block_options.set_block_size(32 * 1024);
389        // Use latest format for better compression and performance
390        block_options.set_format_version(5);
391
392        options.set_block_based_table_factory(&block_options);
393
394        // Configure prefix extraction for bloom filter optimization
395        // Use 8 bytes: ROOT_KEY_DOMAIN (1 byte) + BCS variant (1-2 bytes) + identifier start (4-5 bytes)
396        let prefix_extractor = SliceTransform::create_fixed_prefix(8);
397        options.set_prefix_extractor(prefix_extractor);
398
399        // 12.5% of memtable size for bloom filter
400        options.set_memtable_prefix_bloom_ratio(0.125);
401        // Skip bloom filter for memtable when key exists
402        options.set_optimize_filters_for_hits(true);
403        // Use memory-mapped files for faster reads
404        options.set_allow_mmap_reads(true);
405        // Don't use random access pattern since we do prefix scans
406        options.set_advise_random_on_open(false);
407
408        let db = DB::open(&options, path_buf)?;
409        let executor = RocksDbStoreExecutor {
410            db: Arc::new(db),
411            start_key,
412        };
413        Ok(RocksDbStoreInternal {
414            executor,
415            _path_with_guard: path_with_guard,
416            max_stream_queries,
417            spawn_mode,
418            root_key_written: Arc::new(AtomicBool::new(false)),
419        })
420    }
421}
422
423impl WithError for RocksDbStoreInternal {
424    type Error = RocksDbStoreInternalError;
425}
426
427impl ReadableKeyValueStore for RocksDbStoreInternal {
428    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
429
430    fn max_stream_queries(&self) -> usize {
431        self.max_stream_queries
432    }
433
434    async fn read_value_bytes(
435        &self,
436        key: &[u8],
437    ) -> Result<Option<Vec<u8>>, RocksDbStoreInternalError> {
438        check_key_size(key)?;
439        let db = self.executor.db.clone();
440        let mut full_key = self.executor.start_key.to_vec();
441        full_key.extend(key);
442        self.spawn_mode
443            .spawn(move |x| Ok(db.get(&x)?), full_key)
444            .await
445    }
446
447    async fn contains_key(&self, key: &[u8]) -> Result<bool, RocksDbStoreInternalError> {
448        check_key_size(key)?;
449        let db = self.executor.db.clone();
450        let mut full_key = self.executor.start_key.to_vec();
451        full_key.extend(key);
452        self.spawn_mode
453            .spawn(
454                move |x| {
455                    if !db.key_may_exist(&x) {
456                        return Ok(false);
457                    }
458                    Ok(db.get(&x)?.is_some())
459                },
460                full_key,
461            )
462            .await
463    }
464
465    async fn contains_keys(
466        &self,
467        keys: Vec<Vec<u8>>,
468    ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
469        let executor = self.executor.clone();
470        self.spawn_mode
471            .spawn(move |x| executor.contains_keys_internal(x), keys)
472            .await
473    }
474
475    async fn read_multi_values_bytes(
476        &self,
477        keys: Vec<Vec<u8>>,
478    ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
479        let executor = self.executor.clone();
480        self.spawn_mode
481            .spawn(move |x| executor.read_multi_values_bytes_internal(x), keys)
482            .await
483    }
484
485    async fn find_keys_by_prefix(
486        &self,
487        key_prefix: &[u8],
488    ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
489        let executor = self.executor.clone();
490        let key_prefix = key_prefix.to_vec();
491        self.spawn_mode
492            .spawn(
493                move |x| executor.find_keys_by_prefix_internal(x),
494                key_prefix,
495            )
496            .await
497    }
498
499    async fn find_key_values_by_prefix(
500        &self,
501        key_prefix: &[u8],
502    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, RocksDbStoreInternalError> {
503        let executor = self.executor.clone();
504        let key_prefix = key_prefix.to_vec();
505        self.spawn_mode
506            .spawn(
507                move |x| executor.find_key_values_by_prefix_internal(x),
508                key_prefix,
509            )
510            .await
511    }
512}
513
514impl WritableKeyValueStore for RocksDbStoreInternal {
515    const MAX_VALUE_SIZE: usize = MAX_VALUE_SIZE;
516
517    async fn write_batch(&self, batch: Batch) -> Result<(), RocksDbStoreInternalError> {
518        let write_root_key = !self.root_key_written.fetch_or(true, Ordering::SeqCst);
519        let executor = self.executor.clone();
520        self.spawn_mode
521            .spawn(
522                move |x| executor.write_batch_internal(x, write_root_key),
523                batch,
524            )
525            .await
526    }
527
528    async fn clear_journal(&self) -> Result<(), RocksDbStoreInternalError> {
529        Ok(())
530    }
531}
532
533impl KeyValueDatabase for RocksDbDatabaseInternal {
534    type Config = RocksDbStoreInternalConfig;
535    type Store = RocksDbStoreInternal;
536
537    fn get_name() -> String {
538        "rocksdb internal".to_string()
539    }
540
541    async fn connect(
542        config: &Self::Config,
543        namespace: &str,
544    ) -> Result<Self, RocksDbStoreInternalError> {
545        Self::build(config, namespace)
546    }
547
548    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, RocksDbStoreInternalError> {
549        let mut start_key = ROOT_KEY_DOMAIN.to_vec();
550        start_key.extend(root_key);
551        let mut executor = self.executor.clone();
552        executor.start_key = start_key;
553        Ok(RocksDbStoreInternal {
554            executor,
555            _path_with_guard: self._path_with_guard.clone(),
556            max_stream_queries: self.max_stream_queries,
557            spawn_mode: self.spawn_mode,
558            root_key_written: Arc::new(AtomicBool::new(false)),
559        })
560    }
561
562    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, RocksDbStoreInternalError> {
563        self.open_shared(root_key)
564    }
565
566    async fn list_all(config: &Self::Config) -> Result<Vec<String>, RocksDbStoreInternalError> {
567        let entries = std::fs::read_dir(config.path_with_guard.path_buf.clone())?;
568        let mut namespaces = Vec::new();
569        for entry in entries {
570            let entry = entry?;
571            if !entry.file_type()?.is_dir() {
572                return Err(RocksDbStoreInternalError::NonDirectoryNamespace);
573            }
574            let namespace = match entry.file_name().into_string() {
575                Err(error) => {
576                    return Err(RocksDbStoreInternalError::IntoStringError(error));
577                }
578                Ok(namespace) => namespace,
579            };
580            namespaces.push(namespace);
581        }
582        Ok(namespaces)
583    }
584
585    async fn list_root_keys(
586        config: &Self::Config,
587        namespace: &str,
588    ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
589        let start_key = vec![STORED_ROOT_KEYS_PREFIX];
590        let store = RocksDbStoreInternal::build(config, namespace, start_key)?;
591        store.find_keys_by_prefix(&[]).await
592    }
593
594    async fn delete_all(config: &Self::Config) -> Result<(), RocksDbStoreInternalError> {
595        let namespaces = Self::list_all(config).await?;
596        for namespace in namespaces {
597            let mut path_buf = config.path_with_guard.path_buf.clone();
598            path_buf.push(&namespace);
599            std::fs::remove_dir_all(path_buf.as_path())?;
600        }
601        Ok(())
602    }
603
604    async fn exists(
605        config: &Self::Config,
606        namespace: &str,
607    ) -> Result<bool, RocksDbStoreInternalError> {
608        Self::check_namespace(namespace)?;
609        let mut path_buf = config.path_with_guard.path_buf.clone();
610        path_buf.push(namespace);
611        let test = std::path::Path::exists(&path_buf);
612        Ok(test)
613    }
614
615    async fn create(
616        config: &Self::Config,
617        namespace: &str,
618    ) -> Result<(), RocksDbStoreInternalError> {
619        Self::check_namespace(namespace)?;
620        let mut path_buf = config.path_with_guard.path_buf.clone();
621        path_buf.push(namespace);
622        if std::path::Path::exists(&path_buf) {
623            return Err(RocksDbStoreInternalError::StoreAlreadyExists);
624        }
625        std::fs::create_dir_all(path_buf)?;
626        Ok(())
627    }
628
629    async fn delete(
630        config: &Self::Config,
631        namespace: &str,
632    ) -> Result<(), RocksDbStoreInternalError> {
633        Self::check_namespace(namespace)?;
634        let mut path_buf = config.path_with_guard.path_buf.clone();
635        path_buf.push(namespace);
636        let path = path_buf.as_path();
637        std::fs::remove_dir_all(path)?;
638        Ok(())
639    }
640}
641
642#[cfg(with_testing)]
643impl TestKeyValueDatabase for RocksDbDatabaseInternal {
644    async fn new_test_config() -> Result<RocksDbStoreInternalConfig, RocksDbStoreInternalError> {
645        let path_with_guard = PathWithGuard::new_testing();
646        let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
647        let max_stream_queries = TEST_ROCKS_DB_MAX_STREAM_QUERIES;
648        Ok(RocksDbStoreInternalConfig {
649            path_with_guard,
650            spawn_mode,
651            max_stream_queries,
652        })
653    }
654}
655
656/// The error type for [`RocksDbStoreInternal`]
657#[derive(Error, Debug)]
658pub enum RocksDbStoreInternalError {
659    /// Store already exists
660    #[error("Store already exists")]
661    StoreAlreadyExists,
662
663    /// Tokio join error in RocksDB.
664    #[error("tokio join error: {0}")]
665    TokioJoinError(#[from] tokio::task::JoinError),
666
667    /// RocksDB error.
668    #[error("RocksDB error: {0}")]
669    RocksDb(#[from] rocksdb::Error),
670
671    /// The database contains a file which is not a directory
672    #[error("Namespaces should be directories")]
673    NonDirectoryNamespace,
674
675    /// Error converting `OsString` to `String`
676    #[error("error in the conversion from OsString: {0:?}")]
677    IntoStringError(OsString),
678
679    /// The key must have at most 8 MiB
680    #[error("The key must have at most 8 MiB")]
681    KeyTooLong,
682
683    /// Namespace contains forbidden characters
684    #[error("Namespace contains forbidden characters")]
685    InvalidNamespace,
686
687    /// Filesystem error
688    #[error("Filesystem error: {0}")]
689    FsError(#[from] std::io::Error),
690
691    /// BCS serialization error.
692    #[error(transparent)]
693    BcsError(#[from] bcs::Error),
694}
695
696/// A path and the guard for the temporary directory if needed
697#[derive(Clone, Debug, Deserialize, Serialize)]
698pub struct PathWithGuard {
699    /// The path to the data
700    pub path_buf: PathBuf,
701    /// The guard for the directory if one is needed
702    #[serde(skip)]
703    _dir: Option<Arc<TempDir>>,
704}
705
706impl PathWithGuard {
707    /// Creates a `PathWithGuard` from an existing path.
708    pub fn new(path_buf: PathBuf) -> Self {
709        Self {
710            path_buf,
711            _dir: None,
712        }
713    }
714
715    /// Returns the test path for RocksDB without common config.
716    #[cfg(with_testing)]
717    fn new_testing() -> PathWithGuard {
718        let dir = TempDir::new().unwrap();
719        let path_buf = dir.path().to_path_buf();
720        let _dir = Some(Arc::new(dir));
721        PathWithGuard { path_buf, _dir }
722    }
723}
724
725impl PartialEq for PathWithGuard {
726    fn eq(&self, other: &Self) -> bool {
727        self.path_buf == other.path_buf
728    }
729}
730impl Eq for PathWithGuard {}
731
732impl KeyValueStoreError for RocksDbStoreInternalError {
733    const BACKEND: &'static str = "rocks_db";
734}
735
736/// The composed error type for the `RocksDbStore`
737pub type RocksDbStoreError = ValueSplittingError<RocksDbStoreInternalError>;
738
739/// The composed config type for the `RocksDbStore`
740pub type RocksDbStoreConfig = LruCachingConfig<RocksDbStoreInternalConfig>;
741
742/// The `RocksDbDatabase` composed type with metrics
743#[cfg(with_metrics)]
744pub type RocksDbDatabase = MeteredDatabase<
745    LruCachingDatabase<
746        MeteredDatabase<ValueSplittingDatabase<MeteredDatabase<RocksDbDatabaseInternal>>>,
747    >,
748>;
749/// The `RocksDbDatabase` composed type
750#[cfg(not(with_metrics))]
751pub type RocksDbDatabase = LruCachingDatabase<ValueSplittingDatabase<RocksDbDatabaseInternal>>;