rocksdb/transactions/
optimistic_transaction_db.rs

1// Copyright 2021 Yiyuan Liu
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use std::{collections::BTreeMap, ffi::CString, fs, iter, marker::PhantomData, path::Path, ptr};
17
18use libc::{c_char, c_int};
19
20use crate::{
21    db::DBCommon, db::DBInner, ffi, ffi_util::to_cpath, write_batch::WriteBatchWithTransaction,
22    ColumnFamilyDescriptor, Error, OptimisticTransactionOptions, Options, ThreadMode, Transaction,
23    WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
24};
25
26/// A type alias to RocksDB Optimistic Transaction DB.
27///
28/// Please read the official
29/// [guide](https://github.com/facebook/rocksdb/wiki/Transactions#optimistictransactiondb)
30/// to learn more about RocksDB OptimisticTransactionDB.
31///
32/// The default thread mode for [`OptimisticTransactionDB`] is [`SingleThreaded`]
33/// if feature `multi-threaded-cf` is not enabled.
34///
35/// See [`DBCommon`] for full list of methods.
36///
37/// # Examples
38///
39/// ```
40/// use rocksdb::{DB, Options, OptimisticTransactionDB, SingleThreaded};
41/// let path = "_path_for_optimistic_transaction_db";
42/// {
43///     let db: OptimisticTransactionDB = OptimisticTransactionDB::open_default(path).unwrap();
44///     db.put(b"my key", b"my value").unwrap();
45///     
46///     // create transaction
47///     let txn = db.transaction();
48///     txn.put(b"key2", b"value2");
49///     txn.put(b"key3", b"value3");
50///     txn.commit().unwrap();
51/// }
52/// let _ = DB::destroy(&Options::default(), path);
53/// ```
54///
55/// [`SingleThreaded`]: crate::SingleThreaded
56#[cfg(not(feature = "multi-threaded-cf"))]
57pub type OptimisticTransactionDB<T = crate::SingleThreaded> =
58    DBCommon<T, OptimisticTransactionDBInner>;
59#[cfg(feature = "multi-threaded-cf")]
60pub type OptimisticTransactionDB<T = crate::MultiThreaded> =
61    DBCommon<T, OptimisticTransactionDBInner>;
62
63pub struct OptimisticTransactionDBInner {
64    base: *mut ffi::rocksdb_t,
65    db: *mut ffi::rocksdb_optimistictransactiondb_t,
66}
67
68impl DBInner for OptimisticTransactionDBInner {
69    fn inner(&self) -> *mut ffi::rocksdb_t {
70        self.base
71    }
72}
73
74impl Drop for OptimisticTransactionDBInner {
75    fn drop(&mut self) {
76        unsafe {
77            ffi::rocksdb_optimistictransactiondb_close_base_db(self.base);
78            ffi::rocksdb_optimistictransactiondb_close(self.db);
79        }
80    }
81}
82
83/// Methods of `OptimisticTransactionDB`.
84impl<T: ThreadMode> OptimisticTransactionDB<T> {
85    /// Opens a database with default options.
86    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
87        let mut opts = Options::default();
88        opts.create_if_missing(true);
89        Self::open(&opts, path)
90    }
91
92    /// Opens the database with the specified options.
93    pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
94        Self::open_cf(opts, path, None::<&str>)
95    }
96
97    /// Opens a database with the given database options and column family names.
98    ///
99    /// Column families opened using this function will be created with default `Options`.
100    pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
101    where
102        P: AsRef<Path>,
103        I: IntoIterator<Item = N>,
104        N: AsRef<str>,
105    {
106        let cfs = cfs
107            .into_iter()
108            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
109
110        Self::open_cf_descriptors_internal(opts, path, cfs)
111    }
112
113    /// Opens a database with the given database options and column family descriptors.
114    pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
115    where
116        P: AsRef<Path>,
117        I: IntoIterator<Item = ColumnFamilyDescriptor>,
118    {
119        Self::open_cf_descriptors_internal(opts, path, cfs)
120    }
121
122    /// Internal implementation for opening RocksDB.
123    fn open_cf_descriptors_internal<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
124    where
125        P: AsRef<Path>,
126        I: IntoIterator<Item = ColumnFamilyDescriptor>,
127    {
128        let cfs: Vec<_> = cfs.into_iter().collect();
129        let outlive = iter::once(opts.outlive.clone())
130            .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
131            .collect();
132
133        let cpath = to_cpath(&path)?;
134
135        if let Err(e) = fs::create_dir_all(&path) {
136            return Err(Error::new(format!(
137                "Failed to create RocksDB directory: `{e:?}`."
138            )));
139        }
140
141        let db: *mut ffi::rocksdb_optimistictransactiondb_t;
142        let mut cf_map = BTreeMap::new();
143
144        if cfs.is_empty() {
145            db = Self::open_raw(opts, &cpath)?;
146        } else {
147            let mut cfs_v = cfs;
148            // Always open the default column family.
149            if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
150                cfs_v.push(ColumnFamilyDescriptor {
151                    name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
152                    options: Options::default(),
153                });
154            }
155            // We need to store our CStrings in an intermediate vector
156            // so that their pointers remain valid.
157            let c_cfs: Vec<CString> = cfs_v
158                .iter()
159                .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
160                .collect();
161
162            let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
163
164            // These handles will be populated by DB.
165            let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
166
167            let cfopts: Vec<_> = cfs_v
168                .iter()
169                .map(|cf| cf.options.inner as *const _)
170                .collect();
171
172            db = Self::open_cf_raw(opts, &cpath, &cfs_v, &cfnames, &cfopts, &mut cfhandles)?;
173
174            for handle in &cfhandles {
175                if handle.is_null() {
176                    return Err(Error::new(
177                        "Received null column family handle from DB.".to_owned(),
178                    ));
179                }
180            }
181
182            for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
183                cf_map.insert(cf_desc.name.clone(), inner);
184            }
185        }
186
187        if db.is_null() {
188            return Err(Error::new("Could not initialize database.".to_owned()));
189        }
190
191        let base = unsafe { ffi::rocksdb_optimistictransactiondb_get_base_db(db) };
192        if base.is_null() {
193            unsafe {
194                ffi::rocksdb_optimistictransactiondb_close(db);
195            }
196            return Err(Error::new("Could not initialize database.".to_owned()));
197        }
198        let inner = OptimisticTransactionDBInner { base, db };
199
200        Ok(Self::new(
201            inner,
202            T::new_cf_map_internal(cf_map),
203            path.as_ref().to_path_buf(),
204            outlive,
205        ))
206    }
207
208    fn open_raw(
209        opts: &Options,
210        cpath: &CString,
211    ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
212        unsafe {
213            let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open(
214                opts.inner,
215                cpath.as_ptr()
216            ));
217            Ok(db)
218        }
219    }
220
221    fn open_cf_raw(
222        opts: &Options,
223        cpath: &CString,
224        cfs_v: &[ColumnFamilyDescriptor],
225        cfnames: &[*const c_char],
226        cfopts: &[*const ffi::rocksdb_options_t],
227        cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
228    ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
229        unsafe {
230            let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open_column_families(
231                opts.inner,
232                cpath.as_ptr(),
233                cfs_v.len() as c_int,
234                cfnames.as_ptr(),
235                cfopts.as_ptr(),
236                cfhandles.as_mut_ptr(),
237            ));
238            Ok(db)
239        }
240    }
241
242    /// Creates a transaction with default options.
243    pub fn transaction(&self) -> Transaction<Self> {
244        self.transaction_opt(
245            &WriteOptions::default(),
246            &OptimisticTransactionOptions::default(),
247        )
248    }
249
250    /// Creates a transaction with default options.
251    pub fn transaction_opt(
252        &self,
253        writeopts: &WriteOptions,
254        otxn_opts: &OptimisticTransactionOptions,
255    ) -> Transaction<Self> {
256        Transaction {
257            inner: unsafe {
258                ffi::rocksdb_optimistictransaction_begin(
259                    self.inner.db,
260                    writeopts.inner,
261                    otxn_opts.inner,
262                    std::ptr::null_mut(),
263                )
264            },
265            _marker: PhantomData::default(),
266        }
267    }
268
269    pub fn write_opt(
270        &self,
271        batch: WriteBatchWithTransaction<true>,
272        writeopts: &WriteOptions,
273    ) -> Result<(), Error> {
274        unsafe {
275            ffi_try!(ffi::rocksdb_optimistictransactiondb_write(
276                self.inner.db,
277                writeopts.inner,
278                batch.inner
279            ));
280        }
281        Ok(())
282    }
283
284    pub fn write(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
285        self.write_opt(batch, &WriteOptions::default())
286    }
287
288    pub fn write_without_wal(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
289        let mut wo = WriteOptions::new();
290        wo.disable_wal(true);
291        self.write_opt(batch, &wo)
292    }
293}