1use std::{
7 ffi::OsString,
8 fmt::Display,
9 path::PathBuf,
10 sync::{
11 atomic::{AtomicBool, Ordering},
12 Arc,
13 },
14};
15
16use linera_base::ensure;
17use rocksdb::{BlockBasedOptions, Cache, DBCompactionStyle, SliceTransform};
18use serde::{Deserialize, Serialize};
19use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};
20use tempfile::TempDir;
21use thiserror::Error;
22
23#[cfg(with_metrics)]
24use crate::metering::MeteredDatabase;
25#[cfg(with_testing)]
26use crate::store::TestKeyValueDatabase;
27use crate::{
28 batch::{Batch, WriteOperation},
29 common::get_upper_bound_option,
30 lru_caching::{LruCachingConfig, LruCachingDatabase},
31 store::{
32 KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError,
33 WritableKeyValueStore,
34 },
35 value_splitting::{ValueSplittingDatabase, ValueSplittingError},
36};
37
38static ROOT_KEY_DOMAIN: [u8; 1] = [0];
40static STORED_ROOT_KEYS_PREFIX: u8 = 1;
41
42#[cfg(with_testing)]
44const TEST_ROCKS_DB_MAX_STREAM_QUERIES: usize = 10;
45
46const MAX_VALUE_SIZE: usize = 3 * 1024 * 1024 * 1024 - 400;
49
50const MAX_KEY_SIZE: usize = 8 * 1024 * 1024 - 400;
53
54const WRITE_BUFFER_SIZE: usize = 256 * 1024 * 1024; const MAX_WRITE_BUFFER_NUMBER: i32 = 6;
56const HYPER_CLOCK_CACHE_BLOCK_SIZE: usize = 8 * 1024; type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;
60
61#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize, Serialize)]
68pub enum RocksDbSpawnMode {
69 SpawnBlocking,
71 BlockInPlace,
73}
74
75impl RocksDbSpawnMode {
76 pub fn get_spawn_mode_from_runtime() -> Self {
78 if tokio::runtime::Handle::current().metrics().num_workers() > 1 {
79 RocksDbSpawnMode::BlockInPlace
80 } else {
81 RocksDbSpawnMode::SpawnBlocking
82 }
83 }
84
85 #[inline]
87 async fn spawn<F, I, O>(&self, f: F, input: I) -> Result<O, RocksDbStoreInternalError>
88 where
89 F: FnOnce(I) -> Result<O, RocksDbStoreInternalError> + Send + 'static,
90 I: Send + 'static,
91 O: Send + 'static,
92 {
93 Ok(match self {
94 RocksDbSpawnMode::BlockInPlace => tokio::task::block_in_place(move || f(input))?,
95 RocksDbSpawnMode::SpawnBlocking => {
96 tokio::task::spawn_blocking(move || f(input)).await??
97 }
98 })
99 }
100}
101
102impl Display for RocksDbSpawnMode {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 match &self {
105 RocksDbSpawnMode::SpawnBlocking => write!(f, "spawn_blocking"),
106 RocksDbSpawnMode::BlockInPlace => write!(f, "block_in_place"),
107 }
108 }
109}
110
111fn check_key_size(key: &[u8]) -> Result<(), RocksDbStoreInternalError> {
112 ensure!(
113 key.len() <= MAX_KEY_SIZE,
114 RocksDbStoreInternalError::KeyTooLong
115 );
116 Ok(())
117}
118
119#[derive(Clone)]
120struct RocksDbStoreExecutor {
121 db: Arc<DB>,
122 start_key: Vec<u8>,
123}
124
125impl RocksDbStoreExecutor {
126 fn contains_keys_internal(
127 &self,
128 keys: Vec<Vec<u8>>,
129 ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
130 let size = keys.len();
131 let mut results = vec![false; size];
132 let mut indices = Vec::new();
133 let mut keys_red = Vec::new();
134 for (i, key) in keys.into_iter().enumerate() {
135 check_key_size(&key)?;
136 let mut full_key = self.start_key.to_vec();
137 full_key.extend(key);
138 if self.db.key_may_exist(&full_key) {
139 indices.push(i);
140 keys_red.push(full_key);
141 }
142 }
143 let values_red = self.db.multi_get(keys_red);
144 for (index, value) in indices.into_iter().zip(values_red) {
145 results[index] = value?.is_some();
146 }
147 Ok(results)
148 }
149
150 fn read_multi_values_bytes_internal(
151 &self,
152 keys: Vec<Vec<u8>>,
153 ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
154 for key in &keys {
155 check_key_size(key)?;
156 }
157 let full_keys = keys
158 .into_iter()
159 .map(|key| {
160 let mut full_key = self.start_key.to_vec();
161 full_key.extend(key);
162 full_key
163 })
164 .collect::<Vec<_>>();
165 let entries = self.db.multi_get(&full_keys);
166 Ok(entries.into_iter().collect::<Result<_, _>>()?)
167 }
168
169 fn get_find_prefix_iterator(&self, prefix: &[u8]) -> rocksdb::DBRawIteratorWithThreadMode<DB> {
170 let mut read_opts = rocksdb::ReadOptions::default();
172 read_opts.set_async_io(true);
174
175 let upper_bound = get_upper_bound_option(prefix);
177 if let Some(upper_bound) = upper_bound {
178 read_opts.set_iterate_upper_bound(upper_bound);
179 }
180
181 let mut iter = self.db.raw_iterator_opt(read_opts);
182 iter.seek(prefix);
183 iter
184 }
185
186 fn find_keys_by_prefix_internal(
187 &self,
188 key_prefix: Vec<u8>,
189 ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
190 check_key_size(&key_prefix)?;
191
192 let mut prefix = self.start_key.clone();
193 prefix.extend(key_prefix);
194 let len = prefix.len();
195
196 let mut iter = self.get_find_prefix_iterator(&prefix);
197 let mut keys = Vec::new();
198 while let Some(key) = iter.key() {
199 keys.push(key[len..].to_vec());
200 iter.next();
201 }
202 Ok(keys)
203 }
204
205 #[expect(clippy::type_complexity)]
206 fn find_key_values_by_prefix_internal(
207 &self,
208 key_prefix: Vec<u8>,
209 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, RocksDbStoreInternalError> {
210 check_key_size(&key_prefix)?;
211 let mut prefix = self.start_key.clone();
212 prefix.extend(key_prefix);
213 let len = prefix.len();
214
215 let mut iter = self.get_find_prefix_iterator(&prefix);
216 let mut key_values = Vec::new();
217 while let Some((key, value)) = iter.item() {
218 let key_value = (key[len..].to_vec(), value.to_vec());
219 key_values.push(key_value);
220 iter.next();
221 }
222 Ok(key_values)
223 }
224
225 fn write_batch_internal(
226 &self,
227 batch: Batch,
228 write_root_key: bool,
229 ) -> Result<(), RocksDbStoreInternalError> {
230 let mut inner_batch = rocksdb::WriteBatchWithTransaction::default();
231 for operation in batch.operations {
232 match operation {
233 WriteOperation::Delete { key } => {
234 check_key_size(&key)?;
235 let mut full_key = self.start_key.to_vec();
236 full_key.extend(key);
237 inner_batch.delete(&full_key)
238 }
239 WriteOperation::Put { key, value } => {
240 check_key_size(&key)?;
241 let mut full_key = self.start_key.to_vec();
242 full_key.extend(key);
243 inner_batch.put(&full_key, value)
244 }
245 WriteOperation::DeletePrefix { key_prefix } => {
246 check_key_size(&key_prefix)?;
247 let mut full_key1 = self.start_key.to_vec();
248 full_key1.extend(&key_prefix);
249 let full_key2 =
250 get_upper_bound_option(&full_key1).expect("the first entry cannot be 255");
251 inner_batch.delete_range(&full_key1, &full_key2);
252 }
253 }
254 }
255 if write_root_key {
256 let mut full_key = self.start_key.to_vec();
257 full_key[0] = STORED_ROOT_KEYS_PREFIX;
258 inner_batch.put(&full_key, vec![]);
259 }
260 self.db.write(inner_batch)?;
261 Ok(())
262 }
263}
264
265#[derive(Clone)]
267pub struct RocksDbStoreInternal {
268 executor: RocksDbStoreExecutor,
269 _path_with_guard: PathWithGuard,
270 max_stream_queries: usize,
271 spawn_mode: RocksDbSpawnMode,
272 root_key_written: Arc<AtomicBool>,
273}
274
275#[derive(Clone)]
277pub struct RocksDbDatabaseInternal {
278 executor: RocksDbStoreExecutor,
279 _path_with_guard: PathWithGuard,
280 max_stream_queries: usize,
281 spawn_mode: RocksDbSpawnMode,
282}
283
284impl WithError for RocksDbDatabaseInternal {
285 type Error = RocksDbStoreInternalError;
286}
287
288#[derive(Clone, Debug, Deserialize, Serialize)]
290pub struct RocksDbStoreInternalConfig {
291 pub path_with_guard: PathWithGuard,
293 pub spawn_mode: RocksDbSpawnMode,
295 pub max_stream_queries: usize,
297}
298
299impl RocksDbDatabaseInternal {
300 fn check_namespace(namespace: &str) -> Result<(), RocksDbStoreInternalError> {
301 if !namespace
302 .chars()
303 .all(|character| character.is_ascii_alphanumeric() || character == '_')
304 {
305 return Err(RocksDbStoreInternalError::InvalidNamespace);
306 }
307 Ok(())
308 }
309
310 fn build(
311 config: &RocksDbStoreInternalConfig,
312 namespace: &str,
313 ) -> Result<RocksDbDatabaseInternal, RocksDbStoreInternalError> {
314 let start_key = ROOT_KEY_DOMAIN.to_vec();
315 let temp_store = RocksDbStoreInternal::build(config, namespace, start_key)?;
317 Ok(RocksDbDatabaseInternal {
318 executor: temp_store.executor,
319 _path_with_guard: temp_store._path_with_guard,
320 max_stream_queries: temp_store.max_stream_queries,
321 spawn_mode: temp_store.spawn_mode,
322 })
323 }
324}
325
326impl RocksDbStoreInternal {
327 fn build(
328 config: &RocksDbStoreInternalConfig,
329 namespace: &str,
330 start_key: Vec<u8>,
331 ) -> Result<RocksDbStoreInternal, RocksDbStoreInternalError> {
332 RocksDbDatabaseInternal::check_namespace(namespace)?;
333 let mut path_buf = config.path_with_guard.path_buf.clone();
334 let mut path_with_guard = config.path_with_guard.clone();
335 path_buf.push(namespace);
336 path_with_guard.path_buf = path_buf.clone();
337 let max_stream_queries = config.max_stream_queries;
338 let spawn_mode = config.spawn_mode;
339 if !std::path::Path::exists(&path_buf) {
340 std::fs::create_dir(path_buf.clone())?;
341 }
342 let sys = System::new_with_specifics(
343 RefreshKind::nothing()
344 .with_cpu(CpuRefreshKind::everything())
345 .with_memory(MemoryRefreshKind::nothing().with_ram()),
346 );
347 let num_cpus = sys.cpus().len() as i32;
348 let total_ram = sys.total_memory() as usize;
349 let mut options = rocksdb::Options::default();
350 options.create_if_missing(true);
351 options.create_missing_column_families(true);
352 options.set_write_buffer_size(WRITE_BUFFER_SIZE);
354 options.set_max_write_buffer_number(MAX_WRITE_BUFFER_NUMBER);
355 options.set_compression_type(rocksdb::DBCompressionType::Lz4);
356 options.set_level_zero_slowdown_writes_trigger(8);
357 options.set_level_zero_stop_writes_trigger(12);
358 options.set_level_zero_file_num_compaction_trigger(2);
359 options.increase_parallelism(num_cpus);
363 options.set_max_background_jobs(num_cpus);
364 options.set_max_subcompactions(num_cpus as u32);
365 options.set_level_compaction_dynamic_level_bytes(true);
366
367 options.set_compaction_style(DBCompactionStyle::Level);
368 options.set_target_file_size_base(2 * WRITE_BUFFER_SIZE as u64);
369
370 let mut block_options = BlockBasedOptions::default();
371 block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
372 block_options.set_cache_index_and_filter_blocks(true);
373 block_options.set_block_cache(&Cache::new_hyper_clock_cache(
379 total_ram / 4,
380 HYPER_CLOCK_CACHE_BLOCK_SIZE,
381 ));
382
383 block_options.set_bloom_filter(10.0, false);
385 block_options.set_whole_key_filtering(false);
386
387 block_options.set_block_size(32 * 1024);
389 block_options.set_format_version(5);
391
392 options.set_block_based_table_factory(&block_options);
393
394 let prefix_extractor = SliceTransform::create_fixed_prefix(8);
397 options.set_prefix_extractor(prefix_extractor);
398
399 options.set_memtable_prefix_bloom_ratio(0.125);
401 options.set_optimize_filters_for_hits(true);
403 options.set_allow_mmap_reads(true);
405 options.set_advise_random_on_open(false);
407
408 let db = DB::open(&options, path_buf)?;
409 let executor = RocksDbStoreExecutor {
410 db: Arc::new(db),
411 start_key,
412 };
413 Ok(RocksDbStoreInternal {
414 executor,
415 _path_with_guard: path_with_guard,
416 max_stream_queries,
417 spawn_mode,
418 root_key_written: Arc::new(AtomicBool::new(false)),
419 })
420 }
421}
422
423impl WithError for RocksDbStoreInternal {
424 type Error = RocksDbStoreInternalError;
425}
426
427impl ReadableKeyValueStore for RocksDbStoreInternal {
428 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
429
430 fn max_stream_queries(&self) -> usize {
431 self.max_stream_queries
432 }
433
434 async fn read_value_bytes(
435 &self,
436 key: &[u8],
437 ) -> Result<Option<Vec<u8>>, RocksDbStoreInternalError> {
438 check_key_size(key)?;
439 let db = self.executor.db.clone();
440 let mut full_key = self.executor.start_key.to_vec();
441 full_key.extend(key);
442 self.spawn_mode
443 .spawn(move |x| Ok(db.get(&x)?), full_key)
444 .await
445 }
446
447 async fn contains_key(&self, key: &[u8]) -> Result<bool, RocksDbStoreInternalError> {
448 check_key_size(key)?;
449 let db = self.executor.db.clone();
450 let mut full_key = self.executor.start_key.to_vec();
451 full_key.extend(key);
452 self.spawn_mode
453 .spawn(
454 move |x| {
455 if !db.key_may_exist(&x) {
456 return Ok(false);
457 }
458 Ok(db.get(&x)?.is_some())
459 },
460 full_key,
461 )
462 .await
463 }
464
465 async fn contains_keys(
466 &self,
467 keys: Vec<Vec<u8>>,
468 ) -> Result<Vec<bool>, RocksDbStoreInternalError> {
469 let executor = self.executor.clone();
470 self.spawn_mode
471 .spawn(move |x| executor.contains_keys_internal(x), keys)
472 .await
473 }
474
475 async fn read_multi_values_bytes(
476 &self,
477 keys: Vec<Vec<u8>>,
478 ) -> Result<Vec<Option<Vec<u8>>>, RocksDbStoreInternalError> {
479 let executor = self.executor.clone();
480 self.spawn_mode
481 .spawn(move |x| executor.read_multi_values_bytes_internal(x), keys)
482 .await
483 }
484
485 async fn find_keys_by_prefix(
486 &self,
487 key_prefix: &[u8],
488 ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
489 let executor = self.executor.clone();
490 let key_prefix = key_prefix.to_vec();
491 self.spawn_mode
492 .spawn(
493 move |x| executor.find_keys_by_prefix_internal(x),
494 key_prefix,
495 )
496 .await
497 }
498
499 async fn find_key_values_by_prefix(
500 &self,
501 key_prefix: &[u8],
502 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, RocksDbStoreInternalError> {
503 let executor = self.executor.clone();
504 let key_prefix = key_prefix.to_vec();
505 self.spawn_mode
506 .spawn(
507 move |x| executor.find_key_values_by_prefix_internal(x),
508 key_prefix,
509 )
510 .await
511 }
512}
513
514impl WritableKeyValueStore for RocksDbStoreInternal {
515 const MAX_VALUE_SIZE: usize = MAX_VALUE_SIZE;
516
517 async fn write_batch(&self, batch: Batch) -> Result<(), RocksDbStoreInternalError> {
518 let write_root_key = !self.root_key_written.fetch_or(true, Ordering::SeqCst);
519 let executor = self.executor.clone();
520 self.spawn_mode
521 .spawn(
522 move |x| executor.write_batch_internal(x, write_root_key),
523 batch,
524 )
525 .await
526 }
527
528 async fn clear_journal(&self) -> Result<(), RocksDbStoreInternalError> {
529 Ok(())
530 }
531}
532
533impl KeyValueDatabase for RocksDbDatabaseInternal {
534 type Config = RocksDbStoreInternalConfig;
535 type Store = RocksDbStoreInternal;
536
537 fn get_name() -> String {
538 "rocksdb internal".to_string()
539 }
540
541 async fn connect(
542 config: &Self::Config,
543 namespace: &str,
544 ) -> Result<Self, RocksDbStoreInternalError> {
545 Self::build(config, namespace)
546 }
547
548 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, RocksDbStoreInternalError> {
549 let mut start_key = ROOT_KEY_DOMAIN.to_vec();
550 start_key.extend(root_key);
551 let mut executor = self.executor.clone();
552 executor.start_key = start_key;
553 Ok(RocksDbStoreInternal {
554 executor,
555 _path_with_guard: self._path_with_guard.clone(),
556 max_stream_queries: self.max_stream_queries,
557 spawn_mode: self.spawn_mode,
558 root_key_written: Arc::new(AtomicBool::new(false)),
559 })
560 }
561
562 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, RocksDbStoreInternalError> {
563 self.open_shared(root_key)
564 }
565
566 async fn list_all(config: &Self::Config) -> Result<Vec<String>, RocksDbStoreInternalError> {
567 let entries = std::fs::read_dir(config.path_with_guard.path_buf.clone())?;
568 let mut namespaces = Vec::new();
569 for entry in entries {
570 let entry = entry?;
571 if !entry.file_type()?.is_dir() {
572 return Err(RocksDbStoreInternalError::NonDirectoryNamespace);
573 }
574 let namespace = match entry.file_name().into_string() {
575 Err(error) => {
576 return Err(RocksDbStoreInternalError::IntoStringError(error));
577 }
578 Ok(namespace) => namespace,
579 };
580 namespaces.push(namespace);
581 }
582 Ok(namespaces)
583 }
584
585 async fn list_root_keys(
586 config: &Self::Config,
587 namespace: &str,
588 ) -> Result<Vec<Vec<u8>>, RocksDbStoreInternalError> {
589 let start_key = vec![STORED_ROOT_KEYS_PREFIX];
590 let store = RocksDbStoreInternal::build(config, namespace, start_key)?;
591 store.find_keys_by_prefix(&[]).await
592 }
593
594 async fn delete_all(config: &Self::Config) -> Result<(), RocksDbStoreInternalError> {
595 let namespaces = Self::list_all(config).await?;
596 for namespace in namespaces {
597 let mut path_buf = config.path_with_guard.path_buf.clone();
598 path_buf.push(&namespace);
599 std::fs::remove_dir_all(path_buf.as_path())?;
600 }
601 Ok(())
602 }
603
604 async fn exists(
605 config: &Self::Config,
606 namespace: &str,
607 ) -> Result<bool, RocksDbStoreInternalError> {
608 Self::check_namespace(namespace)?;
609 let mut path_buf = config.path_with_guard.path_buf.clone();
610 path_buf.push(namespace);
611 let test = std::path::Path::exists(&path_buf);
612 Ok(test)
613 }
614
615 async fn create(
616 config: &Self::Config,
617 namespace: &str,
618 ) -> Result<(), RocksDbStoreInternalError> {
619 Self::check_namespace(namespace)?;
620 let mut path_buf = config.path_with_guard.path_buf.clone();
621 path_buf.push(namespace);
622 if std::path::Path::exists(&path_buf) {
623 return Err(RocksDbStoreInternalError::StoreAlreadyExists);
624 }
625 std::fs::create_dir_all(path_buf)?;
626 Ok(())
627 }
628
629 async fn delete(
630 config: &Self::Config,
631 namespace: &str,
632 ) -> Result<(), RocksDbStoreInternalError> {
633 Self::check_namespace(namespace)?;
634 let mut path_buf = config.path_with_guard.path_buf.clone();
635 path_buf.push(namespace);
636 let path = path_buf.as_path();
637 std::fs::remove_dir_all(path)?;
638 Ok(())
639 }
640}
641
642#[cfg(with_testing)]
643impl TestKeyValueDatabase for RocksDbDatabaseInternal {
644 async fn new_test_config() -> Result<RocksDbStoreInternalConfig, RocksDbStoreInternalError> {
645 let path_with_guard = PathWithGuard::new_testing();
646 let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
647 let max_stream_queries = TEST_ROCKS_DB_MAX_STREAM_QUERIES;
648 Ok(RocksDbStoreInternalConfig {
649 path_with_guard,
650 spawn_mode,
651 max_stream_queries,
652 })
653 }
654}
655
656#[derive(Error, Debug)]
658pub enum RocksDbStoreInternalError {
659 #[error("Store already exists")]
661 StoreAlreadyExists,
662
663 #[error("tokio join error: {0}")]
665 TokioJoinError(#[from] tokio::task::JoinError),
666
667 #[error("RocksDB error: {0}")]
669 RocksDb(#[from] rocksdb::Error),
670
671 #[error("Namespaces should be directories")]
673 NonDirectoryNamespace,
674
675 #[error("error in the conversion from OsString: {0:?}")]
677 IntoStringError(OsString),
678
679 #[error("The key must have at most 8 MiB")]
681 KeyTooLong,
682
683 #[error("Namespace contains forbidden characters")]
685 InvalidNamespace,
686
687 #[error("Filesystem error: {0}")]
689 FsError(#[from] std::io::Error),
690
691 #[error(transparent)]
693 BcsError(#[from] bcs::Error),
694}
695
696#[derive(Clone, Debug, Deserialize, Serialize)]
698pub struct PathWithGuard {
699 pub path_buf: PathBuf,
701 #[serde(skip)]
703 _dir: Option<Arc<TempDir>>,
704}
705
706impl PathWithGuard {
707 pub fn new(path_buf: PathBuf) -> Self {
709 Self {
710 path_buf,
711 _dir: None,
712 }
713 }
714
715 #[cfg(with_testing)]
717 fn new_testing() -> PathWithGuard {
718 let dir = TempDir::new().unwrap();
719 let path_buf = dir.path().to_path_buf();
720 let _dir = Some(Arc::new(dir));
721 PathWithGuard { path_buf, _dir }
722 }
723}
724
725impl PartialEq for PathWithGuard {
726 fn eq(&self, other: &Self) -> bool {
727 self.path_buf == other.path_buf
728 }
729}
730impl Eq for PathWithGuard {}
731
732impl KeyValueStoreError for RocksDbStoreInternalError {
733 const BACKEND: &'static str = "rocks_db";
734}
735
736pub type RocksDbStoreError = ValueSplittingError<RocksDbStoreInternalError>;
738
739pub type RocksDbStoreConfig = LruCachingConfig<RocksDbStoreInternalConfig>;
741
742#[cfg(with_metrics)]
744pub type RocksDbDatabase = MeteredDatabase<
745 LruCachingDatabase<
746 MeteredDatabase<ValueSplittingDatabase<MeteredDatabase<RocksDbDatabaseInternal>>>,
747 >,
748>;
749#[cfg(not(with_metrics))]
751pub type RocksDbDatabase = LruCachingDatabase<ValueSplittingDatabase<RocksDbDatabaseInternal>>;