1use std::{
11 collections::{BTreeSet, HashMap},
12 ops::Deref,
13 sync::Arc,
14};
15
16use async_lock::{Semaphore, SemaphoreGuard};
17use dashmap::{mapref::entry::Entry, DashMap};
18use futures::{future::join_all, StreamExt as _};
19use linera_base::ensure;
20use scylla::{
21 client::{
22 execution_profile::{ExecutionProfile, ExecutionProfileHandle},
23 session::Session,
24 session_builder::SessionBuilder,
25 },
26 deserialize::{DeserializationError, TypeCheckError},
27 errors::{
28 DbError, ExecutionError, IntoRowsResultError, NewSessionError, NextPageError, NextRowError,
29 PagerExecutionError, PrepareError, RequestAttemptError, RequestError, RowsError,
30 },
31 policies::{
32 load_balancing::{DefaultPolicy, LoadBalancingPolicy},
33 retry::DefaultRetryPolicy,
34 },
35 response::PagingState,
36 statement::{batch::BatchType, prepared::PreparedStatement, Consistency},
37};
38use serde::{Deserialize, Serialize};
39use thiserror::Error;
40
41#[cfg(with_metrics)]
42use crate::metering::MeteredStore;
43#[cfg(with_testing)]
44use crate::store::TestKeyValueStore;
45use crate::{
46 batch::UnorderedBatch,
47 common::{get_uleb128_size, get_upper_bound_option},
48 journaling::{DirectWritableKeyValueStore, JournalConsistencyError, JournalingKeyValueStore},
49 lru_caching::{LruCachingConfig, LruCachingStore},
50 store::{
51 AdminKeyValueStore, CommonStoreInternalConfig, KeyValueStoreError, ReadableKeyValueStore,
52 WithError,
53 },
54 value_splitting::{ValueSplittingError, ValueSplittingStore},
55 FutureSyncExt as _,
56};
57
58const MAX_MULTI_KEYS: usize = 100 - 1;
61
62const RAW_MAX_VALUE_SIZE: usize = 16 * 1024 * 1024 - 10 * 1024 - 4000;
70const MAX_KEY_SIZE: usize = 10 * 1024;
71const MAX_BATCH_TOTAL_SIZE: usize = RAW_MAX_VALUE_SIZE + MAX_KEY_SIZE;
72
73const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
86 - MAX_KEY_SIZE
87 - get_uleb128_size(RAW_MAX_VALUE_SIZE)
88 - get_uleb128_size(MAX_KEY_SIZE)
89 - 3;
90
91const MAX_BATCH_SIZE: usize = 5000;
97
98const KEYSPACE: &str = "kv";
100
101struct ScyllaDbClient {
106 session: Session,
107 namespace: String,
108 read_value: PreparedStatement,
109 contains_key: PreparedStatement,
110 write_batch_delete_prefix_unbounded: PreparedStatement,
111 write_batch_delete_prefix_bounded: PreparedStatement,
112 write_batch_deletion: PreparedStatement,
113 write_batch_insertion: PreparedStatement,
114 find_keys_by_prefix_unbounded: PreparedStatement,
115 find_keys_by_prefix_bounded: PreparedStatement,
116 find_key_values_by_prefix_unbounded: PreparedStatement,
117 find_key_values_by_prefix_bounded: PreparedStatement,
118 multi_key_values: DashMap<usize, PreparedStatement>,
119 multi_keys: DashMap<usize, PreparedStatement>,
120}
121
122impl ScyllaDbClient {
123 async fn new(session: Session, namespace: &str) -> Result<Self, ScyllaDbStoreInternalError> {
124 let namespace = namespace.to_string();
125 let read_value = session
126 .prepare(format!(
127 "SELECT v FROM {}.{} WHERE root_key = ? AND k = ?",
128 KEYSPACE, namespace
129 ))
130 .await?;
131
132 let contains_key = session
133 .prepare(format!(
134 "SELECT root_key FROM {}.{} WHERE root_key = ? AND k = ?",
135 KEYSPACE, namespace
136 ))
137 .await?;
138
139 let write_batch_delete_prefix_unbounded = session
140 .prepare(format!(
141 "DELETE FROM {}.{} WHERE root_key = ? AND k >= ?",
142 KEYSPACE, namespace
143 ))
144 .await?;
145
146 let write_batch_delete_prefix_bounded = session
147 .prepare(format!(
148 "DELETE FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?",
149 KEYSPACE, namespace
150 ))
151 .await?;
152
153 let write_batch_deletion = session
154 .prepare(format!(
155 "DELETE FROM {}.{} WHERE root_key = ? AND k = ?",
156 KEYSPACE, namespace
157 ))
158 .await?;
159
160 let write_batch_insertion = session
161 .prepare(format!(
162 "INSERT INTO {}.{} (root_key, k, v) VALUES (?, ?, ?)",
163 KEYSPACE, namespace
164 ))
165 .await?;
166
167 let find_keys_by_prefix_unbounded = session
168 .prepare(format!(
169 "SELECT k FROM {}.{} WHERE root_key = ? AND k >= ?",
170 KEYSPACE, namespace
171 ))
172 .await?;
173
174 let find_keys_by_prefix_bounded = session
175 .prepare(format!(
176 "SELECT k FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?",
177 KEYSPACE, namespace
178 ))
179 .await?;
180
181 let find_key_values_by_prefix_unbounded = session
182 .prepare(format!(
183 "SELECT k,v FROM {}.{} WHERE root_key = ? AND k >= ?",
184 KEYSPACE, namespace
185 ))
186 .await?;
187
188 let find_key_values_by_prefix_bounded = session
189 .prepare(format!(
190 "SELECT k,v FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?",
191 KEYSPACE, namespace
192 ))
193 .await?;
194
195 Ok(Self {
196 session,
197 namespace,
198 read_value,
199 contains_key,
200 write_batch_delete_prefix_unbounded,
201 write_batch_delete_prefix_bounded,
202 write_batch_deletion,
203 write_batch_insertion,
204 find_keys_by_prefix_unbounded,
205 find_keys_by_prefix_bounded,
206 find_key_values_by_prefix_unbounded,
207 find_key_values_by_prefix_bounded,
208 multi_key_values: DashMap::new(),
209 multi_keys: DashMap::new(),
210 })
211 }
212
213 fn build_default_policy() -> Arc<dyn LoadBalancingPolicy> {
214 DefaultPolicy::builder().token_aware(true).build()
215 }
216
217 fn build_default_execution_profile_handle(
218 policy: Arc<dyn LoadBalancingPolicy>,
219 ) -> ExecutionProfileHandle {
220 let default_profile = ExecutionProfile::builder()
221 .load_balancing_policy(policy)
222 .retry_policy(Arc::new(DefaultRetryPolicy::new()))
223 .consistency(Consistency::LocalQuorum)
224 .build();
225 default_profile.into_handle()
226 }
227
228 async fn build_default_session(uri: &str) -> Result<Session, ScyllaDbStoreInternalError> {
229 SessionBuilder::new()
232 .known_node(uri)
233 .default_execution_profile_handle(Self::build_default_execution_profile_handle(
234 Self::build_default_policy(),
235 ))
236 .build()
237 .boxed_sync()
238 .await
239 .map_err(Into::into)
240 }
241
242 async fn get_multi_key_values_statement(
243 &self,
244 num_markers: usize,
245 ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
246 let entry = self.multi_key_values.entry(num_markers);
247 match entry {
248 Entry::Occupied(entry) => Ok(entry.get().clone()),
249 Entry::Vacant(entry) => {
250 let markers = std::iter::repeat_n("?", num_markers)
251 .collect::<Vec<_>>()
252 .join(",");
253 let prepared_statement = self
254 .session
255 .prepare(format!(
256 "SELECT k,v FROM {}.{} WHERE root_key = ? AND k IN ({})",
257 KEYSPACE, self.namespace, markers
258 ))
259 .await?;
260 entry.insert(prepared_statement.clone());
261 Ok(prepared_statement)
262 }
263 }
264 }
265
266 async fn get_multi_keys_statement(
267 &self,
268 num_markers: usize,
269 ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
270 let entry = self.multi_keys.entry(num_markers);
271 match entry {
272 Entry::Occupied(entry) => Ok(entry.get().clone()),
273 Entry::Vacant(entry) => {
274 let markers = std::iter::repeat_n("?", num_markers)
275 .collect::<Vec<_>>()
276 .join(",");
277 let prepared_statement = self
278 .session
279 .prepare(format!(
280 "SELECT k FROM {}.{} WHERE root_key = ? AND k IN ({})",
281 KEYSPACE, self.namespace, markers
282 ))
283 .await?;
284 entry.insert(prepared_statement.clone());
285 Ok(prepared_statement)
286 }
287 }
288 }
289
290 fn check_key_size(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
291 ensure!(
292 key.len() <= MAX_KEY_SIZE,
293 ScyllaDbStoreInternalError::KeyTooLong
294 );
295 Ok(())
296 }
297
298 fn check_value_size(value: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
299 ensure!(
300 value.len() <= RAW_MAX_VALUE_SIZE,
301 ScyllaDbStoreInternalError::ValueTooLong
302 );
303 Ok(())
304 }
305
306 fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> {
307 ensure!(
308 batch.len() <= MAX_BATCH_SIZE,
309 ScyllaDbStoreInternalError::BatchTooLong
310 );
311 Ok(())
312 }
313
314 async fn read_value_internal(
315 &self,
316 root_key: &[u8],
317 key: Vec<u8>,
318 ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
319 Self::check_key_size(&key)?;
320 let session = &self.session;
321 let values = (root_key.to_vec(), key);
323
324 let (result, _) = session
325 .execute_single_page(&self.read_value, &values, PagingState::start())
326 .await?;
327 let rows = result.into_rows_result()?;
328 let mut rows = rows.rows::<(Vec<u8>,)>()?;
329 Ok(match rows.next() {
330 Some(row) => Some(row?.0),
331 None => None,
332 })
333 }
334
335 fn get_occurrences_map(
336 keys: Vec<Vec<u8>>,
337 ) -> Result<HashMap<Vec<u8>, Vec<usize>>, ScyllaDbStoreInternalError> {
338 let mut map = HashMap::<Vec<u8>, Vec<usize>>::new();
339 for (i_key, key) in keys.into_iter().enumerate() {
340 Self::check_key_size(&key)?;
341 map.entry(key).or_default().push(i_key);
342 }
343 Ok(map)
344 }
345
346 async fn read_multi_values_internal(
347 &self,
348 root_key: &[u8],
349 keys: Vec<Vec<u8>>,
350 ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
351 let mut values = vec![None; keys.len()];
352 let map = Self::get_occurrences_map(keys)?;
353 let statement = self.get_multi_key_values_statement(map.len()).await?;
354 let mut inputs = vec![root_key.to_vec()];
355 inputs.extend(map.keys().cloned());
356 let mut rows = self
357 .session
358 .execute_iter(statement, &inputs)
359 .await?
360 .rows_stream::<(Vec<u8>, Vec<u8>)>()?;
361
362 while let Some(row) = rows.next().await {
363 let (key, value) = row?;
364 for i_key in &map[&key] {
365 values[*i_key] = Some(value.clone());
366 }
367 }
368 Ok(values)
369 }
370
371 async fn contains_keys_internal(
372 &self,
373 root_key: &[u8],
374 keys: Vec<Vec<u8>>,
375 ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
376 let mut values = vec![false; keys.len()];
377 let map = Self::get_occurrences_map(keys)?;
378 let statement = self.get_multi_keys_statement(map.len()).await?;
379 let mut inputs = vec![root_key.to_vec()];
380 inputs.extend(map.keys().cloned());
381 let mut rows = self
382 .session
383 .execute_iter(statement, &inputs)
384 .await?
385 .rows_stream::<(Vec<u8>,)>()?;
386
387 while let Some(row) = rows.next().await {
388 let (key,) = row?;
389 for i_key in &map[&key] {
390 values[*i_key] = true;
391 }
392 }
393
394 Ok(values)
395 }
396
397 async fn contains_key_internal(
398 &self,
399 root_key: &[u8],
400 key: Vec<u8>,
401 ) -> Result<bool, ScyllaDbStoreInternalError> {
402 Self::check_key_size(&key)?;
403 let session = &self.session;
404 let values = (root_key.to_vec(), key);
406
407 let (result, _) = session
408 .execute_single_page(&self.contains_key, &values, PagingState::start())
409 .await?;
410 let rows = result.into_rows_result()?;
411 let mut rows = rows.rows::<(Vec<u8>,)>()?;
412 Ok(rows.next().is_some())
413 }
414
415 async fn write_batch_internal(
416 &self,
417 root_key: &[u8],
418 batch: UnorderedBatch,
419 ) -> Result<(), ScyllaDbStoreInternalError> {
420 let session = &self.session;
421 let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged);
422 let mut batch_values = Vec::new();
423 let query1 = &self.write_batch_delete_prefix_unbounded;
424 let query2 = &self.write_batch_delete_prefix_bounded;
425 Self::check_batch_len(&batch)?;
426 for key_prefix in batch.key_prefix_deletions {
427 Self::check_key_size(&key_prefix)?;
428 match get_upper_bound_option(&key_prefix) {
429 None => {
430 let values = vec![root_key.to_vec(), key_prefix];
431 batch_values.push(values);
432 batch_query.append_statement(query1.clone());
433 }
434 Some(upper_bound) => {
435 let values = vec![root_key.to_vec(), key_prefix, upper_bound];
436 batch_values.push(values);
437 batch_query.append_statement(query2.clone());
438 }
439 }
440 }
441 let query3 = &self.write_batch_deletion;
442 for key in batch.simple_unordered_batch.deletions {
443 Self::check_key_size(&key)?;
444 let values = vec![root_key.to_vec(), key];
445 batch_values.push(values);
446 batch_query.append_statement(query3.clone());
447 }
448 let query4 = &self.write_batch_insertion;
449 for (key, value) in batch.simple_unordered_batch.insertions {
450 Self::check_key_size(&key)?;
451 Self::check_value_size(&value)?;
452 let values = vec![root_key.to_vec(), key, value];
453 batch_values.push(values);
454 batch_query.append_statement(query4.clone());
455 }
456 session.batch(&batch_query, batch_values).await?;
457 Ok(())
458 }
459
460 async fn find_keys_by_prefix_internal(
461 &self,
462 root_key: &[u8],
463 key_prefix: Vec<u8>,
464 ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
465 Self::check_key_size(&key_prefix)?;
466 let session = &self.session;
467 let len = key_prefix.len();
469 let query_unbounded = &self.find_keys_by_prefix_unbounded;
470 let query_bounded = &self.find_keys_by_prefix_bounded;
471 let rows = match get_upper_bound_option(&key_prefix) {
472 None => {
473 let values = (root_key.to_vec(), key_prefix.clone());
474 session
475 .execute_iter(query_unbounded.clone(), values)
476 .await?
477 }
478 Some(upper_bound) => {
479 let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
480 session.execute_iter(query_bounded.clone(), values).await?
481 }
482 };
483 let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
484 let mut keys = Vec::new();
485 while let Some(row) = rows.next().await {
486 let (key,) = row?;
487 let short_key = key[len..].to_vec();
488 keys.push(short_key);
489 }
490 Ok(keys)
491 }
492
493 async fn find_key_values_by_prefix_internal(
494 &self,
495 root_key: &[u8],
496 key_prefix: Vec<u8>,
497 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
498 Self::check_key_size(&key_prefix)?;
499 let session = &self.session;
500 let len = key_prefix.len();
502 let query_unbounded = &self.find_key_values_by_prefix_unbounded;
503 let query_bounded = &self.find_key_values_by_prefix_bounded;
504 let rows = match get_upper_bound_option(&key_prefix) {
505 None => {
506 let values = (root_key.to_vec(), key_prefix.clone());
507 session
508 .execute_iter(query_unbounded.clone(), values)
509 .await?
510 }
511 Some(upper_bound) => {
512 let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
513 session.execute_iter(query_bounded.clone(), values).await?
514 }
515 };
516 let mut rows = rows.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
517 let mut key_values = Vec::new();
518 while let Some(row) = rows.next().await {
519 let (key, value) = row?;
520 let short_key = key[len..].to_vec();
521 key_values.push((short_key, value));
522 }
523 Ok(key_values)
524 }
525}
526
527#[derive(Clone)]
529pub struct ScyllaDbStoreInternal {
530 store: Arc<ScyllaDbClient>,
531 semaphore: Option<Arc<Semaphore>>,
532 max_stream_queries: usize,
533 root_key: Vec<u8>,
534}
535
536#[derive(Error, Debug)]
538pub enum ScyllaDbStoreInternalError {
539 #[error(transparent)]
541 BcsError(#[from] bcs::Error),
542
543 #[error("The key must have at most MAX_KEY_SIZE")]
545 KeyTooLong,
546
547 #[error("The value must have at most RAW_MAX_VALUE_SIZE")]
549 ValueTooLong,
550
551 #[error(transparent)]
553 DeserializationError(#[from] DeserializationError),
554
555 #[error(transparent)]
557 RowsError(#[from] RowsError),
558
559 #[error(transparent)]
561 IntoRowsResultError(#[from] IntoRowsResultError),
562
563 #[error(transparent)]
565 TypeCheckError(#[from] TypeCheckError),
566
567 #[error(transparent)]
569 PagerExecutionError(#[from] PagerExecutionError),
570
571 #[error(transparent)]
573 ScyllaDbNewSessionError(#[from] NewSessionError),
574
575 #[error("Namespace contains forbidden characters")]
577 InvalidNamespace,
578
579 #[error(transparent)]
581 JournalConsistencyError(#[from] JournalConsistencyError),
582
583 #[error("The batch is too long to be written")]
585 BatchTooLong,
586
587 #[error(transparent)]
589 PrepareError(#[from] PrepareError),
590
591 #[error(transparent)]
593 ExecutionError(#[from] ExecutionError),
594
595 #[error(transparent)]
597 NextRowError(#[from] NextRowError),
598}
599
600impl KeyValueStoreError for ScyllaDbStoreInternalError {
601 const BACKEND: &'static str = "scylla_db";
602}
603
604impl WithError for ScyllaDbStoreInternal {
605 type Error = ScyllaDbStoreInternalError;
606}
607
608impl ReadableKeyValueStore for ScyllaDbStoreInternal {
609 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
610 type Keys = Vec<Vec<u8>>;
611 type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
612
613 fn max_stream_queries(&self) -> usize {
614 self.max_stream_queries
615 }
616
617 async fn read_value_bytes(
618 &self,
619 key: &[u8],
620 ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
621 let store = self.store.deref();
622 let _guard = self.acquire().await;
623 store
624 .read_value_internal(&self.root_key, key.to_vec())
625 .await
626 }
627
628 async fn contains_key(&self, key: &[u8]) -> Result<bool, ScyllaDbStoreInternalError> {
629 let store = self.store.deref();
630 let _guard = self.acquire().await;
631 store
632 .contains_key_internal(&self.root_key, key.to_vec())
633 .await
634 }
635
636 async fn contains_keys(
637 &self,
638 keys: Vec<Vec<u8>>,
639 ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
640 if keys.is_empty() {
641 return Ok(Vec::new());
642 }
643 let store = self.store.deref();
644 let _guard = self.acquire().await;
645 let handles = keys
646 .chunks(MAX_MULTI_KEYS)
647 .map(|keys| store.contains_keys_internal(&self.root_key, keys.to_vec()));
648 let results: Vec<_> = join_all(handles)
649 .await
650 .into_iter()
651 .collect::<Result<_, _>>()?;
652 Ok(results.into_iter().flatten().collect())
653 }
654
655 async fn read_multi_values_bytes(
656 &self,
657 keys: Vec<Vec<u8>>,
658 ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
659 if keys.is_empty() {
660 return Ok(Vec::new());
661 }
662 let store = self.store.deref();
663 let _guard = self.acquire().await;
664 let handles = keys
665 .chunks(MAX_MULTI_KEYS)
666 .map(|keys| store.read_multi_values_internal(&self.root_key, keys.to_vec()));
667 let results: Vec<_> = join_all(handles)
668 .await
669 .into_iter()
670 .collect::<Result<_, _>>()?;
671 Ok(results.into_iter().flatten().collect())
672 }
673
674 async fn find_keys_by_prefix(
675 &self,
676 key_prefix: &[u8],
677 ) -> Result<Self::Keys, ScyllaDbStoreInternalError> {
678 let store = self.store.deref();
679 let _guard = self.acquire().await;
680 store
681 .find_keys_by_prefix_internal(&self.root_key, key_prefix.to_vec())
682 .await
683 }
684
685 async fn find_key_values_by_prefix(
686 &self,
687 key_prefix: &[u8],
688 ) -> Result<Self::KeyValues, ScyllaDbStoreInternalError> {
689 let store = self.store.deref();
690 let _guard = self.acquire().await;
691 store
692 .find_key_values_by_prefix_internal(&self.root_key, key_prefix.to_vec())
693 .await
694 }
695}
696
697impl DirectWritableKeyValueStore for ScyllaDbStoreInternal {
698 const MAX_BATCH_SIZE: usize = MAX_BATCH_SIZE;
699 const MAX_BATCH_TOTAL_SIZE: usize = MAX_BATCH_TOTAL_SIZE;
700 const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
701
702 type Batch = UnorderedBatch;
708
709 async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> {
710 let store = self.store.deref();
711 let _guard = self.acquire().await;
712 store.write_batch_internal(&self.root_key, batch).await
713 }
714}
715
716fn get_big_root_key(root_key: &[u8]) -> Vec<u8> {
718 let mut big_key = vec![0];
719 big_key.extend(root_key);
720 big_key
721}
722
723#[derive(Clone, Debug, Deserialize, Serialize)]
725pub struct ScyllaDbStoreInternalConfig {
726 pub uri: String,
728 common_config: CommonStoreInternalConfig,
730}
731
732impl AdminKeyValueStore for ScyllaDbStoreInternal {
733 type Config = ScyllaDbStoreInternalConfig;
734
735 fn get_name() -> String {
736 "scylladb internal".to_string()
737 }
738
739 async fn connect(
740 config: &Self::Config,
741 namespace: &str,
742 ) -> Result<Self, ScyllaDbStoreInternalError> {
743 Self::check_namespace(namespace)?;
744 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
745 let store = ScyllaDbClient::new(session, namespace).await?;
746 let store = Arc::new(store);
747 let semaphore = config
748 .common_config
749 .max_concurrent_queries
750 .map(|n| Arc::new(Semaphore::new(n)));
751 let max_stream_queries = config.common_config.max_stream_queries;
752 let root_key = get_big_root_key(&[]);
753 Ok(Self {
754 store,
755 semaphore,
756 max_stream_queries,
757 root_key,
758 })
759 }
760
761 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, ScyllaDbStoreInternalError> {
762 let store = self.store.clone();
763 let semaphore = self.semaphore.clone();
764 let max_stream_queries = self.max_stream_queries;
765 let root_key = get_big_root_key(root_key);
766 Ok(Self {
767 store,
768 semaphore,
769 max_stream_queries,
770 root_key,
771 })
772 }
773
774 async fn list_all(config: &Self::Config) -> Result<Vec<String>, ScyllaDbStoreInternalError> {
775 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
776 let statement = session
777 .prepare(format!("DESCRIBE KEYSPACE {}", KEYSPACE))
778 .await?;
779 let result = session.execute_iter(statement, &[]).await;
780 let miss_msg = format!("'{}' not found in keyspaces", KEYSPACE);
781 let result = match result {
782 Ok(result) => result,
783 Err(error) => {
784 let invalid_or_keyspace_not_found = match &error {
785 PagerExecutionError::NextPageError(NextPageError::RequestFailure(
786 RequestError::LastAttemptError(RequestAttemptError::DbError(db_error, msg)),
787 )) => *db_error == DbError::Invalid && msg.as_str() == miss_msg,
788 _ => false,
789 };
790 if invalid_or_keyspace_not_found {
791 return Ok(Vec::new());
792 } else {
793 return Err(ScyllaDbStoreInternalError::PagerExecutionError(error));
794 }
795 }
796 };
797 let mut namespaces = Vec::new();
798 let mut rows_stream = result.rows_stream::<(String, String, String, String)>()?;
799 while let Some(row) = rows_stream.next().await {
800 let (_, object_kind, name, _) = row?;
801 if object_kind == "table" {
802 namespaces.push(name);
803 }
804 }
805 Ok(namespaces)
806 }
807
808 async fn list_root_keys(
809 config: &Self::Config,
810 namespace: &str,
811 ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
812 Self::check_namespace(namespace)?;
813 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
814 let statement = session
815 .prepare(format!(
816 "SELECT root_key FROM {}.{} ALLOW FILTERING",
817 KEYSPACE, namespace
818 ))
819 .await?;
820
821 let rows = session.execute_iter(statement, &[]).await?;
823 let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
824 let mut root_keys = BTreeSet::new();
825 while let Some(row) = rows.next().await {
826 let (root_key,) = row?;
827 let root_key = root_key[1..].to_vec();
828 root_keys.insert(root_key);
829 }
830 Ok(root_keys.into_iter().collect::<Vec<_>>())
831 }
832
833 async fn delete_all(store_config: &Self::Config) -> Result<(), ScyllaDbStoreInternalError> {
834 let session = ScyllaDbClient::build_default_session(&store_config.uri).await?;
835 let statement = session
836 .prepare(format!("DROP KEYSPACE IF EXISTS {}", KEYSPACE))
837 .await?;
838
839 session
840 .execute_single_page(&statement, &[], PagingState::start())
841 .await?;
842 Ok(())
843 }
844
845 async fn exists(
846 config: &Self::Config,
847 namespace: &str,
848 ) -> Result<bool, ScyllaDbStoreInternalError> {
849 Self::check_namespace(namespace)?;
850 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
851
852 let result = session
854 .prepare(format!(
855 "SELECT root_key FROM {}.{} LIMIT 1 ALLOW FILTERING",
856 KEYSPACE, namespace
857 ))
858 .await;
859
860 let miss_msg1 = format!("unconfigured table {}", namespace);
862 let miss_msg1 = miss_msg1.as_str();
863 let miss_msg2 = "Undefined name root_key in selection clause";
864 let miss_msg3 = format!("Keyspace {} does not exist", KEYSPACE);
865 let Err(error) = result else {
866 return Ok(true);
868 };
869 let missing_table = match &error {
870 PrepareError::AllAttemptsFailed {
871 first_attempt: RequestAttemptError::DbError(db_error, msg),
872 } => {
873 if *db_error != DbError::Invalid {
874 false
875 } else {
876 msg.as_str() == miss_msg1
877 || msg.as_str() == miss_msg2
878 || msg.as_str() == miss_msg3
879 }
880 }
881 _ => false,
882 };
883 if missing_table {
884 Ok(false)
885 } else {
886 Err(ScyllaDbStoreInternalError::PrepareError(error))
887 }
888 }
889
890 async fn create(
891 config: &Self::Config,
892 namespace: &str,
893 ) -> Result<(), ScyllaDbStoreInternalError> {
894 Self::check_namespace(namespace)?;
895 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
896
897 let statement = session
899 .prepare(format!(
900 "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ \
901 'class' : 'NetworkTopologyStrategy', \
902 'replication_factor' : {} \
903 }}",
904 KEYSPACE, config.common_config.replication_factor
905 ))
906 .await?;
907 session
908 .execute_single_page(&statement, &[], PagingState::start())
909 .await?;
910
911 let statement = session
914 .prepare(format!(
915 "CREATE TABLE {}.{} (\
916 root_key blob, \
917 k blob, \
918 v blob, \
919 PRIMARY KEY (root_key, k) \
920 ) \
921 WITH compaction = {{ \
922 'class' : 'SizeTieredCompactionStrategy', \
923 'min_sstable_size' : 52428800, \
924 'bucket_low' : 0.5, \
925 'bucket_high' : 1.5, \
926 'min_threshold' : 4, \
927 'max_threshold' : 32 \
928 }} \
929 AND compression = {{ \
930 'sstable_compression': 'LZ4Compressor', \
931 'chunk_length_in_kb':'4' \
932 }} \
933 AND caching = {{ \
934 'enabled': 'true' \
935 }}",
936 KEYSPACE, namespace
937 ))
938 .await?;
939 session
940 .execute_single_page(&statement, &[], PagingState::start())
941 .await?;
942 Ok(())
943 }
944
945 async fn delete(
946 config: &Self::Config,
947 namespace: &str,
948 ) -> Result<(), ScyllaDbStoreInternalError> {
949 Self::check_namespace(namespace)?;
950 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
951 let statement = session
952 .prepare(format!("DROP TABLE IF EXISTS {}.{};", KEYSPACE, namespace))
953 .await?;
954 session
955 .execute_single_page(&statement, &[], PagingState::start())
956 .await?;
957 Ok(())
958 }
959}
960
961impl ScyllaDbStoreInternal {
962 async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
964 match &self.semaphore {
965 None => None,
966 Some(count) => Some(count.acquire().await),
967 }
968 }
969
970 fn check_namespace(namespace: &str) -> Result<(), ScyllaDbStoreInternalError> {
971 if !namespace.is_empty()
972 && namespace.len() <= 48
973 && namespace
974 .chars()
975 .all(|c| c.is_ascii_alphanumeric() || c == '_')
976 {
977 return Ok(());
978 }
979 Err(ScyllaDbStoreInternalError::InvalidNamespace)
980 }
981}
982
983#[cfg(with_testing)]
985const TEST_SCYLLA_DB_MAX_CONCURRENT_QUERIES: usize = 10;
986
987#[cfg(with_testing)]
989const TEST_SCYLLA_DB_MAX_STREAM_QUERIES: usize = 10;
990
991#[cfg(with_testing)]
992impl TestKeyValueStore for JournalingKeyValueStore<ScyllaDbStoreInternal> {
993 async fn new_test_config() -> Result<ScyllaDbStoreInternalConfig, ScyllaDbStoreInternalError> {
994 let uri = "localhost:9042".to_string();
995 let common_config = CommonStoreInternalConfig {
996 max_concurrent_queries: Some(TEST_SCYLLA_DB_MAX_CONCURRENT_QUERIES),
997 max_stream_queries: TEST_SCYLLA_DB_MAX_STREAM_QUERIES,
998 replication_factor: 1,
999 };
1000 Ok(ScyllaDbStoreInternalConfig { uri, common_config })
1001 }
1002}
1003
1004#[cfg(with_metrics)]
1006pub type ScyllaDbStore = MeteredStore<
1007 LruCachingStore<
1008 MeteredStore<
1009 ValueSplittingStore<MeteredStore<JournalingKeyValueStore<ScyllaDbStoreInternal>>>,
1010 >,
1011 >,
1012>;
1013
1014#[cfg(not(with_metrics))]
1016pub type ScyllaDbStore =
1017 LruCachingStore<ValueSplittingStore<JournalingKeyValueStore<ScyllaDbStoreInternal>>>;
1018
1019pub type ScyllaDbStoreConfig = LruCachingConfig<ScyllaDbStoreInternalConfig>;
1021
1022impl ScyllaDbStoreConfig {
1023 pub fn new(uri: String, common_config: crate::store::CommonStoreConfig) -> ScyllaDbStoreConfig {
1025 let inner_config = ScyllaDbStoreInternalConfig {
1026 uri,
1027 common_config: common_config.reduced(),
1028 };
1029 ScyllaDbStoreConfig {
1030 inner_config,
1031 storage_cache_config: common_config.storage_cache_config,
1032 }
1033 }
1034}
1035
1036pub type ScyllaDbStoreError = ValueSplittingError<ScyllaDbStoreInternalError>;