1use std::{
7 ffi::OsString,
8 fmt::Display,
9 path::PathBuf,
10 sync::{
11 atomic::{AtomicBool, Ordering},
12 Arc,
13 },
14};
15
16use linera_base::ensure;
17use rocksdb::{BlockBasedOptions, Cache, DBCompactionStyle};
18use serde::{Deserialize, Serialize};
19use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};
20use tempfile::TempDir;
21use thiserror::Error;
22
23#[cfg(with_metrics)]
24use crate::metering::MeteredDatabase;
25#[cfg(with_testing)]
26use crate::store::TestKeyValueDatabase;
27use crate::{
28 batch::{Batch, WriteOperation},
29 common::get_upper_bound_option,
30 lru_caching::{LruCachingConfig, LruCachingDatabase},
31 store::{
32 KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
33 WritableKeyValueStore,
34 },
35 value_splitting::{ValueSplittingDatabase, ValueSplittingError},
36};
37
38static ROOT_KEY_DOMAIN: [u8; 1] = [0];
40static STORED_ROOT_KEYS_PREFIX: u8 = 1;
41
42#[cfg(with_testing)]
44const TEST_ROCKS_DB_MAX_STREAM_QUERIES: usize = 10;
45
46const MAX_VALUE_SIZE: usize = 3 * 1024 * 1024 * 1024 - 400;
49
50const MAX_KEY_SIZE: usize = 8 * 1024 * 1024 - 400;
53
54const WRITE_BUFFER_SIZE: usize = 256 * 1024 * 1024; const MAX_WRITE_BUFFER_NUMBER: i32 = 6;
56const HYPER_CLOCK_CACHE_BLOCK_SIZE: usize = 8 * 1024; type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;
60
61#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize, Serialize)]
68pub enum RocksDbSpawnMode {
69 SpawnBlocking,
71 BlockInPlace,
73}
74
75impl RocksDbSpawnMode {
76 pub fn get_spawn_mode_from_runtime() -> Self {
78 if tokio::runtime::Handle::current().metrics().num_workers() > 1 {
79 RocksDbSpawnMode::BlockInPlace
80 } else {
81 RocksDbSpawnMode::SpawnBlocking
82 }
83 }
84
85 #[inline]
87 async fn spawn<F, I, O>(&self, f: F, input: I) -> Result<O, RocksDbStoreInternalError>
88 where
89 F: FnOnce(I) -> Result<O, RocksDbStoreInternalError> + Send + 'static,
90 I: Send + 'static,
91 O: Send + 'static,
92 {
93 Ok(match self {
94 RocksDbSpawnMode::BlockInPlace => tokio::task::block_in_place(move || f(input))?,
95 RocksDbSpawnMode::SpawnBlocking => {
96 tokio::task::spawn_blocking(move || f(input)).await??
97 }
98 })
99 }
100}
101
102impl Display for RocksDbSpawnMode {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 match &self {
105 RocksDbSpawnMode::SpawnBlocking => write!(f, "spawn_blocking"),
106 RocksDbSpawnMode::BlockInPlace => write!(f, "block_in_place"),
107 }
108 }
109}
110
111fn check_key_size(key: &[u8]) -> Result<(), RocksDbStoreInternalError> {
112 ensure!(
113 key.len() <= MAX_KEY_SIZE,
114 RocksDbStoreInternalError::KeyTooLong
115 );
116 Ok(())
117}
118
119#[derive(Clone)]
120struct RocksDbStoreExecutor {
121 db: Arc<DB>,
122 start_key: Vec<u8>,
123}
124
125impl RocksDbStoreExecutor {
126 fn contains_keys_internal(
127 &self,
128 keys: Vec<Vec<u8>>,
129 ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
130 let size = keys.len();
131 let mut results = vec![false; size];
132 let mut indices = Vec::new();
133 let mut keys_red = Vec::new();
134 for (i, key) in keys.into_iter().enumerate() {
135 check_key_size(&key)?;
136 let mut full_key = self.start_key.to_vec();
137 full_key.extend(key);
138 if self.db.key_may_exist(&full_key) {
139 indices.push(i);
140 keys_red.push(full_key);
141 }
142 }
143 let values_red = self.db.multi_get(keys_red);
144 for (index, value) in indices.into_iter().zip(values_red) {
145 results[index] = value?.is_some();
146 }
147 Ok(results)
148 }
149
150 fn read_multi_values_bytes_internal(
151 &self,
152 keys: Vec<Vec<u8>>,
153 ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
154 for key in &keys {
155 check_key_size(key)?;
156 }
157 let full_keys = keys
158 .into_iter()
159 .map(|key| {
160 let mut full_key = self.start_key.to_vec();
161 full_key.extend(key);
162 full_key
163 })
164 .collect::<Vec<_>>();
165 let entries = self.db.multi_get(&full_keys);
166 Ok(entries.into_iter().collect::<Result<_, _>>()?)
167 }
168
169 fn find_keys_by_prefix_internal(
170 &self,
171 key_prefix: Vec<u8>,
172 ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
173 check_key_size(&key_prefix)?;
174 let mut prefix = self.start_key.clone();
175 prefix.extend(key_prefix);
176 let len = prefix.len();
177 let mut iter = self.db.raw_iterator();
178 let mut keys = Vec::new();
179 iter.seek(&prefix);
180 let mut next_key = iter.key();
181 while let Some(key) = next_key {
182 if !key.starts_with(&prefix) {
183 break;
184 }
185 keys.push(key[len..].to_vec());
186 iter.next();
187 next_key = iter.key();
188 }
189 Ok(keys)
190 }
191
192 #[expect(clippy::type_complexity)]
193 fn find_key_values_by_prefix_internal(
194 &self,
195 key_prefix: Vec<u8>,
196 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, RocksDbStoreInternalError> {
197 check_key_size(&key_prefix)?;
198 let mut prefix = self.start_key.clone();
199 prefix.extend(key_prefix);
200 let len = prefix.len();
201 let mut iter = self.db.raw_iterator();
202 let mut key_values = Vec::new();
203 iter.seek(&prefix);
204 let mut next_key = iter.key();
205 while let Some(key) = next_key {
206 if !key.starts_with(&prefix) {
207 break;
208 }
209 if let Some(value) = iter.value() {
210 let key_value = (key[len..].to_vec(), value.to_vec());
211 key_values.push(key_value);
212 }
213 iter.next();
214 next_key = iter.key();
215 }
216 Ok(key_values)
217 }
218
219 fn write_batch_internal(
220 &self,
221 batch: Batch,
222 write_root_key: bool,
223 ) -> Result<(), RocksDbStoreInternalError> {
224 let mut inner_batch = rocksdb::WriteBatchWithTransaction::default();
225 for operation in batch.operations {
226 match operation {
227 WriteOperation::Delete { key } => {
228 check_key_size(&key)?;
229 let mut full_key = self.start_key.to_vec();
230 full_key.extend(key);
231 inner_batch.delete(&full_key)
232 }
233 WriteOperation::Put { key, value } => {
234 check_key_size(&key)?;
235 let mut full_key = self.start_key.to_vec();
236 full_key.extend(key);
237 inner_batch.put(&full_key, value)
238 }
239 WriteOperation::DeletePrefix { key_prefix } => {
240 check_key_size(&key_prefix)?;
241 let mut full_key1 = self.start_key.to_vec();
242 full_key1.extend(&key_prefix);
243 let full_key2 =
244 get_upper_bound_option(&full_key1).expect("the first entry cannot be 255");
245 inner_batch.delete_range(&full_key1, &full_key2);
246 }
247 }
248 }
249 if write_root_key {
250 let mut full_key = self.start_key.to_vec();
251 full_key[0] = STORED_ROOT_KEYS_PREFIX;
252 inner_batch.put(&full_key, vec![]);
253 }
254 self.db.write(inner_batch)?;
255 Ok(())
256 }
257}
258
259#[derive(Clone)]
261pub struct RocksDbStoreInternal {
262 executor: RocksDbStoreExecutor,
263 _path_with_guard: PathWithGuard,
264 max_stream_queries: usize,
265 spawn_mode: RocksDbSpawnMode,
266 root_key_written: Arc<AtomicBool>,
267}
268
269#[derive(Clone)]
271pub struct RocksDbDatabaseInternal {
272 executor: RocksDbStoreExecutor,
273 _path_with_guard: PathWithGuard,
274 max_stream_queries: usize,
275 spawn_mode: RocksDbSpawnMode,
276}
277
278impl WithError for RocksDbDatabaseInternal {
279 type Error = RocksDbStoreInternalError;
280}
281
282#[derive(Clone, Debug, Deserialize, Serialize)]
284pub struct RocksDbStoreInternalConfig {
285 pub path_with_guard: PathWithGuard,
287 pub spawn_mode: RocksDbSpawnMode,
289 pub max_stream_queries: usize,
291}
292
293impl RocksDbDatabaseInternal {
294 fn check_namespace(namespace: &str) -> Result<(), RocksDbStoreInternalError> {
295 if !namespace
296 .chars()
297 .all(|character| character.is_ascii_alphanumeric() || character == '_')
298 {
299 return Err(RocksDbStoreInternalError::InvalidNamespace);
300 }
301 Ok(())
302 }
303
304 fn build(
305 config: &RocksDbStoreInternalConfig,
306 namespace: &str,
307 ) -> Result<RocksDbDatabaseInternal, RocksDbStoreInternalError> {
308 let start_key = ROOT_KEY_DOMAIN.to_vec();
309 let temp_store = RocksDbStoreInternal::build(config, namespace, start_key)?;
311 Ok(RocksDbDatabaseInternal {
312 executor: temp_store.executor,
313 _path_with_guard: temp_store._path_with_guard,
314 max_stream_queries: temp_store.max_stream_queries,
315 spawn_mode: temp_store.spawn_mode,
316 })
317 }
318}
319
320impl RocksDbStoreInternal {
321 fn build(
322 config: &RocksDbStoreInternalConfig,
323 namespace: &str,
324 start_key: Vec<u8>,
325 ) -> Result<RocksDbStoreInternal, RocksDbStoreInternalError> {
326 RocksDbDatabaseInternal::check_namespace(namespace)?;
327 let mut path_buf = config.path_with_guard.path_buf.clone();
328 let mut path_with_guard = config.path_with_guard.clone();
329 path_buf.push(namespace);
330 path_with_guard.path_buf = path_buf.clone();
331 let max_stream_queries = config.max_stream_queries;
332 let spawn_mode = config.spawn_mode;
333 if !std::path::Path::exists(&path_buf) {
334 std::fs::create_dir(path_buf.clone())?;
335 }
336 let sys = System::new_with_specifics(
337 RefreshKind::nothing()
338 .with_cpu(CpuRefreshKind::everything())
339 .with_memory(MemoryRefreshKind::nothing().with_ram()),
340 );
341 let num_cpus = sys.cpus().len() as i32;
342 let total_ram = sys.total_memory() as usize;
343 let mut options = rocksdb::Options::default();
344 options.create_if_missing(true);
345 options.create_missing_column_families(true);
346 options.set_write_buffer_size(WRITE_BUFFER_SIZE);
348 options.set_max_write_buffer_number(MAX_WRITE_BUFFER_NUMBER);
349 options.set_compression_type(rocksdb::DBCompressionType::Lz4);
350 options.set_level_zero_slowdown_writes_trigger(8);
351 options.set_level_zero_stop_writes_trigger(12);
352 options.set_level_zero_file_num_compaction_trigger(2);
353 options.increase_parallelism(num_cpus);
357 options.set_max_background_jobs(num_cpus);
358 options.set_max_subcompactions(num_cpus as u32);
359 options.set_level_compaction_dynamic_level_bytes(true);
360
361 options.set_compaction_style(DBCompactionStyle::Level);
362 options.set_target_file_size_base(2 * WRITE_BUFFER_SIZE as u64);
363
364 let mut block_options = BlockBasedOptions::default();
365 block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
366 block_options.set_cache_index_and_filter_blocks(true);
367 block_options.set_block_cache(&Cache::new_hyper_clock_cache(
373 total_ram / 4,
374 HYPER_CLOCK_CACHE_BLOCK_SIZE,
375 ));
376 options.set_block_based_table_factory(&block_options);
377
378 let db = DB::open(&options, path_buf)?;
379 let executor = RocksDbStoreExecutor {
380 db: Arc::new(db),
381 start_key,
382 };
383 Ok(RocksDbStoreInternal {
384 executor,
385 _path_with_guard: path_with_guard,
386 max_stream_queries,
387 spawn_mode,
388 root_key_written: Arc::new(AtomicBool::new(false)),
389 })
390 }
391}
392
393impl WithError for RocksDbStoreInternal {
394 type Error = RocksDbStoreInternalError;
395}
396
397impl ReadableKeyValueStore for RocksDbStoreInternal {
398 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
399
400 fn max_stream_queries(&self) -> usize {
401 self.max_stream_queries
402 }
403
404 async fn read_value_bytes(
405 &self,
406 key: &[u8],
407 ) -> Result<Option<Vec<u8>>, RocksDbStoreInternalError> {
408 check_key_size(key)?;
409 let db = self.executor.db.clone();
410 let mut full_key = self.executor.start_key.to_vec();
411 full_key.extend(key);
412 self.spawn_mode
413 .spawn(move |x| Ok(db.get(&x)?), full_key)
414 .await
415 }
416
417 async fn contains_key(&self, key: &[u8]) -> Result<bool, RocksDbStoreInternalError> {
418 check_key_size(key)?;
419 let db = self.executor.db.clone();
420 let mut full_key = self.executor.start_key.to_vec();
421 full_key.extend(key);
422 self.spawn_mode
423 .spawn(
424 move |x| {
425 if !db.key_may_exist(&x) {
426 return Ok(false);
427 }
428 Ok(db.get(&x)?.is_some())
429 },
430 full_key,
431 )
432 .await
433 }
434
435 async fn contains_keys(
436 &self,
437 keys: Vec<Vec<u8>>,
438 ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
439 let executor = self.executor.clone();
440 self.spawn_mode
441 .spawn(move |x| executor.contains_keys_internal(x), keys)
442 .await
443 }
444
445 async fn read_multi_values_bytes(
446 &self,
447 keys: Vec<Vec<u8>>,
448 ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
449 let executor = self.executor.clone();
450 self.spawn_mode
451 .spawn(move |x| executor.read_multi_values_bytes_internal(x), keys)
452 .await
453 }
454
455 async fn find_keys_by_prefix(
456 &self,
457 key_prefix: &[u8],
458 ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
459 let executor = self.executor.clone();
460 let key_prefix = key_prefix.to_vec();
461 self.spawn_mode
462 .spawn(
463 move |x| executor.find_keys_by_prefix_internal(x),
464 key_prefix,
465 )
466 .await
467 }
468
469 async fn find_key_values_by_prefix(
470 &self,
471 key_prefix: &[u8],
472 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, RocksDbStoreInternalError> {
473 let executor = self.executor.clone();
474 let key_prefix = key_prefix.to_vec();
475 self.spawn_mode
476 .spawn(
477 move |x| executor.find_key_values_by_prefix_internal(x),
478 key_prefix,
479 )
480 .await
481 }
482}
483
484impl WritableKeyValueStore for RocksDbStoreInternal {
485 const MAX_VALUE_SIZE: usize = MAX_VALUE_SIZE;
486
487 async fn write_batch(&self, batch: Batch) -> Result<(), RocksDbStoreInternalError> {
488 let write_root_key = !self.root_key_written.fetch_or(true, Ordering::SeqCst);
489 let executor = self.executor.clone();
490 self.spawn_mode
491 .spawn(
492 move |x| executor.write_batch_internal(x, write_root_key),
493 batch,
494 )
495 .await
496 }
497
498 async fn clear_journal(&self) -> Result<(), RocksDbStoreInternalError> {
499 Ok(())
500 }
501}
502
503impl KeyValueDatabase for RocksDbDatabaseInternal {
504 type Config = RocksDbStoreInternalConfig;
505 type Store = RocksDbStoreInternal;
506
507 fn get_name() -> String {
508 "rocksdb internal".to_string()
509 }
510
511 async fn connect(
512 config: &Self::Config,
513 namespace: &str,
514 ) -> Result<Self, RocksDbStoreInternalError> {
515 Self::build(config, namespace)
516 }
517
518 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, RocksDbStoreInternalError> {
519 let mut start_key = ROOT_KEY_DOMAIN.to_vec();
520 start_key.extend(root_key);
521 let mut executor = self.executor.clone();
522 executor.start_key = start_key;
523 Ok(RocksDbStoreInternal {
524 executor,
525 _path_with_guard: self._path_with_guard.clone(),
526 max_stream_queries: self.max_stream_queries,
527 spawn_mode: self.spawn_mode,
528 root_key_written: Arc::new(AtomicBool::new(false)),
529 })
530 }
531
532 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, RocksDbStoreInternalError> {
533 self.open_shared(root_key)
534 }
535
536 async fn list_all(config: &Self::Config) -> Result<Vec<String>, RocksDbStoreInternalError> {
537 let entries = std::fs::read_dir(config.path_with_guard.path_buf.clone())?;
538 let mut namespaces = Vec::new();
539 for entry in entries {
540 let entry = entry?;
541 if !entry.file_type()?.is_dir() {
542 return Err(RocksDbStoreInternalError::NonDirectoryNamespace);
543 }
544 let namespace = match entry.file_name().into_string() {
545 Err(error) => {
546 return Err(RocksDbStoreInternalError::IntoStringError(error));
547 }
548 Ok(namespace) => namespace,
549 };
550 namespaces.push(namespace);
551 }
552 Ok(namespaces)
553 }
554
555 async fn list_root_keys(
556 config: &Self::Config,
557 namespace: &str,
558 ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
559 let start_key = vec![STORED_ROOT_KEYS_PREFIX];
560 let store = RocksDbStoreInternal::build(config, namespace, start_key)?;
561 store.find_keys_by_prefix(&[]).await
562 }
563
564 async fn delete_all(config: &Self::Config) -> Result<(), RocksDbStoreInternalError> {
565 let namespaces = Self::list_all(config).await?;
566 for namespace in namespaces {
567 let mut path_buf = config.path_with_guard.path_buf.clone();
568 path_buf.push(&namespace);
569 std::fs::remove_dir_all(path_buf.as_path())?;
570 }
571 Ok(())
572 }
573
574 async fn exists(
575 config: &Self::Config,
576 namespace: &str,
577 ) -> Result<bool, RocksDbStoreInternalError> {
578 Self::check_namespace(namespace)?;
579 let mut path_buf = config.path_with_guard.path_buf.clone();
580 path_buf.push(namespace);
581 let test = std::path::Path::exists(&path_buf);
582 Ok(test)
583 }
584
585 async fn create(
586 config: &Self::Config,
587 namespace: &str,
588 ) -> Result<(), RocksDbStoreInternalError> {
589 Self::check_namespace(namespace)?;
590 let mut path_buf = config.path_with_guard.path_buf.clone();
591 path_buf.push(namespace);
592 if std::path::Path::exists(&path_buf) {
593 return Err(RocksDbStoreInternalError::StoreAlreadyExists);
594 }
595 std::fs::create_dir_all(path_buf)?;
596 Ok(())
597 }
598
599 async fn delete(
600 config: &Self::Config,
601 namespace: &str,
602 ) -> Result<(), RocksDbStoreInternalError> {
603 Self::check_namespace(namespace)?;
604 let mut path_buf = config.path_with_guard.path_buf.clone();
605 path_buf.push(namespace);
606 let path = path_buf.as_path();
607 std::fs::remove_dir_all(path)?;
608 Ok(())
609 }
610}
611
612#[cfg(with_testing)]
613impl TestKeyValueDatabase for RocksDbDatabaseInternal {
614 async fn new_test_config() -> Result<RocksDbStoreInternalConfig, RocksDbStoreInternalError> {
615 let path_with_guard = PathWithGuard::new_testing();
616 let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
617 let max_stream_queries = TEST_ROCKS_DB_MAX_STREAM_QUERIES;
618 Ok(RocksDbStoreInternalConfig {
619 path_with_guard,
620 spawn_mode,
621 max_stream_queries,
622 })
623 }
624}
625
626#[derive(Error, Debug)]
628pub enum RocksDbStoreInternalError {
629 #[error("Store already exists")]
631 StoreAlreadyExists,
632
633 #[error("tokio join error: {0}")]
635 TokioJoinError(#[from] tokio::task::JoinError),
636
637 #[error("RocksDB error: {0}")]
639 RocksDb(#[from] rocksdb::Error),
640
641 #[error("Namespaces should be directories")]
643 NonDirectoryNamespace,
644
645 #[error("error in the conversion from OsString: {0:?}")]
647 IntoStringError(OsString),
648
649 #[error("The key must have at most 8 MiB")]
651 KeyTooLong,
652
653 #[error("Namespace contains forbidden characters")]
655 InvalidNamespace,
656
657 #[error("Filesystem error: {0}")]
659 FsError(#[from] std::io::Error),
660
661 #[error(transparent)]
663 BcsError(#[from] bcs::Error),
664}
665
666#[derive(Clone, Debug, Deserialize, Serialize)]
668pub struct PathWithGuard {
669 pub path_buf: PathBuf,
671 #[serde(skip)]
673 _dir: Option<Arc<TempDir>>,
674}
675
676impl PathWithGuard {
677 pub fn new(path_buf: PathBuf) -> Self {
679 Self {
680 path_buf,
681 _dir: None,
682 }
683 }
684
685 #[cfg(with_testing)]
687 fn new_testing() -> PathWithGuard {
688 let dir = TempDir::new().unwrap();
689 let path_buf = dir.path().to_path_buf();
690 let _dir = Some(Arc::new(dir));
691 PathWithGuard { path_buf, _dir }
692 }
693}
694
695impl PartialEq for PathWithGuard {
696 fn eq(&self, other: &Self) -> bool {
697 self.path_buf == other.path_buf
698 }
699}
700impl Eq for PathWithGuard {}
701
702impl KeyValueStoreError for RocksDbStoreInternalError {
703 const BACKEND: &'static str = "rocks_db";
704}
705
706pub type RocksDbStoreError = ValueSplittingError<RocksDbStoreInternalError>;
708
709pub type RocksDbStoreConfig = LruCachingConfig<RocksDbStoreInternalConfig>;
711
712#[cfg(with_metrics)]
714pub type RocksDbDatabase = MeteredDatabase<
715 LruCachingDatabase<
716 MeteredDatabase<ValueSplittingDatabase<MeteredDatabase<RocksDbDatabaseInternal>>>,
717 >,
718>;
719#[cfg(not(with_metrics))]
721pub type RocksDbDatabase = LruCachingDatabase<ValueSplittingDatabase<RocksDbDatabaseInternal>>;