1use std::{
7 ffi::OsString,
8 fmt::Display,
9 path::PathBuf,
10 sync::{
11 atomic::{AtomicBool, Ordering},
12 Arc,
13 },
14};
15
16use linera_base::ensure;
17use rocksdb::{BlockBasedOptions, Cache, DBCompactionStyle};
18use serde::{Deserialize, Serialize};
19use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};
20use tempfile::TempDir;
21use thiserror::Error;
22
23#[cfg(with_metrics)]
24use crate::metering::MeteredStore;
25#[cfg(with_testing)]
26use crate::store::TestKeyValueStore;
27use crate::{
28 batch::{Batch, WriteOperation},
29 common::get_upper_bound_option,
30 lru_caching::{LruCachingConfig, LruCachingStore},
31 store::{
32 AdminKeyValueStore, CommonStoreInternalConfig, KeyValueStoreError, ReadableKeyValueStore,
33 WithError, WritableKeyValueStore,
34 },
35 value_splitting::{ValueSplittingError, ValueSplittingStore},
36};
37
38static ROOT_KEY_DOMAIN: [u8; 1] = [0];
40static STORED_ROOT_KEYS_PREFIX: u8 = 1;
41
42#[cfg(with_testing)]
44const TEST_ROCKS_DB_MAX_STREAM_QUERIES: usize = 10;
45
46const MAX_VALUE_SIZE: usize = 3 * 1024 * 1024 * 1024 - 400;
49
50const MAX_KEY_SIZE: usize = 8 * 1024 * 1024 - 400;
53
54const WRITE_BUFFER_SIZE: usize = 256 * 1024 * 1024; const MAX_WRITE_BUFFER_NUMBER: i32 = 6;
56const HYPER_CLOCK_CACHE_BLOCK_SIZE: usize = 8 * 1024; type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;
60
61#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize, Serialize)]
68pub enum RocksDbSpawnMode {
69 SpawnBlocking,
71 BlockInPlace,
73}
74
75impl RocksDbSpawnMode {
76 pub fn get_spawn_mode_from_runtime() -> Self {
78 if tokio::runtime::Handle::current().metrics().num_workers() > 1 {
79 RocksDbSpawnMode::BlockInPlace
80 } else {
81 RocksDbSpawnMode::SpawnBlocking
82 }
83 }
84
85 #[inline]
87 async fn spawn<F, I, O>(&self, f: F, input: I) -> Result<O, RocksDbStoreInternalError>
88 where
89 F: FnOnce(I) -> Result<O, RocksDbStoreInternalError> + Send + 'static,
90 I: Send + 'static,
91 O: Send + 'static,
92 {
93 Ok(match self {
94 RocksDbSpawnMode::BlockInPlace => tokio::task::block_in_place(move || f(input))?,
95 RocksDbSpawnMode::SpawnBlocking => {
96 tokio::task::spawn_blocking(move || f(input)).await??
97 }
98 })
99 }
100}
101
102impl Display for RocksDbSpawnMode {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 match &self {
105 RocksDbSpawnMode::SpawnBlocking => write!(f, "spawn_blocking"),
106 RocksDbSpawnMode::BlockInPlace => write!(f, "block_in_place"),
107 }
108 }
109}
110
111fn check_key_size(key: &[u8]) -> Result<(), RocksDbStoreInternalError> {
112 ensure!(
113 key.len() <= MAX_KEY_SIZE,
114 RocksDbStoreInternalError::KeyTooLong
115 );
116 Ok(())
117}
118
119#[derive(Clone)]
120struct RocksDbStoreExecutor {
121 db: Arc<DB>,
122 start_key: Vec<u8>,
123}
124
125impl RocksDbStoreExecutor {
126 pub fn contains_keys_internal(
127 &self,
128 keys: Vec<Vec<u8>>,
129 ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
130 let size = keys.len();
131 let mut results = vec![false; size];
132 let mut indices = Vec::new();
133 let mut keys_red = Vec::new();
134 for (i, key) in keys.into_iter().enumerate() {
135 check_key_size(&key)?;
136 let mut full_key = self.start_key.to_vec();
137 full_key.extend(key);
138 if self.db.key_may_exist(&full_key) {
139 indices.push(i);
140 keys_red.push(full_key);
141 }
142 }
143 let values_red = self.db.multi_get(keys_red);
144 for (index, value) in indices.into_iter().zip(values_red) {
145 results[index] = value?.is_some();
146 }
147 Ok(results)
148 }
149
150 fn read_multi_values_bytes_internal(
151 &self,
152 keys: Vec<Vec<u8>>,
153 ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
154 for key in &keys {
155 check_key_size(key)?;
156 }
157 let full_keys = keys
158 .into_iter()
159 .map(|key| {
160 let mut full_key = self.start_key.to_vec();
161 full_key.extend(key);
162 full_key
163 })
164 .collect::<Vec<_>>();
165 let entries = self.db.multi_get(&full_keys);
166 Ok(entries.into_iter().collect::<Result<_, _>>()?)
167 }
168
169 fn find_keys_by_prefix_internal(
170 &self,
171 key_prefix: Vec<u8>,
172 ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
173 check_key_size(&key_prefix)?;
174 let mut prefix = self.start_key.clone();
175 prefix.extend(key_prefix);
176 let len = prefix.len();
177 let mut iter = self.db.raw_iterator();
178 let mut keys = Vec::new();
179 iter.seek(&prefix);
180 let mut next_key = iter.key();
181 while let Some(key) = next_key {
182 if !key.starts_with(&prefix) {
183 break;
184 }
185 keys.push(key[len..].to_vec());
186 iter.next();
187 next_key = iter.key();
188 }
189 Ok(keys)
190 }
191
192 #[expect(clippy::type_complexity)]
193 fn find_key_values_by_prefix_internal(
194 &self,
195 key_prefix: Vec<u8>,
196 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, RocksDbStoreInternalError> {
197 check_key_size(&key_prefix)?;
198 let mut prefix = self.start_key.clone();
199 prefix.extend(key_prefix);
200 let len = prefix.len();
201 let mut iter = self.db.raw_iterator();
202 let mut key_values = Vec::new();
203 iter.seek(&prefix);
204 let mut next_key = iter.key();
205 while let Some(key) = next_key {
206 if !key.starts_with(&prefix) {
207 break;
208 }
209 if let Some(value) = iter.value() {
210 let key_value = (key[len..].to_vec(), value.to_vec());
211 key_values.push(key_value);
212 }
213 iter.next();
214 next_key = iter.key();
215 }
216 Ok(key_values)
217 }
218
219 fn write_batch_internal(
220 &self,
221 batch: Batch,
222 write_root_key: bool,
223 ) -> Result<(), RocksDbStoreInternalError> {
224 let mut inner_batch = rocksdb::WriteBatchWithTransaction::default();
225 for operation in batch.operations {
226 match operation {
227 WriteOperation::Delete { key } => {
228 check_key_size(&key)?;
229 let mut full_key = self.start_key.to_vec();
230 full_key.extend(key);
231 inner_batch.delete(&full_key)
232 }
233 WriteOperation::Put { key, value } => {
234 check_key_size(&key)?;
235 let mut full_key = self.start_key.to_vec();
236 full_key.extend(key);
237 inner_batch.put(&full_key, value)
238 }
239 WriteOperation::DeletePrefix { key_prefix } => {
240 check_key_size(&key_prefix)?;
241 let mut full_key1 = self.start_key.to_vec();
242 full_key1.extend(&key_prefix);
243 let full_key2 =
244 get_upper_bound_option(&full_key1).expect("the first entry cannot be 255");
245 inner_batch.delete_range(&full_key1, &full_key2);
246 }
247 }
248 }
249 if write_root_key {
250 let mut full_key = self.start_key.to_vec();
251 full_key[0] = STORED_ROOT_KEYS_PREFIX;
252 inner_batch.put(&full_key, vec![]);
253 }
254 self.db.write(inner_batch)?;
255 Ok(())
256 }
257}
258
259#[derive(Clone)]
261pub struct RocksDbStoreInternal {
262 executor: RocksDbStoreExecutor,
263 _path_with_guard: PathWithGuard,
264 max_stream_queries: usize,
265 spawn_mode: RocksDbSpawnMode,
266 root_key_written: Arc<AtomicBool>,
267}
268
269#[derive(Clone, Debug, Deserialize, Serialize)]
271pub struct RocksDbStoreInternalConfig {
272 pub path_with_guard: PathWithGuard,
274 spawn_mode: RocksDbSpawnMode,
276 common_config: CommonStoreInternalConfig,
278}
279
280impl RocksDbStoreInternal {
281 fn check_namespace(namespace: &str) -> Result<(), RocksDbStoreInternalError> {
282 if !namespace
283 .chars()
284 .all(|character| character.is_ascii_alphanumeric() || character == '_')
285 {
286 return Err(RocksDbStoreInternalError::InvalidNamespace);
287 }
288 Ok(())
289 }
290
291 fn build(
292 config: &RocksDbStoreInternalConfig,
293 namespace: &str,
294 start_key: Vec<u8>,
295 ) -> Result<RocksDbStoreInternal, RocksDbStoreInternalError> {
296 Self::check_namespace(namespace)?;
297 let mut path_buf = config.path_with_guard.path_buf.clone();
298 let mut path_with_guard = config.path_with_guard.clone();
299 path_buf.push(namespace);
300 path_with_guard.path_buf = path_buf.clone();
301 let max_stream_queries = config.common_config.max_stream_queries;
302 let spawn_mode = config.spawn_mode;
303 if !std::path::Path::exists(&path_buf) {
304 std::fs::create_dir(path_buf.clone())?;
305 }
306 let sys = System::new_with_specifics(
307 RefreshKind::nothing()
308 .with_cpu(CpuRefreshKind::everything())
309 .with_memory(MemoryRefreshKind::nothing().with_ram()),
310 );
311 let num_cpus = sys.cpus().len() as i32;
312 let total_ram = sys.total_memory() as usize;
313 let mut options = rocksdb::Options::default();
314 options.create_if_missing(true);
315 options.create_missing_column_families(true);
316 options.set_write_buffer_size(WRITE_BUFFER_SIZE);
318 options.set_max_write_buffer_number(MAX_WRITE_BUFFER_NUMBER);
319 options.set_compression_type(rocksdb::DBCompressionType::Lz4);
320 options.set_level_zero_slowdown_writes_trigger(8);
321 options.set_level_zero_stop_writes_trigger(12);
322 options.set_level_zero_file_num_compaction_trigger(2);
323 options.increase_parallelism(num_cpus);
327 options.set_max_background_jobs(num_cpus);
328 options.set_max_subcompactions(num_cpus as u32);
329 options.set_level_compaction_dynamic_level_bytes(true);
330
331 options.set_compaction_style(DBCompactionStyle::Level);
332 options.set_target_file_size_base(2 * WRITE_BUFFER_SIZE as u64);
333
334 let mut block_options = BlockBasedOptions::default();
335 block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
336 block_options.set_cache_index_and_filter_blocks(true);
337 block_options.set_block_cache(&Cache::new_hyper_clock_cache(
343 total_ram / 4,
344 HYPER_CLOCK_CACHE_BLOCK_SIZE,
345 ));
346 options.set_block_based_table_factory(&block_options);
347
348 let db = DB::open(&options, path_buf)?;
349 let executor = RocksDbStoreExecutor {
350 db: Arc::new(db),
351 start_key,
352 };
353 Ok(RocksDbStoreInternal {
354 executor,
355 _path_with_guard: path_with_guard,
356 max_stream_queries,
357 spawn_mode,
358 root_key_written: Arc::new(AtomicBool::new(false)),
359 })
360 }
361}
362
363impl WithError for RocksDbStoreInternal {
364 type Error = RocksDbStoreInternalError;
365}
366
367impl ReadableKeyValueStore for RocksDbStoreInternal {
368 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
369 type Keys = Vec<Vec<u8>>;
370 type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
371
372 fn max_stream_queries(&self) -> usize {
373 self.max_stream_queries
374 }
375
376 async fn read_value_bytes(
377 &self,
378 key: &[u8],
379 ) -> Result<Option<Vec<u8>>, RocksDbStoreInternalError> {
380 check_key_size(key)?;
381 let db = self.executor.db.clone();
382 let mut full_key = self.executor.start_key.to_vec();
383 full_key.extend(key);
384 self.spawn_mode
385 .spawn(move |x| Ok(db.get(&x)?), full_key)
386 .await
387 }
388
389 async fn contains_key(&self, key: &[u8]) -> Result<bool, RocksDbStoreInternalError> {
390 check_key_size(key)?;
391 let db = self.executor.db.clone();
392 let mut full_key = self.executor.start_key.to_vec();
393 full_key.extend(key);
394 self.spawn_mode
395 .spawn(
396 move |x| {
397 if !db.key_may_exist(&x) {
398 return Ok(false);
399 }
400 Ok(db.get(&x)?.is_some())
401 },
402 full_key,
403 )
404 .await
405 }
406
407 async fn contains_keys(
408 &self,
409 keys: Vec<Vec<u8>>,
410 ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
411 let executor = self.executor.clone();
412 self.spawn_mode
413 .spawn(move |x| executor.contains_keys_internal(x), keys)
414 .await
415 }
416
417 async fn read_multi_values_bytes(
418 &self,
419 keys: Vec<Vec<u8>>,
420 ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
421 let executor = self.executor.clone();
422 self.spawn_mode
423 .spawn(move |x| executor.read_multi_values_bytes_internal(x), keys)
424 .await
425 }
426
427 async fn find_keys_by_prefix(
428 &self,
429 key_prefix: &[u8],
430 ) -> Result<Self::Keys, RocksDbStoreInternalError> {
431 let executor = self.executor.clone();
432 let key_prefix = key_prefix.to_vec();
433 self.spawn_mode
434 .spawn(
435 move |x| executor.find_keys_by_prefix_internal(x),
436 key_prefix,
437 )
438 .await
439 }
440
441 async fn find_key_values_by_prefix(
442 &self,
443 key_prefix: &[u8],
444 ) -> Result<Self::KeyValues, RocksDbStoreInternalError> {
445 let executor = self.executor.clone();
446 let key_prefix = key_prefix.to_vec();
447 self.spawn_mode
448 .spawn(
449 move |x| executor.find_key_values_by_prefix_internal(x),
450 key_prefix,
451 )
452 .await
453 }
454}
455
456impl WritableKeyValueStore for RocksDbStoreInternal {
457 const MAX_VALUE_SIZE: usize = MAX_VALUE_SIZE;
458
459 async fn write_batch(&self, batch: Batch) -> Result<(), RocksDbStoreInternalError> {
460 let write_root_key = !self.root_key_written.fetch_or(true, Ordering::SeqCst);
461 let executor = self.executor.clone();
462 self.spawn_mode
463 .spawn(
464 move |x| executor.write_batch_internal(x, write_root_key),
465 batch,
466 )
467 .await
468 }
469
470 async fn clear_journal(&self) -> Result<(), RocksDbStoreInternalError> {
471 Ok(())
472 }
473}
474
475impl AdminKeyValueStore for RocksDbStoreInternal {
476 type Config = RocksDbStoreInternalConfig;
477
478 fn get_name() -> String {
479 "rocksdb internal".to_string()
480 }
481
482 async fn connect(
483 config: &Self::Config,
484 namespace: &str,
485 ) -> Result<Self, RocksDbStoreInternalError> {
486 let start_key = ROOT_KEY_DOMAIN.to_vec();
487 RocksDbStoreInternal::build(config, namespace, start_key)
488 }
489
490 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, RocksDbStoreInternalError> {
491 let mut store = self.clone();
492 let mut start_key = ROOT_KEY_DOMAIN.to_vec();
493 start_key.extend(root_key);
494 store.executor.start_key = start_key;
495 store.root_key_written = Arc::new(AtomicBool::new(false));
496 Ok(store)
497 }
498
499 async fn list_all(config: &Self::Config) -> Result<Vec<String>, RocksDbStoreInternalError> {
500 let entries = std::fs::read_dir(config.path_with_guard.path_buf.clone())?;
501 let mut namespaces = Vec::new();
502 for entry in entries {
503 let entry = entry?;
504 if !entry.file_type()?.is_dir() {
505 return Err(RocksDbStoreInternalError::NonDirectoryNamespace);
506 }
507 let namespace = match entry.file_name().into_string() {
508 Err(error) => {
509 return Err(RocksDbStoreInternalError::IntoStringError(error));
510 }
511 Ok(namespace) => namespace,
512 };
513 namespaces.push(namespace);
514 }
515 Ok(namespaces)
516 }
517
518 async fn list_root_keys(
519 config: &Self::Config,
520 namespace: &str,
521 ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
522 let start_key = vec![STORED_ROOT_KEYS_PREFIX];
523 let store = RocksDbStoreInternal::build(config, namespace, start_key)?;
524 store.find_keys_by_prefix(&[]).await
525 }
526
527 async fn delete_all(config: &Self::Config) -> Result<(), RocksDbStoreInternalError> {
528 let namespaces = RocksDbStoreInternal::list_all(config).await?;
529 for namespace in namespaces {
530 let mut path_buf = config.path_with_guard.path_buf.clone();
531 path_buf.push(&namespace);
532 std::fs::remove_dir_all(path_buf.as_path())?;
533 }
534 Ok(())
535 }
536
537 async fn exists(
538 config: &Self::Config,
539 namespace: &str,
540 ) -> Result<bool, RocksDbStoreInternalError> {
541 Self::check_namespace(namespace)?;
542 let mut path_buf = config.path_with_guard.path_buf.clone();
543 path_buf.push(namespace);
544 let test = std::path::Path::exists(&path_buf);
545 Ok(test)
546 }
547
548 async fn create(
549 config: &Self::Config,
550 namespace: &str,
551 ) -> Result<(), RocksDbStoreInternalError> {
552 Self::check_namespace(namespace)?;
553 let mut path_buf = config.path_with_guard.path_buf.clone();
554 path_buf.push(namespace);
555 if std::path::Path::exists(&path_buf) {
556 return Err(RocksDbStoreInternalError::StoreAlreadyExists);
557 }
558 std::fs::create_dir_all(path_buf)?;
559 Ok(())
560 }
561
562 async fn delete(
563 config: &Self::Config,
564 namespace: &str,
565 ) -> Result<(), RocksDbStoreInternalError> {
566 Self::check_namespace(namespace)?;
567 let mut path_buf = config.path_with_guard.path_buf.clone();
568 path_buf.push(namespace);
569 let path = path_buf.as_path();
570 std::fs::remove_dir_all(path)?;
571 Ok(())
572 }
573}
574
575#[cfg(with_testing)]
576impl TestKeyValueStore for RocksDbStoreInternal {
577 async fn new_test_config() -> Result<RocksDbStoreInternalConfig, RocksDbStoreInternalError> {
578 let path_with_guard = PathWithGuard::new_testing();
579 let common_config = CommonStoreInternalConfig {
580 max_concurrent_queries: None,
581 max_stream_queries: TEST_ROCKS_DB_MAX_STREAM_QUERIES,
582 replication_factor: 1,
583 };
584 let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
585 Ok(RocksDbStoreInternalConfig {
586 path_with_guard,
587 spawn_mode,
588 common_config,
589 })
590 }
591}
592
593#[derive(Error, Debug)]
595pub enum RocksDbStoreInternalError {
596 #[error("Store already exists")]
598 StoreAlreadyExists,
599
600 #[error("tokio join error: {0}")]
602 TokioJoinError(#[from] tokio::task::JoinError),
603
604 #[error("RocksDB error: {0}")]
606 RocksDb(#[from] rocksdb::Error),
607
608 #[error("Namespaces should be directories")]
610 NonDirectoryNamespace,
611
612 #[error("error in the conversion from OsString: {0:?}")]
614 IntoStringError(OsString),
615
616 #[error("The key must have at most 8 MiB")]
618 KeyTooLong,
619
620 #[error("Namespace contains forbidden characters")]
622 InvalidNamespace,
623
624 #[error("Filesystem error: {0}")]
626 FsError(#[from] std::io::Error),
627
628 #[error(transparent)]
630 BcsError(#[from] bcs::Error),
631}
632
633#[derive(Clone, Debug, Deserialize, Serialize)]
635pub struct PathWithGuard {
636 pub path_buf: PathBuf,
638 #[serde(skip)]
640 _dir: Option<Arc<TempDir>>,
641}
642
643impl PathWithGuard {
644 pub fn new(path_buf: PathBuf) -> Self {
646 Self {
647 path_buf,
648 _dir: None,
649 }
650 }
651
652 #[cfg(with_testing)]
654 fn new_testing() -> PathWithGuard {
655 let dir = TempDir::new().unwrap();
656 let path_buf = dir.path().to_path_buf();
657 let _dir = Some(Arc::new(dir));
658 PathWithGuard { path_buf, _dir }
659 }
660}
661
662impl PartialEq for PathWithGuard {
663 fn eq(&self, other: &Self) -> bool {
664 self.path_buf == other.path_buf
665 }
666}
667impl Eq for PathWithGuard {}
668
669impl KeyValueStoreError for RocksDbStoreInternalError {
670 const BACKEND: &'static str = "rocks_db";
671}
672
673#[cfg(with_metrics)]
675pub type RocksDbStore = MeteredStore<
676 LruCachingStore<MeteredStore<ValueSplittingStore<MeteredStore<RocksDbStoreInternal>>>>,
677>;
678
679#[cfg(not(with_metrics))]
681pub type RocksDbStore = LruCachingStore<ValueSplittingStore<RocksDbStoreInternal>>;
682
683pub type RocksDbStoreError = ValueSplittingError<RocksDbStoreInternalError>;
685
686pub type RocksDbStoreConfig = LruCachingConfig<RocksDbStoreInternalConfig>;
688
689impl RocksDbStoreConfig {
690 pub fn new(
692 spawn_mode: RocksDbSpawnMode,
693 path_with_guard: PathWithGuard,
694 common_config: crate::store::CommonStoreConfig,
695 ) -> RocksDbStoreConfig {
696 let inner_config = RocksDbStoreInternalConfig {
697 path_with_guard,
698 spawn_mode,
699 common_config: common_config.reduced(),
700 };
701 RocksDbStoreConfig {
702 inner_config,
703 storage_cache_config: common_config.storage_cache_config,
704 }
705 }
706}