linera_views/backends/
rocks_db.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] for the RocksDB database.
5
6use std::{
7    ffi::OsString,
8    fmt::Display,
9    path::PathBuf,
10    sync::{
11        atomic::{AtomicBool, Ordering},
12        Arc,
13    },
14};
15
16use linera_base::ensure;
17use rocksdb::{BlockBasedOptions, Cache, DBCompactionStyle, SliceTransform};
18use serde::{Deserialize, Serialize};
19use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};
20use tempfile::TempDir;
21use thiserror::Error;
22
23#[cfg(with_metrics)]
24use crate::metering::MeteredDatabase;
25#[cfg(with_testing)]
26use crate::store::TestKeyValueDatabase;
27use crate::{
28    batch::{Batch, WriteOperation},
29    common::get_upper_bound_option,
30    lru_caching::{LruCachingConfig, LruCachingDatabase},
31    store::{
32        KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
33        WritableKeyValueStore,
34    },
35    value_splitting::{ValueSplittingDatabase, ValueSplittingError},
36};
37
38/// The prefixes being used in the system
39static ROOT_KEY_DOMAIN: [u8; 1] = [0];
40static STORED_ROOT_KEYS_PREFIX: u8 = 1;
41
42/// The number of streams for the test
43#[cfg(with_testing)]
44const TEST_ROCKS_DB_MAX_STREAM_QUERIES: usize = 10;
45
46// The maximum size of values in RocksDB is 3 GiB
47// For offset reasons we decrease by 400
48const MAX_VALUE_SIZE: usize = 3 * 1024 * 1024 * 1024 - 400;
49
50// The maximum size of keys in RocksDB is 8 MiB
51// For offset reasons we decrease by 400
52const MAX_KEY_SIZE: usize = 8 * 1024 * 1024 - 400;
53
54const WRITE_BUFFER_SIZE: usize = 256 * 1024 * 1024; // 256 MiB
55const MAX_WRITE_BUFFER_NUMBER: i32 = 6;
56const HYPER_CLOCK_CACHE_BLOCK_SIZE: usize = 8 * 1024; // 8 KiB
57
58/// The RocksDB client that we use.
59type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;
60
61/// The choice of the spawning mode.
62/// `SpawnBlocking` always works and is the safest.
63/// `BlockInPlace` can only be used in multi-threaded environment.
64/// One way to select that is to select BlockInPlace when
65/// `tokio::runtime::Handle::current().metrics().num_workers() > 1`
66/// `BlockInPlace` is documented in <https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html>
67#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize, Serialize)]
68pub enum RocksDbSpawnMode {
69    /// This uses the `spawn_blocking` function of Tokio.
70    SpawnBlocking,
71    /// This uses the `block_in_place` function of Tokio.
72    BlockInPlace,
73}
74
75impl RocksDbSpawnMode {
76    /// Obtains the spawning mode from runtime.
77    pub fn get_spawn_mode_from_runtime() -> Self {
78        if tokio::runtime::Handle::current().metrics().num_workers() > 1 {
79            RocksDbSpawnMode::BlockInPlace
80        } else {
81            RocksDbSpawnMode::SpawnBlocking
82        }
83    }
84
85    /// Runs the computation for a function according to the selected policy.
86    #[inline]
87    async fn spawn<F, I, O>(&self, f: F, input: I) -> Result<O, RocksDbStoreInternalError>
88    where
89        F: FnOnce(I) -> Result<O, RocksDbStoreInternalError> + Send + 'static,
90        I: Send + 'static,
91        O: Send + 'static,
92    {
93        Ok(match self {
94            RocksDbSpawnMode::BlockInPlace => tokio::task::block_in_place(move || f(input))?,
95            RocksDbSpawnMode::SpawnBlocking => {
96                tokio::task::spawn_blocking(move || f(input)).await??
97            }
98        })
99    }
100}
101
102impl Display for RocksDbSpawnMode {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        match &self {
105            RocksDbSpawnMode::SpawnBlocking => write!(f, "spawn_blocking"),
106            RocksDbSpawnMode::BlockInPlace => write!(f, "block_in_place"),
107        }
108    }
109}
110
111fn check_key_size(key: &[u8]) -> Result<(), RocksDbStoreInternalError> {
112    ensure!(
113        key.len() <= MAX_KEY_SIZE,
114        RocksDbStoreInternalError::KeyTooLong
115    );
116    Ok(())
117}
118
119#[derive(Clone)]
120struct RocksDbStoreExecutor {
121    db: Arc<DB>,
122    start_key: Vec<u8>,
123}
124
125impl RocksDbStoreExecutor {
126    fn contains_keys_internal(
127        &self,
128        keys: Vec<Vec<u8>>,
129    ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
130        let size = keys.len();
131        let mut results = vec![false; size];
132        let mut indices = Vec::new();
133        let mut keys_red = Vec::new();
134        for (i, key) in keys.into_iter().enumerate() {
135            check_key_size(&key)?;
136            let mut full_key = self.start_key.to_vec();
137            full_key.extend(key);
138            if self.db.key_may_exist(&full_key) {
139                indices.push(i);
140                keys_red.push(full_key);
141            }
142        }
143        let values_red = self.db.multi_get(keys_red);
144        for (index, value) in indices.into_iter().zip(values_red) {
145            results[index] = value?.is_some();
146        }
147        Ok(results)
148    }
149
150    fn read_multi_values_bytes_internal(
151        &self,
152        keys: Vec<Vec<u8>>,
153    ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
154        for key in &keys {
155            check_key_size(key)?;
156        }
157        let full_keys = keys
158            .into_iter()
159            .map(|key| {
160                let mut full_key = self.start_key.to_vec();
161                full_key.extend(key);
162                full_key
163            })
164            .collect::<Vec<_>>();
165        let entries = self.db.multi_get(&full_keys);
166        Ok(entries.into_iter().collect::<Result<_, _>>()?)
167    }
168
169    fn get_find_prefix_iterator(
170        &self,
171        prefix: &[u8],
172    ) -> rocksdb::DBRawIteratorWithThreadMode<'_, DB> {
173        // Configure ReadOptions optimized for SSDs and iterator performance
174        let mut read_opts = rocksdb::ReadOptions::default();
175        // Enable async I/O for better concurrency
176        read_opts.set_async_io(true);
177
178        // Set precise upper bound to minimize key traversal
179        let upper_bound = get_upper_bound_option(prefix);
180        if let Some(upper_bound) = upper_bound {
181            read_opts.set_iterate_upper_bound(upper_bound);
182        }
183
184        let mut iter = self.db.raw_iterator_opt(read_opts);
185        iter.seek(prefix);
186        iter
187    }
188
189    fn find_keys_by_prefix_internal(
190        &self,
191        key_prefix: Vec<u8>,
192    ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
193        check_key_size(&key_prefix)?;
194
195        let mut prefix = self.start_key.clone();
196        prefix.extend(key_prefix);
197        let len = prefix.len();
198
199        let mut iter = self.get_find_prefix_iterator(&prefix);
200        let mut keys = Vec::new();
201        while let Some(key) = iter.key() {
202            keys.push(key[len..].to_vec());
203            iter.next();
204        }
205        Ok(keys)
206    }
207
208    #[expect(clippy::type_complexity)]
209    fn find_key_values_by_prefix_internal(
210        &self,
211        key_prefix: Vec<u8>,
212    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, RocksDbStoreInternalError> {
213        check_key_size(&key_prefix)?;
214        let mut prefix = self.start_key.clone();
215        prefix.extend(key_prefix);
216        let len = prefix.len();
217
218        let mut iter = self.get_find_prefix_iterator(&prefix);
219        let mut key_values = Vec::new();
220        while let Some((key, value)) = iter.item() {
221            let key_value = (key[len..].to_vec(), value.to_vec());
222            key_values.push(key_value);
223            iter.next();
224        }
225        Ok(key_values)
226    }
227
228    fn write_batch_internal(
229        &self,
230        batch: Batch,
231        write_root_key: bool,
232    ) -> Result<(), RocksDbStoreInternalError> {
233        let mut inner_batch = rocksdb::WriteBatchWithTransaction::default();
234        for operation in batch.operations {
235            match operation {
236                WriteOperation::Delete { key } => {
237                    check_key_size(&key)?;
238                    let mut full_key = self.start_key.to_vec();
239                    full_key.extend(key);
240                    inner_batch.delete(&full_key)
241                }
242                WriteOperation::Put { key, value } => {
243                    check_key_size(&key)?;
244                    let mut full_key = self.start_key.to_vec();
245                    full_key.extend(key);
246                    inner_batch.put(&full_key, value)
247                }
248                WriteOperation::DeletePrefix { key_prefix } => {
249                    check_key_size(&key_prefix)?;
250                    let mut full_key1 = self.start_key.to_vec();
251                    full_key1.extend(&key_prefix);
252                    let full_key2 =
253                        get_upper_bound_option(&full_key1).expect("the first entry cannot be 255");
254                    inner_batch.delete_range(&full_key1, &full_key2);
255                }
256            }
257        }
258        if write_root_key {
259            let mut full_key = self.start_key.to_vec();
260            full_key[0] = STORED_ROOT_KEYS_PREFIX;
261            inner_batch.put(&full_key, vec![]);
262        }
263        self.db.write(inner_batch)?;
264        Ok(())
265    }
266}
267
268/// The inner client
269#[derive(Clone)]
270pub struct RocksDbStoreInternal {
271    executor: RocksDbStoreExecutor,
272    _path_with_guard: PathWithGuard,
273    max_stream_queries: usize,
274    spawn_mode: RocksDbSpawnMode,
275    root_key_written: Arc<AtomicBool>,
276}
277
278/// Database-level connection to RocksDB for managing namespaces and partitions.
279#[derive(Clone)]
280pub struct RocksDbDatabaseInternal {
281    executor: RocksDbStoreExecutor,
282    _path_with_guard: PathWithGuard,
283    max_stream_queries: usize,
284    spawn_mode: RocksDbSpawnMode,
285}
286
287impl WithError for RocksDbDatabaseInternal {
288    type Error = RocksDbStoreInternalError;
289}
290
291/// The initial configuration of the system
292#[derive(Clone, Debug, Deserialize, Serialize)]
293pub struct RocksDbStoreInternalConfig {
294    /// The path to the storage containing the namespaces
295    pub path_with_guard: PathWithGuard,
296    /// The chosen spawn mode
297    pub spawn_mode: RocksDbSpawnMode,
298    /// Preferred buffer size for async streams.
299    pub max_stream_queries: usize,
300}
301
302impl RocksDbDatabaseInternal {
303    fn check_namespace(namespace: &str) -> Result<(), RocksDbStoreInternalError> {
304        if !namespace
305            .chars()
306            .all(|character| character.is_ascii_alphanumeric() || character == '_')
307        {
308            return Err(RocksDbStoreInternalError::InvalidNamespace);
309        }
310        Ok(())
311    }
312
313    fn build(
314        config: &RocksDbStoreInternalConfig,
315        namespace: &str,
316    ) -> Result<RocksDbDatabaseInternal, RocksDbStoreInternalError> {
317        let start_key = ROOT_KEY_DOMAIN.to_vec();
318        // Create a store to extract its executor and configuration
319        let temp_store = RocksDbStoreInternal::build(config, namespace, start_key)?;
320        Ok(RocksDbDatabaseInternal {
321            executor: temp_store.executor,
322            _path_with_guard: temp_store._path_with_guard,
323            max_stream_queries: temp_store.max_stream_queries,
324            spawn_mode: temp_store.spawn_mode,
325        })
326    }
327}
328
329impl RocksDbStoreInternal {
330    fn build(
331        config: &RocksDbStoreInternalConfig,
332        namespace: &str,
333        start_key: Vec<u8>,
334    ) -> Result<RocksDbStoreInternal, RocksDbStoreInternalError> {
335        RocksDbDatabaseInternal::check_namespace(namespace)?;
336        let mut path_buf = config.path_with_guard.path_buf.clone();
337        let mut path_with_guard = config.path_with_guard.clone();
338        path_buf.push(namespace);
339        path_with_guard.path_buf = path_buf.clone();
340        let max_stream_queries = config.max_stream_queries;
341        let spawn_mode = config.spawn_mode;
342        if !std::path::Path::exists(&path_buf) {
343            std::fs::create_dir(path_buf.clone())?;
344        }
345        let sys = System::new_with_specifics(
346            RefreshKind::nothing()
347                .with_cpu(CpuRefreshKind::everything())
348                .with_memory(MemoryRefreshKind::nothing().with_ram()),
349        );
350        let num_cpus = sys.cpus().len() as i32;
351        let total_ram = sys.total_memory() as usize;
352        let mut options = rocksdb::Options::default();
353        options.create_if_missing(true);
354        options.create_missing_column_families(true);
355        // Flush in-memory buffer to disk more often
356        options.set_write_buffer_size(WRITE_BUFFER_SIZE);
357        options.set_max_write_buffer_number(MAX_WRITE_BUFFER_NUMBER);
358        options.set_compression_type(rocksdb::DBCompressionType::Lz4);
359        options.set_level_zero_slowdown_writes_trigger(8);
360        options.set_level_zero_stop_writes_trigger(12);
361        options.set_level_zero_file_num_compaction_trigger(2);
362        // We deliberately give RocksDB one background thread *per* CPU so that
363        // flush + (N-1) compactions can hammer the NVMe at full bandwidth while
364        // still leaving enough CPU time for the foreground application threads.
365        options.increase_parallelism(num_cpus);
366        options.set_max_background_jobs(num_cpus);
367        options.set_max_subcompactions(num_cpus as u32);
368        options.set_level_compaction_dynamic_level_bytes(true);
369
370        options.set_compaction_style(DBCompactionStyle::Level);
371        options.set_target_file_size_base(2 * WRITE_BUFFER_SIZE as u64);
372
373        let mut block_options = BlockBasedOptions::default();
374        block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
375        block_options.set_cache_index_and_filter_blocks(true);
376        // Allocate 1/4 of total RAM for RocksDB block cache, which is a reasonable balance:
377        // - Large enough to significantly improve read performance by caching frequently accessed blocks
378        // - Small enough to leave memory for other system components
379        // - Follows common practice for database caching in server environments
380        // - Prevents excessive memory pressure that could lead to swapping or OOM conditions
381        block_options.set_block_cache(&Cache::new_hyper_clock_cache(
382            total_ram / 4,
383            HYPER_CLOCK_CACHE_BLOCK_SIZE,
384        ));
385
386        // Configure bloom filters for prefix iteration optimization
387        block_options.set_bloom_filter(10.0, false);
388        block_options.set_whole_key_filtering(false);
389
390        // 32KB blocks instead of default 4KB - reduces iterator seeks
391        block_options.set_block_size(32 * 1024);
392        // Use latest format for better compression and performance
393        block_options.set_format_version(5);
394
395        options.set_block_based_table_factory(&block_options);
396
397        // Configure prefix extraction for bloom filter optimization
398        // Use 8 bytes: ROOT_KEY_DOMAIN (1 byte) + BCS variant (1-2 bytes) + identifier start (4-5 bytes)
399        let prefix_extractor = SliceTransform::create_fixed_prefix(8);
400        options.set_prefix_extractor(prefix_extractor);
401
402        // 12.5% of memtable size for bloom filter
403        options.set_memtable_prefix_bloom_ratio(0.125);
404        // Skip bloom filter for memtable when key exists
405        options.set_optimize_filters_for_hits(true);
406        // Use memory-mapped files for faster reads
407        options.set_allow_mmap_reads(true);
408        // Don't use random access pattern since we do prefix scans
409        options.set_advise_random_on_open(false);
410
411        let db = DB::open(&options, path_buf)?;
412        let executor = RocksDbStoreExecutor {
413            db: Arc::new(db),
414            start_key,
415        };
416        Ok(RocksDbStoreInternal {
417            executor,
418            _path_with_guard: path_with_guard,
419            max_stream_queries,
420            spawn_mode,
421            root_key_written: Arc::new(AtomicBool::new(false)),
422        })
423    }
424}
425
426impl WithError for RocksDbStoreInternal {
427    type Error = RocksDbStoreInternalError;
428}
429
430impl ReadableKeyValueStore for RocksDbStoreInternal {
431    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
432
433    fn max_stream_queries(&self) -> usize {
434        self.max_stream_queries
435    }
436
437    fn root_key(&self) -> Result<Vec<u8>, RocksDbStoreInternalError> {
438        assert!(self.executor.start_key.starts_with(&ROOT_KEY_DOMAIN));
439        let root_key = bcs::from_bytes(&self.executor.start_key[ROOT_KEY_DOMAIN.len()..])?;
440        Ok(root_key)
441    }
442
443    async fn read_value_bytes(
444        &self,
445        key: &[u8],
446    ) -> Result<Option<Vec<u8>>, RocksDbStoreInternalError> {
447        check_key_size(key)?;
448        let db = self.executor.db.clone();
449        let mut full_key = self.executor.start_key.to_vec();
450        full_key.extend(key);
451        self.spawn_mode
452            .spawn(move |x| Ok(db.get(&x)?), full_key)
453            .await
454    }
455
456    async fn contains_key(&self, key: &[u8]) -> Result<bool, RocksDbStoreInternalError> {
457        check_key_size(key)?;
458        let db = self.executor.db.clone();
459        let mut full_key = self.executor.start_key.to_vec();
460        full_key.extend(key);
461        self.spawn_mode
462            .spawn(
463                move |x| {
464                    if !db.key_may_exist(&x) {
465                        return Ok(false);
466                    }
467                    Ok(db.get(&x)?.is_some())
468                },
469                full_key,
470            )
471            .await
472    }
473
474    async fn contains_keys(
475        &self,
476        keys: &[Vec<u8>],
477    ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
478        let executor = self.executor.clone();
479        self.spawn_mode
480            .spawn(move |x| executor.contains_keys_internal(x), keys.to_vec())
481            .await
482    }
483
484    async fn read_multi_values_bytes(
485        &self,
486        keys: &[Vec<u8>],
487    ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
488        let executor = self.executor.clone();
489        self.spawn_mode
490            .spawn(
491                move |x| executor.read_multi_values_bytes_internal(x),
492                keys.to_vec(),
493            )
494            .await
495    }
496
497    async fn find_keys_by_prefix(
498        &self,
499        key_prefix: &[u8],
500    ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
501        let executor = self.executor.clone();
502        let key_prefix = key_prefix.to_vec();
503        self.spawn_mode
504            .spawn(
505                move |x| executor.find_keys_by_prefix_internal(x),
506                key_prefix,
507            )
508            .await
509    }
510
511    async fn find_key_values_by_prefix(
512        &self,
513        key_prefix: &[u8],
514    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, RocksDbStoreInternalError> {
515        let executor = self.executor.clone();
516        let key_prefix = key_prefix.to_vec();
517        self.spawn_mode
518            .spawn(
519                move |x| executor.find_key_values_by_prefix_internal(x),
520                key_prefix,
521            )
522            .await
523    }
524}
525
526impl WritableKeyValueStore for RocksDbStoreInternal {
527    const MAX_VALUE_SIZE: usize = MAX_VALUE_SIZE;
528
529    async fn write_batch(&self, batch: Batch) -> Result<(), RocksDbStoreInternalError> {
530        let write_root_key = !self.root_key_written.fetch_or(true, Ordering::SeqCst);
531        let executor = self.executor.clone();
532        self.spawn_mode
533            .spawn(
534                move |x| executor.write_batch_internal(x, write_root_key),
535                batch,
536            )
537            .await
538    }
539
540    async fn clear_journal(&self) -> Result<(), RocksDbStoreInternalError> {
541        Ok(())
542    }
543}
544
545impl KeyValueDatabase for RocksDbDatabaseInternal {
546    type Config = RocksDbStoreInternalConfig;
547    type Store = RocksDbStoreInternal;
548
549    fn get_name() -> String {
550        "rocksdb internal".to_string()
551    }
552
553    async fn connect(
554        config: &Self::Config,
555        namespace: &str,
556    ) -> Result<Self, RocksDbStoreInternalError> {
557        Self::build(config, namespace)
558    }
559
560    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, RocksDbStoreInternalError> {
561        let mut start_key = ROOT_KEY_DOMAIN.to_vec();
562        start_key.extend(bcs::to_bytes(root_key)?);
563        let mut executor = self.executor.clone();
564        executor.start_key = start_key;
565        Ok(RocksDbStoreInternal {
566            executor,
567            _path_with_guard: self._path_with_guard.clone(),
568            max_stream_queries: self.max_stream_queries,
569            spawn_mode: self.spawn_mode,
570            root_key_written: Arc::new(AtomicBool::new(false)),
571        })
572    }
573
574    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, RocksDbStoreInternalError> {
575        self.open_shared(root_key)
576    }
577
578    async fn list_all(config: &Self::Config) -> Result<Vec<String>, RocksDbStoreInternalError> {
579        let entries = std::fs::read_dir(config.path_with_guard.path_buf.clone())?;
580        let mut namespaces = Vec::new();
581        for entry in entries {
582            let entry = entry?;
583            if !entry.file_type()?.is_dir() {
584                return Err(RocksDbStoreInternalError::NonDirectoryNamespace);
585            }
586            let namespace = match entry.file_name().into_string() {
587                Err(error) => {
588                    return Err(RocksDbStoreInternalError::IntoStringError(error));
589                }
590                Ok(namespace) => namespace,
591            };
592            namespaces.push(namespace);
593        }
594        Ok(namespaces)
595    }
596
597    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
598        let mut store = self.open_shared(&[])?;
599        store.executor.start_key = vec![STORED_ROOT_KEYS_PREFIX];
600        let bcs_root_keys = store.find_keys_by_prefix(&[]).await?;
601        let mut root_keys = Vec::new();
602        for bcs_root_key in bcs_root_keys {
603            let root_key = bcs::from_bytes::<Vec<u8>>(&bcs_root_key)?;
604            root_keys.push(root_key);
605        }
606        Ok(root_keys)
607    }
608
609    async fn delete_all(config: &Self::Config) -> Result<(), RocksDbStoreInternalError> {
610        let namespaces = Self::list_all(config).await?;
611        for namespace in namespaces {
612            let mut path_buf = config.path_with_guard.path_buf.clone();
613            path_buf.push(&namespace);
614            std::fs::remove_dir_all(path_buf.as_path())?;
615        }
616        Ok(())
617    }
618
619    async fn exists(
620        config: &Self::Config,
621        namespace: &str,
622    ) -> Result<bool, RocksDbStoreInternalError> {
623        Self::check_namespace(namespace)?;
624        let mut path_buf = config.path_with_guard.path_buf.clone();
625        path_buf.push(namespace);
626        let test = std::path::Path::exists(&path_buf);
627        Ok(test)
628    }
629
630    async fn create(
631        config: &Self::Config,
632        namespace: &str,
633    ) -> Result<(), RocksDbStoreInternalError> {
634        Self::check_namespace(namespace)?;
635        let mut path_buf = config.path_with_guard.path_buf.clone();
636        path_buf.push(namespace);
637        if std::path::Path::exists(&path_buf) {
638            return Err(RocksDbStoreInternalError::StoreAlreadyExists);
639        }
640        std::fs::create_dir_all(path_buf)?;
641        Ok(())
642    }
643
644    async fn delete(
645        config: &Self::Config,
646        namespace: &str,
647    ) -> Result<(), RocksDbStoreInternalError> {
648        Self::check_namespace(namespace)?;
649        let mut path_buf = config.path_with_guard.path_buf.clone();
650        path_buf.push(namespace);
651        let path = path_buf.as_path();
652        std::fs::remove_dir_all(path)?;
653        Ok(())
654    }
655}
656
657#[cfg(with_testing)]
658impl TestKeyValueDatabase for RocksDbDatabaseInternal {
659    async fn new_test_config() -> Result<RocksDbStoreInternalConfig, RocksDbStoreInternalError> {
660        let path_with_guard = PathWithGuard::new_testing();
661        let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
662        let max_stream_queries = TEST_ROCKS_DB_MAX_STREAM_QUERIES;
663        Ok(RocksDbStoreInternalConfig {
664            path_with_guard,
665            spawn_mode,
666            max_stream_queries,
667        })
668    }
669}
670
671/// The error type for [`RocksDbStoreInternal`]
672#[derive(Error, Debug)]
673pub enum RocksDbStoreInternalError {
674    /// Store already exists
675    #[error("Store already exists")]
676    StoreAlreadyExists,
677
678    /// Tokio join error in RocksDB.
679    #[error("tokio join error: {0}")]
680    TokioJoinError(#[from] tokio::task::JoinError),
681
682    /// RocksDB error.
683    #[error("RocksDB error: {0}")]
684    RocksDb(#[from] rocksdb::Error),
685
686    /// The database contains a file which is not a directory
687    #[error("Namespaces should be directories")]
688    NonDirectoryNamespace,
689
690    /// Error converting `OsString` to `String`
691    #[error("error in the conversion from OsString: {0:?}")]
692    IntoStringError(OsString),
693
694    /// The key must have at most 8 MiB
695    #[error("The key must have at most 8 MiB")]
696    KeyTooLong,
697
698    /// Namespace contains forbidden characters
699    #[error("Namespace contains forbidden characters")]
700    InvalidNamespace,
701
702    /// Filesystem error
703    #[error("Filesystem error: {0}")]
704    FsError(#[from] std::io::Error),
705
706    /// BCS serialization error.
707    #[error(transparent)]
708    BcsError(#[from] bcs::Error),
709}
710
711/// A path and the guard for the temporary directory if needed
712#[derive(Clone, Debug, Deserialize, Serialize)]
713pub struct PathWithGuard {
714    /// The path to the data
715    pub path_buf: PathBuf,
716    /// The guard for the directory if one is needed
717    #[serde(skip)]
718    _dir: Option<Arc<TempDir>>,
719}
720
721impl PathWithGuard {
722    /// Creates a `PathWithGuard` from an existing path.
723    pub fn new(path_buf: PathBuf) -> Self {
724        Self {
725            path_buf,
726            _dir: None,
727        }
728    }
729
730    /// Returns the test path for RocksDB without common config.
731    #[cfg(with_testing)]
732    fn new_testing() -> PathWithGuard {
733        let dir = TempDir::new().unwrap();
734        let path_buf = dir.path().to_path_buf();
735        let _dir = Some(Arc::new(dir));
736        PathWithGuard { path_buf, _dir }
737    }
738}
739
740impl PartialEq for PathWithGuard {
741    fn eq(&self, other: &Self) -> bool {
742        self.path_buf == other.path_buf
743    }
744}
745impl Eq for PathWithGuard {}
746
747impl KeyValueStoreError for RocksDbStoreInternalError {
748    const BACKEND: &'static str = "rocks_db";
749}
750
751/// The composed error type for the `RocksDbStore`
752pub type RocksDbStoreError = ValueSplittingError<RocksDbStoreInternalError>;
753
754/// The composed config type for the `RocksDbStore`
755pub type RocksDbStoreConfig = LruCachingConfig<RocksDbStoreInternalConfig>;
756
757/// The `RocksDbDatabase` composed type with metrics
758#[cfg(with_metrics)]
759pub type RocksDbDatabase = MeteredDatabase<
760    LruCachingDatabase<
761        MeteredDatabase<ValueSplittingDatabase<MeteredDatabase<RocksDbDatabaseInternal>>>,
762    >,
763>;
764/// The `RocksDbDatabase` composed type
765#[cfg(not(with_metrics))]
766pub type RocksDbDatabase = LruCachingDatabase<ValueSplittingDatabase<RocksDbDatabaseInternal>>;