rocksdb/
db.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use crate::{
17    column_family::AsColumnFamilyRef,
18    column_family::BoundColumnFamily,
19    column_family::UnboundColumnFamily,
20    db_options::OptionsMustOutliveDB,
21    ffi,
22    ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath, CStrLike},
23    ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode,
24    DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, Direction, Error, FlushOptions,
25    IngestExternalFileOptions, IteratorMode, Options, ReadOptions, SnapshotWithThreadMode,
26    WriteBatch, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
27};
28
29use crate::ffi_util::CSlice;
30use libc::{self, c_char, c_int, c_uchar, c_void, size_t};
31use std::collections::BTreeMap;
32use std::ffi::{CStr, CString};
33use std::fmt;
34use std::fs;
35use std::iter;
36use std::path::Path;
37use std::path::PathBuf;
38use std::ptr;
39use std::slice;
40use std::str;
41use std::sync::Arc;
42use std::sync::RwLock;
43use std::time::Duration;
44
45/// Marker trait to specify single or multi threaded column family alternations for
46/// [`DBWithThreadMode<T>`]
47///
48/// This arrangement makes differences in self mutability and return type in
49/// some of `DBWithThreadMode` methods.
50///
51/// While being a marker trait to be generic over `DBWithThreadMode`, this trait
52/// also has a minimum set of not-encapsulated internal methods between
53/// [`SingleThreaded`] and [`MultiThreaded`].  These methods aren't expected to be
54/// called and defined externally.
55pub trait ThreadMode {
56    /// Internal implementation for storing column family handles
57    fn new_cf_map_internal(
58        cf_map: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
59    ) -> Self;
60    /// Internal implementation for dropping column family handles
61    fn drop_all_cfs_internal(&mut self);
62}
63
64/// Actual marker type for the marker trait `ThreadMode`, which holds
65/// a collection of column families without synchronization primitive, providing
66/// no overhead for the single-threaded column family alternations. The other
67/// mode is [`MultiThreaded`].
68///
69/// See [`DB`] for more details, including performance implications for each mode
70pub struct SingleThreaded {
71    pub(crate) cfs: BTreeMap<String, ColumnFamily>,
72}
73
74/// Actual marker type for the marker trait `ThreadMode`, which holds
75/// a collection of column families wrapped in a RwLock to be mutated
76/// concurrently. The other mode is [`SingleThreaded`].
77///
78/// See [`DB`] for more details, including performance implications for each mode
79pub struct MultiThreaded {
80    pub(crate) cfs: RwLock<BTreeMap<String, Arc<UnboundColumnFamily>>>,
81}
82
83impl ThreadMode for SingleThreaded {
84    fn new_cf_map_internal(
85        cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
86    ) -> Self {
87        Self {
88            cfs: cfs
89                .into_iter()
90                .map(|(n, c)| (n, ColumnFamily { inner: c }))
91                .collect(),
92        }
93    }
94
95    fn drop_all_cfs_internal(&mut self) {
96        // Cause all ColumnFamily objects to be Drop::drop()-ed.
97        self.cfs.clear();
98    }
99}
100
101impl ThreadMode for MultiThreaded {
102    fn new_cf_map_internal(
103        cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
104    ) -> Self {
105        Self {
106            cfs: RwLock::new(
107                cfs.into_iter()
108                    .map(|(n, c)| (n, Arc::new(UnboundColumnFamily { inner: c })))
109                    .collect(),
110            ),
111        }
112    }
113
114    fn drop_all_cfs_internal(&mut self) {
115        // Cause all UnboundColumnFamily objects to be Drop::drop()-ed.
116        self.cfs.write().unwrap().clear();
117    }
118}
119
120/// Get underlying `rocksdb_t`.
121pub trait DBInner {
122    fn inner(&self) -> *mut ffi::rocksdb_t;
123}
124
125/// A helper type to implement some common methods for [`DBWithThreadMode`]
126/// and [`OptimisticTransactionDB`].
127///
128/// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
129pub struct DBCommon<T: ThreadMode, D: DBInner> {
130    pub(crate) inner: D,
131    cfs: T, // Column families are held differently depending on thread mode
132    path: PathBuf,
133    _outlive: Vec<OptionsMustOutliveDB>,
134}
135
136/// Minimal set of DB-related methods, intended to be generic over
137/// `DBWithThreadMode<T>`. Mainly used internally
138pub trait DBAccess {
139    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t;
140
141    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t);
142
143    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t;
144
145    unsafe fn create_iterator_cf(
146        &self,
147        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
148        readopts: &ReadOptions,
149    ) -> *mut ffi::rocksdb_iterator_t;
150
151    fn get_opt<K: AsRef<[u8]>>(
152        &self,
153        key: K,
154        readopts: &ReadOptions,
155    ) -> Result<Option<Vec<u8>>, Error>;
156
157    fn get_cf_opt<K: AsRef<[u8]>>(
158        &self,
159        cf: &impl AsColumnFamilyRef,
160        key: K,
161        readopts: &ReadOptions,
162    ) -> Result<Option<Vec<u8>>, Error>;
163
164    fn get_pinned_opt<K: AsRef<[u8]>>(
165        &self,
166        key: K,
167        readopts: &ReadOptions,
168    ) -> Result<Option<DBPinnableSlice>, Error>;
169
170    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
171        &self,
172        cf: &impl AsColumnFamilyRef,
173        key: K,
174        readopts: &ReadOptions,
175    ) -> Result<Option<DBPinnableSlice>, Error>;
176
177    fn multi_get_opt<K, I>(
178        &self,
179        keys: I,
180        readopts: &ReadOptions,
181    ) -> Vec<Result<Option<Vec<u8>>, Error>>
182    where
183        K: AsRef<[u8]>,
184        I: IntoIterator<Item = K>;
185
186    fn multi_get_cf_opt<'b, K, I, W>(
187        &self,
188        keys_cf: I,
189        readopts: &ReadOptions,
190    ) -> Vec<Result<Option<Vec<u8>>, Error>>
191    where
192        K: AsRef<[u8]>,
193        I: IntoIterator<Item = (&'b W, K)>,
194        W: AsColumnFamilyRef + 'b;
195}
196
197impl<T: ThreadMode, D: DBInner> DBAccess for DBCommon<T, D> {
198    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
199        ffi::rocksdb_create_snapshot(self.inner.inner())
200    }
201
202    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
203        ffi::rocksdb_release_snapshot(self.inner.inner(), snapshot);
204    }
205
206    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
207        ffi::rocksdb_create_iterator(self.inner.inner(), readopts.inner)
208    }
209
210    unsafe fn create_iterator_cf(
211        &self,
212        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
213        readopts: &ReadOptions,
214    ) -> *mut ffi::rocksdb_iterator_t {
215        ffi::rocksdb_create_iterator_cf(self.inner.inner(), readopts.inner, cf_handle)
216    }
217
218    fn get_opt<K: AsRef<[u8]>>(
219        &self,
220        key: K,
221        readopts: &ReadOptions,
222    ) -> Result<Option<Vec<u8>>, Error> {
223        self.get_opt(key, readopts)
224    }
225
226    fn get_cf_opt<K: AsRef<[u8]>>(
227        &self,
228        cf: &impl AsColumnFamilyRef,
229        key: K,
230        readopts: &ReadOptions,
231    ) -> Result<Option<Vec<u8>>, Error> {
232        self.get_cf_opt(cf, key, readopts)
233    }
234
235    fn get_pinned_opt<K: AsRef<[u8]>>(
236        &self,
237        key: K,
238        readopts: &ReadOptions,
239    ) -> Result<Option<DBPinnableSlice>, Error> {
240        self.get_pinned_opt(key, readopts)
241    }
242
243    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
244        &self,
245        cf: &impl AsColumnFamilyRef,
246        key: K,
247        readopts: &ReadOptions,
248    ) -> Result<Option<DBPinnableSlice>, Error> {
249        self.get_pinned_cf_opt(cf, key, readopts)
250    }
251
252    fn multi_get_opt<K, Iter>(
253        &self,
254        keys: Iter,
255        readopts: &ReadOptions,
256    ) -> Vec<Result<Option<Vec<u8>>, Error>>
257    where
258        K: AsRef<[u8]>,
259        Iter: IntoIterator<Item = K>,
260    {
261        self.multi_get_opt(keys, readopts)
262    }
263
264    fn multi_get_cf_opt<'b, K, Iter, W>(
265        &self,
266        keys_cf: Iter,
267        readopts: &ReadOptions,
268    ) -> Vec<Result<Option<Vec<u8>>, Error>>
269    where
270        K: AsRef<[u8]>,
271        Iter: IntoIterator<Item = (&'b W, K)>,
272        W: AsColumnFamilyRef + 'b,
273    {
274        self.multi_get_cf_opt(keys_cf, readopts)
275    }
276}
277
278pub struct DBWithThreadModeInner {
279    inner: *mut ffi::rocksdb_t,
280}
281
282impl DBInner for DBWithThreadModeInner {
283    fn inner(&self) -> *mut ffi::rocksdb_t {
284        self.inner
285    }
286}
287
288impl Drop for DBWithThreadModeInner {
289    fn drop(&mut self) {
290        unsafe {
291            ffi::rocksdb_close(self.inner);
292        }
293    }
294}
295
296/// A type alias to RocksDB database.
297///
298/// See crate level documentation for a simple usage example.
299/// See [`DBCommon`] for full list of methods.
300pub type DBWithThreadMode<T> = DBCommon<T, DBWithThreadModeInner>;
301
302/// A type alias to DB instance type with the single-threaded column family
303/// creations/deletions
304///
305/// # Compatibility and multi-threaded mode
306///
307/// Previously, [`DB`] was defined as a direct `struct`. Now, it's type-aliased for
308/// compatibility. Use `DBCommon<MultiThreaded>` for multi-threaded
309/// column family alternations.
310///
311/// # Limited performance implication for single-threaded mode
312///
313/// Even with [`SingleThreaded`], almost all of RocksDB operations is
314/// multi-threaded unless the underlying RocksDB instance is
315/// specifically configured otherwise. `SingleThreaded` only forces
316/// serialization of column family alternations by requiring `&mut self` of DB
317/// instance due to its wrapper implementation details.
318///
319/// # Multi-threaded mode
320///
321/// [`MultiThreaded`] can be appropriate for the situation of multi-threaded
322/// workload including multi-threaded column family alternations, costing the
323/// RwLock overhead inside `DB`.
324#[cfg(not(feature = "multi-threaded-cf"))]
325pub type DB = DBWithThreadMode<SingleThreaded>;
326
327#[cfg(feature = "multi-threaded-cf")]
328pub type DB = DBWithThreadMode<MultiThreaded>;
329
330// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI
331// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and
332// rocksdb internally does not rely on thread-local information for its user-exposed types.
333unsafe impl<T: ThreadMode + Send, I: DBInner> Send for DBCommon<T, I> {}
334
335// Sync is similarly safe for many types because they do not expose interior mutability, and their
336// use within the rocksdb library is generally behind a const reference
337unsafe impl<T: ThreadMode, I: DBInner> Sync for DBCommon<T, I> {}
338
339// Specifies whether open DB for read only.
340enum AccessType<'a> {
341    ReadWrite,
342    ReadOnly { error_if_log_file_exist: bool },
343    Secondary { secondary_path: &'a Path },
344    WithTTL { ttl: Duration },
345}
346
347/// Methods of `DBWithThreadMode`.
348impl<T: ThreadMode> DBWithThreadMode<T> {
349    /// Opens a database with default options.
350    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
351        let mut opts = Options::default();
352        opts.create_if_missing(true);
353        Self::open(&opts, path)
354    }
355
356    /// Opens the database with the specified options.
357    pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
358        Self::open_cf(opts, path, None::<&str>)
359    }
360
361    /// Opens the database for read only with the specified options.
362    pub fn open_for_read_only<P: AsRef<Path>>(
363        opts: &Options,
364        path: P,
365        error_if_log_file_exist: bool,
366    ) -> Result<Self, Error> {
367        Self::open_cf_for_read_only(opts, path, None::<&str>, error_if_log_file_exist)
368    }
369
370    /// Opens the database as a secondary.
371    pub fn open_as_secondary<P: AsRef<Path>>(
372        opts: &Options,
373        primary_path: P,
374        secondary_path: P,
375    ) -> Result<Self, Error> {
376        Self::open_cf_as_secondary(opts, primary_path, secondary_path, None::<&str>)
377    }
378
379    /// Opens the database with a Time to Live compaction filter.
380    pub fn open_with_ttl<P: AsRef<Path>>(
381        opts: &Options,
382        path: P,
383        ttl: Duration,
384    ) -> Result<Self, Error> {
385        Self::open_cf_descriptors_with_ttl(opts, path, std::iter::empty(), ttl)
386    }
387
388    /// Opens the database with a Time to Live compaction filter and column family names.
389    ///
390    /// Column families opened using this function will be created with default `Options`.
391    pub fn open_cf_with_ttl<P, I, N>(
392        opts: &Options,
393        path: P,
394        cfs: I,
395        ttl: Duration,
396    ) -> Result<Self, Error>
397    where
398        P: AsRef<Path>,
399        I: IntoIterator<Item = N>,
400        N: AsRef<str>,
401    {
402        let cfs = cfs
403            .into_iter()
404            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
405
406        Self::open_cf_descriptors_with_ttl(opts, path, cfs, ttl)
407    }
408
409    /// Opens a database with the given database with a Time to Live compaction filter and
410    /// column family descriptors.
411    pub fn open_cf_descriptors_with_ttl<P, I>(
412        opts: &Options,
413        path: P,
414        cfs: I,
415        ttl: Duration,
416    ) -> Result<Self, Error>
417    where
418        P: AsRef<Path>,
419        I: IntoIterator<Item = ColumnFamilyDescriptor>,
420    {
421        Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::WithTTL { ttl })
422    }
423
424    /// Opens a database with the given database options and column family names.
425    ///
426    /// Column families opened using this function will be created with default `Options`.
427    pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
428    where
429        P: AsRef<Path>,
430        I: IntoIterator<Item = N>,
431        N: AsRef<str>,
432    {
433        let cfs = cfs
434            .into_iter()
435            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
436
437        Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
438    }
439
440    /// Opens a database with the given database options and column family names.
441    ///
442    /// Column families opened using given `Options`.
443    pub fn open_cf_with_opts<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
444    where
445        P: AsRef<Path>,
446        I: IntoIterator<Item = (N, Options)>,
447        N: AsRef<str>,
448    {
449        let cfs = cfs
450            .into_iter()
451            .map(|(name, opts)| ColumnFamilyDescriptor::new(name.as_ref(), opts));
452
453        Self::open_cf_descriptors(opts, path, cfs)
454    }
455
456    /// Opens a database for read only with the given database options and column family names.
457    pub fn open_cf_for_read_only<P, I, N>(
458        opts: &Options,
459        path: P,
460        cfs: I,
461        error_if_log_file_exist: bool,
462    ) -> Result<Self, Error>
463    where
464        P: AsRef<Path>,
465        I: IntoIterator<Item = N>,
466        N: AsRef<str>,
467    {
468        let cfs = cfs
469            .into_iter()
470            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
471
472        Self::open_cf_descriptors_internal(
473            opts,
474            path,
475            cfs,
476            &AccessType::ReadOnly {
477                error_if_log_file_exist,
478            },
479        )
480    }
481
482    /// Opens a database for read only with the given database options and column family names.
483    pub fn open_cf_with_opts_for_read_only<P, I, N>(
484        db_opts: &Options,
485        path: P,
486        cfs: I,
487        error_if_log_file_exist: bool,
488    ) -> Result<Self, Error>
489    where
490        P: AsRef<Path>,
491        I: IntoIterator<Item = (N, Options)>,
492        N: AsRef<str>,
493    {
494        let cfs = cfs
495            .into_iter()
496            .map(|(name, cf_opts)| ColumnFamilyDescriptor::new(name.as_ref(), cf_opts));
497
498        Self::open_cf_descriptors_internal(
499            db_opts,
500            path,
501            cfs,
502            &AccessType::ReadOnly {
503                error_if_log_file_exist,
504            },
505        )
506    }
507
508    /// Opens a database for ready only with the given database options and
509    /// column family descriptors.
510    pub fn open_cf_descriptors_read_only<P, I>(
511        opts: &Options,
512        path: P,
513        cfs: I,
514        error_if_log_file_exist: bool,
515    ) -> Result<Self, Error>
516    where
517        P: AsRef<Path>,
518        I: IntoIterator<Item = ColumnFamilyDescriptor>,
519    {
520        Self::open_cf_descriptors_internal(
521            opts,
522            path,
523            cfs,
524            &AccessType::ReadOnly {
525                error_if_log_file_exist,
526            },
527        )
528    }
529
530    /// Opens the database as a secondary with the given database options and column family names.
531    pub fn open_cf_as_secondary<P, I, N>(
532        opts: &Options,
533        primary_path: P,
534        secondary_path: P,
535        cfs: I,
536    ) -> Result<Self, Error>
537    where
538        P: AsRef<Path>,
539        I: IntoIterator<Item = N>,
540        N: AsRef<str>,
541    {
542        let cfs = cfs
543            .into_iter()
544            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
545
546        Self::open_cf_descriptors_internal(
547            opts,
548            primary_path,
549            cfs,
550            &AccessType::Secondary {
551                secondary_path: secondary_path.as_ref(),
552            },
553        )
554    }
555
556    /// Opens the database as a secondary with the given database options and
557    /// column family descriptors.
558    pub fn open_cf_descriptors_as_secondary<P, I>(
559        opts: &Options,
560        path: P,
561        secondary_path: P,
562        cfs: I,
563    ) -> Result<Self, Error>
564    where
565        P: AsRef<Path>,
566        I: IntoIterator<Item = ColumnFamilyDescriptor>,
567    {
568        Self::open_cf_descriptors_internal(
569            opts,
570            path,
571            cfs,
572            &AccessType::Secondary {
573                secondary_path: secondary_path.as_ref(),
574            },
575        )
576    }
577
578    /// Opens a database with the given database options and column family descriptors.
579    pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
580    where
581        P: AsRef<Path>,
582        I: IntoIterator<Item = ColumnFamilyDescriptor>,
583    {
584        Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
585    }
586
587    /// Internal implementation for opening RocksDB.
588    fn open_cf_descriptors_internal<P, I>(
589        opts: &Options,
590        path: P,
591        cfs: I,
592        access_type: &AccessType,
593    ) -> Result<Self, Error>
594    where
595        P: AsRef<Path>,
596        I: IntoIterator<Item = ColumnFamilyDescriptor>,
597    {
598        let cfs: Vec<_> = cfs.into_iter().collect();
599        let outlive = iter::once(opts.outlive.clone())
600            .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
601            .collect();
602
603        let cpath = to_cpath(&path)?;
604
605        if let Err(e) = fs::create_dir_all(&path) {
606            return Err(Error::new(format!(
607                "Failed to create RocksDB directory: `{e:?}`."
608            )));
609        }
610
611        let db: *mut ffi::rocksdb_t;
612        let mut cf_map = BTreeMap::new();
613
614        if cfs.is_empty() {
615            db = Self::open_raw(opts, &cpath, access_type)?;
616        } else {
617            let mut cfs_v = cfs;
618            // Always open the default column family.
619            if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
620                cfs_v.push(ColumnFamilyDescriptor {
621                    name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
622                    options: Options::default(),
623                });
624            }
625            // We need to store our CStrings in an intermediate vector
626            // so that their pointers remain valid.
627            let c_cfs: Vec<CString> = cfs_v
628                .iter()
629                .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
630                .collect();
631
632            let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
633
634            // These handles will be populated by DB.
635            let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
636
637            let cfopts: Vec<_> = cfs_v
638                .iter()
639                .map(|cf| cf.options.inner as *const _)
640                .collect();
641
642            db = Self::open_cf_raw(
643                opts,
644                &cpath,
645                &cfs_v,
646                &cfnames,
647                &cfopts,
648                &mut cfhandles,
649                access_type,
650            )?;
651            for handle in &cfhandles {
652                if handle.is_null() {
653                    return Err(Error::new(
654                        "Received null column family handle from DB.".to_owned(),
655                    ));
656                }
657            }
658
659            for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
660                cf_map.insert(cf_desc.name.clone(), inner);
661            }
662        }
663
664        if db.is_null() {
665            return Err(Error::new("Could not initialize database.".to_owned()));
666        }
667
668        Ok(Self {
669            inner: DBWithThreadModeInner { inner: db },
670            path: path.as_ref().to_path_buf(),
671            cfs: T::new_cf_map_internal(cf_map),
672            _outlive: outlive,
673        })
674    }
675
676    fn open_raw(
677        opts: &Options,
678        cpath: &CString,
679        access_type: &AccessType,
680    ) -> Result<*mut ffi::rocksdb_t, Error> {
681        let db = unsafe {
682            match *access_type {
683                AccessType::ReadOnly {
684                    error_if_log_file_exist,
685                } => ffi_try!(ffi::rocksdb_open_for_read_only(
686                    opts.inner,
687                    cpath.as_ptr(),
688                    c_uchar::from(error_if_log_file_exist),
689                )),
690                AccessType::ReadWrite => {
691                    ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr()))
692                }
693                AccessType::Secondary { secondary_path } => {
694                    ffi_try!(ffi::rocksdb_open_as_secondary(
695                        opts.inner,
696                        cpath.as_ptr(),
697                        to_cpath(secondary_path)?.as_ptr(),
698                    ))
699                }
700                AccessType::WithTTL { ttl } => ffi_try!(ffi::rocksdb_open_with_ttl(
701                    opts.inner,
702                    cpath.as_ptr(),
703                    ttl.as_secs() as c_int,
704                )),
705            }
706        };
707        Ok(db)
708    }
709
710    #[allow(clippy::pedantic)]
711    fn open_cf_raw(
712        opts: &Options,
713        cpath: &CString,
714        cfs_v: &[ColumnFamilyDescriptor],
715        cfnames: &[*const c_char],
716        cfopts: &[*const ffi::rocksdb_options_t],
717        cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
718        access_type: &AccessType,
719    ) -> Result<*mut ffi::rocksdb_t, Error> {
720        let db = unsafe {
721            match *access_type {
722                AccessType::ReadOnly {
723                    error_if_log_file_exist,
724                } => ffi_try!(ffi::rocksdb_open_for_read_only_column_families(
725                    opts.inner,
726                    cpath.as_ptr(),
727                    cfs_v.len() as c_int,
728                    cfnames.as_ptr(),
729                    cfopts.as_ptr(),
730                    cfhandles.as_mut_ptr(),
731                    c_uchar::from(error_if_log_file_exist),
732                )),
733                AccessType::ReadWrite => ffi_try!(ffi::rocksdb_open_column_families(
734                    opts.inner,
735                    cpath.as_ptr(),
736                    cfs_v.len() as c_int,
737                    cfnames.as_ptr(),
738                    cfopts.as_ptr(),
739                    cfhandles.as_mut_ptr(),
740                )),
741                AccessType::Secondary { secondary_path } => {
742                    ffi_try!(ffi::rocksdb_open_as_secondary_column_families(
743                        opts.inner,
744                        cpath.as_ptr(),
745                        to_cpath(secondary_path)?.as_ptr(),
746                        cfs_v.len() as c_int,
747                        cfnames.as_ptr(),
748                        cfopts.as_ptr(),
749                        cfhandles.as_mut_ptr(),
750                    ))
751                }
752                AccessType::WithTTL { ttl } => {
753                    let ttls_v = vec![ttl.as_secs() as c_int; cfs_v.len()];
754                    ffi_try!(ffi::rocksdb_open_column_families_with_ttl(
755                        opts.inner,
756                        cpath.as_ptr(),
757                        cfs_v.len() as c_int,
758                        cfnames.as_ptr(),
759                        cfopts.as_ptr(),
760                        cfhandles.as_mut_ptr(),
761                        ttls_v.as_ptr(),
762                    ))
763                }
764            }
765        };
766        Ok(db)
767    }
768
769    /// Removes the database entries in the range `["from", "to")` using given write options.
770    pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
771        &self,
772        cf: &impl AsColumnFamilyRef,
773        from: K,
774        to: K,
775        writeopts: &WriteOptions,
776    ) -> Result<(), Error> {
777        let from = from.as_ref();
778        let to = to.as_ref();
779
780        unsafe {
781            ffi_try!(ffi::rocksdb_delete_range_cf(
782                self.inner.inner(),
783                writeopts.inner,
784                cf.inner(),
785                from.as_ptr() as *const c_char,
786                from.len() as size_t,
787                to.as_ptr() as *const c_char,
788                to.len() as size_t,
789            ));
790            Ok(())
791        }
792    }
793
794    /// Removes the database entries in the range `["from", "to")` using default write options.
795    pub fn delete_range_cf<K: AsRef<[u8]>>(
796        &self,
797        cf: &impl AsColumnFamilyRef,
798        from: K,
799        to: K,
800    ) -> Result<(), Error> {
801        self.delete_range_cf_opt(cf, from, to, &WriteOptions::default())
802    }
803
804    pub fn write_opt(&self, batch: WriteBatch, writeopts: &WriteOptions) -> Result<(), Error> {
805        unsafe {
806            ffi_try!(ffi::rocksdb_write(
807                self.inner.inner(),
808                writeopts.inner,
809                batch.inner
810            ));
811        }
812        Ok(())
813    }
814
815    pub fn write(&self, batch: WriteBatch) -> Result<(), Error> {
816        self.write_opt(batch, &WriteOptions::default())
817    }
818
819    pub fn write_without_wal(&self, batch: WriteBatch) -> Result<(), Error> {
820        let mut wo = WriteOptions::new();
821        wo.disable_wal(true);
822        self.write_opt(batch, &wo)
823    }
824}
825
826/// Common methods of `DBWithThreadMode` and `OptimisticTransactionDB`.
827impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
828    pub(crate) fn new(inner: D, cfs: T, path: PathBuf, outlive: Vec<OptionsMustOutliveDB>) -> Self {
829        Self {
830            inner,
831            cfs,
832            path,
833            _outlive: outlive,
834        }
835    }
836
837    pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
838        let cpath = to_cpath(path)?;
839        let mut length = 0;
840
841        unsafe {
842            let ptr = ffi_try!(ffi::rocksdb_list_column_families(
843                opts.inner,
844                cpath.as_ptr(),
845                &mut length,
846            ));
847
848            let vec = slice::from_raw_parts(ptr, length)
849                .iter()
850                .map(|ptr| CStr::from_ptr(*ptr).to_string_lossy().into_owned())
851                .collect();
852            ffi::rocksdb_list_column_families_destroy(ptr, length);
853            Ok(vec)
854        }
855    }
856
857    pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
858        let cpath = to_cpath(path)?;
859        unsafe {
860            ffi_try!(ffi::rocksdb_destroy_db(opts.inner, cpath.as_ptr()));
861        }
862        Ok(())
863    }
864
865    pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
866        let cpath = to_cpath(path)?;
867        unsafe {
868            ffi_try!(ffi::rocksdb_repair_db(opts.inner, cpath.as_ptr()));
869        }
870        Ok(())
871    }
872
873    pub fn path(&self) -> &Path {
874        self.path.as_path()
875    }
876
877    /// Flushes the WAL buffer. If `sync` is set to `true`, also syncs
878    /// the data to disk.
879    pub fn flush_wal(&self, sync: bool) -> Result<(), Error> {
880        unsafe {
881            ffi_try!(ffi::rocksdb_flush_wal(
882                self.inner.inner(),
883                c_uchar::from(sync)
884            ));
885        }
886        Ok(())
887    }
888
889    /// Flushes database memtables to SST files on the disk.
890    pub fn flush_opt(&self, flushopts: &FlushOptions) -> Result<(), Error> {
891        unsafe {
892            ffi_try!(ffi::rocksdb_flush(self.inner.inner(), flushopts.inner));
893        }
894        Ok(())
895    }
896
897    /// Flushes database memtables to SST files on the disk using default options.
898    pub fn flush(&self) -> Result<(), Error> {
899        self.flush_opt(&FlushOptions::default())
900    }
901
902    /// Flushes database memtables to SST files on the disk for a given column family.
903    pub fn flush_cf_opt(
904        &self,
905        cf: &impl AsColumnFamilyRef,
906        flushopts: &FlushOptions,
907    ) -> Result<(), Error> {
908        unsafe {
909            ffi_try!(ffi::rocksdb_flush_cf(
910                self.inner.inner(),
911                flushopts.inner,
912                cf.inner()
913            ));
914        }
915        Ok(())
916    }
917
918    /// Flushes database memtables to SST files on the disk for a given column family using default
919    /// options.
920    pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> {
921        self.flush_cf_opt(cf, &FlushOptions::default())
922    }
923
924    /// Return the bytes associated with a key value with read options. If you only intend to use
925    /// the vector returned temporarily, consider using [`get_pinned_opt`](#method.get_pinned_opt)
926    /// to avoid unnecessary memory copy.
927    pub fn get_opt<K: AsRef<[u8]>>(
928        &self,
929        key: K,
930        readopts: &ReadOptions,
931    ) -> Result<Option<Vec<u8>>, Error> {
932        self.get_pinned_opt(key, readopts)
933            .map(|x| x.map(|v| v.as_ref().to_vec()))
934    }
935
936    /// Return the bytes associated with a key value. If you only intend to use the vector returned
937    /// temporarily, consider using [`get_pinned`](#method.get_pinned) to avoid unnecessary memory
938    /// copy.
939    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
940        self.get_opt(key.as_ref(), &ReadOptions::default())
941    }
942
943    /// Return the bytes associated with a key value and the given column family with read options.
944    /// If you only intend to use the vector returned temporarily, consider using
945    /// [`get_pinned_cf_opt`](#method.get_pinned_cf_opt) to avoid unnecessary memory.
946    pub fn get_cf_opt<K: AsRef<[u8]>>(
947        &self,
948        cf: &impl AsColumnFamilyRef,
949        key: K,
950        readopts: &ReadOptions,
951    ) -> Result<Option<Vec<u8>>, Error> {
952        self.get_pinned_cf_opt(cf, key, readopts)
953            .map(|x| x.map(|v| v.as_ref().to_vec()))
954    }
955
956    /// Return the bytes associated with a key value and the given column family. If you only
957    /// intend to use the vector returned temporarily, consider using
958    /// [`get_pinned_cf`](#method.get_pinned_cf) to avoid unnecessary memory.
959    pub fn get_cf<K: AsRef<[u8]>>(
960        &self,
961        cf: &impl AsColumnFamilyRef,
962        key: K,
963    ) -> Result<Option<Vec<u8>>, Error> {
964        self.get_cf_opt(cf, key.as_ref(), &ReadOptions::default())
965    }
966
967    /// Return the value associated with a key using RocksDB's PinnableSlice
968    /// so as to avoid unnecessary memory copy.
969    pub fn get_pinned_opt<K: AsRef<[u8]>>(
970        &self,
971        key: K,
972        readopts: &ReadOptions,
973    ) -> Result<Option<DBPinnableSlice>, Error> {
974        if readopts.inner.is_null() {
975            return Err(Error::new(
976                "Unable to create RocksDB read options. This is a fairly trivial call, and its \
977                 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
978                    .to_owned(),
979            ));
980        }
981
982        let key = key.as_ref();
983        unsafe {
984            let val = ffi_try!(ffi::rocksdb_get_pinned(
985                self.inner.inner(),
986                readopts.inner,
987                key.as_ptr() as *const c_char,
988                key.len() as size_t,
989            ));
990            if val.is_null() {
991                Ok(None)
992            } else {
993                Ok(Some(DBPinnableSlice::from_c(val)))
994            }
995        }
996    }
997
998    /// Return the value associated with a key using RocksDB's PinnableSlice
999    /// so as to avoid unnecessary memory copy. Similar to get_pinned_opt but
1000    /// leverages default options.
1001    pub fn get_pinned<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBPinnableSlice>, Error> {
1002        self.get_pinned_opt(key, &ReadOptions::default())
1003    }
1004
1005    /// Return the value associated with a key using RocksDB's PinnableSlice
1006    /// so as to avoid unnecessary memory copy. Similar to get_pinned_opt but
1007    /// allows specifying ColumnFamily
1008    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
1009        &self,
1010        cf: &impl AsColumnFamilyRef,
1011        key: K,
1012        readopts: &ReadOptions,
1013    ) -> Result<Option<DBPinnableSlice>, Error> {
1014        if readopts.inner.is_null() {
1015            return Err(Error::new(
1016                "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1017                 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1018                    .to_owned(),
1019            ));
1020        }
1021
1022        let key = key.as_ref();
1023        unsafe {
1024            let val = ffi_try!(ffi::rocksdb_get_pinned_cf(
1025                self.inner.inner(),
1026                readopts.inner,
1027                cf.inner(),
1028                key.as_ptr() as *const c_char,
1029                key.len() as size_t,
1030            ));
1031            if val.is_null() {
1032                Ok(None)
1033            } else {
1034                Ok(Some(DBPinnableSlice::from_c(val)))
1035            }
1036        }
1037    }
1038
1039    /// Return the value associated with a key using RocksDB's PinnableSlice
1040    /// so as to avoid unnecessary memory copy. Similar to get_pinned_cf_opt but
1041    /// leverages default options.
1042    pub fn get_pinned_cf<K: AsRef<[u8]>>(
1043        &self,
1044        cf: &impl AsColumnFamilyRef,
1045        key: K,
1046    ) -> Result<Option<DBPinnableSlice>, Error> {
1047        self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
1048    }
1049
1050    /// Return the values associated with the given keys.
1051    pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
1052    where
1053        K: AsRef<[u8]>,
1054        I: IntoIterator<Item = K>,
1055    {
1056        self.multi_get_opt(keys, &ReadOptions::default())
1057    }
1058
1059    /// Return the values associated with the given keys using read options.
1060    pub fn multi_get_opt<K, I>(
1061        &self,
1062        keys: I,
1063        readopts: &ReadOptions,
1064    ) -> Vec<Result<Option<Vec<u8>>, Error>>
1065    where
1066        K: AsRef<[u8]>,
1067        I: IntoIterator<Item = K>,
1068    {
1069        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
1070            .into_iter()
1071            .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
1072            .unzip();
1073        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
1074
1075        let mut values = vec![ptr::null_mut(); keys.len()];
1076        let mut values_sizes = vec![0_usize; keys.len()];
1077        let mut errors = vec![ptr::null_mut(); keys.len()];
1078        unsafe {
1079            ffi::rocksdb_multi_get(
1080                self.inner.inner(),
1081                readopts.inner,
1082                ptr_keys.len(),
1083                ptr_keys.as_ptr(),
1084                keys_sizes.as_ptr(),
1085                values.as_mut_ptr(),
1086                values_sizes.as_mut_ptr(),
1087                errors.as_mut_ptr(),
1088            );
1089        }
1090
1091        convert_values(values, values_sizes, errors)
1092    }
1093
1094    /// Return the values associated with the given keys and column families.
1095    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
1096        &'a self,
1097        keys: I,
1098    ) -> Vec<Result<Option<Vec<u8>>, Error>>
1099    where
1100        K: AsRef<[u8]>,
1101        I: IntoIterator<Item = (&'b W, K)>,
1102        W: 'b + AsColumnFamilyRef,
1103    {
1104        self.multi_get_cf_opt(keys, &ReadOptions::default())
1105    }
1106
1107    /// Return the values associated with the given keys and column families using read options.
1108    pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
1109        &'a self,
1110        keys: I,
1111        readopts: &ReadOptions,
1112    ) -> Vec<Result<Option<Vec<u8>>, Error>>
1113    where
1114        K: AsRef<[u8]>,
1115        I: IntoIterator<Item = (&'b W, K)>,
1116        W: 'b + AsColumnFamilyRef,
1117    {
1118        let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
1119            .into_iter()
1120            .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
1121            .unzip();
1122        let ptr_keys: Vec<_> = cfs_and_keys
1123            .iter()
1124            .map(|(_, k)| k.as_ptr() as *const c_char)
1125            .collect();
1126        let ptr_cfs: Vec<_> = cfs_and_keys
1127            .iter()
1128            .map(|(c, _)| c.inner() as *const _)
1129            .collect();
1130
1131        let mut values = vec![ptr::null_mut(); ptr_keys.len()];
1132        let mut values_sizes = vec![0_usize; ptr_keys.len()];
1133        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1134        unsafe {
1135            ffi::rocksdb_multi_get_cf(
1136                self.inner.inner(),
1137                readopts.inner,
1138                ptr_cfs.as_ptr(),
1139                ptr_keys.len(),
1140                ptr_keys.as_ptr(),
1141                keys_sizes.as_ptr(),
1142                values.as_mut_ptr(),
1143                values_sizes.as_mut_ptr(),
1144                errors.as_mut_ptr(),
1145            );
1146        }
1147
1148        convert_values(values, values_sizes, errors)
1149    }
1150
1151    /// Return the values associated with the given keys and the specified column family
1152    /// where internally the read requests are processed in batch if block-based table
1153    /// SST format is used.  It is a more optimized version of multi_get_cf.
1154    pub fn batched_multi_get_cf<K, I>(
1155        &self,
1156        cf: &impl AsColumnFamilyRef,
1157        keys: I,
1158        sorted_input: bool,
1159    ) -> Vec<Result<Option<DBPinnableSlice>, Error>>
1160    where
1161        K: AsRef<[u8]>,
1162        I: IntoIterator<Item = K>,
1163    {
1164        self.batched_multi_get_cf_opt(cf, keys, sorted_input, &ReadOptions::default())
1165    }
1166
1167    /// Return the values associated with the given keys and the specified column family
1168    /// where internally the read requests are processed in batch if block-based table
1169    /// SST format is used.  It is a more optimized version of multi_get_cf_opt.
1170    pub fn batched_multi_get_cf_opt<K, I>(
1171        &self,
1172        cf: &impl AsColumnFamilyRef,
1173        keys: I,
1174        sorted_input: bool,
1175        readopts: &ReadOptions,
1176    ) -> Vec<Result<Option<DBPinnableSlice>, Error>>
1177    where
1178        K: AsRef<[u8]>,
1179        I: IntoIterator<Item = K>,
1180    {
1181        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
1182            .into_iter()
1183            .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
1184            .unzip();
1185        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
1186
1187        let mut pinned_values = vec![ptr::null_mut(); ptr_keys.len()];
1188        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1189
1190        unsafe {
1191            ffi::rocksdb_batched_multi_get_cf(
1192                self.inner.inner(),
1193                readopts.inner,
1194                cf.inner(),
1195                ptr_keys.len(),
1196                ptr_keys.as_ptr(),
1197                keys_sizes.as_ptr(),
1198                pinned_values.as_mut_ptr(),
1199                errors.as_mut_ptr(),
1200                sorted_input,
1201            );
1202            pinned_values
1203                .into_iter()
1204                .zip(errors.into_iter())
1205                .map(|(v, e)| {
1206                    if e.is_null() {
1207                        if v.is_null() {
1208                            Ok(None)
1209                        } else {
1210                            Ok(Some(DBPinnableSlice::from_c(v)))
1211                        }
1212                    } else {
1213                        Err(Error::new(crate::ffi_util::error_message(e)))
1214                    }
1215                })
1216                .collect()
1217        }
1218    }
1219
1220    /// Returns `false` if the given key definitely doesn't exist in the database, otherwise returns
1221    /// `true`. This function uses default `ReadOptions`.
1222    pub fn key_may_exist<K: AsRef<[u8]>>(&self, key: K) -> bool {
1223        self.key_may_exist_opt(key, &ReadOptions::default())
1224    }
1225
1226    /// Returns `false` if the given key definitely doesn't exist in the database, otherwise returns
1227    /// `true`.
1228    pub fn key_may_exist_opt<K: AsRef<[u8]>>(&self, key: K, readopts: &ReadOptions) -> bool {
1229        let key = key.as_ref();
1230        unsafe {
1231            0 != ffi::rocksdb_key_may_exist(
1232                self.inner.inner(),
1233                readopts.inner,
1234                key.as_ptr() as *const c_char,
1235                key.len() as size_t,
1236                ptr::null_mut(), /*value*/
1237                ptr::null_mut(), /*val_len*/
1238                ptr::null(),     /*timestamp*/
1239                0,               /*timestamp_len*/
1240                ptr::null_mut(), /*value_found*/
1241            )
1242        }
1243    }
1244
1245    /// Returns `false` if the given key definitely doesn't exist in the specified column family,
1246    /// otherwise returns `true`. This function uses default `ReadOptions`.
1247    pub fn key_may_exist_cf<K: AsRef<[u8]>>(&self, cf: &impl AsColumnFamilyRef, key: K) -> bool {
1248        self.key_may_exist_cf_opt(cf, key, &ReadOptions::default())
1249    }
1250
1251    /// Returns `false` if the given key definitely doesn't exist in the specified column family,
1252    /// otherwise returns `true`.
1253    pub fn key_may_exist_cf_opt<K: AsRef<[u8]>>(
1254        &self,
1255        cf: &impl AsColumnFamilyRef,
1256        key: K,
1257        readopts: &ReadOptions,
1258    ) -> bool {
1259        let key = key.as_ref();
1260        0 != unsafe {
1261            ffi::rocksdb_key_may_exist_cf(
1262                self.inner.inner(),
1263                readopts.inner,
1264                cf.inner(),
1265                key.as_ptr() as *const c_char,
1266                key.len() as size_t,
1267                ptr::null_mut(), /*value*/
1268                ptr::null_mut(), /*val_len*/
1269                ptr::null(),     /*timestamp*/
1270                0,               /*timestamp_len*/
1271                ptr::null_mut(), /*value_found*/
1272            )
1273        }
1274    }
1275
1276    /// If the key definitely does not exist in the database, then this method
1277    /// returns `(false, None)`, else `(true, None)` if it may.
1278    /// If the key is found in memory, then it returns `(true, Some<CSlice>)`.
1279    ///
1280    /// This check is potentially lighter-weight than calling `get()`. One way
1281    /// to make this lighter weight is to avoid doing any IOs.
1282    pub fn key_may_exist_cf_opt_value<K: AsRef<[u8]>>(
1283        &self,
1284        cf: &impl AsColumnFamilyRef,
1285        key: K,
1286        readopts: &ReadOptions,
1287    ) -> (bool, Option<CSlice>) {
1288        let key = key.as_ref();
1289        let mut val: *mut c_char = ptr::null_mut();
1290        let mut val_len: usize = 0;
1291        let mut value_found: c_uchar = 0;
1292        let may_exists = 0
1293            != unsafe {
1294                ffi::rocksdb_key_may_exist_cf(
1295                    self.inner.inner(),
1296                    readopts.inner,
1297                    cf.inner(),
1298                    key.as_ptr() as *const c_char,
1299                    key.len() as size_t,
1300                    &mut val,         /*value*/
1301                    &mut val_len,     /*val_len*/
1302                    ptr::null(),      /*timestamp*/
1303                    0,                /*timestamp_len*/
1304                    &mut value_found, /*value_found*/
1305                )
1306            };
1307        // The value is only allocated (using malloc) and returned if it is found and
1308        // value_found isn't NULL. In that case the user is responsible for freeing it.
1309        if may_exists && value_found != 0 {
1310            (
1311                may_exists,
1312                Some(unsafe { CSlice::from_raw_parts(val, val_len) }),
1313            )
1314        } else {
1315            (may_exists, None)
1316        }
1317    }
1318
1319    fn create_inner_cf_handle(
1320        &self,
1321        name: impl CStrLike,
1322        opts: &Options,
1323    ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
1324        let cf_name = name.bake().map_err(|err| {
1325            Error::new(format!(
1326                "Failed to convert path to CString when creating cf: {err}"
1327            ))
1328        })?;
1329        Ok(unsafe {
1330            ffi_try!(ffi::rocksdb_create_column_family(
1331                self.inner.inner(),
1332                opts.inner,
1333                cf_name.as_ptr(),
1334            ))
1335        })
1336    }
1337
1338    pub fn iterator<'a: 'b, 'b>(
1339        &'a self,
1340        mode: IteratorMode,
1341    ) -> DBIteratorWithThreadMode<'b, Self> {
1342        let readopts = ReadOptions::default();
1343        self.iterator_opt(mode, readopts)
1344    }
1345
1346    pub fn iterator_opt<'a: 'b, 'b>(
1347        &'a self,
1348        mode: IteratorMode,
1349        readopts: ReadOptions,
1350    ) -> DBIteratorWithThreadMode<'b, Self> {
1351        DBIteratorWithThreadMode::new(self, readopts, mode)
1352    }
1353
1354    /// Opens an iterator using the provided ReadOptions.
1355    /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions
1356    pub fn iterator_cf_opt<'a: 'b, 'b>(
1357        &'a self,
1358        cf_handle: &impl AsColumnFamilyRef,
1359        readopts: ReadOptions,
1360        mode: IteratorMode,
1361    ) -> DBIteratorWithThreadMode<'b, Self> {
1362        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
1363    }
1364
1365    /// Opens an iterator with `set_total_order_seek` enabled.
1366    /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
1367    /// with a Hash-based implementation.
1368    pub fn full_iterator<'a: 'b, 'b>(
1369        &'a self,
1370        mode: IteratorMode,
1371    ) -> DBIteratorWithThreadMode<'b, Self> {
1372        let mut opts = ReadOptions::default();
1373        opts.set_total_order_seek(true);
1374        DBIteratorWithThreadMode::new(self, opts, mode)
1375    }
1376
1377    pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
1378        &'a self,
1379        prefix: P,
1380    ) -> DBIteratorWithThreadMode<'b, Self> {
1381        let mut opts = ReadOptions::default();
1382        opts.set_prefix_same_as_start(true);
1383        DBIteratorWithThreadMode::new(
1384            self,
1385            opts,
1386            IteratorMode::From(prefix.as_ref(), Direction::Forward),
1387        )
1388    }
1389
1390    pub fn iterator_cf<'a: 'b, 'b>(
1391        &'a self,
1392        cf_handle: &impl AsColumnFamilyRef,
1393        mode: IteratorMode,
1394    ) -> DBIteratorWithThreadMode<'b, Self> {
1395        let opts = ReadOptions::default();
1396        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1397    }
1398
1399    pub fn full_iterator_cf<'a: 'b, 'b>(
1400        &'a self,
1401        cf_handle: &impl AsColumnFamilyRef,
1402        mode: IteratorMode,
1403    ) -> DBIteratorWithThreadMode<'b, Self> {
1404        let mut opts = ReadOptions::default();
1405        opts.set_total_order_seek(true);
1406        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1407    }
1408
1409    pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
1410        &'a self,
1411        cf_handle: &impl AsColumnFamilyRef,
1412        prefix: P,
1413    ) -> DBIteratorWithThreadMode<'a, Self> {
1414        let mut opts = ReadOptions::default();
1415        opts.set_prefix_same_as_start(true);
1416        DBIteratorWithThreadMode::<'a, Self>::new_cf(
1417            self,
1418            cf_handle.inner(),
1419            opts,
1420            IteratorMode::From(prefix.as_ref(), Direction::Forward),
1421        )
1422    }
1423
1424    /// Opens a raw iterator over the database, using the default read options
1425    pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
1426        let opts = ReadOptions::default();
1427        DBRawIteratorWithThreadMode::new(self, opts)
1428    }
1429
1430    /// Opens a raw iterator over the given column family, using the default read options
1431    pub fn raw_iterator_cf<'a: 'b, 'b>(
1432        &'a self,
1433        cf_handle: &impl AsColumnFamilyRef,
1434    ) -> DBRawIteratorWithThreadMode<'b, Self> {
1435        let opts = ReadOptions::default();
1436        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
1437    }
1438
1439    /// Opens a raw iterator over the database, using the given read options
1440    pub fn raw_iterator_opt<'a: 'b, 'b>(
1441        &'a self,
1442        readopts: ReadOptions,
1443    ) -> DBRawIteratorWithThreadMode<'b, Self> {
1444        DBRawIteratorWithThreadMode::new(self, readopts)
1445    }
1446
1447    /// Opens a raw iterator over the given column family, using the given read options
1448    pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
1449        &'a self,
1450        cf_handle: &impl AsColumnFamilyRef,
1451        readopts: ReadOptions,
1452    ) -> DBRawIteratorWithThreadMode<'b, Self> {
1453        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
1454    }
1455
1456    pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
1457        SnapshotWithThreadMode::<Self>::new(self)
1458    }
1459
1460    pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
1461    where
1462        K: AsRef<[u8]>,
1463        V: AsRef<[u8]>,
1464    {
1465        let key = key.as_ref();
1466        let value = value.as_ref();
1467
1468        unsafe {
1469            ffi_try!(ffi::rocksdb_put(
1470                self.inner.inner(),
1471                writeopts.inner,
1472                key.as_ptr() as *const c_char,
1473                key.len() as size_t,
1474                value.as_ptr() as *const c_char,
1475                value.len() as size_t,
1476            ));
1477            Ok(())
1478        }
1479    }
1480
1481    pub fn put_cf_opt<K, V>(
1482        &self,
1483        cf: &impl AsColumnFamilyRef,
1484        key: K,
1485        value: V,
1486        writeopts: &WriteOptions,
1487    ) -> Result<(), Error>
1488    where
1489        K: AsRef<[u8]>,
1490        V: AsRef<[u8]>,
1491    {
1492        let key = key.as_ref();
1493        let value = value.as_ref();
1494
1495        unsafe {
1496            ffi_try!(ffi::rocksdb_put_cf(
1497                self.inner.inner(),
1498                writeopts.inner,
1499                cf.inner(),
1500                key.as_ptr() as *const c_char,
1501                key.len() as size_t,
1502                value.as_ptr() as *const c_char,
1503                value.len() as size_t,
1504            ));
1505            Ok(())
1506        }
1507    }
1508
1509    pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
1510    where
1511        K: AsRef<[u8]>,
1512        V: AsRef<[u8]>,
1513    {
1514        let key = key.as_ref();
1515        let value = value.as_ref();
1516
1517        unsafe {
1518            ffi_try!(ffi::rocksdb_merge(
1519                self.inner.inner(),
1520                writeopts.inner,
1521                key.as_ptr() as *const c_char,
1522                key.len() as size_t,
1523                value.as_ptr() as *const c_char,
1524                value.len() as size_t,
1525            ));
1526            Ok(())
1527        }
1528    }
1529
1530    pub fn merge_cf_opt<K, V>(
1531        &self,
1532        cf: &impl AsColumnFamilyRef,
1533        key: K,
1534        value: V,
1535        writeopts: &WriteOptions,
1536    ) -> Result<(), Error>
1537    where
1538        K: AsRef<[u8]>,
1539        V: AsRef<[u8]>,
1540    {
1541        let key = key.as_ref();
1542        let value = value.as_ref();
1543
1544        unsafe {
1545            ffi_try!(ffi::rocksdb_merge_cf(
1546                self.inner.inner(),
1547                writeopts.inner,
1548                cf.inner(),
1549                key.as_ptr() as *const c_char,
1550                key.len() as size_t,
1551                value.as_ptr() as *const c_char,
1552                value.len() as size_t,
1553            ));
1554            Ok(())
1555        }
1556    }
1557
1558    pub fn delete_opt<K: AsRef<[u8]>>(
1559        &self,
1560        key: K,
1561        writeopts: &WriteOptions,
1562    ) -> Result<(), Error> {
1563        let key = key.as_ref();
1564
1565        unsafe {
1566            ffi_try!(ffi::rocksdb_delete(
1567                self.inner.inner(),
1568                writeopts.inner,
1569                key.as_ptr() as *const c_char,
1570                key.len() as size_t,
1571            ));
1572            Ok(())
1573        }
1574    }
1575
1576    pub fn delete_cf_opt<K: AsRef<[u8]>>(
1577        &self,
1578        cf: &impl AsColumnFamilyRef,
1579        key: K,
1580        writeopts: &WriteOptions,
1581    ) -> Result<(), Error> {
1582        let key = key.as_ref();
1583
1584        unsafe {
1585            ffi_try!(ffi::rocksdb_delete_cf(
1586                self.inner.inner(),
1587                writeopts.inner,
1588                cf.inner(),
1589                key.as_ptr() as *const c_char,
1590                key.len() as size_t,
1591            ));
1592            Ok(())
1593        }
1594    }
1595
1596    pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
1597    where
1598        K: AsRef<[u8]>,
1599        V: AsRef<[u8]>,
1600    {
1601        self.put_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
1602    }
1603
1604    pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
1605    where
1606        K: AsRef<[u8]>,
1607        V: AsRef<[u8]>,
1608    {
1609        self.put_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default())
1610    }
1611
1612    pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
1613    where
1614        K: AsRef<[u8]>,
1615        V: AsRef<[u8]>,
1616    {
1617        self.merge_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
1618    }
1619
1620    pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
1621    where
1622        K: AsRef<[u8]>,
1623        V: AsRef<[u8]>,
1624    {
1625        self.merge_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default())
1626    }
1627
1628    pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
1629        self.delete_opt(key.as_ref(), &WriteOptions::default())
1630    }
1631
1632    pub fn delete_cf<K: AsRef<[u8]>>(
1633        &self,
1634        cf: &impl AsColumnFamilyRef,
1635        key: K,
1636    ) -> Result<(), Error> {
1637        self.delete_cf_opt(cf, key.as_ref(), &WriteOptions::default())
1638    }
1639
1640    /// Runs a manual compaction on the Range of keys given. This is not likely to be needed for typical usage.
1641    pub fn compact_range<S: AsRef<[u8]>, E: AsRef<[u8]>>(&self, start: Option<S>, end: Option<E>) {
1642        unsafe {
1643            let start = start.as_ref().map(AsRef::as_ref);
1644            let end = end.as_ref().map(AsRef::as_ref);
1645
1646            ffi::rocksdb_compact_range(
1647                self.inner.inner(),
1648                opt_bytes_to_ptr(start),
1649                start.map_or(0, <[u8]>::len) as size_t,
1650                opt_bytes_to_ptr(end),
1651                end.map_or(0, <[u8]>::len) as size_t,
1652            );
1653        }
1654    }
1655
1656    /// Same as `compact_range` but with custom options.
1657    pub fn compact_range_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1658        &self,
1659        start: Option<S>,
1660        end: Option<E>,
1661        opts: &CompactOptions,
1662    ) {
1663        unsafe {
1664            let start = start.as_ref().map(AsRef::as_ref);
1665            let end = end.as_ref().map(AsRef::as_ref);
1666
1667            ffi::rocksdb_compact_range_opt(
1668                self.inner.inner(),
1669                opts.inner,
1670                opt_bytes_to_ptr(start),
1671                start.map_or(0, <[u8]>::len) as size_t,
1672                opt_bytes_to_ptr(end),
1673                end.map_or(0, <[u8]>::len) as size_t,
1674            );
1675        }
1676    }
1677
1678    /// Runs a manual compaction on the Range of keys given on the
1679    /// given column family. This is not likely to be needed for typical usage.
1680    pub fn compact_range_cf<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1681        &self,
1682        cf: &impl AsColumnFamilyRef,
1683        start: Option<S>,
1684        end: Option<E>,
1685    ) {
1686        unsafe {
1687            let start = start.as_ref().map(AsRef::as_ref);
1688            let end = end.as_ref().map(AsRef::as_ref);
1689
1690            ffi::rocksdb_compact_range_cf(
1691                self.inner.inner(),
1692                cf.inner(),
1693                opt_bytes_to_ptr(start),
1694                start.map_or(0, <[u8]>::len) as size_t,
1695                opt_bytes_to_ptr(end),
1696                end.map_or(0, <[u8]>::len) as size_t,
1697            );
1698        }
1699    }
1700
1701    /// Same as `compact_range_cf` but with custom options.
1702    pub fn compact_range_cf_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1703        &self,
1704        cf: &impl AsColumnFamilyRef,
1705        start: Option<S>,
1706        end: Option<E>,
1707        opts: &CompactOptions,
1708    ) {
1709        unsafe {
1710            let start = start.as_ref().map(AsRef::as_ref);
1711            let end = end.as_ref().map(AsRef::as_ref);
1712
1713            ffi::rocksdb_compact_range_cf_opt(
1714                self.inner.inner(),
1715                cf.inner(),
1716                opts.inner,
1717                opt_bytes_to_ptr(start),
1718                start.map_or(0, <[u8]>::len) as size_t,
1719                opt_bytes_to_ptr(end),
1720                end.map_or(0, <[u8]>::len) as size_t,
1721            );
1722        }
1723    }
1724
1725    pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), Error> {
1726        let copts = convert_options(opts)?;
1727        let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
1728        let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
1729        let count = opts.len() as i32;
1730        unsafe {
1731            ffi_try!(ffi::rocksdb_set_options(
1732                self.inner.inner(),
1733                count,
1734                cnames.as_ptr(),
1735                cvalues.as_ptr(),
1736            ));
1737        }
1738        Ok(())
1739    }
1740
1741    pub fn set_options_cf(
1742        &self,
1743        cf: &impl AsColumnFamilyRef,
1744        opts: &[(&str, &str)],
1745    ) -> Result<(), Error> {
1746        let copts = convert_options(opts)?;
1747        let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
1748        let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
1749        let count = opts.len() as i32;
1750        unsafe {
1751            ffi_try!(ffi::rocksdb_set_options_cf(
1752                self.inner.inner(),
1753                cf.inner(),
1754                count,
1755                cnames.as_ptr(),
1756                cvalues.as_ptr(),
1757            ));
1758        }
1759        Ok(())
1760    }
1761
1762    /// Implementation for property_value et al methods.
1763    ///
1764    /// `name` is the name of the property.  It will be converted into a CString
1765    /// and passed to `get_property` as argument.  `get_property` reads the
1766    /// specified property and either returns NULL or a pointer to a C allocated
1767    /// string; this method takes ownership of that string and will free it at
1768    /// the end. That string is parsed using `parse` callback which produces
1769    /// the returned result.
1770    fn property_value_impl<R>(
1771        name: impl CStrLike,
1772        get_property: impl FnOnce(*const c_char) -> *mut c_char,
1773        parse: impl FnOnce(&str) -> Result<R, Error>,
1774    ) -> Result<Option<R>, Error> {
1775        let value = match name.bake() {
1776            Ok(prop_name) => get_property(prop_name.as_ptr()),
1777            Err(e) => {
1778                return Err(Error::new(format!(
1779                    "Failed to convert property name to CString: {e}"
1780                )));
1781            }
1782        };
1783        if value.is_null() {
1784            return Ok(None);
1785        }
1786        let result = match unsafe { CStr::from_ptr(value) }.to_str() {
1787            Ok(s) => parse(s).map(|value| Some(value)),
1788            Err(e) => Err(Error::new(format!(
1789                "Failed to convert property value to string: {e}"
1790            ))),
1791        };
1792        unsafe {
1793            libc::free(value as *mut c_void);
1794        }
1795        result
1796    }
1797
1798    /// Retrieves a RocksDB property by name.
1799    ///
1800    /// Full list of properties could be find
1801    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634).
1802    pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
1803        Self::property_value_impl(
1804            name,
1805            |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
1806            |str_value| Ok(str_value.to_owned()),
1807        )
1808    }
1809
1810    /// Retrieves a RocksDB property by name, for a specific column family.
1811    ///
1812    /// Full list of properties could be find
1813    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634).
1814    pub fn property_value_cf(
1815        &self,
1816        cf: &impl AsColumnFamilyRef,
1817        name: impl CStrLike,
1818    ) -> Result<Option<String>, Error> {
1819        Self::property_value_impl(
1820            name,
1821            |prop_name| unsafe {
1822                ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
1823            },
1824            |str_value| Ok(str_value.to_owned()),
1825        )
1826    }
1827
1828    fn parse_property_int_value(value: &str) -> Result<u64, Error> {
1829        value.parse::<u64>().map_err(|err| {
1830            Error::new(format!(
1831                "Failed to convert property value {value} to int: {err}"
1832            ))
1833        })
1834    }
1835
1836    /// Retrieves a RocksDB property and casts it to an integer.
1837    ///
1838    /// Full list of properties that return int values could be find
1839    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
1840    pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
1841        Self::property_value_impl(
1842            name,
1843            |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
1844            Self::parse_property_int_value,
1845        )
1846    }
1847
1848    /// Retrieves a RocksDB property for a specific column family and casts it to an integer.
1849    ///
1850    /// Full list of properties that return int values could be find
1851    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
1852    pub fn property_int_value_cf(
1853        &self,
1854        cf: &impl AsColumnFamilyRef,
1855        name: impl CStrLike,
1856    ) -> Result<Option<u64>, Error> {
1857        Self::property_value_impl(
1858            name,
1859            |prop_name| unsafe {
1860                ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
1861            },
1862            Self::parse_property_int_value,
1863        )
1864    }
1865
1866    /// The sequence number of the most recent transaction.
1867    pub fn latest_sequence_number(&self) -> u64 {
1868        unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner.inner()) }
1869    }
1870
1871    /// Iterate over batches of write operations since a given sequence.
1872    ///
1873    /// Produce an iterator that will provide the batches of write operations
1874    /// that have occurred since the given sequence (see
1875    /// `latest_sequence_number()`). Use the provided iterator to retrieve each
1876    /// (`u64`, `WriteBatch`) tuple, and then gather the individual puts and
1877    /// deletes using the `WriteBatch::iterate()` function.
1878    ///
1879    /// Calling `get_updates_since()` with a sequence number that is out of
1880    /// bounds will return an error.
1881    pub fn get_updates_since(&self, seq_number: u64) -> Result<DBWALIterator, Error> {
1882        unsafe {
1883            // rocksdb_wal_readoptions_t does not appear to have any functions
1884            // for creating and destroying it; fortunately we can pass a nullptr
1885            // here to get the default behavior
1886            let opts: *const ffi::rocksdb_wal_readoptions_t = ptr::null();
1887            let iter = ffi_try!(ffi::rocksdb_get_updates_since(
1888                self.inner.inner(),
1889                seq_number,
1890                opts
1891            ));
1892            Ok(DBWALIterator {
1893                inner: iter,
1894                start_seq_number: seq_number,
1895            })
1896        }
1897    }
1898
1899    /// Tries to catch up with the primary by reading as much as possible from the
1900    /// log files.
1901    pub fn try_catch_up_with_primary(&self) -> Result<(), Error> {
1902        unsafe {
1903            ffi_try!(ffi::rocksdb_try_catch_up_with_primary(self.inner.inner()));
1904        }
1905        Ok(())
1906    }
1907
1908    /// Loads a list of external SST files created with SstFileWriter into the DB with default opts
1909    pub fn ingest_external_file<P: AsRef<Path>>(&self, paths: Vec<P>) -> Result<(), Error> {
1910        let opts = IngestExternalFileOptions::default();
1911        self.ingest_external_file_opts(&opts, paths)
1912    }
1913
1914    /// Loads a list of external SST files created with SstFileWriter into the DB
1915    pub fn ingest_external_file_opts<P: AsRef<Path>>(
1916        &self,
1917        opts: &IngestExternalFileOptions,
1918        paths: Vec<P>,
1919    ) -> Result<(), Error> {
1920        let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
1921        let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
1922
1923        self.ingest_external_file_raw(opts, &paths_v, &cpaths)
1924    }
1925
1926    /// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family
1927    /// with default opts
1928    pub fn ingest_external_file_cf<P: AsRef<Path>>(
1929        &self,
1930        cf: &impl AsColumnFamilyRef,
1931        paths: Vec<P>,
1932    ) -> Result<(), Error> {
1933        let opts = IngestExternalFileOptions::default();
1934        self.ingest_external_file_cf_opts(cf, &opts, paths)
1935    }
1936
1937    /// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family
1938    pub fn ingest_external_file_cf_opts<P: AsRef<Path>>(
1939        &self,
1940        cf: &impl AsColumnFamilyRef,
1941        opts: &IngestExternalFileOptions,
1942        paths: Vec<P>,
1943    ) -> Result<(), Error> {
1944        let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
1945        let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
1946
1947        self.ingest_external_file_raw_cf(cf, opts, &paths_v, &cpaths)
1948    }
1949
1950    fn ingest_external_file_raw(
1951        &self,
1952        opts: &IngestExternalFileOptions,
1953        paths_v: &[CString],
1954        cpaths: &[*const c_char],
1955    ) -> Result<(), Error> {
1956        unsafe {
1957            ffi_try!(ffi::rocksdb_ingest_external_file(
1958                self.inner.inner(),
1959                cpaths.as_ptr(),
1960                paths_v.len(),
1961                opts.inner as *const _
1962            ));
1963            Ok(())
1964        }
1965    }
1966
1967    fn ingest_external_file_raw_cf(
1968        &self,
1969        cf: &impl AsColumnFamilyRef,
1970        opts: &IngestExternalFileOptions,
1971        paths_v: &[CString],
1972        cpaths: &[*const c_char],
1973    ) -> Result<(), Error> {
1974        unsafe {
1975            ffi_try!(ffi::rocksdb_ingest_external_file_cf(
1976                self.inner.inner(),
1977                cf.inner(),
1978                cpaths.as_ptr(),
1979                paths_v.len(),
1980                opts.inner as *const _
1981            ));
1982            Ok(())
1983        }
1984    }
1985
1986    /// Returns a list of all table files with their level, start key
1987    /// and end key
1988    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
1989        unsafe {
1990            let files = ffi::rocksdb_livefiles(self.inner.inner());
1991            if files.is_null() {
1992                Err(Error::new("Could not get live files".to_owned()))
1993            } else {
1994                let n = ffi::rocksdb_livefiles_count(files);
1995
1996                let mut livefiles = Vec::with_capacity(n as usize);
1997                let mut key_size: usize = 0;
1998
1999                for i in 0..n {
2000                    let column_family_name =
2001                        from_cstr(ffi::rocksdb_livefiles_column_family_name(files, i));
2002                    let name = from_cstr(ffi::rocksdb_livefiles_name(files, i));
2003                    let size = ffi::rocksdb_livefiles_size(files, i);
2004                    let level = ffi::rocksdb_livefiles_level(files, i);
2005
2006                    // get smallest key inside file
2007                    let smallest_key = ffi::rocksdb_livefiles_smallestkey(files, i, &mut key_size);
2008                    let smallest_key = raw_data(smallest_key, key_size);
2009
2010                    // get largest key inside file
2011                    let largest_key = ffi::rocksdb_livefiles_largestkey(files, i, &mut key_size);
2012                    let largest_key = raw_data(largest_key, key_size);
2013
2014                    livefiles.push(LiveFile {
2015                        column_family_name,
2016                        name,
2017                        size,
2018                        level,
2019                        start_key: smallest_key,
2020                        end_key: largest_key,
2021                        num_entries: ffi::rocksdb_livefiles_entries(files, i),
2022                        num_deletions: ffi::rocksdb_livefiles_deletions(files, i),
2023                    });
2024                }
2025
2026                // destroy livefiles metadata(s)
2027                ffi::rocksdb_livefiles_destroy(files);
2028
2029                // return
2030                Ok(livefiles)
2031            }
2032        }
2033    }
2034
2035    /// Delete sst files whose keys are entirely in the given range.
2036    ///
2037    /// Could leave some keys in the range which are in files which are not
2038    /// entirely in the range.
2039    ///
2040    /// Note: L0 files are left regardless of whether they're in the range.
2041    ///
2042    /// SnapshotWithThreadModes before the delete might not see the data in the given range.
2043    pub fn delete_file_in_range<K: AsRef<[u8]>>(&self, from: K, to: K) -> Result<(), Error> {
2044        let from = from.as_ref();
2045        let to = to.as_ref();
2046        unsafe {
2047            ffi_try!(ffi::rocksdb_delete_file_in_range(
2048                self.inner.inner(),
2049                from.as_ptr() as *const c_char,
2050                from.len() as size_t,
2051                to.as_ptr() as *const c_char,
2052                to.len() as size_t,
2053            ));
2054            Ok(())
2055        }
2056    }
2057
2058    /// Same as `delete_file_in_range` but only for specific column family
2059    pub fn delete_file_in_range_cf<K: AsRef<[u8]>>(
2060        &self,
2061        cf: &impl AsColumnFamilyRef,
2062        from: K,
2063        to: K,
2064    ) -> Result<(), Error> {
2065        let from = from.as_ref();
2066        let to = to.as_ref();
2067        unsafe {
2068            ffi_try!(ffi::rocksdb_delete_file_in_range_cf(
2069                self.inner.inner(),
2070                cf.inner(),
2071                from.as_ptr() as *const c_char,
2072                from.len() as size_t,
2073                to.as_ptr() as *const c_char,
2074                to.len() as size_t,
2075            ));
2076            Ok(())
2077        }
2078    }
2079
2080    /// Request stopping background work, if wait is true wait until it's done.
2081    pub fn cancel_all_background_work(&self, wait: bool) {
2082        unsafe {
2083            ffi::rocksdb_cancel_all_background_work(self.inner.inner(), c_uchar::from(wait));
2084        }
2085    }
2086
2087    fn drop_column_family<C>(
2088        &self,
2089        cf_inner: *mut ffi::rocksdb_column_family_handle_t,
2090        cf: C,
2091    ) -> Result<(), Error> {
2092        unsafe {
2093            // first mark the column family as dropped
2094            ffi_try!(ffi::rocksdb_drop_column_family(
2095                self.inner.inner(),
2096                cf_inner
2097            ));
2098        }
2099        // then finally reclaim any resources (mem, files) by destroying the only single column
2100        // family handle by drop()-ing it
2101        drop(cf);
2102        Ok(())
2103    }
2104}
2105
2106impl<I: DBInner> DBCommon<SingleThreaded, I> {
2107    /// Creates column family with given name and options
2108    pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
2109        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
2110        self.cfs
2111            .cfs
2112            .insert(name.as_ref().to_string(), ColumnFamily { inner });
2113        Ok(())
2114    }
2115
2116    /// Drops the column family with the given name
2117    pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
2118        if let Some(cf) = self.cfs.cfs.remove(name) {
2119            self.drop_column_family(cf.inner, cf)
2120        } else {
2121            Err(Error::new(format!("Invalid column family: {name}")))
2122        }
2123    }
2124
2125    /// Returns the underlying column family handle
2126    pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
2127        self.cfs.cfs.get(name)
2128    }
2129}
2130
2131impl<I: DBInner> DBCommon<MultiThreaded, I> {
2132    /// Creates column family with given name and options
2133    pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
2134        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
2135        self.cfs.cfs.write().unwrap().insert(
2136            name.as_ref().to_string(),
2137            Arc::new(UnboundColumnFamily { inner }),
2138        );
2139        Ok(())
2140    }
2141
2142    /// Drops the column family with the given name by internally locking the inner column
2143    /// family map. This avoids needing `&mut self` reference
2144    pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
2145        if let Some(cf) = self.cfs.cfs.write().unwrap().remove(name) {
2146            self.drop_column_family(cf.inner, cf)
2147        } else {
2148            Err(Error::new(format!("Invalid column family: {name}")))
2149        }
2150    }
2151
2152    /// Returns the underlying column family handle
2153    pub fn cf_handle(&self, name: &str) -> Option<Arc<BoundColumnFamily>> {
2154        self.cfs
2155            .cfs
2156            .read()
2157            .unwrap()
2158            .get(name)
2159            .cloned()
2160            .map(UnboundColumnFamily::bound_column_family)
2161    }
2162}
2163
2164impl<T: ThreadMode, I: DBInner> Drop for DBCommon<T, I> {
2165    fn drop(&mut self) {
2166        self.cfs.drop_all_cfs_internal();
2167    }
2168}
2169
2170impl<T: ThreadMode, I: DBInner> fmt::Debug for DBCommon<T, I> {
2171    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2172        write!(f, "RocksDB {{ path: {:?} }}", self.path())
2173    }
2174}
2175
2176/// The metadata that describes a SST file
2177#[derive(Debug, Clone)]
2178pub struct LiveFile {
2179    /// Name of the column family the file belongs to
2180    pub column_family_name: String,
2181    /// Name of the file
2182    pub name: String,
2183    /// Size of the file
2184    pub size: usize,
2185    /// Level at which this file resides
2186    pub level: i32,
2187    /// Smallest user defined key in the file
2188    pub start_key: Option<Vec<u8>>,
2189    /// Largest user defined key in the file
2190    pub end_key: Option<Vec<u8>>,
2191    /// Number of entries/alive keys in the file
2192    pub num_entries: u64,
2193    /// Number of deletions/tomb key(s) in the file
2194    pub num_deletions: u64,
2195}
2196
2197fn convert_options(opts: &[(&str, &str)]) -> Result<Vec<(CString, CString)>, Error> {
2198    opts.iter()
2199        .map(|(name, value)| {
2200            let cname = match CString::new(name.as_bytes()) {
2201                Ok(cname) => cname,
2202                Err(e) => return Err(Error::new(format!("Invalid option name `{e}`"))),
2203            };
2204            let cvalue = match CString::new(value.as_bytes()) {
2205                Ok(cvalue) => cvalue,
2206                Err(e) => return Err(Error::new(format!("Invalid option value: `{e}`"))),
2207            };
2208            Ok((cname, cvalue))
2209        })
2210        .collect()
2211}
2212
2213pub(crate) fn convert_values(
2214    values: Vec<*mut c_char>,
2215    values_sizes: Vec<usize>,
2216    errors: Vec<*mut c_char>,
2217) -> Vec<Result<Option<Vec<u8>>, Error>> {
2218    values
2219        .into_iter()
2220        .zip(values_sizes.into_iter())
2221        .zip(errors.into_iter())
2222        .map(|((v, s), e)| {
2223            if e.is_null() {
2224                let value = unsafe { crate::ffi_util::raw_data(v, s) };
2225                unsafe {
2226                    ffi::rocksdb_free(v as *mut c_void);
2227                }
2228                Ok(value)
2229            } else {
2230                Err(Error::new(crate::ffi_util::error_message(e)))
2231            }
2232        })
2233        .collect()
2234}