1use std::{
11 collections::{BTreeSet, HashMap},
12 ops::Deref,
13 sync::Arc,
14};
15
16use async_lock::{Semaphore, SemaphoreGuard};
17use dashmap::DashMap;
18use futures::{future::join_all, StreamExt as _};
19use linera_base::ensure;
20use scylla::{
21 client::{
22 execution_profile::{ExecutionProfile, ExecutionProfileHandle},
23 session::Session,
24 session_builder::SessionBuilder,
25 },
26 deserialize::{DeserializationError, TypeCheckError},
27 errors::{
28 DbError, ExecutionError, IntoRowsResultError, NewSessionError, NextPageError, NextRowError,
29 PagerExecutionError, PrepareError, RequestAttemptError, RequestError, RowsError,
30 },
31 policies::{
32 load_balancing::{DefaultPolicy, LoadBalancingPolicy},
33 retry::DefaultRetryPolicy,
34 },
35 response::PagingState,
36 statement::{batch::BatchType, prepared::PreparedStatement, Consistency},
37};
38use serde::{Deserialize, Serialize};
39use thiserror::Error;
40
41#[cfg(with_metrics)]
42use crate::metering::MeteredStore;
43#[cfg(with_testing)]
44use crate::store::TestKeyValueStore;
45use crate::{
46 batch::UnorderedBatch,
47 common::{get_uleb128_size, get_upper_bound_option},
48 journaling::{DirectWritableKeyValueStore, JournalConsistencyError, JournalingKeyValueStore},
49 lru_caching::{LruCachingConfig, LruCachingStore},
50 store::{AdminKeyValueStore, KeyValueStoreError, ReadableKeyValueStore, WithError},
51 value_splitting::{ValueSplittingError, ValueSplittingStore},
52 FutureSyncExt as _,
53};
54
55const MAX_MULTI_KEYS: usize = 100 - 1;
58
59const RAW_MAX_VALUE_SIZE: usize = 16 * 1024 * 1024 - 10 * 1024 - 4000;
67const MAX_KEY_SIZE: usize = 10 * 1024;
68const MAX_BATCH_TOTAL_SIZE: usize = RAW_MAX_VALUE_SIZE + MAX_KEY_SIZE;
69
70const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
83 - MAX_KEY_SIZE
84 - get_uleb128_size(RAW_MAX_VALUE_SIZE)
85 - get_uleb128_size(MAX_KEY_SIZE)
86 - 3;
87
88const MAX_BATCH_SIZE: usize = 5000;
94
95const KEYSPACE: &str = "kv";
97
98struct ScyllaDbClient {
103 session: Session,
104 namespace: String,
105 read_value: PreparedStatement,
106 contains_key: PreparedStatement,
107 write_batch_delete_prefix_unbounded: PreparedStatement,
108 write_batch_delete_prefix_bounded: PreparedStatement,
109 write_batch_deletion: PreparedStatement,
110 write_batch_insertion: PreparedStatement,
111 find_keys_by_prefix_unbounded: PreparedStatement,
112 find_keys_by_prefix_bounded: PreparedStatement,
113 find_key_values_by_prefix_unbounded: PreparedStatement,
114 find_key_values_by_prefix_bounded: PreparedStatement,
115 multi_key_values: DashMap<usize, PreparedStatement>,
116 multi_keys: DashMap<usize, PreparedStatement>,
117}
118
119impl ScyllaDbClient {
120 async fn new(session: Session, namespace: &str) -> Result<Self, ScyllaDbStoreInternalError> {
121 let namespace = namespace.to_string();
122 let read_value = session
123 .prepare(format!(
124 "SELECT v FROM {}.{} WHERE root_key = ? AND k = ?",
125 KEYSPACE, namespace
126 ))
127 .await?;
128
129 let contains_key = session
130 .prepare(format!(
131 "SELECT root_key FROM {}.{} WHERE root_key = ? AND k = ?",
132 KEYSPACE, namespace
133 ))
134 .await?;
135
136 let write_batch_delete_prefix_unbounded = session
137 .prepare(format!(
138 "DELETE FROM {}.{} WHERE root_key = ? AND k >= ?",
139 KEYSPACE, namespace
140 ))
141 .await?;
142
143 let write_batch_delete_prefix_bounded = session
144 .prepare(format!(
145 "DELETE FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?",
146 KEYSPACE, namespace
147 ))
148 .await?;
149
150 let write_batch_deletion = session
151 .prepare(format!(
152 "DELETE FROM {}.{} WHERE root_key = ? AND k = ?",
153 KEYSPACE, namespace
154 ))
155 .await?;
156
157 let write_batch_insertion = session
158 .prepare(format!(
159 "INSERT INTO {}.{} (root_key, k, v) VALUES (?, ?, ?)",
160 KEYSPACE, namespace
161 ))
162 .await?;
163
164 let find_keys_by_prefix_unbounded = session
165 .prepare(format!(
166 "SELECT k FROM {}.{} WHERE root_key = ? AND k >= ?",
167 KEYSPACE, namespace
168 ))
169 .await?;
170
171 let find_keys_by_prefix_bounded = session
172 .prepare(format!(
173 "SELECT k FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?",
174 KEYSPACE, namespace
175 ))
176 .await?;
177
178 let find_key_values_by_prefix_unbounded = session
179 .prepare(format!(
180 "SELECT k,v FROM {}.{} WHERE root_key = ? AND k >= ?",
181 KEYSPACE, namespace
182 ))
183 .await?;
184
185 let find_key_values_by_prefix_bounded = session
186 .prepare(format!(
187 "SELECT k,v FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?",
188 KEYSPACE, namespace
189 ))
190 .await?;
191
192 Ok(Self {
193 session,
194 namespace,
195 read_value,
196 contains_key,
197 write_batch_delete_prefix_unbounded,
198 write_batch_delete_prefix_bounded,
199 write_batch_deletion,
200 write_batch_insertion,
201 find_keys_by_prefix_unbounded,
202 find_keys_by_prefix_bounded,
203 find_key_values_by_prefix_unbounded,
204 find_key_values_by_prefix_bounded,
205 multi_key_values: DashMap::new(),
206 multi_keys: DashMap::new(),
207 })
208 }
209
210 fn build_default_policy() -> Arc<dyn LoadBalancingPolicy> {
211 DefaultPolicy::builder().token_aware(true).build()
212 }
213
214 fn build_default_execution_profile_handle(
215 policy: Arc<dyn LoadBalancingPolicy>,
216 ) -> ExecutionProfileHandle {
217 let default_profile = ExecutionProfile::builder()
218 .load_balancing_policy(policy)
219 .retry_policy(Arc::new(DefaultRetryPolicy::new()))
220 .consistency(Consistency::LocalQuorum)
221 .build();
222 default_profile.into_handle()
223 }
224
225 async fn build_default_session(uri: &str) -> Result<Session, ScyllaDbStoreInternalError> {
226 SessionBuilder::new()
229 .known_node(uri)
230 .default_execution_profile_handle(Self::build_default_execution_profile_handle(
231 Self::build_default_policy(),
232 ))
233 .build()
234 .boxed_sync()
235 .await
236 .map_err(Into::into)
237 }
238
239 async fn get_multi_key_values_statement(
240 &self,
241 num_markers: usize,
242 ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
243 if let Some(prepared_statement) = self.multi_key_values.get(&num_markers) {
244 return Ok(prepared_statement.clone());
245 }
246 let markers = std::iter::repeat_n("?", num_markers)
247 .collect::<Vec<_>>()
248 .join(",");
249 let prepared_statement = self
250 .session
251 .prepare(format!(
252 "SELECT k,v FROM {}.{} WHERE root_key = ? AND k IN ({})",
253 KEYSPACE, self.namespace, markers
254 ))
255 .await?;
256 self.multi_key_values
257 .insert(num_markers, prepared_statement.clone());
258 Ok(prepared_statement)
259 }
260
261 async fn get_multi_keys_statement(
262 &self,
263 num_markers: usize,
264 ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
265 if let Some(prepared_statement) = self.multi_keys.get(&num_markers) {
266 return Ok(prepared_statement.clone());
267 };
268 let markers = std::iter::repeat_n("?", num_markers)
269 .collect::<Vec<_>>()
270 .join(",");
271 let prepared_statement = self
272 .session
273 .prepare(format!(
274 "SELECT k FROM {}.{} WHERE root_key = ? AND k IN ({})",
275 KEYSPACE, self.namespace, markers
276 ))
277 .await?;
278 self.multi_keys
279 .insert(num_markers, prepared_statement.clone());
280 Ok(prepared_statement)
281 }
282
283 fn check_key_size(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
284 ensure!(
285 key.len() <= MAX_KEY_SIZE,
286 ScyllaDbStoreInternalError::KeyTooLong
287 );
288 Ok(())
289 }
290
291 fn check_value_size(value: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
292 ensure!(
293 value.len() <= RAW_MAX_VALUE_SIZE,
294 ScyllaDbStoreInternalError::ValueTooLong
295 );
296 Ok(())
297 }
298
299 fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> {
300 ensure!(
301 batch.len() <= MAX_BATCH_SIZE,
302 ScyllaDbStoreInternalError::BatchTooLong
303 );
304 Ok(())
305 }
306
307 async fn read_value_internal(
308 &self,
309 root_key: &[u8],
310 key: Vec<u8>,
311 ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
312 Self::check_key_size(&key)?;
313 let session = &self.session;
314 let values = (root_key.to_vec(), key);
316
317 let (result, _) = session
318 .execute_single_page(&self.read_value, &values, PagingState::start())
319 .await?;
320 let rows = result.into_rows_result()?;
321 let mut rows = rows.rows::<(Vec<u8>,)>()?;
322 Ok(match rows.next() {
323 Some(row) => Some(row?.0),
324 None => None,
325 })
326 }
327
328 fn get_occurrences_map(
329 keys: Vec<Vec<u8>>,
330 ) -> Result<HashMap<Vec<u8>, Vec<usize>>, ScyllaDbStoreInternalError> {
331 let mut map = HashMap::<Vec<u8>, Vec<usize>>::new();
332 for (i_key, key) in keys.into_iter().enumerate() {
333 Self::check_key_size(&key)?;
334 map.entry(key).or_default().push(i_key);
335 }
336 Ok(map)
337 }
338
339 async fn read_multi_values_internal(
340 &self,
341 root_key: &[u8],
342 keys: Vec<Vec<u8>>,
343 ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
344 let mut values = vec![None; keys.len()];
345 let map = Self::get_occurrences_map(keys)?;
346 let statement = self.get_multi_key_values_statement(map.len()).await?;
347 let mut inputs = vec![root_key.to_vec()];
348 inputs.extend(map.keys().cloned());
349 let mut rows = self
350 .session
351 .execute_iter(statement, &inputs)
352 .await?
353 .rows_stream::<(Vec<u8>, Vec<u8>)>()?;
354
355 while let Some(row) = rows.next().await {
356 let (key, value) = row?;
357 for i_key in &map[&key] {
358 values[*i_key] = Some(value.clone());
359 }
360 }
361 Ok(values)
362 }
363
364 async fn contains_keys_internal(
365 &self,
366 root_key: &[u8],
367 keys: Vec<Vec<u8>>,
368 ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
369 let mut values = vec![false; keys.len()];
370 let map = Self::get_occurrences_map(keys)?;
371 let statement = self.get_multi_keys_statement(map.len()).await?;
372 let mut inputs = vec![root_key.to_vec()];
373 inputs.extend(map.keys().cloned());
374 let mut rows = self
375 .session
376 .execute_iter(statement, &inputs)
377 .await?
378 .rows_stream::<(Vec<u8>,)>()?;
379
380 while let Some(row) = rows.next().await {
381 let (key,) = row?;
382 for i_key in &map[&key] {
383 values[*i_key] = true;
384 }
385 }
386
387 Ok(values)
388 }
389
390 async fn contains_key_internal(
391 &self,
392 root_key: &[u8],
393 key: Vec<u8>,
394 ) -> Result<bool, ScyllaDbStoreInternalError> {
395 Self::check_key_size(&key)?;
396 let session = &self.session;
397 let values = (root_key.to_vec(), key);
399
400 let (result, _) = session
401 .execute_single_page(&self.contains_key, &values, PagingState::start())
402 .await?;
403 let rows = result.into_rows_result()?;
404 let mut rows = rows.rows::<(Vec<u8>,)>()?;
405 Ok(rows.next().is_some())
406 }
407
408 async fn write_batch_internal(
409 &self,
410 root_key: &[u8],
411 batch: UnorderedBatch,
412 ) -> Result<(), ScyllaDbStoreInternalError> {
413 let session = &self.session;
414 let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged);
415 let mut batch_values = Vec::new();
416 let query1 = &self.write_batch_delete_prefix_unbounded;
417 let query2 = &self.write_batch_delete_prefix_bounded;
418 Self::check_batch_len(&batch)?;
419 for key_prefix in batch.key_prefix_deletions {
420 Self::check_key_size(&key_prefix)?;
421 match get_upper_bound_option(&key_prefix) {
422 None => {
423 let values = vec![root_key.to_vec(), key_prefix];
424 batch_values.push(values);
425 batch_query.append_statement(query1.clone());
426 }
427 Some(upper_bound) => {
428 let values = vec![root_key.to_vec(), key_prefix, upper_bound];
429 batch_values.push(values);
430 batch_query.append_statement(query2.clone());
431 }
432 }
433 }
434 let query3 = &self.write_batch_deletion;
435 for key in batch.simple_unordered_batch.deletions {
436 Self::check_key_size(&key)?;
437 let values = vec![root_key.to_vec(), key];
438 batch_values.push(values);
439 batch_query.append_statement(query3.clone());
440 }
441 let query4 = &self.write_batch_insertion;
442 for (key, value) in batch.simple_unordered_batch.insertions {
443 Self::check_key_size(&key)?;
444 Self::check_value_size(&value)?;
445 let values = vec![root_key.to_vec(), key, value];
446 batch_values.push(values);
447 batch_query.append_statement(query4.clone());
448 }
449 session.batch(&batch_query, batch_values).await?;
450 Ok(())
451 }
452
453 async fn find_keys_by_prefix_internal(
454 &self,
455 root_key: &[u8],
456 key_prefix: Vec<u8>,
457 ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
458 Self::check_key_size(&key_prefix)?;
459 let session = &self.session;
460 let len = key_prefix.len();
462 let query_unbounded = &self.find_keys_by_prefix_unbounded;
463 let query_bounded = &self.find_keys_by_prefix_bounded;
464 let rows = match get_upper_bound_option(&key_prefix) {
465 None => {
466 let values = (root_key.to_vec(), key_prefix.clone());
467 session
468 .execute_iter(query_unbounded.clone(), values)
469 .await?
470 }
471 Some(upper_bound) => {
472 let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
473 session.execute_iter(query_bounded.clone(), values).await?
474 }
475 };
476 let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
477 let mut keys = Vec::new();
478 while let Some(row) = rows.next().await {
479 let (key,) = row?;
480 let short_key = key[len..].to_vec();
481 keys.push(short_key);
482 }
483 Ok(keys)
484 }
485
486 async fn find_key_values_by_prefix_internal(
487 &self,
488 root_key: &[u8],
489 key_prefix: Vec<u8>,
490 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
491 Self::check_key_size(&key_prefix)?;
492 let session = &self.session;
493 let len = key_prefix.len();
495 let query_unbounded = &self.find_key_values_by_prefix_unbounded;
496 let query_bounded = &self.find_key_values_by_prefix_bounded;
497 let rows = match get_upper_bound_option(&key_prefix) {
498 None => {
499 let values = (root_key.to_vec(), key_prefix.clone());
500 session
501 .execute_iter(query_unbounded.clone(), values)
502 .await?
503 }
504 Some(upper_bound) => {
505 let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
506 session.execute_iter(query_bounded.clone(), values).await?
507 }
508 };
509 let mut rows = rows.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
510 let mut key_values = Vec::new();
511 while let Some(row) = rows.next().await {
512 let (key, value) = row?;
513 let short_key = key[len..].to_vec();
514 key_values.push((short_key, value));
515 }
516 Ok(key_values)
517 }
518}
519
520#[derive(Clone)]
522pub struct ScyllaDbStoreInternal {
523 store: Arc<ScyllaDbClient>,
524 semaphore: Option<Arc<Semaphore>>,
525 max_stream_queries: usize,
526 root_key: Vec<u8>,
527}
528
529#[derive(Error, Debug)]
531pub enum ScyllaDbStoreInternalError {
532 #[error(transparent)]
534 BcsError(#[from] bcs::Error),
535
536 #[error("The key must have at most MAX_KEY_SIZE")]
538 KeyTooLong,
539
540 #[error("The value must have at most RAW_MAX_VALUE_SIZE")]
542 ValueTooLong,
543
544 #[error(transparent)]
546 DeserializationError(#[from] DeserializationError),
547
548 #[error(transparent)]
550 RowsError(#[from] RowsError),
551
552 #[error(transparent)]
554 IntoRowsResultError(#[from] IntoRowsResultError),
555
556 #[error(transparent)]
558 TypeCheckError(#[from] TypeCheckError),
559
560 #[error(transparent)]
562 PagerExecutionError(#[from] PagerExecutionError),
563
564 #[error(transparent)]
566 ScyllaDbNewSessionError(#[from] NewSessionError),
567
568 #[error("Namespace contains forbidden characters")]
570 InvalidNamespace,
571
572 #[error(transparent)]
574 JournalConsistencyError(#[from] JournalConsistencyError),
575
576 #[error("The batch is too long to be written")]
578 BatchTooLong,
579
580 #[error(transparent)]
582 PrepareError(#[from] PrepareError),
583
584 #[error(transparent)]
586 ExecutionError(#[from] ExecutionError),
587
588 #[error(transparent)]
590 NextRowError(#[from] NextRowError),
591}
592
593impl KeyValueStoreError for ScyllaDbStoreInternalError {
594 const BACKEND: &'static str = "scylla_db";
595}
596
597impl WithError for ScyllaDbStoreInternal {
598 type Error = ScyllaDbStoreInternalError;
599}
600
601impl ReadableKeyValueStore for ScyllaDbStoreInternal {
602 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
603
604 fn max_stream_queries(&self) -> usize {
605 self.max_stream_queries
606 }
607
608 async fn read_value_bytes(
609 &self,
610 key: &[u8],
611 ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
612 let store = self.store.deref();
613 let _guard = self.acquire().await;
614 store
615 .read_value_internal(&self.root_key, key.to_vec())
616 .await
617 }
618
619 async fn contains_key(&self, key: &[u8]) -> Result<bool, ScyllaDbStoreInternalError> {
620 let store = self.store.deref();
621 let _guard = self.acquire().await;
622 store
623 .contains_key_internal(&self.root_key, key.to_vec())
624 .await
625 }
626
627 async fn contains_keys(
628 &self,
629 keys: Vec<Vec<u8>>,
630 ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
631 if keys.is_empty() {
632 return Ok(Vec::new());
633 }
634 let store = self.store.deref();
635 let _guard = self.acquire().await;
636 let handles = keys
637 .chunks(MAX_MULTI_KEYS)
638 .map(|keys| store.contains_keys_internal(&self.root_key, keys.to_vec()));
639 let results: Vec<_> = join_all(handles)
640 .await
641 .into_iter()
642 .collect::<Result<_, _>>()?;
643 Ok(results.into_iter().flatten().collect())
644 }
645
646 async fn read_multi_values_bytes(
647 &self,
648 keys: Vec<Vec<u8>>,
649 ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
650 if keys.is_empty() {
651 return Ok(Vec::new());
652 }
653 let store = self.store.deref();
654 let _guard = self.acquire().await;
655 let handles = keys
656 .chunks(MAX_MULTI_KEYS)
657 .map(|keys| store.read_multi_values_internal(&self.root_key, keys.to_vec()));
658 let results: Vec<_> = join_all(handles)
659 .await
660 .into_iter()
661 .collect::<Result<_, _>>()?;
662 Ok(results.into_iter().flatten().collect())
663 }
664
665 async fn find_keys_by_prefix(
666 &self,
667 key_prefix: &[u8],
668 ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
669 let store = self.store.deref();
670 let _guard = self.acquire().await;
671 store
672 .find_keys_by_prefix_internal(&self.root_key, key_prefix.to_vec())
673 .await
674 }
675
676 async fn find_key_values_by_prefix(
677 &self,
678 key_prefix: &[u8],
679 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
680 let store = self.store.deref();
681 let _guard = self.acquire().await;
682 store
683 .find_key_values_by_prefix_internal(&self.root_key, key_prefix.to_vec())
684 .await
685 }
686}
687
688impl DirectWritableKeyValueStore for ScyllaDbStoreInternal {
689 const MAX_BATCH_SIZE: usize = MAX_BATCH_SIZE;
690 const MAX_BATCH_TOTAL_SIZE: usize = MAX_BATCH_TOTAL_SIZE;
691 const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
692
693 type Batch = UnorderedBatch;
699
700 async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> {
701 let store = self.store.deref();
702 let _guard = self.acquire().await;
703 store.write_batch_internal(&self.root_key, batch).await
704 }
705}
706
707fn get_big_root_key(root_key: &[u8]) -> Vec<u8> {
709 let mut big_key = vec![0];
710 big_key.extend(root_key);
711 big_key
712}
713
714#[derive(Clone, Debug, Deserialize, Serialize)]
716pub struct ScyllaDbStoreInternalConfig {
717 pub uri: String,
719 pub max_concurrent_queries: Option<usize>,
721 pub max_stream_queries: usize,
723 pub replication_factor: u32,
725}
726
727impl AdminKeyValueStore for ScyllaDbStoreInternal {
728 type Config = ScyllaDbStoreInternalConfig;
729
730 fn get_name() -> String {
731 "scylladb internal".to_string()
732 }
733
734 async fn connect(
735 config: &Self::Config,
736 namespace: &str,
737 ) -> Result<Self, ScyllaDbStoreInternalError> {
738 Self::check_namespace(namespace)?;
739 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
740 let store = ScyllaDbClient::new(session, namespace).await?;
741 let store = Arc::new(store);
742 let semaphore = config
743 .max_concurrent_queries
744 .map(|n| Arc::new(Semaphore::new(n)));
745 let max_stream_queries = config.max_stream_queries;
746 let root_key = get_big_root_key(&[]);
747 Ok(Self {
748 store,
749 semaphore,
750 max_stream_queries,
751 root_key,
752 })
753 }
754
755 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, ScyllaDbStoreInternalError> {
756 let store = self.store.clone();
757 let semaphore = self.semaphore.clone();
758 let max_stream_queries = self.max_stream_queries;
759 let root_key = get_big_root_key(root_key);
760 Ok(Self {
761 store,
762 semaphore,
763 max_stream_queries,
764 root_key,
765 })
766 }
767
768 async fn list_all(config: &Self::Config) -> Result<Vec<String>, ScyllaDbStoreInternalError> {
769 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
770 let statement = session
771 .prepare(format!("DESCRIBE KEYSPACE {}", KEYSPACE))
772 .await?;
773 let result = session.execute_iter(statement, &[]).await;
774 let miss_msg = format!("'{}' not found in keyspaces", KEYSPACE);
775 let result = match result {
776 Ok(result) => result,
777 Err(error) => {
778 let invalid_or_keyspace_not_found = match &error {
779 PagerExecutionError::NextPageError(NextPageError::RequestFailure(
780 RequestError::LastAttemptError(RequestAttemptError::DbError(db_error, msg)),
781 )) => *db_error == DbError::Invalid && msg.as_str() == miss_msg,
782 _ => false,
783 };
784 if invalid_or_keyspace_not_found {
785 return Ok(Vec::new());
786 } else {
787 return Err(ScyllaDbStoreInternalError::PagerExecutionError(error));
788 }
789 }
790 };
791 let mut namespaces = Vec::new();
792 let mut rows_stream = result.rows_stream::<(String, String, String, String)>()?;
793 while let Some(row) = rows_stream.next().await {
794 let (_, object_kind, name, _) = row?;
795 if object_kind == "table" {
796 namespaces.push(name);
797 }
798 }
799 Ok(namespaces)
800 }
801
802 async fn list_root_keys(
803 config: &Self::Config,
804 namespace: &str,
805 ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
806 Self::check_namespace(namespace)?;
807 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
808 let statement = session
809 .prepare(format!(
810 "SELECT root_key FROM {}.{} ALLOW FILTERING",
811 KEYSPACE, namespace
812 ))
813 .await?;
814
815 let rows = session.execute_iter(statement, &[]).await?;
817 let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
818 let mut root_keys = BTreeSet::new();
819 while let Some(row) = rows.next().await {
820 let (root_key,) = row?;
821 let root_key = root_key[1..].to_vec();
822 root_keys.insert(root_key);
823 }
824 Ok(root_keys.into_iter().collect::<Vec<_>>())
825 }
826
827 async fn delete_all(store_config: &Self::Config) -> Result<(), ScyllaDbStoreInternalError> {
828 let session = ScyllaDbClient::build_default_session(&store_config.uri).await?;
829 let statement = session
830 .prepare(format!("DROP KEYSPACE IF EXISTS {}", KEYSPACE))
831 .await?;
832
833 session
834 .execute_single_page(&statement, &[], PagingState::start())
835 .await?;
836 Ok(())
837 }
838
839 async fn exists(
840 config: &Self::Config,
841 namespace: &str,
842 ) -> Result<bool, ScyllaDbStoreInternalError> {
843 Self::check_namespace(namespace)?;
844 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
845
846 let result = session
848 .prepare(format!(
849 "SELECT root_key FROM {}.{} LIMIT 1 ALLOW FILTERING",
850 KEYSPACE, namespace
851 ))
852 .await;
853
854 let miss_msg1 = format!("unconfigured table {}", namespace);
856 let miss_msg1 = miss_msg1.as_str();
857 let miss_msg2 = "Undefined name root_key in selection clause";
858 let miss_msg3 = format!("Keyspace {} does not exist", KEYSPACE);
859 let Err(error) = result else {
860 return Ok(true);
862 };
863 let missing_table = match &error {
864 PrepareError::AllAttemptsFailed {
865 first_attempt: RequestAttemptError::DbError(db_error, msg),
866 } => {
867 if *db_error != DbError::Invalid {
868 false
869 } else {
870 msg.as_str() == miss_msg1
871 || msg.as_str() == miss_msg2
872 || msg.as_str() == miss_msg3
873 }
874 }
875 _ => false,
876 };
877 if missing_table {
878 Ok(false)
879 } else {
880 Err(ScyllaDbStoreInternalError::PrepareError(error))
881 }
882 }
883
884 async fn create(
885 config: &Self::Config,
886 namespace: &str,
887 ) -> Result<(), ScyllaDbStoreInternalError> {
888 Self::check_namespace(namespace)?;
889 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
890
891 let statement = session
893 .prepare(format!(
894 "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ \
895 'class' : 'NetworkTopologyStrategy', \
896 'replication_factor' : {} \
897 }}",
898 KEYSPACE, config.replication_factor
899 ))
900 .await?;
901 session
902 .execute_single_page(&statement, &[], PagingState::start())
903 .await?;
904
905 let statement = session
908 .prepare(format!(
909 "CREATE TABLE {}.{} (\
910 root_key blob, \
911 k blob, \
912 v blob, \
913 PRIMARY KEY (root_key, k) \
914 ) \
915 WITH compaction = {{ \
916 'class' : 'SizeTieredCompactionStrategy', \
917 'min_sstable_size' : 52428800, \
918 'bucket_low' : 0.5, \
919 'bucket_high' : 1.5, \
920 'min_threshold' : 4, \
921 'max_threshold' : 32 \
922 }} \
923 AND compression = {{ \
924 'sstable_compression': 'LZ4Compressor', \
925 'chunk_length_in_kb':'4' \
926 }} \
927 AND caching = {{ \
928 'enabled': 'true' \
929 }}",
930 KEYSPACE, namespace
931 ))
932 .await?;
933 session
934 .execute_single_page(&statement, &[], PagingState::start())
935 .await?;
936 Ok(())
937 }
938
939 async fn delete(
940 config: &Self::Config,
941 namespace: &str,
942 ) -> Result<(), ScyllaDbStoreInternalError> {
943 Self::check_namespace(namespace)?;
944 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
945 let statement = session
946 .prepare(format!("DROP TABLE IF EXISTS {}.{};", KEYSPACE, namespace))
947 .await?;
948 session
949 .execute_single_page(&statement, &[], PagingState::start())
950 .await?;
951 Ok(())
952 }
953}
954
955impl ScyllaDbStoreInternal {
956 async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
958 match &self.semaphore {
959 None => None,
960 Some(count) => Some(count.acquire().await),
961 }
962 }
963
964 fn check_namespace(namespace: &str) -> Result<(), ScyllaDbStoreInternalError> {
965 if !namespace.is_empty()
966 && namespace.len() <= 48
967 && namespace
968 .chars()
969 .all(|c| c.is_ascii_alphanumeric() || c == '_')
970 {
971 return Ok(());
972 }
973 Err(ScyllaDbStoreInternalError::InvalidNamespace)
974 }
975}
976
977#[cfg(with_testing)]
978impl TestKeyValueStore for JournalingKeyValueStore<ScyllaDbStoreInternal> {
979 async fn new_test_config() -> Result<ScyllaDbStoreInternalConfig, ScyllaDbStoreInternalError> {
980 let uri = "localhost:9042".to_string();
982 Ok(ScyllaDbStoreInternalConfig {
983 uri,
984 max_concurrent_queries: Some(10),
985 max_stream_queries: 10,
986 replication_factor: 1,
987 })
988 }
989}
990
991#[cfg(with_metrics)]
993pub type ScyllaDbStore = MeteredStore<
994 LruCachingStore<
995 MeteredStore<
996 ValueSplittingStore<MeteredStore<JournalingKeyValueStore<ScyllaDbStoreInternal>>>,
997 >,
998 >,
999>;
1000
1001#[cfg(not(with_metrics))]
1003pub type ScyllaDbStore =
1004 LruCachingStore<ValueSplittingStore<JournalingKeyValueStore<ScyllaDbStoreInternal>>>;
1005
1006pub type ScyllaDbStoreConfig = LruCachingConfig<ScyllaDbStoreInternalConfig>;
1008
1009pub type ScyllaDbStoreError = ValueSplittingError<ScyllaDbStoreInternalError>;