1use std::{collections::BTreeMap, ffi::CString, fs, iter, marker::PhantomData, path::Path, ptr};
17
18use libc::{c_char, c_int};
19
20use crate::{
21 db::DBCommon, db::DBInner, ffi, ffi_util::to_cpath, write_batch::WriteBatchWithTransaction,
22 ColumnFamilyDescriptor, Error, OptimisticTransactionOptions, Options, ThreadMode, Transaction,
23 WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
24};
25
26#[cfg(not(feature = "multi-threaded-cf"))]
57pub type OptimisticTransactionDB<T = crate::SingleThreaded> =
58 DBCommon<T, OptimisticTransactionDBInner>;
59#[cfg(feature = "multi-threaded-cf")]
60pub type OptimisticTransactionDB<T = crate::MultiThreaded> =
61 DBCommon<T, OptimisticTransactionDBInner>;
62
63pub struct OptimisticTransactionDBInner {
64 base: *mut ffi::rocksdb_t,
65 db: *mut ffi::rocksdb_optimistictransactiondb_t,
66}
67
68impl DBInner for OptimisticTransactionDBInner {
69 fn inner(&self) -> *mut ffi::rocksdb_t {
70 self.base
71 }
72}
73
74impl Drop for OptimisticTransactionDBInner {
75 fn drop(&mut self) {
76 unsafe {
77 ffi::rocksdb_optimistictransactiondb_close_base_db(self.base);
78 ffi::rocksdb_optimistictransactiondb_close(self.db);
79 }
80 }
81}
82
83impl<T: ThreadMode> OptimisticTransactionDB<T> {
85 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
87 let mut opts = Options::default();
88 opts.create_if_missing(true);
89 Self::open(&opts, path)
90 }
91
92 pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
94 Self::open_cf(opts, path, None::<&str>)
95 }
96
97 pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
101 where
102 P: AsRef<Path>,
103 I: IntoIterator<Item = N>,
104 N: AsRef<str>,
105 {
106 let cfs = cfs
107 .into_iter()
108 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
109
110 Self::open_cf_descriptors_internal(opts, path, cfs)
111 }
112
113 pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
115 where
116 P: AsRef<Path>,
117 I: IntoIterator<Item = ColumnFamilyDescriptor>,
118 {
119 Self::open_cf_descriptors_internal(opts, path, cfs)
120 }
121
122 fn open_cf_descriptors_internal<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
124 where
125 P: AsRef<Path>,
126 I: IntoIterator<Item = ColumnFamilyDescriptor>,
127 {
128 let cfs: Vec<_> = cfs.into_iter().collect();
129 let outlive = iter::once(opts.outlive.clone())
130 .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
131 .collect();
132
133 let cpath = to_cpath(&path)?;
134
135 if let Err(e) = fs::create_dir_all(&path) {
136 return Err(Error::new(format!(
137 "Failed to create RocksDB directory: `{e:?}`."
138 )));
139 }
140
141 let db: *mut ffi::rocksdb_optimistictransactiondb_t;
142 let mut cf_map = BTreeMap::new();
143
144 if cfs.is_empty() {
145 db = Self::open_raw(opts, &cpath)?;
146 } else {
147 let mut cfs_v = cfs;
148 if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
150 cfs_v.push(ColumnFamilyDescriptor {
151 name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
152 options: Options::default(),
153 });
154 }
155 let c_cfs: Vec<CString> = cfs_v
158 .iter()
159 .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
160 .collect();
161
162 let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
163
164 let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
166
167 let cfopts: Vec<_> = cfs_v
168 .iter()
169 .map(|cf| cf.options.inner as *const _)
170 .collect();
171
172 db = Self::open_cf_raw(opts, &cpath, &cfs_v, &cfnames, &cfopts, &mut cfhandles)?;
173
174 for handle in &cfhandles {
175 if handle.is_null() {
176 return Err(Error::new(
177 "Received null column family handle from DB.".to_owned(),
178 ));
179 }
180 }
181
182 for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
183 cf_map.insert(cf_desc.name.clone(), inner);
184 }
185 }
186
187 if db.is_null() {
188 return Err(Error::new("Could not initialize database.".to_owned()));
189 }
190
191 let base = unsafe { ffi::rocksdb_optimistictransactiondb_get_base_db(db) };
192 if base.is_null() {
193 unsafe {
194 ffi::rocksdb_optimistictransactiondb_close(db);
195 }
196 return Err(Error::new("Could not initialize database.".to_owned()));
197 }
198 let inner = OptimisticTransactionDBInner { base, db };
199
200 Ok(Self::new(
201 inner,
202 T::new_cf_map_internal(cf_map),
203 path.as_ref().to_path_buf(),
204 outlive,
205 ))
206 }
207
208 fn open_raw(
209 opts: &Options,
210 cpath: &CString,
211 ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
212 unsafe {
213 let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open(
214 opts.inner,
215 cpath.as_ptr()
216 ));
217 Ok(db)
218 }
219 }
220
221 fn open_cf_raw(
222 opts: &Options,
223 cpath: &CString,
224 cfs_v: &[ColumnFamilyDescriptor],
225 cfnames: &[*const c_char],
226 cfopts: &[*const ffi::rocksdb_options_t],
227 cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
228 ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
229 unsafe {
230 let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open_column_families(
231 opts.inner,
232 cpath.as_ptr(),
233 cfs_v.len() as c_int,
234 cfnames.as_ptr(),
235 cfopts.as_ptr(),
236 cfhandles.as_mut_ptr(),
237 ));
238 Ok(db)
239 }
240 }
241
242 pub fn transaction(&self) -> Transaction<Self> {
244 self.transaction_opt(
245 &WriteOptions::default(),
246 &OptimisticTransactionOptions::default(),
247 )
248 }
249
250 pub fn transaction_opt(
252 &self,
253 writeopts: &WriteOptions,
254 otxn_opts: &OptimisticTransactionOptions,
255 ) -> Transaction<Self> {
256 Transaction {
257 inner: unsafe {
258 ffi::rocksdb_optimistictransaction_begin(
259 self.inner.db,
260 writeopts.inner,
261 otxn_opts.inner,
262 std::ptr::null_mut(),
263 )
264 },
265 _marker: PhantomData::default(),
266 }
267 }
268
269 pub fn write_opt(
270 &self,
271 batch: WriteBatchWithTransaction<true>,
272 writeopts: &WriteOptions,
273 ) -> Result<(), Error> {
274 unsafe {
275 ffi_try!(ffi::rocksdb_optimistictransactiondb_write(
276 self.inner.db,
277 writeopts.inner,
278 batch.inner
279 ));
280 }
281 Ok(())
282 }
283
284 pub fn write(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
285 self.write_opt(batch, &WriteOptions::default())
286 }
287
288 pub fn write_without_wal(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
289 let mut wo = WriteOptions::new();
290 wo.disable_wal(true);
291 self.write_opt(batch, &wo)
292 }
293}