1use std::{
11 collections::{BTreeSet, HashMap},
12 ops::Deref,
13 sync::{
14 atomic::{AtomicI64, Ordering},
15 Arc,
16 },
17 time::{SystemTime, UNIX_EPOCH},
18};
19
20use async_lock::{Semaphore, SemaphoreGuard};
21use futures::{future::join_all, StreamExt as _};
22use linera_base::{ensure, util::future::FutureSyncExt as _};
23use scylla::{
24 client::{
25 execution_profile::{ExecutionProfile, ExecutionProfileHandle},
26 session::Session,
27 session_builder::SessionBuilder,
28 },
29 deserialize::{DeserializationError, TypeCheckError},
30 errors::{
31 DbError, ExecutionError, IntoRowsResultError, NewSessionError, NextPageError, NextRowError,
32 PagerExecutionError, PrepareError, RequestAttemptError, RequestError, RowsError,
33 },
34 policies::{
35 load_balancing::{DefaultPolicy, LoadBalancingPolicy},
36 retry::DefaultRetryPolicy,
37 },
38 response::PagingState,
39 statement::{batch::BatchType, prepared::PreparedStatement, Consistency},
40 value::CqlValue,
41};
42use serde::{Deserialize, Serialize};
43use thiserror::Error;
44
45#[cfg(with_metrics)]
46use crate::metering::MeteredDatabase;
47#[cfg(with_testing)]
48use crate::store::TestKeyValueDatabase;
49use crate::{
50 batch::{SimpleUnorderedBatch, UnorderedBatch},
51 common::{get_uleb128_size, get_upper_bound_option},
52 journaling::{JournalingError, JournalingKeyValueDatabase},
53 lru_caching::{LruCachingConfig, LruCachingDatabase},
54 store::{
55 DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
56 WithError,
57 },
58 value_splitting::{ValueSplittingDatabase, ValueSplittingError},
59};
60
61const MAX_MULTI_KEYS: usize = 100 - 1;
64
65const RAW_MAX_VALUE_SIZE: usize = 16 * 1024 * 1024 - 10 * 1024 - 4000;
73const MAX_KEY_SIZE: usize = 10 * 1024;
74const MAX_BATCH_TOTAL_SIZE: usize = RAW_MAX_VALUE_SIZE + MAX_KEY_SIZE;
75
76const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
89 - MAX_KEY_SIZE
90 - get_uleb128_size(RAW_MAX_VALUE_SIZE)
91 - get_uleb128_size(MAX_KEY_SIZE)
92 - 3;
93
94const MAX_BATCH_SIZE: usize = 5000;
100
101const KEYSPACE: &str = "kv";
103
104struct ScyllaDbClient {
109 session: Session,
110 namespace: String,
111 read_value: PreparedStatement,
112 read_writetime: PreparedStatement,
113 contains_key: PreparedStatement,
114 write_batch_delete_prefix_unbounded: PreparedStatement,
115 write_batch_delete_prefix_bounded: PreparedStatement,
116 write_batch_deletion: PreparedStatement,
117 write_batch_insertion: PreparedStatement,
118 write_batch_delete_prefix_unbounded_ts: PreparedStatement,
121 write_batch_delete_prefix_bounded_ts: PreparedStatement,
122 write_batch_deletion_ts: PreparedStatement,
123 write_batch_insertion_ts: PreparedStatement,
124 find_keys_by_prefix_unbounded: PreparedStatement,
125 find_keys_by_prefix_bounded: PreparedStatement,
126 find_key_values_by_prefix_unbounded: PreparedStatement,
127 find_key_values_by_prefix_bounded: PreparedStatement,
128 multi_key_values: papaya::HashMap<usize, PreparedStatement>,
129 multi_keys: papaya::HashMap<usize, PreparedStatement>,
130}
131
132impl ScyllaDbClient {
133 async fn new(session: Session, namespace: &str) -> Result<Self, ScyllaDbStoreInternalError> {
134 let namespace = namespace.to_string();
135 let read_value = session
136 .prepare(format!(
137 "SELECT v FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k = ?"
138 ))
139 .await?;
140
141 let read_writetime = session
142 .prepare(format!(
143 "SELECT WRITETIME(v) FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k = ?"
144 ))
145 .await?;
146
147 let contains_key = session
148 .prepare(format!(
149 "SELECT root_key FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k = ?"
150 ))
151 .await?;
152
153 let write_batch_delete_prefix_unbounded = session
154 .prepare(format!(
155 "DELETE FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ?"
156 ))
157 .await?;
158
159 let write_batch_delete_prefix_bounded = session
160 .prepare(format!(
161 "DELETE FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ? AND k < ?"
162 ))
163 .await?;
164
165 let write_batch_deletion = session
166 .prepare(format!(
167 "DELETE FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k = ?"
168 ))
169 .await?;
170
171 let write_batch_insertion = session
172 .prepare(format!(
173 "INSERT INTO {KEYSPACE}.\"{namespace}\" (root_key, k, v) VALUES (?, ?, ?)"
174 ))
175 .await?;
176
177 let write_batch_delete_prefix_unbounded_ts = session
182 .prepare(format!(
183 "DELETE FROM {KEYSPACE}.\"{namespace}\" USING TIMESTAMP ? WHERE root_key = ? AND k >= ?"
184 ))
185 .await?;
186
187 let write_batch_delete_prefix_bounded_ts = session
188 .prepare(format!(
189 "DELETE FROM {KEYSPACE}.\"{namespace}\" USING TIMESTAMP ? \
190 WHERE root_key = ? AND k >= ? AND k < ?"
191 ))
192 .await?;
193
194 let write_batch_deletion_ts = session
195 .prepare(format!(
196 "DELETE FROM {KEYSPACE}.\"{namespace}\" USING TIMESTAMP ? WHERE root_key = ? AND k = ?"
197 ))
198 .await?;
199
200 let write_batch_insertion_ts = session
201 .prepare(format!(
202 "INSERT INTO {KEYSPACE}.\"{namespace}\" (root_key, k, v) VALUES (?, ?, ?) \
203 USING TIMESTAMP ?"
204 ))
205 .await?;
206
207 let find_keys_by_prefix_unbounded = session
208 .prepare(format!(
209 "SELECT k FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ?"
210 ))
211 .await?;
212
213 let find_keys_by_prefix_bounded = session
214 .prepare(format!(
215 "SELECT k FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ? AND k < ?"
216 ))
217 .await?;
218
219 let find_key_values_by_prefix_unbounded = session
220 .prepare(format!(
221 "SELECT k,v FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ?"
222 ))
223 .await?;
224
225 let find_key_values_by_prefix_bounded = session
226 .prepare(format!(
227 "SELECT k,v FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ? AND k < ?"
228 ))
229 .await?;
230
231 Ok(Self {
232 session,
233 namespace,
234 read_value,
235 read_writetime,
236 contains_key,
237 write_batch_delete_prefix_unbounded,
238 write_batch_delete_prefix_bounded,
239 write_batch_deletion,
240 write_batch_insertion,
241 write_batch_delete_prefix_unbounded_ts,
242 write_batch_delete_prefix_bounded_ts,
243 write_batch_deletion_ts,
244 write_batch_insertion_ts,
245 find_keys_by_prefix_unbounded,
246 find_keys_by_prefix_bounded,
247 find_key_values_by_prefix_unbounded,
248 find_key_values_by_prefix_bounded,
249 multi_key_values: papaya::HashMap::new(),
250 multi_keys: papaya::HashMap::new(),
251 })
252 }
253
254 fn build_default_policy() -> Arc<dyn LoadBalancingPolicy> {
255 DefaultPolicy::builder().token_aware(true).build()
256 }
257
258 fn build_default_execution_profile_handle(
259 policy: Arc<dyn LoadBalancingPolicy>,
260 ) -> ExecutionProfileHandle {
261 let default_profile = ExecutionProfile::builder()
262 .load_balancing_policy(policy)
263 .retry_policy(Arc::new(DefaultRetryPolicy::new()))
264 .consistency(Consistency::LocalQuorum)
265 .build();
266 default_profile.into_handle()
267 }
268
269 async fn build_default_session(uri: &str) -> Result<Session, ScyllaDbStoreInternalError> {
270 SessionBuilder::new()
273 .known_node(uri)
274 .default_execution_profile_handle(Self::build_default_execution_profile_handle(
275 Self::build_default_policy(),
276 ))
277 .build()
278 .boxed_sync()
279 .await
280 .map_err(Into::into)
281 }
282
283 async fn get_multi_key_values_statement(
284 &self,
285 num_markers: usize,
286 ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
287 if let Some(prepared_statement) = self.multi_key_values.pin().get(&num_markers) {
288 return Ok(prepared_statement.clone());
289 }
290 let markers = std::iter::repeat_n("?", num_markers)
291 .collect::<Vec<_>>()
292 .join(",");
293 let prepared_statement = self
294 .session
295 .prepare(format!(
296 "SELECT k,v FROM {}.\"{}\" WHERE root_key = ? AND k IN ({})",
297 KEYSPACE, self.namespace, markers
298 ))
299 .await?;
300 self.multi_key_values
301 .pin()
302 .insert(num_markers, prepared_statement.clone());
303 Ok(prepared_statement)
304 }
305
306 async fn get_multi_keys_statement(
307 &self,
308 num_markers: usize,
309 ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
310 if let Some(prepared_statement) = self.multi_keys.pin().get(&num_markers) {
311 return Ok(prepared_statement.clone());
312 };
313 let markers = std::iter::repeat_n("?", num_markers)
314 .collect::<Vec<_>>()
315 .join(",");
316 let prepared_statement = self
317 .session
318 .prepare(format!(
319 "SELECT k FROM {}.\"{}\" WHERE root_key = ? AND k IN ({})",
320 KEYSPACE, self.namespace, markers
321 ))
322 .await?;
323 self.multi_keys
324 .pin()
325 .insert(num_markers, prepared_statement.clone());
326 Ok(prepared_statement)
327 }
328
329 fn check_key_size(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
330 ensure!(
331 key.len() <= MAX_KEY_SIZE,
332 ScyllaDbStoreInternalError::KeyTooLong
333 );
334 Ok(())
335 }
336
337 fn check_value_size(value: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
338 ensure!(
339 value.len() <= RAW_MAX_VALUE_SIZE,
340 ScyllaDbStoreInternalError::ValueTooLong
341 );
342 Ok(())
343 }
344
345 fn check_batch_key(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
351 Self::check_key_size(key)?;
352 ensure!(!key.is_empty(), ScyllaDbStoreInternalError::ZeroLengthKey);
353 Ok(())
354 }
355
356 fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> {
357 ensure!(
358 batch.len() <= MAX_BATCH_SIZE,
359 ScyllaDbStoreInternalError::BatchTooLong
360 );
361 Ok(())
362 }
363
364 async fn read_value_internal(
365 &self,
366 root_key: &[u8],
367 key: Vec<u8>,
368 ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
369 Self::check_key_size(&key)?;
370 let session = &self.session;
371 let values = (root_key.to_vec(), key);
373
374 let (result, _) = session
375 .execute_single_page(&self.read_value, &values, PagingState::start())
376 .await
377 .map_err(ScyllaDbStoreInternalError::ExecutionError)?;
378 let rows = result.into_rows_result()?;
379 let mut rows = rows.rows::<(Vec<u8>,)>()?;
380 Ok(match rows.next() {
381 Some(row) => Some(row?.0),
382 None => None,
383 })
384 }
385
386 fn get_occurrences_map(
387 keys: Vec<Vec<u8>>,
388 ) -> Result<HashMap<Vec<u8>, Vec<usize>>, ScyllaDbStoreInternalError> {
389 let mut map = HashMap::<Vec<u8>, Vec<usize>>::new();
390 for (i_key, key) in keys.into_iter().enumerate() {
391 Self::check_key_size(&key)?;
392 map.entry(key).or_default().push(i_key);
393 }
394 Ok(map)
395 }
396
397 async fn read_multi_values_internal(
398 &self,
399 root_key: &[u8],
400 keys: Vec<Vec<u8>>,
401 ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
402 let mut values = vec![None; keys.len()];
403 let map = Self::get_occurrences_map(keys)?;
404 let statement = self.get_multi_key_values_statement(map.len()).await?;
405 let mut inputs = vec![root_key.to_vec()];
406 inputs.extend(map.keys().cloned());
407 let mut rows = Box::pin(self.session.execute_iter(statement, &inputs))
408 .await?
409 .rows_stream::<(Vec<u8>, Vec<u8>)>()?;
410
411 while let Some(row) = rows.next().await {
412 let (key, value) = row?;
413 if let Some((&last, rest)) = map[&key].split_last() {
414 for position in rest {
415 values[*position] = Some(value.clone());
416 }
417 values[last] = Some(value);
418 }
419 }
420 Ok(values)
421 }
422
423 async fn contains_keys_internal(
424 &self,
425 root_key: &[u8],
426 keys: Vec<Vec<u8>>,
427 ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
428 let mut values = vec![false; keys.len()];
429 let map = Self::get_occurrences_map(keys)?;
430 let statement = self.get_multi_keys_statement(map.len()).await?;
431 let mut inputs = vec![root_key.to_vec()];
432 inputs.extend(map.keys().cloned());
433 let mut rows = Box::pin(self.session.execute_iter(statement, &inputs))
434 .await?
435 .rows_stream::<(Vec<u8>,)>()?;
436
437 while let Some(row) = rows.next().await {
438 let (key,) = row?;
439 for i_key in &map[&key] {
440 values[*i_key] = true;
441 }
442 }
443
444 Ok(values)
445 }
446
447 async fn contains_key_internal(
448 &self,
449 root_key: &[u8],
450 key: Vec<u8>,
451 ) -> Result<bool, ScyllaDbStoreInternalError> {
452 Self::check_key_size(&key)?;
453 let session = &self.session;
454 let values = (root_key.to_vec(), key);
456
457 let (result, _) = session
458 .execute_single_page(&self.contains_key, &values, PagingState::start())
459 .await
460 .map_err(ScyllaDbStoreInternalError::ExecutionError)?;
461 let rows = result.into_rows_result()?;
462 let mut rows = rows.rows::<(Vec<u8>,)>()?;
463 Ok(rows.next().is_some())
464 }
465
466 async fn read_writetime_internal(
469 &self,
470 root_key: &[u8],
471 key: Vec<u8>,
472 ) -> Result<Option<i64>, ScyllaDbStoreInternalError> {
473 Self::check_key_size(&key)?;
474 let session = &self.session;
475 let values = (root_key.to_vec(), key);
476 let (result, _) = session
477 .execute_single_page(&self.read_writetime, &values, PagingState::start())
478 .await
479 .map_err(ScyllaDbStoreInternalError::ExecutionError)?;
480 let rows = result.into_rows_result()?;
481 let mut rows = rows.rows::<(Option<i64>,)>()?;
482 Ok(match rows.next() {
483 Some(row) => row?.0,
484 None => None,
485 })
486 }
487
488 async fn write_batch_prefix_deletes(
491 &self,
492 root_key: &[u8],
493 key_prefix_deletions: Vec<Vec<u8>>,
494 ) -> Result<(), ScyllaDbStoreInternalError> {
495 if key_prefix_deletions.is_empty() {
496 return Ok(());
497 }
498 let session = &self.session;
499 let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged);
500 let mut batch_values = Vec::new();
501 let q_unbounded = &self.write_batch_delete_prefix_unbounded;
502 let q_bounded = &self.write_batch_delete_prefix_bounded;
503 for key_prefix in key_prefix_deletions {
504 Self::check_key_size(&key_prefix)?;
505 match get_upper_bound_option(&key_prefix) {
506 None => {
507 batch_values.push(vec![root_key.to_vec(), key_prefix]);
508 batch_query.append_statement(q_unbounded.clone());
509 }
510 Some(upper_bound) => {
511 batch_values.push(vec![root_key.to_vec(), key_prefix, upper_bound]);
512 batch_query.append_statement(q_bounded.clone());
513 }
514 }
515 }
516 session
517 .batch(&batch_query, batch_values)
518 .await
519 .map_err(ScyllaDbStoreInternalError::WriteBatchExecutionError)?;
520 Ok(())
521 }
522
523 async fn write_simple_batch(
526 &self,
527 root_key: &[u8],
528 batch: SimpleUnorderedBatch,
529 ) -> Result<(), ScyllaDbStoreInternalError> {
530 if batch.deletions.is_empty() && batch.insertions.is_empty() {
531 return Ok(());
532 }
533 let session = &self.session;
534 let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged);
535 let mut batch_values = Vec::new();
536 let q_deletion = &self.write_batch_deletion;
537 for key in batch.deletions {
538 Self::check_batch_key(&key)?;
539 batch_values.push(vec![root_key.to_vec(), key]);
540 batch_query.append_statement(q_deletion.clone());
541 }
542 let q_insertion = &self.write_batch_insertion;
543 for (key, value) in batch.insertions {
544 Self::check_batch_key(&key)?;
545 Self::check_value_size(&value)?;
546 batch_values.push(vec![root_key.to_vec(), key, value]);
547 batch_query.append_statement(q_insertion.clone());
548 }
549 session
550 .batch(&batch_query, batch_values)
551 .await
552 .map_err(ScyllaDbStoreInternalError::WriteBatchExecutionError)?;
553 Ok(())
554 }
555
556 async fn write_batch_exclusive(
568 &self,
569 root_key: &[u8],
570 batch: UnorderedBatch,
571 t: i64,
572 ) -> Result<(), ScyllaDbStoreInternalError> {
573 let UnorderedBatch {
574 key_prefix_deletions,
575 simple_unordered_batch:
576 SimpleUnorderedBatch {
577 deletions,
578 insertions,
579 },
580 } = batch;
581 let session = &self.session;
582 let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged);
583 let mut batch_values = Vec::new();
584
585 for key_prefix in key_prefix_deletions {
587 Self::check_key_size(&key_prefix)?;
588 match get_upper_bound_option(&key_prefix) {
589 None => {
590 batch_values.push(vec![
591 CqlValue::BigInt(t),
592 CqlValue::Blob(root_key.to_vec()),
593 CqlValue::Blob(key_prefix),
594 ]);
595 batch_query
596 .append_statement(self.write_batch_delete_prefix_unbounded_ts.clone());
597 }
598 Some(upper_bound) => {
599 batch_values.push(vec![
600 CqlValue::BigInt(t),
601 CqlValue::Blob(root_key.to_vec()),
602 CqlValue::Blob(key_prefix),
603 CqlValue::Blob(upper_bound),
604 ]);
605 batch_query.append_statement(self.write_batch_delete_prefix_bounded_ts.clone());
606 }
607 }
608 }
609
610 let t_data = t + 1;
612 for key in deletions {
613 Self::check_batch_key(&key)?;
614 batch_values.push(vec![
615 CqlValue::BigInt(t_data),
616 CqlValue::Blob(root_key.to_vec()),
617 CqlValue::Blob(key),
618 ]);
619 batch_query.append_statement(self.write_batch_deletion_ts.clone());
620 }
621 for (key, value) in insertions {
622 Self::check_batch_key(&key)?;
623 Self::check_value_size(&value)?;
624 batch_values.push(vec![
625 CqlValue::Blob(root_key.to_vec()),
626 CqlValue::Blob(key),
627 CqlValue::Blob(value),
628 CqlValue::BigInt(t_data),
629 ]);
630 batch_query.append_statement(self.write_batch_insertion_ts.clone());
631 }
632 batch_values.push(vec![
633 CqlValue::Blob(root_key.to_vec()),
634 CqlValue::Blob(WRITETIME_SENTINEL_KEY.to_vec()),
635 CqlValue::Blob(Vec::new()),
636 CqlValue::BigInt(t_data),
637 ]);
638 batch_query.append_statement(self.write_batch_insertion_ts.clone());
639
640 session
641 .batch(&batch_query, batch_values)
642 .await
643 .map_err(ScyllaDbStoreInternalError::WriteBatchExecutionError)?;
644 Ok(())
645 }
646
647 async fn find_keys_by_prefix_internal(
648 &self,
649 root_key: &[u8],
650 key_prefix: Vec<u8>,
651 ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
652 Self::check_key_size(&key_prefix)?;
653 let session = &self.session;
654 let len = key_prefix.len();
656 let query_unbounded = &self.find_keys_by_prefix_unbounded;
657 let query_bounded = &self.find_keys_by_prefix_bounded;
658 let rows = match get_upper_bound_option(&key_prefix) {
659 None => {
660 let values = (root_key.to_vec(), key_prefix.clone());
661 Box::pin(session.execute_iter(query_unbounded.clone(), values)).await?
662 }
663 Some(upper_bound) => {
664 let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
665 Box::pin(session.execute_iter(query_bounded.clone(), values)).await?
666 }
667 };
668 let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
669 let mut keys = Vec::new();
670 while let Some(row) = rows.next().await {
671 let (key,) = row?;
672 if key == WRITETIME_SENTINEL_KEY {
676 continue;
677 }
678 let short_key = key[len..].to_vec();
679 keys.push(short_key);
680 }
681 Ok(keys)
682 }
683
684 async fn find_key_values_by_prefix_internal(
685 &self,
686 root_key: &[u8],
687 key_prefix: Vec<u8>,
688 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
689 Self::check_key_size(&key_prefix)?;
690 let session = &self.session;
691 let len = key_prefix.len();
693 let query_unbounded = &self.find_key_values_by_prefix_unbounded;
694 let query_bounded = &self.find_key_values_by_prefix_bounded;
695 let rows = match get_upper_bound_option(&key_prefix) {
696 None => {
697 let values = (root_key.to_vec(), key_prefix.clone());
698 Box::pin(session.execute_iter(query_unbounded.clone(), values)).await?
699 }
700 Some(upper_bound) => {
701 let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
702 Box::pin(session.execute_iter(query_bounded.clone(), values)).await?
703 }
704 };
705 let mut rows = rows.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
706 let mut key_values = Vec::new();
707 while let Some(row) = rows.next().await {
708 let (key, value) = row?;
709 if key == WRITETIME_SENTINEL_KEY {
711 continue;
712 }
713 let short_key = key[len..].to_vec();
714 key_values.push((short_key, value));
715 }
716 Ok(key_values)
717 }
718}
719
720#[derive(Clone)]
722pub struct ScyllaDbStoreInternal {
723 store: Arc<ScyllaDbClient>,
724 semaphore: Option<Arc<Semaphore>>,
725 max_stream_queries: usize,
726 root_key: Vec<u8>,
727 is_exclusive: bool,
732 ts_floor: Arc<AtomicI64>,
736}
737
738#[derive(Clone)]
740pub struct ScyllaDbDatabaseInternal {
741 store: Arc<ScyllaDbClient>,
742 semaphore: Option<Arc<Semaphore>>,
743 max_stream_queries: usize,
744}
745
746impl WithError for ScyllaDbDatabaseInternal {
747 type Error = ScyllaDbStoreInternalError;
748}
749
750#[derive(Error, Debug)]
752pub enum ScyllaDbStoreInternalError {
753 #[error(transparent)]
755 BcsError(#[from] bcs::Error),
756
757 #[error(transparent)]
759 DeserializationError(#[from] DeserializationError),
760
761 #[error(transparent)]
763 RowsError(#[from] RowsError),
764
765 #[error(transparent)]
767 IntoRowsResultError(#[from] IntoRowsResultError),
768
769 #[error(transparent)]
771 TypeCheckError(#[from] TypeCheckError),
772
773 #[error(transparent)]
775 PagerExecutionError(#[from] PagerExecutionError),
776
777 #[error(transparent)]
779 PrepareError(#[from] PrepareError),
780
781 #[error(transparent)]
783 ExecutionError(ExecutionError),
784
785 #[error(transparent)]
787 WriteBatchExecutionError(ExecutionError),
788
789 #[error(transparent)]
791 NewSessionError(#[from] NewSessionError),
792
793 #[error(transparent)]
795 NextRowError(#[from] NextRowError),
796
797 #[error("Namespace contains forbidden characters")]
799 InvalidNamespace,
800
801 #[error("The key must have at most MAX_KEY_SIZE")]
803 KeyTooLong,
804
805 #[error("The value must have at most RAW_MAX_VALUE_SIZE")]
807 ValueTooLong,
808
809 #[error("The batch is too long to be written")]
811 BatchTooLong,
812
813 #[error("The key must be of nonzero length")]
816 ZeroLengthKey,
817}
818
819impl KeyValueStoreError for ScyllaDbStoreInternalError {
820 const BACKEND: &'static str = "scylla_db";
821
822 fn must_reload_view(&self) -> bool {
823 matches!(self, Self::WriteBatchExecutionError(_))
826 }
827}
828
829impl WithError for ScyllaDbStoreInternal {
830 type Error = ScyllaDbStoreInternalError;
831}
832
833impl ReadableKeyValueStore for ScyllaDbStoreInternal {
834 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
835
836 fn max_stream_queries(&self) -> usize {
837 self.max_stream_queries
838 }
839
840 fn root_key(&self) -> Result<Vec<u8>, ScyllaDbStoreInternalError> {
841 Ok(self.root_key[1..].to_vec())
842 }
843
844 async fn read_value_bytes(
845 &self,
846 key: &[u8],
847 ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
848 let store = self.store.deref();
849 let _guard = self.acquire().await;
850 Box::pin(store.read_value_internal(&self.root_key, key.to_vec())).await
851 }
852
853 async fn contains_key(&self, key: &[u8]) -> Result<bool, ScyllaDbStoreInternalError> {
854 let store = self.store.deref();
855 let _guard = self.acquire().await;
856 Box::pin(store.contains_key_internal(&self.root_key, key.to_vec())).await
857 }
858
859 async fn contains_keys(
860 &self,
861 keys: &[Vec<u8>],
862 ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
863 if keys.is_empty() {
864 return Ok(Vec::new());
865 }
866 let store = self.store.deref();
867 let _guard = self.acquire().await;
868 let handles = keys
869 .chunks(MAX_MULTI_KEYS)
870 .map(|keys| store.contains_keys_internal(&self.root_key, keys.to_vec()));
871 let results: Vec<_> = join_all(handles)
872 .await
873 .into_iter()
874 .collect::<Result<_, _>>()?;
875 Ok(results.into_iter().flatten().collect())
876 }
877
878 async fn read_multi_values_bytes(
879 &self,
880 keys: &[Vec<u8>],
881 ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
882 if keys.is_empty() {
883 return Ok(Vec::new());
884 }
885 let store = self.store.deref();
886 let _guard = self.acquire().await;
887 let handles = keys
888 .chunks(MAX_MULTI_KEYS)
889 .map(|keys| store.read_multi_values_internal(&self.root_key, keys.to_vec()));
890 let results: Vec<_> = join_all(handles)
891 .await
892 .into_iter()
893 .collect::<Result<_, _>>()?;
894 Ok(results.into_iter().flatten().collect())
895 }
896
897 async fn find_keys_by_prefix(
898 &self,
899 key_prefix: &[u8],
900 ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
901 let store = self.store.deref();
902 let _guard = self.acquire().await;
903 Box::pin(store.find_keys_by_prefix_internal(&self.root_key, key_prefix.to_vec())).await
904 }
905
906 async fn find_key_values_by_prefix(
907 &self,
908 key_prefix: &[u8],
909 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
910 let store = self.store.deref();
911 let _guard = self.acquire().await;
912 Box::pin(store.find_key_values_by_prefix_internal(&self.root_key, key_prefix.to_vec()))
913 .await
914 }
915}
916
917impl DirectWritableKeyValueStore for ScyllaDbStoreInternal {
918 const MAX_BATCH_SIZE: usize = MAX_BATCH_SIZE;
919 const MAX_BATCH_TOTAL_SIZE: usize = MAX_BATCH_TOTAL_SIZE;
920 const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
921
922 type Batch = UnorderedBatch;
935
936 async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> {
937 let store = self.store.deref();
938 let _guard = self.acquire().await;
939 ScyllaDbClient::check_batch_len(&batch)?;
940 if self.is_exclusive {
941 let t = self.next_batch_ts().await?;
943 store.write_batch_exclusive(&self.root_key, batch, t).await
944 } else {
945 store
946 .write_batch_prefix_deletes(&self.root_key, batch.key_prefix_deletions)
947 .await?;
948 store
949 .write_simple_batch(&self.root_key, batch.simple_unordered_batch)
950 .await?;
951 Ok(())
952 }
953 }
954}
955
956impl ScyllaDbStoreInternal {
957 async fn ensure_ts_seeded(&self) -> Result<(), ScyllaDbStoreInternalError> {
963 if self.ts_floor.load(Ordering::Relaxed) > 0 {
964 return Ok(());
965 }
966 let writetime = self
967 .store
968 .read_writetime_internal(&self.root_key, WRITETIME_SENTINEL_KEY.to_vec())
969 .await?
970 .unwrap_or(0);
971 let now_us = SystemTime::now()
972 .duration_since(UNIX_EPOCH)
973 .ok()
974 .and_then(|d| i64::try_from(d.as_micros()).ok())
975 .unwrap_or(0);
976 let seed = now_us.max(writetime);
979 if self
980 .ts_floor
981 .compare_exchange(0, seed, Ordering::Relaxed, Ordering::Relaxed)
982 .is_err()
983 {
984 }
986 Ok(())
987 }
988
989 async fn next_batch_ts(&self) -> Result<i64, ScyllaDbStoreInternalError> {
993 self.ensure_ts_seeded().await?;
994 loop {
995 let prev = self.ts_floor.load(Ordering::Relaxed);
996 let now_us = SystemTime::now()
997 .duration_since(UNIX_EPOCH)
998 .ok()
999 .and_then(|d| i64::try_from(d.as_micros()).ok())
1000 .unwrap_or(prev);
1001 let next = std::cmp::max(now_us, prev + 1);
1002 if self
1005 .ts_floor
1006 .compare_exchange_weak(prev, next + 1, Ordering::Relaxed, Ordering::Relaxed)
1007 .is_ok()
1008 {
1009 return Ok(next);
1010 }
1011 }
1012 }
1013}
1014
1015fn get_big_root_key(root_key: &[u8]) -> Vec<u8> {
1017 let mut big_key = vec![0];
1018 big_key.extend(root_key);
1019 big_key
1020}
1021
1022const WRITETIME_SENTINEL_KEY: &[u8] = &[];
1028
1029#[derive(Clone, Debug, Deserialize, Serialize)]
1031pub struct ScyllaDbStoreInternalConfig {
1032 pub uri: String,
1034 pub max_concurrent_queries: Option<usize>,
1036 pub max_stream_queries: usize,
1038 pub replication_factor: u32,
1040}
1041
1042impl KeyValueDatabase for ScyllaDbDatabaseInternal {
1043 type Config = ScyllaDbStoreInternalConfig;
1044 type Store = ScyllaDbStoreInternal;
1045
1046 fn get_name() -> String {
1047 "scylladb internal".to_string()
1048 }
1049
1050 async fn connect(
1051 config: &Self::Config,
1052 namespace: &str,
1053 ) -> Result<Self, ScyllaDbStoreInternalError> {
1054 Self::check_namespace(namespace)?;
1055 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
1056 let store = ScyllaDbClient::new(session, namespace).await?;
1057 let store = Arc::new(store);
1058 let semaphore = config
1059 .max_concurrent_queries
1060 .map(|n| Arc::new(Semaphore::new(n)));
1061 let max_stream_queries = config.max_stream_queries;
1062 Ok(Self {
1063 store,
1064 semaphore,
1065 max_stream_queries,
1066 })
1067 }
1068
1069 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
1070 let store = self.store.clone();
1071 let semaphore = self.semaphore.clone();
1072 let max_stream_queries = self.max_stream_queries;
1073 let root_key = get_big_root_key(root_key);
1074 Ok(ScyllaDbStoreInternal {
1075 store,
1076 semaphore,
1077 max_stream_queries,
1078 root_key,
1079 is_exclusive: false,
1080 ts_floor: Arc::new(AtomicI64::new(0)),
1081 })
1082 }
1083
1084 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
1085 let store = self.store.clone();
1086 let semaphore = self.semaphore.clone();
1087 let max_stream_queries = self.max_stream_queries;
1088 let root_key = get_big_root_key(root_key);
1089 Ok(ScyllaDbStoreInternal {
1090 store,
1091 semaphore,
1092 max_stream_queries,
1093 root_key,
1094 is_exclusive: true,
1095 ts_floor: Arc::new(AtomicI64::new(0)),
1096 })
1097 }
1098
1099 async fn list_all(config: &Self::Config) -> Result<Vec<String>, ScyllaDbStoreInternalError> {
1100 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
1101 let statement = session
1102 .prepare(format!("DESCRIBE KEYSPACE {KEYSPACE}"))
1103 .await?;
1104 let result = Box::pin(session.execute_iter(statement, &[])).await;
1105 let miss_msg = format!("'{KEYSPACE}' not found in keyspaces");
1106 let result = match result {
1107 Ok(result) => result,
1108 Err(error) => {
1109 let invalid_or_keyspace_not_found = match &error {
1110 PagerExecutionError::NextPageError(NextPageError::RequestFailure(
1111 RequestError::LastAttemptError(RequestAttemptError::DbError(db_error, msg)),
1112 )) => *db_error == DbError::Invalid && msg.as_str() == miss_msg,
1113 _ => false,
1114 };
1115 if invalid_or_keyspace_not_found {
1116 return Ok(Vec::new());
1117 } else {
1118 return Err(ScyllaDbStoreInternalError::PagerExecutionError(error));
1119 }
1120 }
1121 };
1122 let mut namespaces = Vec::new();
1123 let mut rows_stream = result.rows_stream::<(String, String, String, String)>()?;
1124 while let Some(row) = rows_stream.next().await {
1125 let (_, object_kind, name, _) = row?;
1126 if object_kind == "table" {
1127 namespaces.push(name);
1128 }
1129 }
1130 Ok(namespaces)
1131 }
1132
1133 async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
1134 let statement = self
1135 .store
1136 .session
1137 .prepare(format!(
1138 "SELECT root_key FROM {}.\"{}\" ALLOW FILTERING",
1139 KEYSPACE, self.store.namespace
1140 ))
1141 .await?;
1142
1143 let rows = Box::pin(self.store.session.execute_iter(statement, &[])).await?;
1145 let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
1146 let mut root_keys = BTreeSet::new();
1147 while let Some(row) = rows.next().await {
1148 let (root_key,) = row?;
1149 let root_key = root_key[1..].to_vec();
1150 root_keys.insert(root_key);
1151 }
1152 Ok(root_keys.into_iter().collect::<Vec<_>>())
1153 }
1154
1155 async fn delete_all(store_config: &Self::Config) -> Result<(), ScyllaDbStoreInternalError> {
1156 let session = ScyllaDbClient::build_default_session(&store_config.uri).await?;
1157 let statement = session
1158 .prepare(format!("DROP KEYSPACE IF EXISTS {KEYSPACE}"))
1159 .await?;
1160
1161 session
1162 .execute_single_page(&statement, &[], PagingState::start())
1163 .await
1164 .map_err(ScyllaDbStoreInternalError::ExecutionError)?;
1165 Ok(())
1166 }
1167
1168 async fn exists(
1169 config: &Self::Config,
1170 namespace: &str,
1171 ) -> Result<bool, ScyllaDbStoreInternalError> {
1172 Self::check_namespace(namespace)?;
1173 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
1174
1175 let result = session
1177 .prepare(format!(
1178 "SELECT root_key FROM {KEYSPACE}.\"{namespace}\" LIMIT 1 ALLOW FILTERING"
1179 ))
1180 .await;
1181
1182 let miss_msg1 = format!("unconfigured table {namespace}");
1184 let miss_msg1 = miss_msg1.as_str();
1185 let miss_msg2 = "Undefined name root_key in selection clause";
1186 let miss_msg3 = format!("Keyspace {KEYSPACE} does not exist");
1187 let Err(error) = result else {
1188 return Ok(true);
1190 };
1191 let missing_table = match &error {
1192 PrepareError::AllAttemptsFailed {
1193 first_attempt: RequestAttemptError::DbError(db_error, msg),
1194 } => {
1195 if *db_error != DbError::Invalid {
1196 false
1197 } else {
1198 msg.as_str() == miss_msg1
1199 || msg.as_str() == miss_msg2
1200 || msg.as_str() == miss_msg3
1201 }
1202 }
1203 _ => false,
1204 };
1205 if missing_table {
1206 Ok(false)
1207 } else {
1208 Err(ScyllaDbStoreInternalError::PrepareError(error))
1209 }
1210 }
1211
1212 async fn create(
1213 config: &Self::Config,
1214 namespace: &str,
1215 ) -> Result<(), ScyllaDbStoreInternalError> {
1216 Self::check_namespace(namespace)?;
1217 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
1218
1219 let statement = session
1221 .prepare(format!(
1222 "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ \
1223 'class' : 'NetworkTopologyStrategy', \
1224 'replication_factor' : {} \
1225 }}",
1226 KEYSPACE, config.replication_factor
1227 ))
1228 .await?;
1229 session
1230 .execute_single_page(&statement, &[], PagingState::start())
1231 .await
1232 .map_err(ScyllaDbStoreInternalError::ExecutionError)?;
1233
1234 let statement = session
1237 .prepare(format!(
1238 "CREATE TABLE {KEYSPACE}.\"{namespace}\" (\
1239 root_key blob, \
1240 k blob, \
1241 v blob, \
1242 PRIMARY KEY (root_key, k) \
1243 ) \
1244 WITH compaction = {{ \
1245 'class' : 'LeveledCompactionStrategy', \
1246 'sstable_size_in_mb' : 160 \
1247 }} \
1248 AND compression = {{ \
1249 'sstable_compression': 'LZ4Compressor', \
1250 'chunk_length_in_kb':'4' \
1251 }} \
1252 AND caching = {{ \
1253 'enabled': 'true' \
1254 }} \
1255 AND gc_grace_seconds = 0 \
1256 AND tombstone_gc = {{'mode': 'immediate'}}"
1257 ))
1258 .await?;
1259 session
1260 .execute_single_page(&statement, &[], PagingState::start())
1261 .await
1262 .map_err(ScyllaDbStoreInternalError::ExecutionError)?;
1263 Ok(())
1264 }
1265
1266 async fn delete(
1267 config: &Self::Config,
1268 namespace: &str,
1269 ) -> Result<(), ScyllaDbStoreInternalError> {
1270 Self::check_namespace(namespace)?;
1271 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
1272 let statement = session
1273 .prepare(format!("DROP TABLE IF EXISTS {KEYSPACE}.\"{namespace}\";"))
1274 .await?;
1275 session
1276 .execute_single_page(&statement, &[], PagingState::start())
1277 .await
1278 .map_err(ScyllaDbStoreInternalError::ExecutionError)?;
1279 Ok(())
1280 }
1281}
1282
1283impl ScyllaDbStoreInternal {
1284 async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
1286 match &self.semaphore {
1287 None => None,
1288 Some(count) => Some(count.acquire().await),
1289 }
1290 }
1291}
1292
1293impl ScyllaDbDatabaseInternal {
1294 fn check_namespace(namespace: &str) -> Result<(), ScyllaDbStoreInternalError> {
1295 if !namespace.is_empty()
1296 && namespace.len() <= 48
1297 && namespace
1298 .chars()
1299 .all(|c| c.is_ascii_alphanumeric() || c == '_')
1300 {
1301 return Ok(());
1302 }
1303 Err(ScyllaDbStoreInternalError::InvalidNamespace)
1304 }
1305}
1306
1307#[cfg(with_testing)]
1308impl TestKeyValueDatabase for JournalingKeyValueDatabase<ScyllaDbDatabaseInternal> {
1309 async fn new_test_config(
1310 ) -> Result<ScyllaDbStoreInternalConfig, JournalingError<ScyllaDbStoreInternalError>> {
1311 let uri = "localhost:9042".to_string();
1313 Ok(ScyllaDbStoreInternalConfig {
1314 uri,
1315 max_concurrent_queries: Some(10),
1316 max_stream_queries: 10,
1317 replication_factor: 1,
1318 })
1319 }
1320}
1321
1322#[cfg(with_metrics)]
1324pub type ScyllaDbDatabase = MeteredDatabase<
1325 LruCachingDatabase<
1326 MeteredDatabase<
1327 ValueSplittingDatabase<
1328 MeteredDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
1329 >,
1330 >,
1331 >,
1332>;
1333
1334#[cfg(not(with_metrics))]
1336pub type ScyllaDbDatabase = LruCachingDatabase<
1337 ValueSplittingDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
1338>;
1339
1340pub type ScyllaDbStoreConfig = LruCachingConfig<ScyllaDbStoreInternalConfig>;
1342
1343pub type ScyllaDbStoreError = ValueSplittingError<JournalingError<ScyllaDbStoreInternalError>>;