1#![allow(
9 clippy::cast_possible_truncation,
10 clippy::cast_sign_loss,
11 clippy::cast_possible_wrap
12)]
13
14use std::{
15 ffi::OsString,
16 fmt::Display,
17 path::PathBuf,
18 sync::{
19 atomic::{AtomicBool, Ordering},
20 Arc,
21 },
22};
23
24use linera_base::ensure;
25use rocksdb::{BlockBasedOptions, Cache, DBCompactionStyle, SliceTransform, WriteBufferManager};
26use serde::{Deserialize, Serialize};
27use sysinfo::{MemoryRefreshKind, RefreshKind, System};
28use tempfile::TempDir;
29use thiserror::Error;
30
31#[cfg(with_metrics)]
32use crate::metering::MeteredDatabase;
33#[cfg(with_testing)]
34use crate::store::TestKeyValueDatabase;
35use crate::{
36 batch::{Batch, WriteOperation},
37 common::get_upper_bound_option,
38 lru_caching::{LruCachingConfig, LruCachingDatabase},
39 store::{
40 KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
41 WritableKeyValueStore,
42 },
43 value_splitting::{ValueSplittingDatabase, ValueSplittingError},
44};
45
46static ROOT_KEY_DOMAIN: [u8; 1] = [0];
48static STORED_ROOT_KEYS_PREFIX: u8 = 1;
49
50#[cfg(with_testing)]
52const TEST_ROCKS_DB_MAX_STREAM_QUERIES: usize = 10;
53
54const MAX_VALUE_SIZE: usize = 3 * 1024 * 1024 * 1024 - 400;
57
58const MAX_KEY_SIZE: usize = 8 * 1024 * 1024 - 400;
61
62const WRITE_BUFFER_SIZE: usize = 16 * 1024 * 1024; const MAX_WRITE_BUFFER_NUMBER: i32 = 6;
68
69fn get_available_memory(sys: &System) -> usize {
70 sys.cgroup_limits()
71 .map_or_else(|| sys.total_memory() as usize, |c| c.total_memory as usize)
72}
73
74fn get_available_cpus() -> i32 {
75 std::thread::available_parallelism().map_or(1, |p| p.get() as i32)
76}
77
78const HYPER_CLOCK_CACHE_BLOCK_SIZE: usize = 8 * 1024; type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;
82
83#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize, Serialize)]
90pub enum RocksDbSpawnMode {
91 SpawnBlocking,
93 BlockInPlace,
95}
96
97impl RocksDbSpawnMode {
98 pub fn get_spawn_mode_from_runtime() -> Self {
100 if tokio::runtime::Handle::current().metrics().num_workers() > 1 {
101 RocksDbSpawnMode::BlockInPlace
102 } else {
103 RocksDbSpawnMode::SpawnBlocking
104 }
105 }
106
107 #[inline]
109 async fn spawn<F, I, O>(&self, f: F, input: I) -> Result<O, RocksDbStoreInternalError>
110 where
111 F: FnOnce(I) -> Result<O, RocksDbStoreInternalError> + Send + 'static,
112 I: Send + 'static,
113 O: Send + 'static,
114 {
115 Ok(match self {
116 RocksDbSpawnMode::BlockInPlace => tokio::task::block_in_place(move || f(input))?,
117 RocksDbSpawnMode::SpawnBlocking => {
118 tokio::task::spawn_blocking(move || f(input)).await??
119 }
120 })
121 }
122}
123
124impl Display for RocksDbSpawnMode {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 match &self {
127 RocksDbSpawnMode::SpawnBlocking => write!(f, "spawn_blocking"),
128 RocksDbSpawnMode::BlockInPlace => write!(f, "block_in_place"),
129 }
130 }
131}
132
133fn check_key_size(key: &[u8]) -> Result<(), RocksDbStoreInternalError> {
134 ensure!(
135 key.len() <= MAX_KEY_SIZE,
136 RocksDbStoreInternalError::KeyTooLong
137 );
138 Ok(())
139}
140
141#[derive(Clone)]
142struct RocksDbStoreExecutor {
143 db: Arc<DB>,
144 start_key: Vec<u8>,
145}
146
147impl RocksDbStoreExecutor {
148 fn contains_keys_internal(
149 &self,
150 keys: Vec<Vec<u8>>,
151 ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
152 let size = keys.len();
153 let mut results = vec![false; size];
154 let mut indices = Vec::new();
155 let mut keys_red = Vec::new();
156 for (i, key) in keys.into_iter().enumerate() {
157 check_key_size(&key)?;
158 let mut full_key = self.start_key.to_vec();
159 full_key.extend(key);
160 if self.db.key_may_exist(&full_key) {
161 indices.push(i);
162 keys_red.push(full_key);
163 }
164 }
165 let values_red = self.db.multi_get(keys_red);
166 for (index, value) in indices.into_iter().zip(values_red) {
167 results[index] = value?.is_some();
168 }
169 Ok(results)
170 }
171
172 fn read_multi_values_bytes_internal(
173 &self,
174 keys: Vec<Vec<u8>>,
175 ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
176 for key in &keys {
177 check_key_size(key)?;
178 }
179 let full_keys = keys
180 .into_iter()
181 .map(|key| {
182 let mut full_key = self.start_key.to_vec();
183 full_key.extend(key);
184 full_key
185 })
186 .collect::<Vec<_>>();
187 let entries = self.db.multi_get(&full_keys);
188 Ok(entries.into_iter().collect::<Result<_, _>>()?)
189 }
190
191 fn get_find_prefix_iterator(
192 &self,
193 prefix: &[u8],
194 ) -> rocksdb::DBRawIteratorWithThreadMode<'_, DB> {
195 let mut read_opts = rocksdb::ReadOptions::default();
197 read_opts.set_async_io(true);
199
200 let upper_bound = get_upper_bound_option(prefix);
202 if let Some(upper_bound) = upper_bound {
203 read_opts.set_iterate_upper_bound(upper_bound);
204 }
205
206 let mut iter = self.db.raw_iterator_opt(read_opts);
207 iter.seek(prefix);
208 iter
209 }
210
211 fn find_keys_by_prefix_internal(
212 &self,
213 key_prefix: Vec<u8>,
214 ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
215 check_key_size(&key_prefix)?;
216
217 let mut prefix = self.start_key.clone();
218 prefix.extend(key_prefix);
219 let len = prefix.len();
220
221 let mut iter = self.get_find_prefix_iterator(&prefix);
222 let mut keys = Vec::new();
223 while let Some(key) = iter.key() {
224 keys.push(key[len..].to_vec());
225 iter.next();
226 }
227 Ok(keys)
228 }
229
230 #[expect(clippy::type_complexity)]
231 fn find_key_values_by_prefix_internal(
232 &self,
233 key_prefix: Vec<u8>,
234 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, RocksDbStoreInternalError> {
235 check_key_size(&key_prefix)?;
236 let mut prefix = self.start_key.clone();
237 prefix.extend(key_prefix);
238 let len = prefix.len();
239
240 let mut iter = self.get_find_prefix_iterator(&prefix);
241 let mut key_values = Vec::new();
242 while let Some((key, value)) = iter.item() {
243 let key_value = (key[len..].to_vec(), value.to_vec());
244 key_values.push(key_value);
245 iter.next();
246 }
247 Ok(key_values)
248 }
249
250 fn write_batch_internal(
251 &self,
252 batch: Batch,
253 write_root_key: bool,
254 ) -> Result<(), RocksDbStoreInternalError> {
255 let mut inner_batch = rocksdb::WriteBatchWithTransaction::default();
256 for operation in batch.operations {
257 match operation {
258 WriteOperation::Delete { key } => {
259 check_key_size(&key)?;
260 let mut full_key = self.start_key.to_vec();
261 full_key.extend(key);
262 inner_batch.delete(&full_key)
263 }
264 WriteOperation::Put { key, value } => {
265 check_key_size(&key)?;
266 let mut full_key = self.start_key.to_vec();
267 full_key.extend(key);
268 inner_batch.put(&full_key, value)
269 }
270 WriteOperation::DeletePrefix { key_prefix } => {
271 check_key_size(&key_prefix)?;
272 let mut full_key1 = self.start_key.to_vec();
273 full_key1.extend(&key_prefix);
274 let full_key2 =
275 get_upper_bound_option(&full_key1).expect("the first entry cannot be 255");
276 inner_batch.delete_range(&full_key1, &full_key2);
277 }
278 }
279 }
280 if write_root_key {
281 let mut full_key = self.start_key.to_vec();
282 full_key[0] = STORED_ROOT_KEYS_PREFIX;
283 inner_batch.put(&full_key, vec![]);
284 }
285 self.db.write(inner_batch)?;
286 Ok(())
287 }
288}
289
290#[derive(Clone)]
292pub struct RocksDbStoreInternal {
293 executor: RocksDbStoreExecutor,
294 path_with_guard: PathWithGuard,
295 max_stream_queries: usize,
296 spawn_mode: RocksDbSpawnMode,
297 root_key_written: Arc<AtomicBool>,
298}
299
300#[derive(Clone)]
302pub struct RocksDbDatabaseInternal {
303 executor: RocksDbStoreExecutor,
304 path_with_guard: PathWithGuard,
305 max_stream_queries: usize,
306 spawn_mode: RocksDbSpawnMode,
307}
308
309impl WithError for RocksDbDatabaseInternal {
310 type Error = RocksDbStoreInternalError;
311}
312
313#[derive(Clone, Debug, Deserialize, Serialize)]
315pub struct RocksDbStoreInternalConfig {
316 pub path_with_guard: PathWithGuard,
318 pub spawn_mode: RocksDbSpawnMode,
320 pub max_stream_queries: usize,
322}
323
324impl RocksDbDatabaseInternal {
325 fn check_namespace(namespace: &str) -> Result<(), RocksDbStoreInternalError> {
326 if !namespace
327 .chars()
328 .all(|character| character.is_ascii_alphanumeric() || character == '_')
329 {
330 return Err(RocksDbStoreInternalError::InvalidNamespace);
331 }
332 Ok(())
333 }
334
335 fn build(
336 config: &RocksDbStoreInternalConfig,
337 namespace: &str,
338 ) -> Result<RocksDbDatabaseInternal, RocksDbStoreInternalError> {
339 let start_key = ROOT_KEY_DOMAIN.to_vec();
340 let temp_store = RocksDbStoreInternal::build(config, namespace, start_key)?;
342 Ok(RocksDbDatabaseInternal {
343 executor: temp_store.executor,
344 path_with_guard: temp_store.path_with_guard,
345 max_stream_queries: temp_store.max_stream_queries,
346 spawn_mode: temp_store.spawn_mode,
347 })
348 }
349}
350
351impl RocksDbStoreInternal {
352 fn build(
353 config: &RocksDbStoreInternalConfig,
354 namespace: &str,
355 start_key: Vec<u8>,
356 ) -> Result<RocksDbStoreInternal, RocksDbStoreInternalError> {
357 RocksDbDatabaseInternal::check_namespace(namespace)?;
358 let mut path_buf = config.path_with_guard.path_buf.clone();
359 let mut path_with_guard = config.path_with_guard.clone();
360 path_buf.push(namespace);
361 path_with_guard.path_buf = path_buf.clone();
362 let max_stream_queries = config.max_stream_queries;
363 let spawn_mode = config.spawn_mode;
364 if !std::path::Path::exists(&path_buf) {
365 std::fs::create_dir_all(path_buf.clone())?;
366 }
367 let sys = System::new_with_specifics(
368 RefreshKind::nothing().with_memory(MemoryRefreshKind::nothing().with_ram()),
369 );
370 let num_cpus = get_available_cpus();
371 let total_ram = get_available_memory(&sys);
372
373 let mut options = rocksdb::Options::default();
374 options.create_if_missing(true);
375 options.create_missing_column_families(true);
376
377 options.set_write_buffer_size(WRITE_BUFFER_SIZE);
379 options.set_max_write_buffer_number(MAX_WRITE_BUFFER_NUMBER);
380 options.set_compression_type(rocksdb::DBCompressionType::Lz4);
381 options.set_level_zero_slowdown_writes_trigger(8);
382 options.set_level_zero_stop_writes_trigger(12);
383 options.set_level_zero_file_num_compaction_trigger(2);
384 options.increase_parallelism(num_cpus);
388 options.set_max_background_jobs(num_cpus);
389 options.set_max_subcompactions(num_cpus as u32);
390 options.set_level_compaction_dynamic_level_bytes(true);
391
392 options.set_compaction_style(DBCompactionStyle::Level);
393 options.set_target_file_size_base(2 * WRITE_BUFFER_SIZE as u64);
394
395 let mut block_options = BlockBasedOptions::default();
396 block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
397 block_options.set_cache_index_and_filter_blocks(true);
398 block_options.set_block_cache(&Cache::new_hyper_clock_cache(
404 total_ram / 4,
405 HYPER_CLOCK_CACHE_BLOCK_SIZE,
406 ));
407
408 let write_buffer_manager =
411 WriteBufferManager::new_write_buffer_manager(total_ram / 4, true);
412 options.set_write_buffer_manager(&write_buffer_manager);
413
414 block_options.set_bloom_filter(10.0, false);
416 block_options.set_whole_key_filtering(false);
417
418 block_options.set_block_size(32 * 1024);
420 block_options.set_format_version(5);
422
423 options.set_block_based_table_factory(&block_options);
424
425 let prefix_extractor = SliceTransform::create_fixed_prefix(8);
428 options.set_prefix_extractor(prefix_extractor);
429
430 options.set_memtable_prefix_bloom_ratio(0.125);
432 options.set_optimize_filters_for_hits(true);
434 options.set_allow_mmap_reads(true);
436 options.set_advise_random_on_open(false);
438
439 let db = DB::open(&options, path_buf)?;
440 let executor = RocksDbStoreExecutor {
441 db: Arc::new(db),
442 start_key,
443 };
444 Ok(RocksDbStoreInternal {
445 executor,
446 path_with_guard,
447 max_stream_queries,
448 spawn_mode,
449 root_key_written: Arc::new(AtomicBool::new(false)),
450 })
451 }
452}
453
454impl WithError for RocksDbStoreInternal {
455 type Error = RocksDbStoreInternalError;
456}
457
458impl ReadableKeyValueStore for RocksDbStoreInternal {
459 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
460
461 fn max_stream_queries(&self) -> usize {
462 self.max_stream_queries
463 }
464
465 fn root_key(&self) -> Result<Vec<u8>, RocksDbStoreInternalError> {
466 assert!(self.executor.start_key.starts_with(&ROOT_KEY_DOMAIN));
467 let root_key = bcs::from_bytes(&self.executor.start_key[ROOT_KEY_DOMAIN.len()..])?;
468 Ok(root_key)
469 }
470
471 async fn read_value_bytes(
472 &self,
473 key: &[u8],
474 ) -> Result<Option<Vec<u8>>, RocksDbStoreInternalError> {
475 check_key_size(key)?;
476 let db = self.executor.db.clone();
477 let mut full_key = self.executor.start_key.to_vec();
478 full_key.extend(key);
479 self.spawn_mode
480 .spawn(move |x| Ok(db.get(&x)?), full_key)
481 .await
482 }
483
484 async fn contains_key(&self, key: &[u8]) -> Result<bool, RocksDbStoreInternalError> {
485 check_key_size(key)?;
486 let db = self.executor.db.clone();
487 let mut full_key = self.executor.start_key.to_vec();
488 full_key.extend(key);
489 self.spawn_mode
490 .spawn(
491 move |x| {
492 if !db.key_may_exist(&x) {
493 return Ok(false);
494 }
495 Ok(db.get(&x)?.is_some())
496 },
497 full_key,
498 )
499 .await
500 }
501
502 async fn contains_keys(
503 &self,
504 keys: &[Vec<u8>],
505 ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
506 let executor = self.executor.clone();
507 self.spawn_mode
508 .spawn(move |x| executor.contains_keys_internal(x), keys.to_vec())
509 .await
510 }
511
512 async fn read_multi_values_bytes(
513 &self,
514 keys: &[Vec<u8>],
515 ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
516 let executor = self.executor.clone();
517 self.spawn_mode
518 .spawn(
519 move |x| executor.read_multi_values_bytes_internal(x),
520 keys.to_vec(),
521 )
522 .await
523 }
524
525 async fn find_keys_by_prefix(
526 &self,
527 key_prefix: &[u8],
528 ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
529 let executor = self.executor.clone();
530 let key_prefix = key_prefix.to_vec();
531 self.spawn_mode
532 .spawn(
533 move |x| executor.find_keys_by_prefix_internal(x),
534 key_prefix,
535 )
536 .await
537 }
538
539 async fn find_key_values_by_prefix(
540 &self,
541 key_prefix: &[u8],
542 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, RocksDbStoreInternalError> {
543 let executor = self.executor.clone();
544 let key_prefix = key_prefix.to_vec();
545 self.spawn_mode
546 .spawn(
547 move |x| executor.find_key_values_by_prefix_internal(x),
548 key_prefix,
549 )
550 .await
551 }
552}
553
554impl WritableKeyValueStore for RocksDbStoreInternal {
555 const MAX_VALUE_SIZE: usize = MAX_VALUE_SIZE;
556
557 async fn write_batch(&self, batch: Batch) -> Result<(), RocksDbStoreInternalError> {
558 let write_root_key = !self.root_key_written.fetch_or(true, Ordering::SeqCst);
559 let executor = self.executor.clone();
560 self.spawn_mode
561 .spawn(
562 move |x| executor.write_batch_internal(x, write_root_key),
563 batch,
564 )
565 .await
566 }
567
568 async fn clear_journal(&self) -> Result<(), RocksDbStoreInternalError> {
569 Ok(())
570 }
571}
572
573impl KeyValueDatabase for RocksDbDatabaseInternal {
574 type Config = RocksDbStoreInternalConfig;
575 type Store = RocksDbStoreInternal;
576
577 fn get_name() -> String {
578 "rocksdb internal".to_string()
579 }
580
581 async fn connect(
582 config: &Self::Config,
583 namespace: &str,
584 ) -> Result<Self, RocksDbStoreInternalError> {
585 Self::build(config, namespace)
586 }
587
588 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, RocksDbStoreInternalError> {
589 let mut start_key = ROOT_KEY_DOMAIN.to_vec();
590 start_key.extend(bcs::to_bytes(root_key)?);
591 let mut executor = self.executor.clone();
592 executor.start_key = start_key;
593 Ok(RocksDbStoreInternal {
594 executor,
595 path_with_guard: self.path_with_guard.clone(),
596 max_stream_queries: self.max_stream_queries,
597 spawn_mode: self.spawn_mode,
598 root_key_written: Arc::new(AtomicBool::new(false)),
599 })
600 }
601
602 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, RocksDbStoreInternalError> {
603 self.open_shared(root_key)
604 }
605
606 async fn list_all(config: &Self::Config) -> Result<Vec<String>, RocksDbStoreInternalError> {
607 let entries = std::fs::read_dir(config.path_with_guard.path_buf.clone())?;
608 let mut namespaces = Vec::new();
609 for entry in entries {
610 let entry = entry?;
611 if !entry.file_type()?.is_dir() {
612 return Err(RocksDbStoreInternalError::NonDirectoryNamespace);
613 }
614 let namespace = match entry.file_name().into_string() {
615 Err(error) => {
616 return Err(RocksDbStoreInternalError::IntoStringError(error));
617 }
618 Ok(namespace) => namespace,
619 };
620 namespaces.push(namespace);
621 }
622 Ok(namespaces)
623 }
624
625 async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
626 let mut store = self.open_shared(&[])?;
627 store.executor.start_key = vec![STORED_ROOT_KEYS_PREFIX];
628 let bcs_root_keys = store.find_keys_by_prefix(&[]).await?;
629 let mut root_keys = Vec::new();
630 for bcs_root_key in bcs_root_keys {
631 let root_key = bcs::from_bytes::<Vec<u8>>(&bcs_root_key)?;
632 root_keys.push(root_key);
633 }
634 Ok(root_keys)
635 }
636
637 async fn delete_all(config: &Self::Config) -> Result<(), RocksDbStoreInternalError> {
638 let namespaces = Self::list_all(config).await?;
639 for namespace in namespaces {
640 let mut path_buf = config.path_with_guard.path_buf.clone();
641 path_buf.push(&namespace);
642 std::fs::remove_dir_all(path_buf.as_path())?;
643 }
644 Ok(())
645 }
646
647 async fn exists(
648 config: &Self::Config,
649 namespace: &str,
650 ) -> Result<bool, RocksDbStoreInternalError> {
651 Self::check_namespace(namespace)?;
652 let mut path_buf = config.path_with_guard.path_buf.clone();
653 path_buf.push(namespace);
654 let test = std::path::Path::exists(&path_buf);
655 Ok(test)
656 }
657
658 async fn create(
659 config: &Self::Config,
660 namespace: &str,
661 ) -> Result<(), RocksDbStoreInternalError> {
662 Self::check_namespace(namespace)?;
663 let mut path_buf = config.path_with_guard.path_buf.clone();
664 path_buf.push(namespace);
665 if std::path::Path::exists(&path_buf) {
666 return Err(RocksDbStoreInternalError::StoreAlreadyExists);
667 }
668 std::fs::create_dir_all(path_buf)?;
669 Ok(())
670 }
671
672 async fn delete(
673 config: &Self::Config,
674 namespace: &str,
675 ) -> Result<(), RocksDbStoreInternalError> {
676 Self::check_namespace(namespace)?;
677 let mut path_buf = config.path_with_guard.path_buf.clone();
678 path_buf.push(namespace);
679 let path = path_buf.as_path();
680 std::fs::remove_dir_all(path)?;
681 Ok(())
682 }
683}
684
685#[cfg(with_testing)]
686impl TestKeyValueDatabase for RocksDbDatabaseInternal {
687 async fn new_test_config() -> Result<RocksDbStoreInternalConfig, RocksDbStoreInternalError> {
688 let path_with_guard = PathWithGuard::new_testing();
689 let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
690 let max_stream_queries = TEST_ROCKS_DB_MAX_STREAM_QUERIES;
691 Ok(RocksDbStoreInternalConfig {
692 path_with_guard,
693 spawn_mode,
694 max_stream_queries,
695 })
696 }
697}
698
699#[derive(Error, Debug)]
701pub enum RocksDbStoreInternalError {
702 #[error("Store already exists")]
704 StoreAlreadyExists,
705
706 #[error("tokio join error: {0}")]
708 TokioJoinError(#[from] tokio::task::JoinError),
709
710 #[error("RocksDB error: {0}")]
712 RocksDb(#[from] rocksdb::Error),
713
714 #[error("Namespaces should be directories")]
716 NonDirectoryNamespace,
717
718 #[error("error in the conversion from OsString: {0:?}")]
720 IntoStringError(OsString),
721
722 #[error("The key must have at most 8 MiB")]
724 KeyTooLong,
725
726 #[error("Namespace contains forbidden characters")]
728 InvalidNamespace,
729
730 #[error("Filesystem error: {0}")]
732 FsError(#[from] std::io::Error),
733
734 #[error(transparent)]
736 BcsError(#[from] bcs::Error),
737}
738
739#[derive(Clone, Debug, Deserialize, Serialize)]
741pub struct PathWithGuard {
742 pub path_buf: PathBuf,
744 #[serde(skip)]
746 _dir_guard: Option<Arc<TempDir>>,
747}
748
749impl PathWithGuard {
750 pub fn new(path_buf: PathBuf) -> Self {
752 Self {
753 path_buf,
754 _dir_guard: None,
755 }
756 }
757
758 #[cfg(with_testing)]
760 fn new_testing() -> PathWithGuard {
761 let dir = TempDir::new().unwrap();
762 let path_buf = dir.path().to_path_buf();
763 let dir_guard = Some(Arc::new(dir));
764 PathWithGuard {
765 path_buf,
766 _dir_guard: dir_guard,
767 }
768 }
769}
770
771impl PartialEq for PathWithGuard {
772 fn eq(&self, other: &Self) -> bool {
773 self.path_buf == other.path_buf
774 }
775}
776impl Eq for PathWithGuard {}
777
778impl KeyValueStoreError for RocksDbStoreInternalError {
779 const BACKEND: &'static str = "rocks_db";
780}
781
782pub type RocksDbStoreError = ValueSplittingError<RocksDbStoreInternalError>;
784
785pub type RocksDbStoreConfig = LruCachingConfig<RocksDbStoreInternalConfig>;
787
788#[cfg(with_metrics)]
790pub type RocksDbDatabase = MeteredDatabase<
791 LruCachingDatabase<
792 MeteredDatabase<ValueSplittingDatabase<MeteredDatabase<RocksDbDatabaseInternal>>>,
793 >,
794>;
795#[cfg(not(with_metrics))]
797pub type RocksDbDatabase = LruCachingDatabase<ValueSplittingDatabase<RocksDbDatabaseInternal>>;
798
799#[cfg(with_testing)]
800impl crate::backends::DatabaseBackup for RocksDbDatabaseInternal {
801 fn backup_to(&self, dir: &std::path::Path) -> anyhow::Result<()> {
802 use rocksdb::{
803 backup::{BackupEngine, BackupEngineOptions},
804 Env,
805 };
806 let opts = BackupEngineOptions::new(dir)?;
807 let env = Env::new()?;
808 let mut engine = BackupEngine::open(&opts, &env)?;
809 engine.create_new_backup_flush(&*self.executor.db, true)?;
810 Ok(())
811 }
812}