1use crate::{
17 column_family::AsColumnFamilyRef,
18 column_family::BoundColumnFamily,
19 column_family::UnboundColumnFamily,
20 db_options::OptionsMustOutliveDB,
21 ffi,
22 ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath, CStrLike},
23 ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode,
24 DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, Direction, Error, FlushOptions,
25 IngestExternalFileOptions, IteratorMode, Options, ReadOptions, SnapshotWithThreadMode,
26 WriteBatch, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
27};
28
29use crate::ffi_util::CSlice;
30use libc::{self, c_char, c_int, c_uchar, c_void, size_t};
31use std::collections::BTreeMap;
32use std::ffi::{CStr, CString};
33use std::fmt;
34use std::fs;
35use std::iter;
36use std::path::Path;
37use std::path::PathBuf;
38use std::ptr;
39use std::slice;
40use std::str;
41use std::sync::Arc;
42use std::sync::RwLock;
43use std::time::Duration;
44
45pub trait ThreadMode {
56 fn new_cf_map_internal(
58 cf_map: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
59 ) -> Self;
60 fn drop_all_cfs_internal(&mut self);
62}
63
64pub struct SingleThreaded {
71 pub(crate) cfs: BTreeMap<String, ColumnFamily>,
72}
73
74pub struct MultiThreaded {
80 pub(crate) cfs: RwLock<BTreeMap<String, Arc<UnboundColumnFamily>>>,
81}
82
83impl ThreadMode for SingleThreaded {
84 fn new_cf_map_internal(
85 cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
86 ) -> Self {
87 Self {
88 cfs: cfs
89 .into_iter()
90 .map(|(n, c)| (n, ColumnFamily { inner: c }))
91 .collect(),
92 }
93 }
94
95 fn drop_all_cfs_internal(&mut self) {
96 self.cfs.clear();
98 }
99}
100
101impl ThreadMode for MultiThreaded {
102 fn new_cf_map_internal(
103 cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
104 ) -> Self {
105 Self {
106 cfs: RwLock::new(
107 cfs.into_iter()
108 .map(|(n, c)| (n, Arc::new(UnboundColumnFamily { inner: c })))
109 .collect(),
110 ),
111 }
112 }
113
114 fn drop_all_cfs_internal(&mut self) {
115 self.cfs.write().unwrap().clear();
117 }
118}
119
120pub trait DBInner {
122 fn inner(&self) -> *mut ffi::rocksdb_t;
123}
124
125pub struct DBCommon<T: ThreadMode, D: DBInner> {
130 pub(crate) inner: D,
131 cfs: T, path: PathBuf,
133 _outlive: Vec<OptionsMustOutliveDB>,
134}
135
136pub trait DBAccess {
139 unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t;
140
141 unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t);
142
143 unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t;
144
145 unsafe fn create_iterator_cf(
146 &self,
147 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
148 readopts: &ReadOptions,
149 ) -> *mut ffi::rocksdb_iterator_t;
150
151 fn get_opt<K: AsRef<[u8]>>(
152 &self,
153 key: K,
154 readopts: &ReadOptions,
155 ) -> Result<Option<Vec<u8>>, Error>;
156
157 fn get_cf_opt<K: AsRef<[u8]>>(
158 &self,
159 cf: &impl AsColumnFamilyRef,
160 key: K,
161 readopts: &ReadOptions,
162 ) -> Result<Option<Vec<u8>>, Error>;
163
164 fn get_pinned_opt<K: AsRef<[u8]>>(
165 &self,
166 key: K,
167 readopts: &ReadOptions,
168 ) -> Result<Option<DBPinnableSlice>, Error>;
169
170 fn get_pinned_cf_opt<K: AsRef<[u8]>>(
171 &self,
172 cf: &impl AsColumnFamilyRef,
173 key: K,
174 readopts: &ReadOptions,
175 ) -> Result<Option<DBPinnableSlice>, Error>;
176
177 fn multi_get_opt<K, I>(
178 &self,
179 keys: I,
180 readopts: &ReadOptions,
181 ) -> Vec<Result<Option<Vec<u8>>, Error>>
182 where
183 K: AsRef<[u8]>,
184 I: IntoIterator<Item = K>;
185
186 fn multi_get_cf_opt<'b, K, I, W>(
187 &self,
188 keys_cf: I,
189 readopts: &ReadOptions,
190 ) -> Vec<Result<Option<Vec<u8>>, Error>>
191 where
192 K: AsRef<[u8]>,
193 I: IntoIterator<Item = (&'b W, K)>,
194 W: AsColumnFamilyRef + 'b;
195}
196
197impl<T: ThreadMode, D: DBInner> DBAccess for DBCommon<T, D> {
198 unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
199 ffi::rocksdb_create_snapshot(self.inner.inner())
200 }
201
202 unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
203 ffi::rocksdb_release_snapshot(self.inner.inner(), snapshot);
204 }
205
206 unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
207 ffi::rocksdb_create_iterator(self.inner.inner(), readopts.inner)
208 }
209
210 unsafe fn create_iterator_cf(
211 &self,
212 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
213 readopts: &ReadOptions,
214 ) -> *mut ffi::rocksdb_iterator_t {
215 ffi::rocksdb_create_iterator_cf(self.inner.inner(), readopts.inner, cf_handle)
216 }
217
218 fn get_opt<K: AsRef<[u8]>>(
219 &self,
220 key: K,
221 readopts: &ReadOptions,
222 ) -> Result<Option<Vec<u8>>, Error> {
223 self.get_opt(key, readopts)
224 }
225
226 fn get_cf_opt<K: AsRef<[u8]>>(
227 &self,
228 cf: &impl AsColumnFamilyRef,
229 key: K,
230 readopts: &ReadOptions,
231 ) -> Result<Option<Vec<u8>>, Error> {
232 self.get_cf_opt(cf, key, readopts)
233 }
234
235 fn get_pinned_opt<K: AsRef<[u8]>>(
236 &self,
237 key: K,
238 readopts: &ReadOptions,
239 ) -> Result<Option<DBPinnableSlice>, Error> {
240 self.get_pinned_opt(key, readopts)
241 }
242
243 fn get_pinned_cf_opt<K: AsRef<[u8]>>(
244 &self,
245 cf: &impl AsColumnFamilyRef,
246 key: K,
247 readopts: &ReadOptions,
248 ) -> Result<Option<DBPinnableSlice>, Error> {
249 self.get_pinned_cf_opt(cf, key, readopts)
250 }
251
252 fn multi_get_opt<K, Iter>(
253 &self,
254 keys: Iter,
255 readopts: &ReadOptions,
256 ) -> Vec<Result<Option<Vec<u8>>, Error>>
257 where
258 K: AsRef<[u8]>,
259 Iter: IntoIterator<Item = K>,
260 {
261 self.multi_get_opt(keys, readopts)
262 }
263
264 fn multi_get_cf_opt<'b, K, Iter, W>(
265 &self,
266 keys_cf: Iter,
267 readopts: &ReadOptions,
268 ) -> Vec<Result<Option<Vec<u8>>, Error>>
269 where
270 K: AsRef<[u8]>,
271 Iter: IntoIterator<Item = (&'b W, K)>,
272 W: AsColumnFamilyRef + 'b,
273 {
274 self.multi_get_cf_opt(keys_cf, readopts)
275 }
276}
277
278pub struct DBWithThreadModeInner {
279 inner: *mut ffi::rocksdb_t,
280}
281
282impl DBInner for DBWithThreadModeInner {
283 fn inner(&self) -> *mut ffi::rocksdb_t {
284 self.inner
285 }
286}
287
288impl Drop for DBWithThreadModeInner {
289 fn drop(&mut self) {
290 unsafe {
291 ffi::rocksdb_close(self.inner);
292 }
293 }
294}
295
296pub type DBWithThreadMode<T> = DBCommon<T, DBWithThreadModeInner>;
301
302#[cfg(not(feature = "multi-threaded-cf"))]
325pub type DB = DBWithThreadMode<SingleThreaded>;
326
327#[cfg(feature = "multi-threaded-cf")]
328pub type DB = DBWithThreadMode<MultiThreaded>;
329
330unsafe impl<T: ThreadMode + Send, I: DBInner> Send for DBCommon<T, I> {}
334
335unsafe impl<T: ThreadMode, I: DBInner> Sync for DBCommon<T, I> {}
338
339enum AccessType<'a> {
341 ReadWrite,
342 ReadOnly { error_if_log_file_exist: bool },
343 Secondary { secondary_path: &'a Path },
344 WithTTL { ttl: Duration },
345}
346
347impl<T: ThreadMode> DBWithThreadMode<T> {
349 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
351 let mut opts = Options::default();
352 opts.create_if_missing(true);
353 Self::open(&opts, path)
354 }
355
356 pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
358 Self::open_cf(opts, path, None::<&str>)
359 }
360
361 pub fn open_for_read_only<P: AsRef<Path>>(
363 opts: &Options,
364 path: P,
365 error_if_log_file_exist: bool,
366 ) -> Result<Self, Error> {
367 Self::open_cf_for_read_only(opts, path, None::<&str>, error_if_log_file_exist)
368 }
369
370 pub fn open_as_secondary<P: AsRef<Path>>(
372 opts: &Options,
373 primary_path: P,
374 secondary_path: P,
375 ) -> Result<Self, Error> {
376 Self::open_cf_as_secondary(opts, primary_path, secondary_path, None::<&str>)
377 }
378
379 pub fn open_with_ttl<P: AsRef<Path>>(
381 opts: &Options,
382 path: P,
383 ttl: Duration,
384 ) -> Result<Self, Error> {
385 Self::open_cf_descriptors_with_ttl(opts, path, std::iter::empty(), ttl)
386 }
387
388 pub fn open_cf_with_ttl<P, I, N>(
392 opts: &Options,
393 path: P,
394 cfs: I,
395 ttl: Duration,
396 ) -> Result<Self, Error>
397 where
398 P: AsRef<Path>,
399 I: IntoIterator<Item = N>,
400 N: AsRef<str>,
401 {
402 let cfs = cfs
403 .into_iter()
404 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
405
406 Self::open_cf_descriptors_with_ttl(opts, path, cfs, ttl)
407 }
408
409 pub fn open_cf_descriptors_with_ttl<P, I>(
412 opts: &Options,
413 path: P,
414 cfs: I,
415 ttl: Duration,
416 ) -> Result<Self, Error>
417 where
418 P: AsRef<Path>,
419 I: IntoIterator<Item = ColumnFamilyDescriptor>,
420 {
421 Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::WithTTL { ttl })
422 }
423
424 pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
428 where
429 P: AsRef<Path>,
430 I: IntoIterator<Item = N>,
431 N: AsRef<str>,
432 {
433 let cfs = cfs
434 .into_iter()
435 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
436
437 Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
438 }
439
440 pub fn open_cf_with_opts<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
444 where
445 P: AsRef<Path>,
446 I: IntoIterator<Item = (N, Options)>,
447 N: AsRef<str>,
448 {
449 let cfs = cfs
450 .into_iter()
451 .map(|(name, opts)| ColumnFamilyDescriptor::new(name.as_ref(), opts));
452
453 Self::open_cf_descriptors(opts, path, cfs)
454 }
455
456 pub fn open_cf_for_read_only<P, I, N>(
458 opts: &Options,
459 path: P,
460 cfs: I,
461 error_if_log_file_exist: bool,
462 ) -> Result<Self, Error>
463 where
464 P: AsRef<Path>,
465 I: IntoIterator<Item = N>,
466 N: AsRef<str>,
467 {
468 let cfs = cfs
469 .into_iter()
470 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
471
472 Self::open_cf_descriptors_internal(
473 opts,
474 path,
475 cfs,
476 &AccessType::ReadOnly {
477 error_if_log_file_exist,
478 },
479 )
480 }
481
482 pub fn open_cf_with_opts_for_read_only<P, I, N>(
484 db_opts: &Options,
485 path: P,
486 cfs: I,
487 error_if_log_file_exist: bool,
488 ) -> Result<Self, Error>
489 where
490 P: AsRef<Path>,
491 I: IntoIterator<Item = (N, Options)>,
492 N: AsRef<str>,
493 {
494 let cfs = cfs
495 .into_iter()
496 .map(|(name, cf_opts)| ColumnFamilyDescriptor::new(name.as_ref(), cf_opts));
497
498 Self::open_cf_descriptors_internal(
499 db_opts,
500 path,
501 cfs,
502 &AccessType::ReadOnly {
503 error_if_log_file_exist,
504 },
505 )
506 }
507
508 pub fn open_cf_descriptors_read_only<P, I>(
511 opts: &Options,
512 path: P,
513 cfs: I,
514 error_if_log_file_exist: bool,
515 ) -> Result<Self, Error>
516 where
517 P: AsRef<Path>,
518 I: IntoIterator<Item = ColumnFamilyDescriptor>,
519 {
520 Self::open_cf_descriptors_internal(
521 opts,
522 path,
523 cfs,
524 &AccessType::ReadOnly {
525 error_if_log_file_exist,
526 },
527 )
528 }
529
530 pub fn open_cf_as_secondary<P, I, N>(
532 opts: &Options,
533 primary_path: P,
534 secondary_path: P,
535 cfs: I,
536 ) -> Result<Self, Error>
537 where
538 P: AsRef<Path>,
539 I: IntoIterator<Item = N>,
540 N: AsRef<str>,
541 {
542 let cfs = cfs
543 .into_iter()
544 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
545
546 Self::open_cf_descriptors_internal(
547 opts,
548 primary_path,
549 cfs,
550 &AccessType::Secondary {
551 secondary_path: secondary_path.as_ref(),
552 },
553 )
554 }
555
556 pub fn open_cf_descriptors_as_secondary<P, I>(
559 opts: &Options,
560 path: P,
561 secondary_path: P,
562 cfs: I,
563 ) -> Result<Self, Error>
564 where
565 P: AsRef<Path>,
566 I: IntoIterator<Item = ColumnFamilyDescriptor>,
567 {
568 Self::open_cf_descriptors_internal(
569 opts,
570 path,
571 cfs,
572 &AccessType::Secondary {
573 secondary_path: secondary_path.as_ref(),
574 },
575 )
576 }
577
578 pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
580 where
581 P: AsRef<Path>,
582 I: IntoIterator<Item = ColumnFamilyDescriptor>,
583 {
584 Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
585 }
586
587 fn open_cf_descriptors_internal<P, I>(
589 opts: &Options,
590 path: P,
591 cfs: I,
592 access_type: &AccessType,
593 ) -> Result<Self, Error>
594 where
595 P: AsRef<Path>,
596 I: IntoIterator<Item = ColumnFamilyDescriptor>,
597 {
598 let cfs: Vec<_> = cfs.into_iter().collect();
599 let outlive = iter::once(opts.outlive.clone())
600 .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
601 .collect();
602
603 let cpath = to_cpath(&path)?;
604
605 if let Err(e) = fs::create_dir_all(&path) {
606 return Err(Error::new(format!(
607 "Failed to create RocksDB directory: `{e:?}`."
608 )));
609 }
610
611 let db: *mut ffi::rocksdb_t;
612 let mut cf_map = BTreeMap::new();
613
614 if cfs.is_empty() {
615 db = Self::open_raw(opts, &cpath, access_type)?;
616 } else {
617 let mut cfs_v = cfs;
618 if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
620 cfs_v.push(ColumnFamilyDescriptor {
621 name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
622 options: Options::default(),
623 });
624 }
625 let c_cfs: Vec<CString> = cfs_v
628 .iter()
629 .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
630 .collect();
631
632 let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
633
634 let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
636
637 let cfopts: Vec<_> = cfs_v
638 .iter()
639 .map(|cf| cf.options.inner as *const _)
640 .collect();
641
642 db = Self::open_cf_raw(
643 opts,
644 &cpath,
645 &cfs_v,
646 &cfnames,
647 &cfopts,
648 &mut cfhandles,
649 access_type,
650 )?;
651 for handle in &cfhandles {
652 if handle.is_null() {
653 return Err(Error::new(
654 "Received null column family handle from DB.".to_owned(),
655 ));
656 }
657 }
658
659 for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
660 cf_map.insert(cf_desc.name.clone(), inner);
661 }
662 }
663
664 if db.is_null() {
665 return Err(Error::new("Could not initialize database.".to_owned()));
666 }
667
668 Ok(Self {
669 inner: DBWithThreadModeInner { inner: db },
670 path: path.as_ref().to_path_buf(),
671 cfs: T::new_cf_map_internal(cf_map),
672 _outlive: outlive,
673 })
674 }
675
676 fn open_raw(
677 opts: &Options,
678 cpath: &CString,
679 access_type: &AccessType,
680 ) -> Result<*mut ffi::rocksdb_t, Error> {
681 let db = unsafe {
682 match *access_type {
683 AccessType::ReadOnly {
684 error_if_log_file_exist,
685 } => ffi_try!(ffi::rocksdb_open_for_read_only(
686 opts.inner,
687 cpath.as_ptr(),
688 c_uchar::from(error_if_log_file_exist),
689 )),
690 AccessType::ReadWrite => {
691 ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr()))
692 }
693 AccessType::Secondary { secondary_path } => {
694 ffi_try!(ffi::rocksdb_open_as_secondary(
695 opts.inner,
696 cpath.as_ptr(),
697 to_cpath(secondary_path)?.as_ptr(),
698 ))
699 }
700 AccessType::WithTTL { ttl } => ffi_try!(ffi::rocksdb_open_with_ttl(
701 opts.inner,
702 cpath.as_ptr(),
703 ttl.as_secs() as c_int,
704 )),
705 }
706 };
707 Ok(db)
708 }
709
710 #[allow(clippy::pedantic)]
711 fn open_cf_raw(
712 opts: &Options,
713 cpath: &CString,
714 cfs_v: &[ColumnFamilyDescriptor],
715 cfnames: &[*const c_char],
716 cfopts: &[*const ffi::rocksdb_options_t],
717 cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
718 access_type: &AccessType,
719 ) -> Result<*mut ffi::rocksdb_t, Error> {
720 let db = unsafe {
721 match *access_type {
722 AccessType::ReadOnly {
723 error_if_log_file_exist,
724 } => ffi_try!(ffi::rocksdb_open_for_read_only_column_families(
725 opts.inner,
726 cpath.as_ptr(),
727 cfs_v.len() as c_int,
728 cfnames.as_ptr(),
729 cfopts.as_ptr(),
730 cfhandles.as_mut_ptr(),
731 c_uchar::from(error_if_log_file_exist),
732 )),
733 AccessType::ReadWrite => ffi_try!(ffi::rocksdb_open_column_families(
734 opts.inner,
735 cpath.as_ptr(),
736 cfs_v.len() as c_int,
737 cfnames.as_ptr(),
738 cfopts.as_ptr(),
739 cfhandles.as_mut_ptr(),
740 )),
741 AccessType::Secondary { secondary_path } => {
742 ffi_try!(ffi::rocksdb_open_as_secondary_column_families(
743 opts.inner,
744 cpath.as_ptr(),
745 to_cpath(secondary_path)?.as_ptr(),
746 cfs_v.len() as c_int,
747 cfnames.as_ptr(),
748 cfopts.as_ptr(),
749 cfhandles.as_mut_ptr(),
750 ))
751 }
752 AccessType::WithTTL { ttl } => {
753 let ttls_v = vec![ttl.as_secs() as c_int; cfs_v.len()];
754 ffi_try!(ffi::rocksdb_open_column_families_with_ttl(
755 opts.inner,
756 cpath.as_ptr(),
757 cfs_v.len() as c_int,
758 cfnames.as_ptr(),
759 cfopts.as_ptr(),
760 cfhandles.as_mut_ptr(),
761 ttls_v.as_ptr(),
762 ))
763 }
764 }
765 };
766 Ok(db)
767 }
768
769 pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
771 &self,
772 cf: &impl AsColumnFamilyRef,
773 from: K,
774 to: K,
775 writeopts: &WriteOptions,
776 ) -> Result<(), Error> {
777 let from = from.as_ref();
778 let to = to.as_ref();
779
780 unsafe {
781 ffi_try!(ffi::rocksdb_delete_range_cf(
782 self.inner.inner(),
783 writeopts.inner,
784 cf.inner(),
785 from.as_ptr() as *const c_char,
786 from.len() as size_t,
787 to.as_ptr() as *const c_char,
788 to.len() as size_t,
789 ));
790 Ok(())
791 }
792 }
793
794 pub fn delete_range_cf<K: AsRef<[u8]>>(
796 &self,
797 cf: &impl AsColumnFamilyRef,
798 from: K,
799 to: K,
800 ) -> Result<(), Error> {
801 self.delete_range_cf_opt(cf, from, to, &WriteOptions::default())
802 }
803
804 pub fn write_opt(&self, batch: WriteBatch, writeopts: &WriteOptions) -> Result<(), Error> {
805 unsafe {
806 ffi_try!(ffi::rocksdb_write(
807 self.inner.inner(),
808 writeopts.inner,
809 batch.inner
810 ));
811 }
812 Ok(())
813 }
814
815 pub fn write(&self, batch: WriteBatch) -> Result<(), Error> {
816 self.write_opt(batch, &WriteOptions::default())
817 }
818
819 pub fn write_without_wal(&self, batch: WriteBatch) -> Result<(), Error> {
820 let mut wo = WriteOptions::new();
821 wo.disable_wal(true);
822 self.write_opt(batch, &wo)
823 }
824}
825
826impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
828 pub(crate) fn new(inner: D, cfs: T, path: PathBuf, outlive: Vec<OptionsMustOutliveDB>) -> Self {
829 Self {
830 inner,
831 cfs,
832 path,
833 _outlive: outlive,
834 }
835 }
836
837 pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
838 let cpath = to_cpath(path)?;
839 let mut length = 0;
840
841 unsafe {
842 let ptr = ffi_try!(ffi::rocksdb_list_column_families(
843 opts.inner,
844 cpath.as_ptr(),
845 &mut length,
846 ));
847
848 let vec = slice::from_raw_parts(ptr, length)
849 .iter()
850 .map(|ptr| CStr::from_ptr(*ptr).to_string_lossy().into_owned())
851 .collect();
852 ffi::rocksdb_list_column_families_destroy(ptr, length);
853 Ok(vec)
854 }
855 }
856
857 pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
858 let cpath = to_cpath(path)?;
859 unsafe {
860 ffi_try!(ffi::rocksdb_destroy_db(opts.inner, cpath.as_ptr()));
861 }
862 Ok(())
863 }
864
865 pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
866 let cpath = to_cpath(path)?;
867 unsafe {
868 ffi_try!(ffi::rocksdb_repair_db(opts.inner, cpath.as_ptr()));
869 }
870 Ok(())
871 }
872
873 pub fn path(&self) -> &Path {
874 self.path.as_path()
875 }
876
877 pub fn flush_wal(&self, sync: bool) -> Result<(), Error> {
880 unsafe {
881 ffi_try!(ffi::rocksdb_flush_wal(
882 self.inner.inner(),
883 c_uchar::from(sync)
884 ));
885 }
886 Ok(())
887 }
888
889 pub fn flush_opt(&self, flushopts: &FlushOptions) -> Result<(), Error> {
891 unsafe {
892 ffi_try!(ffi::rocksdb_flush(self.inner.inner(), flushopts.inner));
893 }
894 Ok(())
895 }
896
897 pub fn flush(&self) -> Result<(), Error> {
899 self.flush_opt(&FlushOptions::default())
900 }
901
902 pub fn flush_cf_opt(
904 &self,
905 cf: &impl AsColumnFamilyRef,
906 flushopts: &FlushOptions,
907 ) -> Result<(), Error> {
908 unsafe {
909 ffi_try!(ffi::rocksdb_flush_cf(
910 self.inner.inner(),
911 flushopts.inner,
912 cf.inner()
913 ));
914 }
915 Ok(())
916 }
917
918 pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> {
921 self.flush_cf_opt(cf, &FlushOptions::default())
922 }
923
924 pub fn get_opt<K: AsRef<[u8]>>(
928 &self,
929 key: K,
930 readopts: &ReadOptions,
931 ) -> Result<Option<Vec<u8>>, Error> {
932 self.get_pinned_opt(key, readopts)
933 .map(|x| x.map(|v| v.as_ref().to_vec()))
934 }
935
936 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
940 self.get_opt(key.as_ref(), &ReadOptions::default())
941 }
942
943 pub fn get_cf_opt<K: AsRef<[u8]>>(
947 &self,
948 cf: &impl AsColumnFamilyRef,
949 key: K,
950 readopts: &ReadOptions,
951 ) -> Result<Option<Vec<u8>>, Error> {
952 self.get_pinned_cf_opt(cf, key, readopts)
953 .map(|x| x.map(|v| v.as_ref().to_vec()))
954 }
955
956 pub fn get_cf<K: AsRef<[u8]>>(
960 &self,
961 cf: &impl AsColumnFamilyRef,
962 key: K,
963 ) -> Result<Option<Vec<u8>>, Error> {
964 self.get_cf_opt(cf, key.as_ref(), &ReadOptions::default())
965 }
966
967 pub fn get_pinned_opt<K: AsRef<[u8]>>(
970 &self,
971 key: K,
972 readopts: &ReadOptions,
973 ) -> Result<Option<DBPinnableSlice>, Error> {
974 if readopts.inner.is_null() {
975 return Err(Error::new(
976 "Unable to create RocksDB read options. This is a fairly trivial call, and its \
977 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
978 .to_owned(),
979 ));
980 }
981
982 let key = key.as_ref();
983 unsafe {
984 let val = ffi_try!(ffi::rocksdb_get_pinned(
985 self.inner.inner(),
986 readopts.inner,
987 key.as_ptr() as *const c_char,
988 key.len() as size_t,
989 ));
990 if val.is_null() {
991 Ok(None)
992 } else {
993 Ok(Some(DBPinnableSlice::from_c(val)))
994 }
995 }
996 }
997
998 pub fn get_pinned<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBPinnableSlice>, Error> {
1002 self.get_pinned_opt(key, &ReadOptions::default())
1003 }
1004
1005 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
1009 &self,
1010 cf: &impl AsColumnFamilyRef,
1011 key: K,
1012 readopts: &ReadOptions,
1013 ) -> Result<Option<DBPinnableSlice>, Error> {
1014 if readopts.inner.is_null() {
1015 return Err(Error::new(
1016 "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1017 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1018 .to_owned(),
1019 ));
1020 }
1021
1022 let key = key.as_ref();
1023 unsafe {
1024 let val = ffi_try!(ffi::rocksdb_get_pinned_cf(
1025 self.inner.inner(),
1026 readopts.inner,
1027 cf.inner(),
1028 key.as_ptr() as *const c_char,
1029 key.len() as size_t,
1030 ));
1031 if val.is_null() {
1032 Ok(None)
1033 } else {
1034 Ok(Some(DBPinnableSlice::from_c(val)))
1035 }
1036 }
1037 }
1038
1039 pub fn get_pinned_cf<K: AsRef<[u8]>>(
1043 &self,
1044 cf: &impl AsColumnFamilyRef,
1045 key: K,
1046 ) -> Result<Option<DBPinnableSlice>, Error> {
1047 self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
1048 }
1049
1050 pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
1052 where
1053 K: AsRef<[u8]>,
1054 I: IntoIterator<Item = K>,
1055 {
1056 self.multi_get_opt(keys, &ReadOptions::default())
1057 }
1058
1059 pub fn multi_get_opt<K, I>(
1061 &self,
1062 keys: I,
1063 readopts: &ReadOptions,
1064 ) -> Vec<Result<Option<Vec<u8>>, Error>>
1065 where
1066 K: AsRef<[u8]>,
1067 I: IntoIterator<Item = K>,
1068 {
1069 let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
1070 .into_iter()
1071 .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
1072 .unzip();
1073 let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
1074
1075 let mut values = vec![ptr::null_mut(); keys.len()];
1076 let mut values_sizes = vec![0_usize; keys.len()];
1077 let mut errors = vec![ptr::null_mut(); keys.len()];
1078 unsafe {
1079 ffi::rocksdb_multi_get(
1080 self.inner.inner(),
1081 readopts.inner,
1082 ptr_keys.len(),
1083 ptr_keys.as_ptr(),
1084 keys_sizes.as_ptr(),
1085 values.as_mut_ptr(),
1086 values_sizes.as_mut_ptr(),
1087 errors.as_mut_ptr(),
1088 );
1089 }
1090
1091 convert_values(values, values_sizes, errors)
1092 }
1093
1094 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
1096 &'a self,
1097 keys: I,
1098 ) -> Vec<Result<Option<Vec<u8>>, Error>>
1099 where
1100 K: AsRef<[u8]>,
1101 I: IntoIterator<Item = (&'b W, K)>,
1102 W: 'b + AsColumnFamilyRef,
1103 {
1104 self.multi_get_cf_opt(keys, &ReadOptions::default())
1105 }
1106
1107 pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
1109 &'a self,
1110 keys: I,
1111 readopts: &ReadOptions,
1112 ) -> Vec<Result<Option<Vec<u8>>, Error>>
1113 where
1114 K: AsRef<[u8]>,
1115 I: IntoIterator<Item = (&'b W, K)>,
1116 W: 'b + AsColumnFamilyRef,
1117 {
1118 let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
1119 .into_iter()
1120 .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
1121 .unzip();
1122 let ptr_keys: Vec<_> = cfs_and_keys
1123 .iter()
1124 .map(|(_, k)| k.as_ptr() as *const c_char)
1125 .collect();
1126 let ptr_cfs: Vec<_> = cfs_and_keys
1127 .iter()
1128 .map(|(c, _)| c.inner() as *const _)
1129 .collect();
1130
1131 let mut values = vec![ptr::null_mut(); ptr_keys.len()];
1132 let mut values_sizes = vec![0_usize; ptr_keys.len()];
1133 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1134 unsafe {
1135 ffi::rocksdb_multi_get_cf(
1136 self.inner.inner(),
1137 readopts.inner,
1138 ptr_cfs.as_ptr(),
1139 ptr_keys.len(),
1140 ptr_keys.as_ptr(),
1141 keys_sizes.as_ptr(),
1142 values.as_mut_ptr(),
1143 values_sizes.as_mut_ptr(),
1144 errors.as_mut_ptr(),
1145 );
1146 }
1147
1148 convert_values(values, values_sizes, errors)
1149 }
1150
1151 pub fn batched_multi_get_cf<K, I>(
1155 &self,
1156 cf: &impl AsColumnFamilyRef,
1157 keys: I,
1158 sorted_input: bool,
1159 ) -> Vec<Result<Option<DBPinnableSlice>, Error>>
1160 where
1161 K: AsRef<[u8]>,
1162 I: IntoIterator<Item = K>,
1163 {
1164 self.batched_multi_get_cf_opt(cf, keys, sorted_input, &ReadOptions::default())
1165 }
1166
1167 pub fn batched_multi_get_cf_opt<K, I>(
1171 &self,
1172 cf: &impl AsColumnFamilyRef,
1173 keys: I,
1174 sorted_input: bool,
1175 readopts: &ReadOptions,
1176 ) -> Vec<Result<Option<DBPinnableSlice>, Error>>
1177 where
1178 K: AsRef<[u8]>,
1179 I: IntoIterator<Item = K>,
1180 {
1181 let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
1182 .into_iter()
1183 .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
1184 .unzip();
1185 let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
1186
1187 let mut pinned_values = vec![ptr::null_mut(); ptr_keys.len()];
1188 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1189
1190 unsafe {
1191 ffi::rocksdb_batched_multi_get_cf(
1192 self.inner.inner(),
1193 readopts.inner,
1194 cf.inner(),
1195 ptr_keys.len(),
1196 ptr_keys.as_ptr(),
1197 keys_sizes.as_ptr(),
1198 pinned_values.as_mut_ptr(),
1199 errors.as_mut_ptr(),
1200 sorted_input,
1201 );
1202 pinned_values
1203 .into_iter()
1204 .zip(errors.into_iter())
1205 .map(|(v, e)| {
1206 if e.is_null() {
1207 if v.is_null() {
1208 Ok(None)
1209 } else {
1210 Ok(Some(DBPinnableSlice::from_c(v)))
1211 }
1212 } else {
1213 Err(Error::new(crate::ffi_util::error_message(e)))
1214 }
1215 })
1216 .collect()
1217 }
1218 }
1219
1220 pub fn key_may_exist<K: AsRef<[u8]>>(&self, key: K) -> bool {
1223 self.key_may_exist_opt(key, &ReadOptions::default())
1224 }
1225
1226 pub fn key_may_exist_opt<K: AsRef<[u8]>>(&self, key: K, readopts: &ReadOptions) -> bool {
1229 let key = key.as_ref();
1230 unsafe {
1231 0 != ffi::rocksdb_key_may_exist(
1232 self.inner.inner(),
1233 readopts.inner,
1234 key.as_ptr() as *const c_char,
1235 key.len() as size_t,
1236 ptr::null_mut(), ptr::null_mut(), ptr::null(), 0, ptr::null_mut(), )
1242 }
1243 }
1244
1245 pub fn key_may_exist_cf<K: AsRef<[u8]>>(&self, cf: &impl AsColumnFamilyRef, key: K) -> bool {
1248 self.key_may_exist_cf_opt(cf, key, &ReadOptions::default())
1249 }
1250
1251 pub fn key_may_exist_cf_opt<K: AsRef<[u8]>>(
1254 &self,
1255 cf: &impl AsColumnFamilyRef,
1256 key: K,
1257 readopts: &ReadOptions,
1258 ) -> bool {
1259 let key = key.as_ref();
1260 0 != unsafe {
1261 ffi::rocksdb_key_may_exist_cf(
1262 self.inner.inner(),
1263 readopts.inner,
1264 cf.inner(),
1265 key.as_ptr() as *const c_char,
1266 key.len() as size_t,
1267 ptr::null_mut(), ptr::null_mut(), ptr::null(), 0, ptr::null_mut(), )
1273 }
1274 }
1275
1276 pub fn key_may_exist_cf_opt_value<K: AsRef<[u8]>>(
1283 &self,
1284 cf: &impl AsColumnFamilyRef,
1285 key: K,
1286 readopts: &ReadOptions,
1287 ) -> (bool, Option<CSlice>) {
1288 let key = key.as_ref();
1289 let mut val: *mut c_char = ptr::null_mut();
1290 let mut val_len: usize = 0;
1291 let mut value_found: c_uchar = 0;
1292 let may_exists = 0
1293 != unsafe {
1294 ffi::rocksdb_key_may_exist_cf(
1295 self.inner.inner(),
1296 readopts.inner,
1297 cf.inner(),
1298 key.as_ptr() as *const c_char,
1299 key.len() as size_t,
1300 &mut val, &mut val_len, ptr::null(), 0, &mut value_found, )
1306 };
1307 if may_exists && value_found != 0 {
1310 (
1311 may_exists,
1312 Some(unsafe { CSlice::from_raw_parts(val, val_len) }),
1313 )
1314 } else {
1315 (may_exists, None)
1316 }
1317 }
1318
1319 fn create_inner_cf_handle(
1320 &self,
1321 name: impl CStrLike,
1322 opts: &Options,
1323 ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
1324 let cf_name = name.bake().map_err(|err| {
1325 Error::new(format!(
1326 "Failed to convert path to CString when creating cf: {err}"
1327 ))
1328 })?;
1329 Ok(unsafe {
1330 ffi_try!(ffi::rocksdb_create_column_family(
1331 self.inner.inner(),
1332 opts.inner,
1333 cf_name.as_ptr(),
1334 ))
1335 })
1336 }
1337
1338 pub fn iterator<'a: 'b, 'b>(
1339 &'a self,
1340 mode: IteratorMode,
1341 ) -> DBIteratorWithThreadMode<'b, Self> {
1342 let readopts = ReadOptions::default();
1343 self.iterator_opt(mode, readopts)
1344 }
1345
1346 pub fn iterator_opt<'a: 'b, 'b>(
1347 &'a self,
1348 mode: IteratorMode,
1349 readopts: ReadOptions,
1350 ) -> DBIteratorWithThreadMode<'b, Self> {
1351 DBIteratorWithThreadMode::new(self, readopts, mode)
1352 }
1353
1354 pub fn iterator_cf_opt<'a: 'b, 'b>(
1357 &'a self,
1358 cf_handle: &impl AsColumnFamilyRef,
1359 readopts: ReadOptions,
1360 mode: IteratorMode,
1361 ) -> DBIteratorWithThreadMode<'b, Self> {
1362 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
1363 }
1364
1365 pub fn full_iterator<'a: 'b, 'b>(
1369 &'a self,
1370 mode: IteratorMode,
1371 ) -> DBIteratorWithThreadMode<'b, Self> {
1372 let mut opts = ReadOptions::default();
1373 opts.set_total_order_seek(true);
1374 DBIteratorWithThreadMode::new(self, opts, mode)
1375 }
1376
1377 pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
1378 &'a self,
1379 prefix: P,
1380 ) -> DBIteratorWithThreadMode<'b, Self> {
1381 let mut opts = ReadOptions::default();
1382 opts.set_prefix_same_as_start(true);
1383 DBIteratorWithThreadMode::new(
1384 self,
1385 opts,
1386 IteratorMode::From(prefix.as_ref(), Direction::Forward),
1387 )
1388 }
1389
1390 pub fn iterator_cf<'a: 'b, 'b>(
1391 &'a self,
1392 cf_handle: &impl AsColumnFamilyRef,
1393 mode: IteratorMode,
1394 ) -> DBIteratorWithThreadMode<'b, Self> {
1395 let opts = ReadOptions::default();
1396 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1397 }
1398
1399 pub fn full_iterator_cf<'a: 'b, 'b>(
1400 &'a self,
1401 cf_handle: &impl AsColumnFamilyRef,
1402 mode: IteratorMode,
1403 ) -> DBIteratorWithThreadMode<'b, Self> {
1404 let mut opts = ReadOptions::default();
1405 opts.set_total_order_seek(true);
1406 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1407 }
1408
1409 pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
1410 &'a self,
1411 cf_handle: &impl AsColumnFamilyRef,
1412 prefix: P,
1413 ) -> DBIteratorWithThreadMode<'a, Self> {
1414 let mut opts = ReadOptions::default();
1415 opts.set_prefix_same_as_start(true);
1416 DBIteratorWithThreadMode::<'a, Self>::new_cf(
1417 self,
1418 cf_handle.inner(),
1419 opts,
1420 IteratorMode::From(prefix.as_ref(), Direction::Forward),
1421 )
1422 }
1423
1424 pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
1426 let opts = ReadOptions::default();
1427 DBRawIteratorWithThreadMode::new(self, opts)
1428 }
1429
1430 pub fn raw_iterator_cf<'a: 'b, 'b>(
1432 &'a self,
1433 cf_handle: &impl AsColumnFamilyRef,
1434 ) -> DBRawIteratorWithThreadMode<'b, Self> {
1435 let opts = ReadOptions::default();
1436 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
1437 }
1438
1439 pub fn raw_iterator_opt<'a: 'b, 'b>(
1441 &'a self,
1442 readopts: ReadOptions,
1443 ) -> DBRawIteratorWithThreadMode<'b, Self> {
1444 DBRawIteratorWithThreadMode::new(self, readopts)
1445 }
1446
1447 pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
1449 &'a self,
1450 cf_handle: &impl AsColumnFamilyRef,
1451 readopts: ReadOptions,
1452 ) -> DBRawIteratorWithThreadMode<'b, Self> {
1453 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
1454 }
1455
1456 pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
1457 SnapshotWithThreadMode::<Self>::new(self)
1458 }
1459
1460 pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
1461 where
1462 K: AsRef<[u8]>,
1463 V: AsRef<[u8]>,
1464 {
1465 let key = key.as_ref();
1466 let value = value.as_ref();
1467
1468 unsafe {
1469 ffi_try!(ffi::rocksdb_put(
1470 self.inner.inner(),
1471 writeopts.inner,
1472 key.as_ptr() as *const c_char,
1473 key.len() as size_t,
1474 value.as_ptr() as *const c_char,
1475 value.len() as size_t,
1476 ));
1477 Ok(())
1478 }
1479 }
1480
1481 pub fn put_cf_opt<K, V>(
1482 &self,
1483 cf: &impl AsColumnFamilyRef,
1484 key: K,
1485 value: V,
1486 writeopts: &WriteOptions,
1487 ) -> Result<(), Error>
1488 where
1489 K: AsRef<[u8]>,
1490 V: AsRef<[u8]>,
1491 {
1492 let key = key.as_ref();
1493 let value = value.as_ref();
1494
1495 unsafe {
1496 ffi_try!(ffi::rocksdb_put_cf(
1497 self.inner.inner(),
1498 writeopts.inner,
1499 cf.inner(),
1500 key.as_ptr() as *const c_char,
1501 key.len() as size_t,
1502 value.as_ptr() as *const c_char,
1503 value.len() as size_t,
1504 ));
1505 Ok(())
1506 }
1507 }
1508
1509 pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
1510 where
1511 K: AsRef<[u8]>,
1512 V: AsRef<[u8]>,
1513 {
1514 let key = key.as_ref();
1515 let value = value.as_ref();
1516
1517 unsafe {
1518 ffi_try!(ffi::rocksdb_merge(
1519 self.inner.inner(),
1520 writeopts.inner,
1521 key.as_ptr() as *const c_char,
1522 key.len() as size_t,
1523 value.as_ptr() as *const c_char,
1524 value.len() as size_t,
1525 ));
1526 Ok(())
1527 }
1528 }
1529
1530 pub fn merge_cf_opt<K, V>(
1531 &self,
1532 cf: &impl AsColumnFamilyRef,
1533 key: K,
1534 value: V,
1535 writeopts: &WriteOptions,
1536 ) -> Result<(), Error>
1537 where
1538 K: AsRef<[u8]>,
1539 V: AsRef<[u8]>,
1540 {
1541 let key = key.as_ref();
1542 let value = value.as_ref();
1543
1544 unsafe {
1545 ffi_try!(ffi::rocksdb_merge_cf(
1546 self.inner.inner(),
1547 writeopts.inner,
1548 cf.inner(),
1549 key.as_ptr() as *const c_char,
1550 key.len() as size_t,
1551 value.as_ptr() as *const c_char,
1552 value.len() as size_t,
1553 ));
1554 Ok(())
1555 }
1556 }
1557
1558 pub fn delete_opt<K: AsRef<[u8]>>(
1559 &self,
1560 key: K,
1561 writeopts: &WriteOptions,
1562 ) -> Result<(), Error> {
1563 let key = key.as_ref();
1564
1565 unsafe {
1566 ffi_try!(ffi::rocksdb_delete(
1567 self.inner.inner(),
1568 writeopts.inner,
1569 key.as_ptr() as *const c_char,
1570 key.len() as size_t,
1571 ));
1572 Ok(())
1573 }
1574 }
1575
1576 pub fn delete_cf_opt<K: AsRef<[u8]>>(
1577 &self,
1578 cf: &impl AsColumnFamilyRef,
1579 key: K,
1580 writeopts: &WriteOptions,
1581 ) -> Result<(), Error> {
1582 let key = key.as_ref();
1583
1584 unsafe {
1585 ffi_try!(ffi::rocksdb_delete_cf(
1586 self.inner.inner(),
1587 writeopts.inner,
1588 cf.inner(),
1589 key.as_ptr() as *const c_char,
1590 key.len() as size_t,
1591 ));
1592 Ok(())
1593 }
1594 }
1595
1596 pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
1597 where
1598 K: AsRef<[u8]>,
1599 V: AsRef<[u8]>,
1600 {
1601 self.put_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
1602 }
1603
1604 pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
1605 where
1606 K: AsRef<[u8]>,
1607 V: AsRef<[u8]>,
1608 {
1609 self.put_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default())
1610 }
1611
1612 pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
1613 where
1614 K: AsRef<[u8]>,
1615 V: AsRef<[u8]>,
1616 {
1617 self.merge_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
1618 }
1619
1620 pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
1621 where
1622 K: AsRef<[u8]>,
1623 V: AsRef<[u8]>,
1624 {
1625 self.merge_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default())
1626 }
1627
1628 pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
1629 self.delete_opt(key.as_ref(), &WriteOptions::default())
1630 }
1631
1632 pub fn delete_cf<K: AsRef<[u8]>>(
1633 &self,
1634 cf: &impl AsColumnFamilyRef,
1635 key: K,
1636 ) -> Result<(), Error> {
1637 self.delete_cf_opt(cf, key.as_ref(), &WriteOptions::default())
1638 }
1639
1640 pub fn compact_range<S: AsRef<[u8]>, E: AsRef<[u8]>>(&self, start: Option<S>, end: Option<E>) {
1642 unsafe {
1643 let start = start.as_ref().map(AsRef::as_ref);
1644 let end = end.as_ref().map(AsRef::as_ref);
1645
1646 ffi::rocksdb_compact_range(
1647 self.inner.inner(),
1648 opt_bytes_to_ptr(start),
1649 start.map_or(0, <[u8]>::len) as size_t,
1650 opt_bytes_to_ptr(end),
1651 end.map_or(0, <[u8]>::len) as size_t,
1652 );
1653 }
1654 }
1655
1656 pub fn compact_range_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1658 &self,
1659 start: Option<S>,
1660 end: Option<E>,
1661 opts: &CompactOptions,
1662 ) {
1663 unsafe {
1664 let start = start.as_ref().map(AsRef::as_ref);
1665 let end = end.as_ref().map(AsRef::as_ref);
1666
1667 ffi::rocksdb_compact_range_opt(
1668 self.inner.inner(),
1669 opts.inner,
1670 opt_bytes_to_ptr(start),
1671 start.map_or(0, <[u8]>::len) as size_t,
1672 opt_bytes_to_ptr(end),
1673 end.map_or(0, <[u8]>::len) as size_t,
1674 );
1675 }
1676 }
1677
1678 pub fn compact_range_cf<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1681 &self,
1682 cf: &impl AsColumnFamilyRef,
1683 start: Option<S>,
1684 end: Option<E>,
1685 ) {
1686 unsafe {
1687 let start = start.as_ref().map(AsRef::as_ref);
1688 let end = end.as_ref().map(AsRef::as_ref);
1689
1690 ffi::rocksdb_compact_range_cf(
1691 self.inner.inner(),
1692 cf.inner(),
1693 opt_bytes_to_ptr(start),
1694 start.map_or(0, <[u8]>::len) as size_t,
1695 opt_bytes_to_ptr(end),
1696 end.map_or(0, <[u8]>::len) as size_t,
1697 );
1698 }
1699 }
1700
1701 pub fn compact_range_cf_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1703 &self,
1704 cf: &impl AsColumnFamilyRef,
1705 start: Option<S>,
1706 end: Option<E>,
1707 opts: &CompactOptions,
1708 ) {
1709 unsafe {
1710 let start = start.as_ref().map(AsRef::as_ref);
1711 let end = end.as_ref().map(AsRef::as_ref);
1712
1713 ffi::rocksdb_compact_range_cf_opt(
1714 self.inner.inner(),
1715 cf.inner(),
1716 opts.inner,
1717 opt_bytes_to_ptr(start),
1718 start.map_or(0, <[u8]>::len) as size_t,
1719 opt_bytes_to_ptr(end),
1720 end.map_or(0, <[u8]>::len) as size_t,
1721 );
1722 }
1723 }
1724
1725 pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), Error> {
1726 let copts = convert_options(opts)?;
1727 let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
1728 let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
1729 let count = opts.len() as i32;
1730 unsafe {
1731 ffi_try!(ffi::rocksdb_set_options(
1732 self.inner.inner(),
1733 count,
1734 cnames.as_ptr(),
1735 cvalues.as_ptr(),
1736 ));
1737 }
1738 Ok(())
1739 }
1740
1741 pub fn set_options_cf(
1742 &self,
1743 cf: &impl AsColumnFamilyRef,
1744 opts: &[(&str, &str)],
1745 ) -> Result<(), Error> {
1746 let copts = convert_options(opts)?;
1747 let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
1748 let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
1749 let count = opts.len() as i32;
1750 unsafe {
1751 ffi_try!(ffi::rocksdb_set_options_cf(
1752 self.inner.inner(),
1753 cf.inner(),
1754 count,
1755 cnames.as_ptr(),
1756 cvalues.as_ptr(),
1757 ));
1758 }
1759 Ok(())
1760 }
1761
1762 fn property_value_impl<R>(
1771 name: impl CStrLike,
1772 get_property: impl FnOnce(*const c_char) -> *mut c_char,
1773 parse: impl FnOnce(&str) -> Result<R, Error>,
1774 ) -> Result<Option<R>, Error> {
1775 let value = match name.bake() {
1776 Ok(prop_name) => get_property(prop_name.as_ptr()),
1777 Err(e) => {
1778 return Err(Error::new(format!(
1779 "Failed to convert property name to CString: {e}"
1780 )));
1781 }
1782 };
1783 if value.is_null() {
1784 return Ok(None);
1785 }
1786 let result = match unsafe { CStr::from_ptr(value) }.to_str() {
1787 Ok(s) => parse(s).map(|value| Some(value)),
1788 Err(e) => Err(Error::new(format!(
1789 "Failed to convert property value to string: {e}"
1790 ))),
1791 };
1792 unsafe {
1793 libc::free(value as *mut c_void);
1794 }
1795 result
1796 }
1797
1798 pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
1803 Self::property_value_impl(
1804 name,
1805 |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
1806 |str_value| Ok(str_value.to_owned()),
1807 )
1808 }
1809
1810 pub fn property_value_cf(
1815 &self,
1816 cf: &impl AsColumnFamilyRef,
1817 name: impl CStrLike,
1818 ) -> Result<Option<String>, Error> {
1819 Self::property_value_impl(
1820 name,
1821 |prop_name| unsafe {
1822 ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
1823 },
1824 |str_value| Ok(str_value.to_owned()),
1825 )
1826 }
1827
1828 fn parse_property_int_value(value: &str) -> Result<u64, Error> {
1829 value.parse::<u64>().map_err(|err| {
1830 Error::new(format!(
1831 "Failed to convert property value {value} to int: {err}"
1832 ))
1833 })
1834 }
1835
1836 pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
1841 Self::property_value_impl(
1842 name,
1843 |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
1844 Self::parse_property_int_value,
1845 )
1846 }
1847
1848 pub fn property_int_value_cf(
1853 &self,
1854 cf: &impl AsColumnFamilyRef,
1855 name: impl CStrLike,
1856 ) -> Result<Option<u64>, Error> {
1857 Self::property_value_impl(
1858 name,
1859 |prop_name| unsafe {
1860 ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
1861 },
1862 Self::parse_property_int_value,
1863 )
1864 }
1865
1866 pub fn latest_sequence_number(&self) -> u64 {
1868 unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner.inner()) }
1869 }
1870
1871 pub fn get_updates_since(&self, seq_number: u64) -> Result<DBWALIterator, Error> {
1882 unsafe {
1883 let opts: *const ffi::rocksdb_wal_readoptions_t = ptr::null();
1887 let iter = ffi_try!(ffi::rocksdb_get_updates_since(
1888 self.inner.inner(),
1889 seq_number,
1890 opts
1891 ));
1892 Ok(DBWALIterator {
1893 inner: iter,
1894 start_seq_number: seq_number,
1895 })
1896 }
1897 }
1898
1899 pub fn try_catch_up_with_primary(&self) -> Result<(), Error> {
1902 unsafe {
1903 ffi_try!(ffi::rocksdb_try_catch_up_with_primary(self.inner.inner()));
1904 }
1905 Ok(())
1906 }
1907
1908 pub fn ingest_external_file<P: AsRef<Path>>(&self, paths: Vec<P>) -> Result<(), Error> {
1910 let opts = IngestExternalFileOptions::default();
1911 self.ingest_external_file_opts(&opts, paths)
1912 }
1913
1914 pub fn ingest_external_file_opts<P: AsRef<Path>>(
1916 &self,
1917 opts: &IngestExternalFileOptions,
1918 paths: Vec<P>,
1919 ) -> Result<(), Error> {
1920 let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
1921 let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
1922
1923 self.ingest_external_file_raw(opts, &paths_v, &cpaths)
1924 }
1925
1926 pub fn ingest_external_file_cf<P: AsRef<Path>>(
1929 &self,
1930 cf: &impl AsColumnFamilyRef,
1931 paths: Vec<P>,
1932 ) -> Result<(), Error> {
1933 let opts = IngestExternalFileOptions::default();
1934 self.ingest_external_file_cf_opts(cf, &opts, paths)
1935 }
1936
1937 pub fn ingest_external_file_cf_opts<P: AsRef<Path>>(
1939 &self,
1940 cf: &impl AsColumnFamilyRef,
1941 opts: &IngestExternalFileOptions,
1942 paths: Vec<P>,
1943 ) -> Result<(), Error> {
1944 let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
1945 let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
1946
1947 self.ingest_external_file_raw_cf(cf, opts, &paths_v, &cpaths)
1948 }
1949
1950 fn ingest_external_file_raw(
1951 &self,
1952 opts: &IngestExternalFileOptions,
1953 paths_v: &[CString],
1954 cpaths: &[*const c_char],
1955 ) -> Result<(), Error> {
1956 unsafe {
1957 ffi_try!(ffi::rocksdb_ingest_external_file(
1958 self.inner.inner(),
1959 cpaths.as_ptr(),
1960 paths_v.len(),
1961 opts.inner as *const _
1962 ));
1963 Ok(())
1964 }
1965 }
1966
1967 fn ingest_external_file_raw_cf(
1968 &self,
1969 cf: &impl AsColumnFamilyRef,
1970 opts: &IngestExternalFileOptions,
1971 paths_v: &[CString],
1972 cpaths: &[*const c_char],
1973 ) -> Result<(), Error> {
1974 unsafe {
1975 ffi_try!(ffi::rocksdb_ingest_external_file_cf(
1976 self.inner.inner(),
1977 cf.inner(),
1978 cpaths.as_ptr(),
1979 paths_v.len(),
1980 opts.inner as *const _
1981 ));
1982 Ok(())
1983 }
1984 }
1985
1986 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
1989 unsafe {
1990 let files = ffi::rocksdb_livefiles(self.inner.inner());
1991 if files.is_null() {
1992 Err(Error::new("Could not get live files".to_owned()))
1993 } else {
1994 let n = ffi::rocksdb_livefiles_count(files);
1995
1996 let mut livefiles = Vec::with_capacity(n as usize);
1997 let mut key_size: usize = 0;
1998
1999 for i in 0..n {
2000 let column_family_name =
2001 from_cstr(ffi::rocksdb_livefiles_column_family_name(files, i));
2002 let name = from_cstr(ffi::rocksdb_livefiles_name(files, i));
2003 let size = ffi::rocksdb_livefiles_size(files, i);
2004 let level = ffi::rocksdb_livefiles_level(files, i);
2005
2006 let smallest_key = ffi::rocksdb_livefiles_smallestkey(files, i, &mut key_size);
2008 let smallest_key = raw_data(smallest_key, key_size);
2009
2010 let largest_key = ffi::rocksdb_livefiles_largestkey(files, i, &mut key_size);
2012 let largest_key = raw_data(largest_key, key_size);
2013
2014 livefiles.push(LiveFile {
2015 column_family_name,
2016 name,
2017 size,
2018 level,
2019 start_key: smallest_key,
2020 end_key: largest_key,
2021 num_entries: ffi::rocksdb_livefiles_entries(files, i),
2022 num_deletions: ffi::rocksdb_livefiles_deletions(files, i),
2023 });
2024 }
2025
2026 ffi::rocksdb_livefiles_destroy(files);
2028
2029 Ok(livefiles)
2031 }
2032 }
2033 }
2034
2035 pub fn delete_file_in_range<K: AsRef<[u8]>>(&self, from: K, to: K) -> Result<(), Error> {
2044 let from = from.as_ref();
2045 let to = to.as_ref();
2046 unsafe {
2047 ffi_try!(ffi::rocksdb_delete_file_in_range(
2048 self.inner.inner(),
2049 from.as_ptr() as *const c_char,
2050 from.len() as size_t,
2051 to.as_ptr() as *const c_char,
2052 to.len() as size_t,
2053 ));
2054 Ok(())
2055 }
2056 }
2057
2058 pub fn delete_file_in_range_cf<K: AsRef<[u8]>>(
2060 &self,
2061 cf: &impl AsColumnFamilyRef,
2062 from: K,
2063 to: K,
2064 ) -> Result<(), Error> {
2065 let from = from.as_ref();
2066 let to = to.as_ref();
2067 unsafe {
2068 ffi_try!(ffi::rocksdb_delete_file_in_range_cf(
2069 self.inner.inner(),
2070 cf.inner(),
2071 from.as_ptr() as *const c_char,
2072 from.len() as size_t,
2073 to.as_ptr() as *const c_char,
2074 to.len() as size_t,
2075 ));
2076 Ok(())
2077 }
2078 }
2079
2080 pub fn cancel_all_background_work(&self, wait: bool) {
2082 unsafe {
2083 ffi::rocksdb_cancel_all_background_work(self.inner.inner(), c_uchar::from(wait));
2084 }
2085 }
2086
2087 fn drop_column_family<C>(
2088 &self,
2089 cf_inner: *mut ffi::rocksdb_column_family_handle_t,
2090 cf: C,
2091 ) -> Result<(), Error> {
2092 unsafe {
2093 ffi_try!(ffi::rocksdb_drop_column_family(
2095 self.inner.inner(),
2096 cf_inner
2097 ));
2098 }
2099 drop(cf);
2102 Ok(())
2103 }
2104}
2105
2106impl<I: DBInner> DBCommon<SingleThreaded, I> {
2107 pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
2109 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
2110 self.cfs
2111 .cfs
2112 .insert(name.as_ref().to_string(), ColumnFamily { inner });
2113 Ok(())
2114 }
2115
2116 pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
2118 if let Some(cf) = self.cfs.cfs.remove(name) {
2119 self.drop_column_family(cf.inner, cf)
2120 } else {
2121 Err(Error::new(format!("Invalid column family: {name}")))
2122 }
2123 }
2124
2125 pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
2127 self.cfs.cfs.get(name)
2128 }
2129}
2130
2131impl<I: DBInner> DBCommon<MultiThreaded, I> {
2132 pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
2134 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
2135 self.cfs.cfs.write().unwrap().insert(
2136 name.as_ref().to_string(),
2137 Arc::new(UnboundColumnFamily { inner }),
2138 );
2139 Ok(())
2140 }
2141
2142 pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
2145 if let Some(cf) = self.cfs.cfs.write().unwrap().remove(name) {
2146 self.drop_column_family(cf.inner, cf)
2147 } else {
2148 Err(Error::new(format!("Invalid column family: {name}")))
2149 }
2150 }
2151
2152 pub fn cf_handle(&self, name: &str) -> Option<Arc<BoundColumnFamily>> {
2154 self.cfs
2155 .cfs
2156 .read()
2157 .unwrap()
2158 .get(name)
2159 .cloned()
2160 .map(UnboundColumnFamily::bound_column_family)
2161 }
2162}
2163
2164impl<T: ThreadMode, I: DBInner> Drop for DBCommon<T, I> {
2165 fn drop(&mut self) {
2166 self.cfs.drop_all_cfs_internal();
2167 }
2168}
2169
2170impl<T: ThreadMode, I: DBInner> fmt::Debug for DBCommon<T, I> {
2171 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2172 write!(f, "RocksDB {{ path: {:?} }}", self.path())
2173 }
2174}
2175
2176#[derive(Debug, Clone)]
2178pub struct LiveFile {
2179 pub column_family_name: String,
2181 pub name: String,
2183 pub size: usize,
2185 pub level: i32,
2187 pub start_key: Option<Vec<u8>>,
2189 pub end_key: Option<Vec<u8>>,
2191 pub num_entries: u64,
2193 pub num_deletions: u64,
2195}
2196
2197fn convert_options(opts: &[(&str, &str)]) -> Result<Vec<(CString, CString)>, Error> {
2198 opts.iter()
2199 .map(|(name, value)| {
2200 let cname = match CString::new(name.as_bytes()) {
2201 Ok(cname) => cname,
2202 Err(e) => return Err(Error::new(format!("Invalid option name `{e}`"))),
2203 };
2204 let cvalue = match CString::new(value.as_bytes()) {
2205 Ok(cvalue) => cvalue,
2206 Err(e) => return Err(Error::new(format!("Invalid option value: `{e}`"))),
2207 };
2208 Ok((cname, cvalue))
2209 })
2210 .collect()
2211}
2212
2213pub(crate) fn convert_values(
2214 values: Vec<*mut c_char>,
2215 values_sizes: Vec<usize>,
2216 errors: Vec<*mut c_char>,
2217) -> Vec<Result<Option<Vec<u8>>, Error>> {
2218 values
2219 .into_iter()
2220 .zip(values_sizes.into_iter())
2221 .zip(errors.into_iter())
2222 .map(|((v, s), e)| {
2223 if e.is_null() {
2224 let value = unsafe { crate::ffi_util::raw_data(v, s) };
2225 unsafe {
2226 ffi::rocksdb_free(v as *mut c_void);
2227 }
2228 Ok(value)
2229 } else {
2230 Err(Error::new(crate::ffi_util::error_message(e)))
2231 }
2232 })
2233 .collect()
2234}