linera_views/backends/
rocks_db.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] for the RocksDB database.
5
6use std::{
7    ffi::OsString,
8    fmt::Display,
9    path::PathBuf,
10    sync::{
11        atomic::{AtomicBool, Ordering},
12        Arc,
13    },
14};
15
16use linera_base::ensure;
17use rocksdb::{BlockBasedOptions, Cache, DBCompactionStyle, SliceTransform, WriteBufferManager};
18use serde::{Deserialize, Serialize};
19use sysinfo::{MemoryRefreshKind, RefreshKind, System};
20use tempfile::TempDir;
21use thiserror::Error;
22
23#[cfg(with_metrics)]
24use crate::metering::MeteredDatabase;
25#[cfg(with_testing)]
26use crate::store::TestKeyValueDatabase;
27use crate::{
28    batch::{Batch, WriteOperation},
29    common::get_upper_bound_option,
30    lru_caching::{LruCachingConfig, LruCachingDatabase},
31    store::{
32        KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
33        WritableKeyValueStore,
34    },
35    value_splitting::{ValueSplittingDatabase, ValueSplittingError},
36};
37
38/// The prefixes being used in the system
39static ROOT_KEY_DOMAIN: [u8; 1] = [0];
40static STORED_ROOT_KEYS_PREFIX: u8 = 1;
41
42/// The number of streams for the test
43#[cfg(with_testing)]
44const TEST_ROCKS_DB_MAX_STREAM_QUERIES: usize = 10;
45
46// The maximum size of values in RocksDB is 3 GiB
47// For offset reasons we decrease by 400
48const MAX_VALUE_SIZE: usize = 3 * 1024 * 1024 * 1024 - 400;
49
50// The maximum size of keys in RocksDB is 8 MiB
51// For offset reasons we decrease by 400
52const MAX_KEY_SIZE: usize = 8 * 1024 * 1024 - 400;
53
54const WRITE_BUFFER_SIZE: usize = 256 * 1024 * 1024; // 256 MiB
55const MAX_WRITE_BUFFER_NUMBER: i32 = 6;
56
57fn get_available_memory(sys: &System) -> usize {
58    sys.cgroup_limits()
59        .map_or_else(|| sys.total_memory() as usize, |c| c.total_memory as usize)
60}
61
62fn get_available_cpus() -> i32 {
63    std::thread::available_parallelism()
64        .map(|p| p.get() as i32)
65        .unwrap_or(1)
66}
67
68const HYPER_CLOCK_CACHE_BLOCK_SIZE: usize = 8 * 1024; // 8 KiB
69
70/// The RocksDB client that we use.
71type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;
72
73/// The choice of the spawning mode.
74/// `SpawnBlocking` always works and is the safest.
75/// `BlockInPlace` can only be used in multi-threaded environment.
76/// One way to select that is to select BlockInPlace when
77/// `tokio::runtime::Handle::current().metrics().num_workers() > 1`
78/// `BlockInPlace` is documented in <https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html>
79#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize, Serialize)]
80pub enum RocksDbSpawnMode {
81    /// This uses the `spawn_blocking` function of Tokio.
82    SpawnBlocking,
83    /// This uses the `block_in_place` function of Tokio.
84    BlockInPlace,
85}
86
87impl RocksDbSpawnMode {
88    /// Obtains the spawning mode from runtime.
89    pub fn get_spawn_mode_from_runtime() -> Self {
90        if tokio::runtime::Handle::current().metrics().num_workers() > 1 {
91            RocksDbSpawnMode::BlockInPlace
92        } else {
93            RocksDbSpawnMode::SpawnBlocking
94        }
95    }
96
97    /// Runs the computation for a function according to the selected policy.
98    #[inline]
99    async fn spawn<F, I, O>(&self, f: F, input: I) -> Result<O, RocksDbStoreInternalError>
100    where
101        F: FnOnce(I) -> Result<O, RocksDbStoreInternalError> + Send + 'static,
102        I: Send + 'static,
103        O: Send + 'static,
104    {
105        Ok(match self {
106            RocksDbSpawnMode::BlockInPlace => tokio::task::block_in_place(move || f(input))?,
107            RocksDbSpawnMode::SpawnBlocking => {
108                tokio::task::spawn_blocking(move || f(input)).await??
109            }
110        })
111    }
112}
113
114impl Display for RocksDbSpawnMode {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        match &self {
117            RocksDbSpawnMode::SpawnBlocking => write!(f, "spawn_blocking"),
118            RocksDbSpawnMode::BlockInPlace => write!(f, "block_in_place"),
119        }
120    }
121}
122
123fn check_key_size(key: &[u8]) -> Result<(), RocksDbStoreInternalError> {
124    ensure!(
125        key.len() <= MAX_KEY_SIZE,
126        RocksDbStoreInternalError::KeyTooLong
127    );
128    Ok(())
129}
130
131#[derive(Clone)]
132struct RocksDbStoreExecutor {
133    db: Arc<DB>,
134    start_key: Vec<u8>,
135}
136
137impl RocksDbStoreExecutor {
138    fn contains_keys_internal(
139        &self,
140        keys: Vec<Vec<u8>>,
141    ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
142        let size = keys.len();
143        let mut results = vec![false; size];
144        let mut indices = Vec::new();
145        let mut keys_red = Vec::new();
146        for (i, key) in keys.into_iter().enumerate() {
147            check_key_size(&key)?;
148            let mut full_key = self.start_key.to_vec();
149            full_key.extend(key);
150            if self.db.key_may_exist(&full_key) {
151                indices.push(i);
152                keys_red.push(full_key);
153            }
154        }
155        let values_red = self.db.multi_get(keys_red);
156        for (index, value) in indices.into_iter().zip(values_red) {
157            results[index] = value?.is_some();
158        }
159        Ok(results)
160    }
161
162    fn read_multi_values_bytes_internal(
163        &self,
164        keys: Vec<Vec<u8>>,
165    ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
166        for key in &keys {
167            check_key_size(key)?;
168        }
169        let full_keys = keys
170            .into_iter()
171            .map(|key| {
172                let mut full_key = self.start_key.to_vec();
173                full_key.extend(key);
174                full_key
175            })
176            .collect::<Vec<_>>();
177        let entries = self.db.multi_get(&full_keys);
178        Ok(entries.into_iter().collect::<Result<_, _>>()?)
179    }
180
181    fn get_find_prefix_iterator(
182        &self,
183        prefix: &[u8],
184    ) -> rocksdb::DBRawIteratorWithThreadMode<'_, DB> {
185        // Configure ReadOptions optimized for SSDs and iterator performance
186        let mut read_opts = rocksdb::ReadOptions::default();
187        // Enable async I/O for better concurrency
188        read_opts.set_async_io(true);
189
190        // Set precise upper bound to minimize key traversal
191        let upper_bound = get_upper_bound_option(prefix);
192        if let Some(upper_bound) = upper_bound {
193            read_opts.set_iterate_upper_bound(upper_bound);
194        }
195
196        let mut iter = self.db.raw_iterator_opt(read_opts);
197        iter.seek(prefix);
198        iter
199    }
200
201    fn find_keys_by_prefix_internal(
202        &self,
203        key_prefix: Vec<u8>,
204    ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
205        check_key_size(&key_prefix)?;
206
207        let mut prefix = self.start_key.clone();
208        prefix.extend(key_prefix);
209        let len = prefix.len();
210
211        let mut iter = self.get_find_prefix_iterator(&prefix);
212        let mut keys = Vec::new();
213        while let Some(key) = iter.key() {
214            keys.push(key[len..].to_vec());
215            iter.next();
216        }
217        Ok(keys)
218    }
219
220    #[expect(clippy::type_complexity)]
221    fn find_key_values_by_prefix_internal(
222        &self,
223        key_prefix: Vec<u8>,
224    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, RocksDbStoreInternalError> {
225        check_key_size(&key_prefix)?;
226        let mut prefix = self.start_key.clone();
227        prefix.extend(key_prefix);
228        let len = prefix.len();
229
230        let mut iter = self.get_find_prefix_iterator(&prefix);
231        let mut key_values = Vec::new();
232        while let Some((key, value)) = iter.item() {
233            let key_value = (key[len..].to_vec(), value.to_vec());
234            key_values.push(key_value);
235            iter.next();
236        }
237        Ok(key_values)
238    }
239
240    fn write_batch_internal(
241        &self,
242        batch: Batch,
243        write_root_key: bool,
244    ) -> Result<(), RocksDbStoreInternalError> {
245        let mut inner_batch = rocksdb::WriteBatchWithTransaction::default();
246        for operation in batch.operations {
247            match operation {
248                WriteOperation::Delete { key } => {
249                    check_key_size(&key)?;
250                    let mut full_key = self.start_key.to_vec();
251                    full_key.extend(key);
252                    inner_batch.delete(&full_key)
253                }
254                WriteOperation::Put { key, value } => {
255                    check_key_size(&key)?;
256                    let mut full_key = self.start_key.to_vec();
257                    full_key.extend(key);
258                    inner_batch.put(&full_key, value)
259                }
260                WriteOperation::DeletePrefix { key_prefix } => {
261                    check_key_size(&key_prefix)?;
262                    let mut full_key1 = self.start_key.to_vec();
263                    full_key1.extend(&key_prefix);
264                    let full_key2 =
265                        get_upper_bound_option(&full_key1).expect("the first entry cannot be 255");
266                    inner_batch.delete_range(&full_key1, &full_key2);
267                }
268            }
269        }
270        if write_root_key {
271            let mut full_key = self.start_key.to_vec();
272            full_key[0] = STORED_ROOT_KEYS_PREFIX;
273            inner_batch.put(&full_key, vec![]);
274        }
275        self.db.write(inner_batch)?;
276        Ok(())
277    }
278}
279
280/// The inner client
281#[derive(Clone)]
282pub struct RocksDbStoreInternal {
283    executor: RocksDbStoreExecutor,
284    path_with_guard: PathWithGuard,
285    max_stream_queries: usize,
286    spawn_mode: RocksDbSpawnMode,
287    root_key_written: Arc<AtomicBool>,
288}
289
290/// Database-level connection to RocksDB for managing namespaces and partitions.
291#[derive(Clone)]
292pub struct RocksDbDatabaseInternal {
293    executor: RocksDbStoreExecutor,
294    path_with_guard: PathWithGuard,
295    max_stream_queries: usize,
296    spawn_mode: RocksDbSpawnMode,
297}
298
299impl WithError for RocksDbDatabaseInternal {
300    type Error = RocksDbStoreInternalError;
301}
302
303/// The initial configuration of the system
304#[derive(Clone, Debug, Deserialize, Serialize)]
305pub struct RocksDbStoreInternalConfig {
306    /// The path to the storage containing the namespaces
307    pub path_with_guard: PathWithGuard,
308    /// The chosen spawn mode
309    pub spawn_mode: RocksDbSpawnMode,
310    /// Preferred buffer size for async streams.
311    pub max_stream_queries: usize,
312}
313
314impl RocksDbDatabaseInternal {
315    fn check_namespace(namespace: &str) -> Result<(), RocksDbStoreInternalError> {
316        if !namespace
317            .chars()
318            .all(|character| character.is_ascii_alphanumeric() || character == '_')
319        {
320            return Err(RocksDbStoreInternalError::InvalidNamespace);
321        }
322        Ok(())
323    }
324
325    fn build(
326        config: &RocksDbStoreInternalConfig,
327        namespace: &str,
328    ) -> Result<RocksDbDatabaseInternal, RocksDbStoreInternalError> {
329        let start_key = ROOT_KEY_DOMAIN.to_vec();
330        // Create a store to extract its executor and configuration
331        let temp_store = RocksDbStoreInternal::build(config, namespace, start_key)?;
332        Ok(RocksDbDatabaseInternal {
333            executor: temp_store.executor,
334            path_with_guard: temp_store.path_with_guard,
335            max_stream_queries: temp_store.max_stream_queries,
336            spawn_mode: temp_store.spawn_mode,
337        })
338    }
339}
340
341impl RocksDbStoreInternal {
342    fn build(
343        config: &RocksDbStoreInternalConfig,
344        namespace: &str,
345        start_key: Vec<u8>,
346    ) -> Result<RocksDbStoreInternal, RocksDbStoreInternalError> {
347        RocksDbDatabaseInternal::check_namespace(namespace)?;
348        let mut path_buf = config.path_with_guard.path_buf.clone();
349        let mut path_with_guard = config.path_with_guard.clone();
350        path_buf.push(namespace);
351        path_with_guard.path_buf = path_buf.clone();
352        let max_stream_queries = config.max_stream_queries;
353        let spawn_mode = config.spawn_mode;
354        if !std::path::Path::exists(&path_buf) {
355            std::fs::create_dir_all(path_buf.clone())?;
356        }
357        let sys = System::new_with_specifics(
358            RefreshKind::nothing().with_memory(MemoryRefreshKind::nothing().with_ram()),
359        );
360        let num_cpus = get_available_cpus();
361        let total_ram = get_available_memory(&sys);
362
363        let mut options = rocksdb::Options::default();
364        options.create_if_missing(true);
365        options.create_missing_column_families(true);
366
367        // Flush in-memory buffer to disk more often
368        options.set_write_buffer_size(WRITE_BUFFER_SIZE);
369        options.set_max_write_buffer_number(MAX_WRITE_BUFFER_NUMBER);
370        options.set_compression_type(rocksdb::DBCompressionType::Lz4);
371        options.set_level_zero_slowdown_writes_trigger(8);
372        options.set_level_zero_stop_writes_trigger(12);
373        options.set_level_zero_file_num_compaction_trigger(2);
374        // We deliberately give RocksDB one background thread *per* CPU so that
375        // flush + (N-1) compactions can hammer the NVMe at full bandwidth while
376        // still leaving enough CPU time for the foreground application threads.
377        options.increase_parallelism(num_cpus);
378        options.set_max_background_jobs(num_cpus);
379        options.set_max_subcompactions(num_cpus as u32);
380        options.set_level_compaction_dynamic_level_bytes(true);
381
382        options.set_compaction_style(DBCompactionStyle::Level);
383        options.set_target_file_size_base(2 * WRITE_BUFFER_SIZE as u64);
384
385        let mut block_options = BlockBasedOptions::default();
386        block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
387        block_options.set_cache_index_and_filter_blocks(true);
388        // Allocate 1/4 of total RAM for RocksDB block cache, which is a reasonable balance:
389        // - Large enough to significantly improve read performance by caching frequently accessed blocks
390        // - Small enough to leave memory for other system components
391        // - Follows common practice for database caching in server environments
392        // - Prevents excessive memory pressure that could lead to swapping or OOM conditions
393        block_options.set_block_cache(&Cache::new_hyper_clock_cache(
394            total_ram / 4,
395            HYPER_CLOCK_CACHE_BLOCK_SIZE,
396        ));
397
398        // Cap total memtable memory to prevent unbounded growth when multiple column
399        // families are used or many memtables accumulate before flushing.
400        let write_buffer_manager =
401            WriteBufferManager::new_write_buffer_manager(total_ram / 4, true);
402        options.set_write_buffer_manager(&write_buffer_manager);
403
404        // Configure bloom filters for prefix iteration optimization
405        block_options.set_bloom_filter(10.0, false);
406        block_options.set_whole_key_filtering(false);
407
408        // 32KB blocks instead of default 4KB - reduces iterator seeks
409        block_options.set_block_size(32 * 1024);
410        // Use latest format for better compression and performance
411        block_options.set_format_version(5);
412
413        options.set_block_based_table_factory(&block_options);
414
415        // Configure prefix extraction for bloom filter optimization
416        // Use 8 bytes: ROOT_KEY_DOMAIN (1 byte) + BCS variant (1-2 bytes) + identifier start (4-5 bytes)
417        let prefix_extractor = SliceTransform::create_fixed_prefix(8);
418        options.set_prefix_extractor(prefix_extractor);
419
420        // 12.5% of memtable size for bloom filter
421        options.set_memtable_prefix_bloom_ratio(0.125);
422        // Skip bloom filter for memtable when key exists
423        options.set_optimize_filters_for_hits(true);
424        // Use memory-mapped files for faster reads
425        options.set_allow_mmap_reads(true);
426        // Don't use random access pattern since we do prefix scans
427        options.set_advise_random_on_open(false);
428
429        let db = DB::open(&options, path_buf)?;
430        let executor = RocksDbStoreExecutor {
431            db: Arc::new(db),
432            start_key,
433        };
434        Ok(RocksDbStoreInternal {
435            executor,
436            path_with_guard,
437            max_stream_queries,
438            spawn_mode,
439            root_key_written: Arc::new(AtomicBool::new(false)),
440        })
441    }
442}
443
444impl WithError for RocksDbStoreInternal {
445    type Error = RocksDbStoreInternalError;
446}
447
448impl ReadableKeyValueStore for RocksDbStoreInternal {
449    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
450
451    fn max_stream_queries(&self) -> usize {
452        self.max_stream_queries
453    }
454
455    fn root_key(&self) -> Result<Vec<u8>, RocksDbStoreInternalError> {
456        assert!(self.executor.start_key.starts_with(&ROOT_KEY_DOMAIN));
457        let root_key = bcs::from_bytes(&self.executor.start_key[ROOT_KEY_DOMAIN.len()..])?;
458        Ok(root_key)
459    }
460
461    async fn read_value_bytes(
462        &self,
463        key: &[u8],
464    ) -> Result<Option<Vec<u8>>, RocksDbStoreInternalError> {
465        check_key_size(key)?;
466        let db = self.executor.db.clone();
467        let mut full_key = self.executor.start_key.to_vec();
468        full_key.extend(key);
469        self.spawn_mode
470            .spawn(move |x| Ok(db.get(&x)?), full_key)
471            .await
472    }
473
474    async fn contains_key(&self, key: &[u8]) -> Result<bool, RocksDbStoreInternalError> {
475        check_key_size(key)?;
476        let db = self.executor.db.clone();
477        let mut full_key = self.executor.start_key.to_vec();
478        full_key.extend(key);
479        self.spawn_mode
480            .spawn(
481                move |x| {
482                    if !db.key_may_exist(&x) {
483                        return Ok(false);
484                    }
485                    Ok(db.get(&x)?.is_some())
486                },
487                full_key,
488            )
489            .await
490    }
491
492    async fn contains_keys(
493        &self,
494        keys: &[Vec<u8>],
495    ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
496        let executor = self.executor.clone();
497        self.spawn_mode
498            .spawn(move |x| executor.contains_keys_internal(x), keys.to_vec())
499            .await
500    }
501
502    async fn read_multi_values_bytes(
503        &self,
504        keys: &[Vec<u8>],
505    ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
506        let executor = self.executor.clone();
507        self.spawn_mode
508            .spawn(
509                move |x| executor.read_multi_values_bytes_internal(x),
510                keys.to_vec(),
511            )
512            .await
513    }
514
515    async fn find_keys_by_prefix(
516        &self,
517        key_prefix: &[u8],
518    ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
519        let executor = self.executor.clone();
520        let key_prefix = key_prefix.to_vec();
521        self.spawn_mode
522            .spawn(
523                move |x| executor.find_keys_by_prefix_internal(x),
524                key_prefix,
525            )
526            .await
527    }
528
529    async fn find_key_values_by_prefix(
530        &self,
531        key_prefix: &[u8],
532    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, RocksDbStoreInternalError> {
533        let executor = self.executor.clone();
534        let key_prefix = key_prefix.to_vec();
535        self.spawn_mode
536            .spawn(
537                move |x| executor.find_key_values_by_prefix_internal(x),
538                key_prefix,
539            )
540            .await
541    }
542}
543
544impl WritableKeyValueStore for RocksDbStoreInternal {
545    const MAX_VALUE_SIZE: usize = MAX_VALUE_SIZE;
546
547    async fn write_batch(&self, batch: Batch) -> Result<(), RocksDbStoreInternalError> {
548        let write_root_key = !self.root_key_written.fetch_or(true, Ordering::SeqCst);
549        let executor = self.executor.clone();
550        self.spawn_mode
551            .spawn(
552                move |x| executor.write_batch_internal(x, write_root_key),
553                batch,
554            )
555            .await
556    }
557
558    async fn clear_journal(&self) -> Result<(), RocksDbStoreInternalError> {
559        Ok(())
560    }
561}
562
563impl KeyValueDatabase for RocksDbDatabaseInternal {
564    type Config = RocksDbStoreInternalConfig;
565    type Store = RocksDbStoreInternal;
566
567    fn get_name() -> String {
568        "rocksdb internal".to_string()
569    }
570
571    async fn connect(
572        config: &Self::Config,
573        namespace: &str,
574    ) -> Result<Self, RocksDbStoreInternalError> {
575        Self::build(config, namespace)
576    }
577
578    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, RocksDbStoreInternalError> {
579        let mut start_key = ROOT_KEY_DOMAIN.to_vec();
580        start_key.extend(bcs::to_bytes(root_key)?);
581        let mut executor = self.executor.clone();
582        executor.start_key = start_key;
583        Ok(RocksDbStoreInternal {
584            executor,
585            path_with_guard: self.path_with_guard.clone(),
586            max_stream_queries: self.max_stream_queries,
587            spawn_mode: self.spawn_mode,
588            root_key_written: Arc::new(AtomicBool::new(false)),
589        })
590    }
591
592    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, RocksDbStoreInternalError> {
593        self.open_shared(root_key)
594    }
595
596    async fn list_all(config: &Self::Config) -> Result<Vec<String>, RocksDbStoreInternalError> {
597        let entries = std::fs::read_dir(config.path_with_guard.path_buf.clone())?;
598        let mut namespaces = Vec::new();
599        for entry in entries {
600            let entry = entry?;
601            if !entry.file_type()?.is_dir() {
602                return Err(RocksDbStoreInternalError::NonDirectoryNamespace);
603            }
604            let namespace = match entry.file_name().into_string() {
605                Err(error) => {
606                    return Err(RocksDbStoreInternalError::IntoStringError(error));
607                }
608                Ok(namespace) => namespace,
609            };
610            namespaces.push(namespace);
611        }
612        Ok(namespaces)
613    }
614
615    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
616        let mut store = self.open_shared(&[])?;
617        store.executor.start_key = vec![STORED_ROOT_KEYS_PREFIX];
618        let bcs_root_keys = store.find_keys_by_prefix(&[]).await?;
619        let mut root_keys = Vec::new();
620        for bcs_root_key in bcs_root_keys {
621            let root_key = bcs::from_bytes::<Vec<u8>>(&bcs_root_key)?;
622            root_keys.push(root_key);
623        }
624        Ok(root_keys)
625    }
626
627    async fn delete_all(config: &Self::Config) -> Result<(), RocksDbStoreInternalError> {
628        let namespaces = Self::list_all(config).await?;
629        for namespace in namespaces {
630            let mut path_buf = config.path_with_guard.path_buf.clone();
631            path_buf.push(&namespace);
632            std::fs::remove_dir_all(path_buf.as_path())?;
633        }
634        Ok(())
635    }
636
637    async fn exists(
638        config: &Self::Config,
639        namespace: &str,
640    ) -> Result<bool, RocksDbStoreInternalError> {
641        Self::check_namespace(namespace)?;
642        let mut path_buf = config.path_with_guard.path_buf.clone();
643        path_buf.push(namespace);
644        let test = std::path::Path::exists(&path_buf);
645        Ok(test)
646    }
647
648    async fn create(
649        config: &Self::Config,
650        namespace: &str,
651    ) -> Result<(), RocksDbStoreInternalError> {
652        Self::check_namespace(namespace)?;
653        let mut path_buf = config.path_with_guard.path_buf.clone();
654        path_buf.push(namespace);
655        if std::path::Path::exists(&path_buf) {
656            return Err(RocksDbStoreInternalError::StoreAlreadyExists);
657        }
658        std::fs::create_dir_all(path_buf)?;
659        Ok(())
660    }
661
662    async fn delete(
663        config: &Self::Config,
664        namespace: &str,
665    ) -> Result<(), RocksDbStoreInternalError> {
666        Self::check_namespace(namespace)?;
667        let mut path_buf = config.path_with_guard.path_buf.clone();
668        path_buf.push(namespace);
669        let path = path_buf.as_path();
670        std::fs::remove_dir_all(path)?;
671        Ok(())
672    }
673}
674
675#[cfg(with_testing)]
676impl TestKeyValueDatabase for RocksDbDatabaseInternal {
677    async fn new_test_config() -> Result<RocksDbStoreInternalConfig, RocksDbStoreInternalError> {
678        let path_with_guard = PathWithGuard::new_testing();
679        let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
680        let max_stream_queries = TEST_ROCKS_DB_MAX_STREAM_QUERIES;
681        Ok(RocksDbStoreInternalConfig {
682            path_with_guard,
683            spawn_mode,
684            max_stream_queries,
685        })
686    }
687}
688
689/// The error type for [`RocksDbStoreInternal`]
690#[derive(Error, Debug)]
691pub enum RocksDbStoreInternalError {
692    /// Store already exists
693    #[error("Store already exists")]
694    StoreAlreadyExists,
695
696    /// Tokio join error in RocksDB.
697    #[error("tokio join error: {0}")]
698    TokioJoinError(#[from] tokio::task::JoinError),
699
700    /// RocksDB error.
701    #[error("RocksDB error: {0}")]
702    RocksDb(#[from] rocksdb::Error),
703
704    /// The database contains a file which is not a directory
705    #[error("Namespaces should be directories")]
706    NonDirectoryNamespace,
707
708    /// Error converting `OsString` to `String`
709    #[error("error in the conversion from OsString: {0:?}")]
710    IntoStringError(OsString),
711
712    /// The key must have at most 8 MiB
713    #[error("The key must have at most 8 MiB")]
714    KeyTooLong,
715
716    /// Namespace contains forbidden characters
717    #[error("Namespace contains forbidden characters")]
718    InvalidNamespace,
719
720    /// Filesystem error
721    #[error("Filesystem error: {0}")]
722    FsError(#[from] std::io::Error),
723
724    /// BCS serialization error.
725    #[error(transparent)]
726    BcsError(#[from] bcs::Error),
727}
728
729/// A path and the guard for the temporary directory if needed
730#[derive(Clone, Debug, Deserialize, Serialize)]
731pub struct PathWithGuard {
732    /// The path to the data
733    pub path_buf: PathBuf,
734    /// The guard for the directory if one is needed
735    #[serde(skip)]
736    _dir_guard: Option<Arc<TempDir>>,
737}
738
739impl PathWithGuard {
740    /// Creates a `PathWithGuard` from an existing path.
741    pub fn new(path_buf: PathBuf) -> Self {
742        Self {
743            path_buf,
744            _dir_guard: None,
745        }
746    }
747
748    /// Returns the test path for RocksDB without common config.
749    #[cfg(with_testing)]
750    fn new_testing() -> PathWithGuard {
751        let dir = TempDir::new().unwrap();
752        let path_buf = dir.path().to_path_buf();
753        let dir_guard = Some(Arc::new(dir));
754        PathWithGuard {
755            path_buf,
756            _dir_guard: dir_guard,
757        }
758    }
759}
760
761impl PartialEq for PathWithGuard {
762    fn eq(&self, other: &Self) -> bool {
763        self.path_buf == other.path_buf
764    }
765}
766impl Eq for PathWithGuard {}
767
768impl KeyValueStoreError for RocksDbStoreInternalError {
769    const BACKEND: &'static str = "rocks_db";
770}
771
772/// The composed error type for the `RocksDbStore`
773pub type RocksDbStoreError = ValueSplittingError<RocksDbStoreInternalError>;
774
775/// The composed config type for the `RocksDbStore`
776pub type RocksDbStoreConfig = LruCachingConfig<RocksDbStoreInternalConfig>;
777
778/// The `RocksDbDatabase` composed type with metrics
779#[cfg(with_metrics)]
780pub type RocksDbDatabase = MeteredDatabase<
781    LruCachingDatabase<
782        MeteredDatabase<ValueSplittingDatabase<MeteredDatabase<RocksDbDatabaseInternal>>>,
783    >,
784>;
785/// The `RocksDbDatabase` composed type
786#[cfg(not(with_metrics))]
787pub type RocksDbDatabase = LruCachingDatabase<ValueSplittingDatabase<RocksDbDatabaseInternal>>;