Skip to main content

linera_views/backends/
rocks_db.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] for the RocksDB database.
5
6// RocksDB's C API uses `i32` and signed sizes; casts at this boundary are
7// by design.
8#![allow(
9    clippy::cast_possible_truncation,
10    clippy::cast_sign_loss,
11    clippy::cast_possible_wrap
12)]
13
14use std::{
15    ffi::OsString,
16    fmt::Display,
17    path::PathBuf,
18    sync::{
19        atomic::{AtomicBool, Ordering},
20        Arc,
21    },
22};
23
24use linera_base::ensure;
25use rocksdb::{BlockBasedOptions, Cache, DBCompactionStyle, SliceTransform, WriteBufferManager};
26use serde::{Deserialize, Serialize};
27use sysinfo::{MemoryRefreshKind, RefreshKind, System};
28use tempfile::TempDir;
29use thiserror::Error;
30
31#[cfg(with_metrics)]
32use crate::metering::MeteredDatabase;
33#[cfg(with_testing)]
34use crate::store::TestKeyValueDatabase;
35use crate::{
36    batch::{Batch, WriteOperation},
37    common::get_upper_bound_option,
38    lru_caching::{LruCachingConfig, LruCachingDatabase},
39    store::{
40        KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
41        WritableKeyValueStore,
42    },
43    value_splitting::{ValueSplittingDatabase, ValueSplittingError},
44};
45
46/// The prefixes being used in the system
47static ROOT_KEY_DOMAIN: [u8; 1] = [0];
48static STORED_ROOT_KEYS_PREFIX: u8 = 1;
49
50/// The number of streams for the test
51#[cfg(with_testing)]
52const TEST_ROCKS_DB_MAX_STREAM_QUERIES: usize = 10;
53
54// The maximum size of values in RocksDB is 3 GiB
55// For offset reasons we decrease by 400
56const MAX_VALUE_SIZE: usize = 3 * 1024 * 1024 * 1024 - 400;
57
58// The maximum size of keys in RocksDB is 8 MiB
59// For offset reasons we decrease by 400
60const MAX_KEY_SIZE: usize = 8 * 1024 * 1024 - 400;
61
62// A small write buffer keeps the memtable flushing even on low-write workloads. A large buffer
63// (e.g. 256 MiB) lets a slowly-filling memtable grow huge and accumulate range tombstones, so every
64// point read has to scan it — which severely amplifies reads during e.g. a long chain
65// synchronization, where blocks are re-executed with little net data written.
66const WRITE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16 MiB
67const MAX_WRITE_BUFFER_NUMBER: i32 = 6;
68
69fn get_available_memory(sys: &System) -> usize {
70    sys.cgroup_limits()
71        .map_or_else(|| sys.total_memory() as usize, |c| c.total_memory as usize)
72}
73
74fn get_available_cpus() -> i32 {
75    std::thread::available_parallelism().map_or(1, |p| p.get() as i32)
76}
77
78const HYPER_CLOCK_CACHE_BLOCK_SIZE: usize = 8 * 1024; // 8 KiB
79
80/// The RocksDB client that we use.
81type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;
82
83/// The choice of the spawning mode.
84/// `SpawnBlocking` always works and is the safest.
85/// `BlockInPlace` can only be used in multi-threaded environment.
86/// One way to select that is to select BlockInPlace when
87/// `tokio::runtime::Handle::current().metrics().num_workers() > 1`
88/// `BlockInPlace` is documented in <https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html>
89#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize, Serialize)]
90pub enum RocksDbSpawnMode {
91    /// This uses the `spawn_blocking` function of Tokio.
92    SpawnBlocking,
93    /// This uses the `block_in_place` function of Tokio.
94    BlockInPlace,
95}
96
97impl RocksDbSpawnMode {
98    /// Obtains the spawning mode from runtime.
99    pub fn get_spawn_mode_from_runtime() -> Self {
100        if tokio::runtime::Handle::current().metrics().num_workers() > 1 {
101            RocksDbSpawnMode::BlockInPlace
102        } else {
103            RocksDbSpawnMode::SpawnBlocking
104        }
105    }
106
107    /// Runs the computation for a function according to the selected policy.
108    #[inline]
109    async fn spawn<F, I, O>(&self, f: F, input: I) -> Result<O, RocksDbStoreInternalError>
110    where
111        F: FnOnce(I) -> Result<O, RocksDbStoreInternalError> + Send + 'static,
112        I: Send + 'static,
113        O: Send + 'static,
114    {
115        Ok(match self {
116            RocksDbSpawnMode::BlockInPlace => tokio::task::block_in_place(move || f(input))?,
117            RocksDbSpawnMode::SpawnBlocking => {
118                tokio::task::spawn_blocking(move || f(input)).await??
119            }
120        })
121    }
122}
123
124impl Display for RocksDbSpawnMode {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        match &self {
127            RocksDbSpawnMode::SpawnBlocking => write!(f, "spawn_blocking"),
128            RocksDbSpawnMode::BlockInPlace => write!(f, "block_in_place"),
129        }
130    }
131}
132
133fn check_key_size(key: &[u8]) -> Result<(), RocksDbStoreInternalError> {
134    ensure!(
135        key.len() <= MAX_KEY_SIZE,
136        RocksDbStoreInternalError::KeyTooLong
137    );
138    Ok(())
139}
140
141#[derive(Clone)]
142struct RocksDbStoreExecutor {
143    db: Arc<DB>,
144    start_key: Vec<u8>,
145}
146
147impl RocksDbStoreExecutor {
148    fn contains_keys_internal(
149        &self,
150        keys: Vec<Vec<u8>>,
151    ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
152        let size = keys.len();
153        let mut results = vec![false; size];
154        let mut indices = Vec::new();
155        let mut keys_red = Vec::new();
156        for (i, key) in keys.into_iter().enumerate() {
157            check_key_size(&key)?;
158            let mut full_key = self.start_key.to_vec();
159            full_key.extend(key);
160            if self.db.key_may_exist(&full_key) {
161                indices.push(i);
162                keys_red.push(full_key);
163            }
164        }
165        let values_red = self.db.multi_get(keys_red);
166        for (index, value) in indices.into_iter().zip(values_red) {
167            results[index] = value?.is_some();
168        }
169        Ok(results)
170    }
171
172    fn read_multi_values_bytes_internal(
173        &self,
174        keys: Vec<Vec<u8>>,
175    ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
176        for key in &keys {
177            check_key_size(key)?;
178        }
179        let full_keys = keys
180            .into_iter()
181            .map(|key| {
182                let mut full_key = self.start_key.to_vec();
183                full_key.extend(key);
184                full_key
185            })
186            .collect::<Vec<_>>();
187        let entries = self.db.multi_get(&full_keys);
188        Ok(entries.into_iter().collect::<Result<_, _>>()?)
189    }
190
191    fn get_find_prefix_iterator(
192        &self,
193        prefix: &[u8],
194    ) -> rocksdb::DBRawIteratorWithThreadMode<'_, DB> {
195        // Configure ReadOptions optimized for SSDs and iterator performance
196        let mut read_opts = rocksdb::ReadOptions::default();
197        // Enable async I/O for better concurrency
198        read_opts.set_async_io(true);
199
200        // Set precise upper bound to minimize key traversal
201        let upper_bound = get_upper_bound_option(prefix);
202        if let Some(upper_bound) = upper_bound {
203            read_opts.set_iterate_upper_bound(upper_bound);
204        }
205
206        let mut iter = self.db.raw_iterator_opt(read_opts);
207        iter.seek(prefix);
208        iter
209    }
210
211    fn find_keys_by_prefix_internal(
212        &self,
213        key_prefix: Vec<u8>,
214    ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
215        check_key_size(&key_prefix)?;
216
217        let mut prefix = self.start_key.clone();
218        prefix.extend(key_prefix);
219        let len = prefix.len();
220
221        let mut iter = self.get_find_prefix_iterator(&prefix);
222        let mut keys = Vec::new();
223        while let Some(key) = iter.key() {
224            keys.push(key[len..].to_vec());
225            iter.next();
226        }
227        Ok(keys)
228    }
229
230    #[expect(clippy::type_complexity)]
231    fn find_key_values_by_prefix_internal(
232        &self,
233        key_prefix: Vec<u8>,
234    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, RocksDbStoreInternalError> {
235        check_key_size(&key_prefix)?;
236        let mut prefix = self.start_key.clone();
237        prefix.extend(key_prefix);
238        let len = prefix.len();
239
240        let mut iter = self.get_find_prefix_iterator(&prefix);
241        let mut key_values = Vec::new();
242        while let Some((key, value)) = iter.item() {
243            let key_value = (key[len..].to_vec(), value.to_vec());
244            key_values.push(key_value);
245            iter.next();
246        }
247        Ok(key_values)
248    }
249
250    fn write_batch_internal(
251        &self,
252        batch: Batch,
253        write_root_key: bool,
254    ) -> Result<(), RocksDbStoreInternalError> {
255        let mut inner_batch = rocksdb::WriteBatchWithTransaction::default();
256        for operation in batch.operations {
257            match operation {
258                WriteOperation::Delete { key } => {
259                    check_key_size(&key)?;
260                    let mut full_key = self.start_key.to_vec();
261                    full_key.extend(key);
262                    inner_batch.delete(&full_key)
263                }
264                WriteOperation::Put { key, value } => {
265                    check_key_size(&key)?;
266                    let mut full_key = self.start_key.to_vec();
267                    full_key.extend(key);
268                    inner_batch.put(&full_key, value)
269                }
270                WriteOperation::DeletePrefix { key_prefix } => {
271                    check_key_size(&key_prefix)?;
272                    let mut full_key1 = self.start_key.to_vec();
273                    full_key1.extend(&key_prefix);
274                    let full_key2 =
275                        get_upper_bound_option(&full_key1).expect("the first entry cannot be 255");
276                    inner_batch.delete_range(&full_key1, &full_key2);
277                }
278            }
279        }
280        if write_root_key {
281            let mut full_key = self.start_key.to_vec();
282            full_key[0] = STORED_ROOT_KEYS_PREFIX;
283            inner_batch.put(&full_key, vec![]);
284        }
285        self.db.write(inner_batch)?;
286        Ok(())
287    }
288}
289
290/// The inner client
291#[derive(Clone)]
292pub struct RocksDbStoreInternal {
293    executor: RocksDbStoreExecutor,
294    path_with_guard: PathWithGuard,
295    max_stream_queries: usize,
296    spawn_mode: RocksDbSpawnMode,
297    root_key_written: Arc<AtomicBool>,
298}
299
300/// Database-level connection to RocksDB for managing namespaces and partitions.
301#[derive(Clone)]
302pub struct RocksDbDatabaseInternal {
303    executor: RocksDbStoreExecutor,
304    path_with_guard: PathWithGuard,
305    max_stream_queries: usize,
306    spawn_mode: RocksDbSpawnMode,
307}
308
309impl WithError for RocksDbDatabaseInternal {
310    type Error = RocksDbStoreInternalError;
311}
312
313/// The initial configuration of the system
314#[derive(Clone, Debug, Deserialize, Serialize)]
315pub struct RocksDbStoreInternalConfig {
316    /// The path to the storage containing the namespaces
317    pub path_with_guard: PathWithGuard,
318    /// The chosen spawn mode
319    pub spawn_mode: RocksDbSpawnMode,
320    /// Preferred buffer size for async streams.
321    pub max_stream_queries: usize,
322}
323
324impl RocksDbDatabaseInternal {
325    fn check_namespace(namespace: &str) -> Result<(), RocksDbStoreInternalError> {
326        if !namespace
327            .chars()
328            .all(|character| character.is_ascii_alphanumeric() || character == '_')
329        {
330            return Err(RocksDbStoreInternalError::InvalidNamespace);
331        }
332        Ok(())
333    }
334
335    fn build(
336        config: &RocksDbStoreInternalConfig,
337        namespace: &str,
338    ) -> Result<RocksDbDatabaseInternal, RocksDbStoreInternalError> {
339        let start_key = ROOT_KEY_DOMAIN.to_vec();
340        // Create a store to extract its executor and configuration
341        let temp_store = RocksDbStoreInternal::build(config, namespace, start_key)?;
342        Ok(RocksDbDatabaseInternal {
343            executor: temp_store.executor,
344            path_with_guard: temp_store.path_with_guard,
345            max_stream_queries: temp_store.max_stream_queries,
346            spawn_mode: temp_store.spawn_mode,
347        })
348    }
349}
350
351impl RocksDbStoreInternal {
352    fn build(
353        config: &RocksDbStoreInternalConfig,
354        namespace: &str,
355        start_key: Vec<u8>,
356    ) -> Result<RocksDbStoreInternal, RocksDbStoreInternalError> {
357        RocksDbDatabaseInternal::check_namespace(namespace)?;
358        let mut path_buf = config.path_with_guard.path_buf.clone();
359        let mut path_with_guard = config.path_with_guard.clone();
360        path_buf.push(namespace);
361        path_with_guard.path_buf = path_buf.clone();
362        let max_stream_queries = config.max_stream_queries;
363        let spawn_mode = config.spawn_mode;
364        if !std::path::Path::exists(&path_buf) {
365            std::fs::create_dir_all(path_buf.clone())?;
366        }
367        let sys = System::new_with_specifics(
368            RefreshKind::nothing().with_memory(MemoryRefreshKind::nothing().with_ram()),
369        );
370        let num_cpus = get_available_cpus();
371        let total_ram = get_available_memory(&sys);
372
373        let mut options = rocksdb::Options::default();
374        options.create_if_missing(true);
375        options.create_missing_column_families(true);
376
377        // Flush in-memory buffer to disk more often
378        options.set_write_buffer_size(WRITE_BUFFER_SIZE);
379        options.set_max_write_buffer_number(MAX_WRITE_BUFFER_NUMBER);
380        options.set_compression_type(rocksdb::DBCompressionType::Lz4);
381        options.set_level_zero_slowdown_writes_trigger(8);
382        options.set_level_zero_stop_writes_trigger(12);
383        options.set_level_zero_file_num_compaction_trigger(2);
384        // We deliberately give RocksDB one background thread *per* CPU so that
385        // flush + (N-1) compactions can hammer the NVMe at full bandwidth while
386        // still leaving enough CPU time for the foreground application threads.
387        options.increase_parallelism(num_cpus);
388        options.set_max_background_jobs(num_cpus);
389        options.set_max_subcompactions(num_cpus as u32);
390        options.set_level_compaction_dynamic_level_bytes(true);
391
392        options.set_compaction_style(DBCompactionStyle::Level);
393        options.set_target_file_size_base(2 * WRITE_BUFFER_SIZE as u64);
394
395        let mut block_options = BlockBasedOptions::default();
396        block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
397        block_options.set_cache_index_and_filter_blocks(true);
398        // Allocate 1/4 of total RAM for RocksDB block cache, which is a reasonable balance:
399        // - Large enough to significantly improve read performance by caching frequently accessed blocks
400        // - Small enough to leave memory for other system components
401        // - Follows common practice for database caching in server environments
402        // - Prevents excessive memory pressure that could lead to swapping or OOM conditions
403        block_options.set_block_cache(&Cache::new_hyper_clock_cache(
404            total_ram / 4,
405            HYPER_CLOCK_CACHE_BLOCK_SIZE,
406        ));
407
408        // Cap total memtable memory to prevent unbounded growth when multiple column
409        // families are used or many memtables accumulate before flushing.
410        let write_buffer_manager =
411            WriteBufferManager::new_write_buffer_manager(total_ram / 4, true);
412        options.set_write_buffer_manager(&write_buffer_manager);
413
414        // Configure bloom filters for prefix iteration optimization
415        block_options.set_bloom_filter(10.0, false);
416        block_options.set_whole_key_filtering(false);
417
418        // 32KB blocks instead of default 4KB - reduces iterator seeks
419        block_options.set_block_size(32 * 1024);
420        // Use latest format for better compression and performance
421        block_options.set_format_version(5);
422
423        options.set_block_based_table_factory(&block_options);
424
425        // Configure prefix extraction for bloom filter optimization
426        // Use 8 bytes: ROOT_KEY_DOMAIN (1 byte) + BCS variant (1-2 bytes) + identifier start (4-5 bytes)
427        let prefix_extractor = SliceTransform::create_fixed_prefix(8);
428        options.set_prefix_extractor(prefix_extractor);
429
430        // 12.5% of memtable size for bloom filter
431        options.set_memtable_prefix_bloom_ratio(0.125);
432        // Skip bloom filter for memtable when key exists
433        options.set_optimize_filters_for_hits(true);
434        // Use memory-mapped files for faster reads
435        options.set_allow_mmap_reads(true);
436        // Don't use random access pattern since we do prefix scans
437        options.set_advise_random_on_open(false);
438
439        let db = DB::open(&options, path_buf)?;
440        let executor = RocksDbStoreExecutor {
441            db: Arc::new(db),
442            start_key,
443        };
444        Ok(RocksDbStoreInternal {
445            executor,
446            path_with_guard,
447            max_stream_queries,
448            spawn_mode,
449            root_key_written: Arc::new(AtomicBool::new(false)),
450        })
451    }
452}
453
454impl WithError for RocksDbStoreInternal {
455    type Error = RocksDbStoreInternalError;
456}
457
458impl ReadableKeyValueStore for RocksDbStoreInternal {
459    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
460
461    fn max_stream_queries(&self) -> usize {
462        self.max_stream_queries
463    }
464
465    fn root_key(&self) -> Result<Vec<u8>, RocksDbStoreInternalError> {
466        assert!(self.executor.start_key.starts_with(&ROOT_KEY_DOMAIN));
467        let root_key = bcs::from_bytes(&self.executor.start_key[ROOT_KEY_DOMAIN.len()..])?;
468        Ok(root_key)
469    }
470
471    async fn read_value_bytes(
472        &self,
473        key: &[u8],
474    ) -> Result<Option<Vec<u8>>, RocksDbStoreInternalError> {
475        check_key_size(key)?;
476        let db = self.executor.db.clone();
477        let mut full_key = self.executor.start_key.to_vec();
478        full_key.extend(key);
479        self.spawn_mode
480            .spawn(move |x| Ok(db.get(&x)?), full_key)
481            .await
482    }
483
484    async fn contains_key(&self, key: &[u8]) -> Result<bool, RocksDbStoreInternalError> {
485        check_key_size(key)?;
486        let db = self.executor.db.clone();
487        let mut full_key = self.executor.start_key.to_vec();
488        full_key.extend(key);
489        self.spawn_mode
490            .spawn(
491                move |x| {
492                    if !db.key_may_exist(&x) {
493                        return Ok(false);
494                    }
495                    Ok(db.get(&x)?.is_some())
496                },
497                full_key,
498            )
499            .await
500    }
501
502    async fn contains_keys(
503        &self,
504        keys: &[Vec<u8>],
505    ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
506        let executor = self.executor.clone();
507        self.spawn_mode
508            .spawn(move |x| executor.contains_keys_internal(x), keys.to_vec())
509            .await
510    }
511
512    async fn read_multi_values_bytes(
513        &self,
514        keys: &[Vec<u8>],
515    ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
516        let executor = self.executor.clone();
517        self.spawn_mode
518            .spawn(
519                move |x| executor.read_multi_values_bytes_internal(x),
520                keys.to_vec(),
521            )
522            .await
523    }
524
525    async fn find_keys_by_prefix(
526        &self,
527        key_prefix: &[u8],
528    ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
529        let executor = self.executor.clone();
530        let key_prefix = key_prefix.to_vec();
531        self.spawn_mode
532            .spawn(
533                move |x| executor.find_keys_by_prefix_internal(x),
534                key_prefix,
535            )
536            .await
537    }
538
539    async fn find_key_values_by_prefix(
540        &self,
541        key_prefix: &[u8],
542    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, RocksDbStoreInternalError> {
543        let executor = self.executor.clone();
544        let key_prefix = key_prefix.to_vec();
545        self.spawn_mode
546            .spawn(
547                move |x| executor.find_key_values_by_prefix_internal(x),
548                key_prefix,
549            )
550            .await
551    }
552}
553
554impl WritableKeyValueStore for RocksDbStoreInternal {
555    const MAX_VALUE_SIZE: usize = MAX_VALUE_SIZE;
556
557    async fn write_batch(&self, batch: Batch) -> Result<(), RocksDbStoreInternalError> {
558        let write_root_key = !self.root_key_written.fetch_or(true, Ordering::SeqCst);
559        let executor = self.executor.clone();
560        self.spawn_mode
561            .spawn(
562                move |x| executor.write_batch_internal(x, write_root_key),
563                batch,
564            )
565            .await
566    }
567
568    async fn clear_journal(&self) -> Result<(), RocksDbStoreInternalError> {
569        Ok(())
570    }
571}
572
573impl KeyValueDatabase for RocksDbDatabaseInternal {
574    type Config = RocksDbStoreInternalConfig;
575    type Store = RocksDbStoreInternal;
576
577    fn get_name() -> String {
578        "rocksdb internal".to_string()
579    }
580
581    async fn connect(
582        config: &Self::Config,
583        namespace: &str,
584    ) -> Result<Self, RocksDbStoreInternalError> {
585        Self::build(config, namespace)
586    }
587
588    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, RocksDbStoreInternalError> {
589        let mut start_key = ROOT_KEY_DOMAIN.to_vec();
590        start_key.extend(bcs::to_bytes(root_key)?);
591        let mut executor = self.executor.clone();
592        executor.start_key = start_key;
593        Ok(RocksDbStoreInternal {
594            executor,
595            path_with_guard: self.path_with_guard.clone(),
596            max_stream_queries: self.max_stream_queries,
597            spawn_mode: self.spawn_mode,
598            root_key_written: Arc::new(AtomicBool::new(false)),
599        })
600    }
601
602    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, RocksDbStoreInternalError> {
603        self.open_shared(root_key)
604    }
605
606    async fn list_all(config: &Self::Config) -> Result<Vec<String>, RocksDbStoreInternalError> {
607        let entries = std::fs::read_dir(config.path_with_guard.path_buf.clone())?;
608        let mut namespaces = Vec::new();
609        for entry in entries {
610            let entry = entry?;
611            if !entry.file_type()?.is_dir() {
612                return Err(RocksDbStoreInternalError::NonDirectoryNamespace);
613            }
614            let namespace = match entry.file_name().into_string() {
615                Err(error) => {
616                    return Err(RocksDbStoreInternalError::IntoStringError(error));
617                }
618                Ok(namespace) => namespace,
619            };
620            namespaces.push(namespace);
621        }
622        Ok(namespaces)
623    }
624
625    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
626        let mut store = self.open_shared(&[])?;
627        store.executor.start_key = vec![STORED_ROOT_KEYS_PREFIX];
628        let bcs_root_keys = store.find_keys_by_prefix(&[]).await?;
629        let mut root_keys = Vec::new();
630        for bcs_root_key in bcs_root_keys {
631            let root_key = bcs::from_bytes::<Vec<u8>>(&bcs_root_key)?;
632            root_keys.push(root_key);
633        }
634        Ok(root_keys)
635    }
636
637    async fn delete_all(config: &Self::Config) -> Result<(), RocksDbStoreInternalError> {
638        let namespaces = Self::list_all(config).await?;
639        for namespace in namespaces {
640            let mut path_buf = config.path_with_guard.path_buf.clone();
641            path_buf.push(&namespace);
642            std::fs::remove_dir_all(path_buf.as_path())?;
643        }
644        Ok(())
645    }
646
647    async fn exists(
648        config: &Self::Config,
649        namespace: &str,
650    ) -> Result<bool, RocksDbStoreInternalError> {
651        Self::check_namespace(namespace)?;
652        let mut path_buf = config.path_with_guard.path_buf.clone();
653        path_buf.push(namespace);
654        let test = std::path::Path::exists(&path_buf);
655        Ok(test)
656    }
657
658    async fn create(
659        config: &Self::Config,
660        namespace: &str,
661    ) -> Result<(), RocksDbStoreInternalError> {
662        Self::check_namespace(namespace)?;
663        let mut path_buf = config.path_with_guard.path_buf.clone();
664        path_buf.push(namespace);
665        if std::path::Path::exists(&path_buf) {
666            return Err(RocksDbStoreInternalError::StoreAlreadyExists);
667        }
668        std::fs::create_dir_all(path_buf)?;
669        Ok(())
670    }
671
672    async fn delete(
673        config: &Self::Config,
674        namespace: &str,
675    ) -> Result<(), RocksDbStoreInternalError> {
676        Self::check_namespace(namespace)?;
677        let mut path_buf = config.path_with_guard.path_buf.clone();
678        path_buf.push(namespace);
679        let path = path_buf.as_path();
680        std::fs::remove_dir_all(path)?;
681        Ok(())
682    }
683}
684
685#[cfg(with_testing)]
686impl TestKeyValueDatabase for RocksDbDatabaseInternal {
687    async fn new_test_config() -> Result<RocksDbStoreInternalConfig, RocksDbStoreInternalError> {
688        let path_with_guard = PathWithGuard::new_testing();
689        let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
690        let max_stream_queries = TEST_ROCKS_DB_MAX_STREAM_QUERIES;
691        Ok(RocksDbStoreInternalConfig {
692            path_with_guard,
693            spawn_mode,
694            max_stream_queries,
695        })
696    }
697}
698
699/// The error type for [`RocksDbStoreInternal`]
700#[derive(Error, Debug)]
701pub enum RocksDbStoreInternalError {
702    /// Store already exists
703    #[error("Store already exists")]
704    StoreAlreadyExists,
705
706    /// Tokio join error in RocksDB.
707    #[error("tokio join error: {0}")]
708    TokioJoinError(#[from] tokio::task::JoinError),
709
710    /// RocksDB error.
711    #[error("RocksDB error: {0}")]
712    RocksDb(#[from] rocksdb::Error),
713
714    /// The database contains a file which is not a directory
715    #[error("Namespaces should be directories")]
716    NonDirectoryNamespace,
717
718    /// Error converting `OsString` to `String`
719    #[error("error in the conversion from OsString: {0:?}")]
720    IntoStringError(OsString),
721
722    /// The key must have at most 8 MiB
723    #[error("The key must have at most 8 MiB")]
724    KeyTooLong,
725
726    /// Namespace contains forbidden characters
727    #[error("Namespace contains forbidden characters")]
728    InvalidNamespace,
729
730    /// Filesystem error
731    #[error("Filesystem error: {0}")]
732    FsError(#[from] std::io::Error),
733
734    /// BCS serialization error.
735    #[error(transparent)]
736    BcsError(#[from] bcs::Error),
737}
738
739/// A path and the guard for the temporary directory if needed
740#[derive(Clone, Debug, Deserialize, Serialize)]
741pub struct PathWithGuard {
742    /// The path to the data
743    pub path_buf: PathBuf,
744    /// The guard for the directory if one is needed
745    #[serde(skip)]
746    _dir_guard: Option<Arc<TempDir>>,
747}
748
749impl PathWithGuard {
750    /// Creates a `PathWithGuard` from an existing path.
751    pub fn new(path_buf: PathBuf) -> Self {
752        Self {
753            path_buf,
754            _dir_guard: None,
755        }
756    }
757
758    /// Returns the test path for RocksDB without common config.
759    #[cfg(with_testing)]
760    fn new_testing() -> PathWithGuard {
761        let dir = TempDir::new().unwrap();
762        let path_buf = dir.path().to_path_buf();
763        let dir_guard = Some(Arc::new(dir));
764        PathWithGuard {
765            path_buf,
766            _dir_guard: dir_guard,
767        }
768    }
769}
770
771impl PartialEq for PathWithGuard {
772    fn eq(&self, other: &Self) -> bool {
773        self.path_buf == other.path_buf
774    }
775}
776impl Eq for PathWithGuard {}
777
778impl KeyValueStoreError for RocksDbStoreInternalError {
779    const BACKEND: &'static str = "rocks_db";
780}
781
782/// The composed error type for the `RocksDbStore`
783pub type RocksDbStoreError = ValueSplittingError<RocksDbStoreInternalError>;
784
785/// The composed config type for the `RocksDbStore`
786pub type RocksDbStoreConfig = LruCachingConfig<RocksDbStoreInternalConfig>;
787
788/// The `RocksDbDatabase` composed type with metrics
789#[cfg(with_metrics)]
790pub type RocksDbDatabase = MeteredDatabase<
791    LruCachingDatabase<
792        MeteredDatabase<ValueSplittingDatabase<MeteredDatabase<RocksDbDatabaseInternal>>>,
793    >,
794>;
795/// The `RocksDbDatabase` composed type
796#[cfg(not(with_metrics))]
797pub type RocksDbDatabase = LruCachingDatabase<ValueSplittingDatabase<RocksDbDatabaseInternal>>;
798
799#[cfg(with_testing)]
800impl crate::backends::DatabaseBackup for RocksDbDatabaseInternal {
801    fn backup_to(&self, dir: &std::path::Path) -> anyhow::Result<()> {
802        use rocksdb::{
803            backup::{BackupEngine, BackupEngineOptions},
804            Env,
805        };
806        let opts = BackupEngineOptions::new(dir)?;
807        let env = Env::new()?;
808        let mut engine = BackupEngine::open(&opts, &env)?;
809        engine.create_new_backup_flush(&*self.executor.db, true)?;
810        Ok(())
811    }
812}