1use std::{
11 collections::{BTreeSet, HashMap},
12 ops::Deref,
13 sync::Arc,
14};
15
16use async_lock::{Semaphore, SemaphoreGuard};
17use futures::{future::join_all, StreamExt as _};
18use linera_base::ensure;
19use scylla::{
20 client::{
21 execution_profile::{ExecutionProfile, ExecutionProfileHandle},
22 session::Session,
23 session_builder::SessionBuilder,
24 },
25 deserialize::{DeserializationError, TypeCheckError},
26 errors::{
27 DbError, ExecutionError, IntoRowsResultError, NewSessionError, NextPageError, NextRowError,
28 PagerExecutionError, PrepareError, RequestAttemptError, RequestError, RowsError,
29 },
30 policies::{
31 load_balancing::{DefaultPolicy, LoadBalancingPolicy},
32 retry::DefaultRetryPolicy,
33 },
34 response::PagingState,
35 statement::{batch::BatchType, prepared::PreparedStatement, Consistency},
36};
37use serde::{Deserialize, Serialize};
38use thiserror::Error;
39
40#[cfg(with_metrics)]
41use crate::metering::MeteredDatabase;
42#[cfg(with_testing)]
43use crate::store::TestKeyValueDatabase;
44use crate::{
45 batch::UnorderedBatch,
46 common::{get_uleb128_size, get_upper_bound_option},
47 journaling::{JournalConsistencyError, JournalingKeyValueDatabase},
48 lru_caching::{LruCachingConfig, LruCachingDatabase},
49 store::{
50 DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
51 WithError,
52 },
53 value_splitting::{ValueSplittingDatabase, ValueSplittingError},
54 FutureSyncExt as _,
55};
56
57const MAX_MULTI_KEYS: usize = 100 - 1;
60
61const RAW_MAX_VALUE_SIZE: usize = 16 * 1024 * 1024 - 10 * 1024 - 4000;
69const MAX_KEY_SIZE: usize = 10 * 1024;
70const MAX_BATCH_TOTAL_SIZE: usize = RAW_MAX_VALUE_SIZE + MAX_KEY_SIZE;
71
72const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
85 - MAX_KEY_SIZE
86 - get_uleb128_size(RAW_MAX_VALUE_SIZE)
87 - get_uleb128_size(MAX_KEY_SIZE)
88 - 3;
89
90const MAX_BATCH_SIZE: usize = 5000;
96
97const KEYSPACE: &str = "kv";
99
100struct ScyllaDbClient {
105 session: Session,
106 namespace: String,
107 read_value: PreparedStatement,
108 contains_key: PreparedStatement,
109 write_batch_delete_prefix_unbounded: PreparedStatement,
110 write_batch_delete_prefix_bounded: PreparedStatement,
111 write_batch_deletion: PreparedStatement,
112 write_batch_insertion: PreparedStatement,
113 find_keys_by_prefix_unbounded: PreparedStatement,
114 find_keys_by_prefix_bounded: PreparedStatement,
115 find_key_values_by_prefix_unbounded: PreparedStatement,
116 find_key_values_by_prefix_bounded: PreparedStatement,
117 multi_key_values: papaya::HashMap<usize, PreparedStatement>,
118 multi_keys: papaya::HashMap<usize, PreparedStatement>,
119}
120
121impl ScyllaDbClient {
122 async fn new(session: Session, namespace: &str) -> Result<Self, ScyllaDbStoreInternalError> {
123 let namespace = namespace.to_string();
124 let read_value = session
125 .prepare(format!(
126 "SELECT v FROM {}.{} WHERE root_key = ? AND k = ?",
127 KEYSPACE, namespace
128 ))
129 .await?;
130
131 let contains_key = session
132 .prepare(format!(
133 "SELECT root_key FROM {}.{} WHERE root_key = ? AND k = ?",
134 KEYSPACE, namespace
135 ))
136 .await?;
137
138 let write_batch_delete_prefix_unbounded = session
139 .prepare(format!(
140 "DELETE FROM {}.{} WHERE root_key = ? AND k >= ?",
141 KEYSPACE, namespace
142 ))
143 .await?;
144
145 let write_batch_delete_prefix_bounded = session
146 .prepare(format!(
147 "DELETE FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?",
148 KEYSPACE, namespace
149 ))
150 .await?;
151
152 let write_batch_deletion = session
153 .prepare(format!(
154 "DELETE FROM {}.{} WHERE root_key = ? AND k = ?",
155 KEYSPACE, namespace
156 ))
157 .await?;
158
159 let write_batch_insertion = session
160 .prepare(format!(
161 "INSERT INTO {}.{} (root_key, k, v) VALUES (?, ?, ?)",
162 KEYSPACE, namespace
163 ))
164 .await?;
165
166 let find_keys_by_prefix_unbounded = session
167 .prepare(format!(
168 "SELECT k FROM {}.{} WHERE root_key = ? AND k >= ?",
169 KEYSPACE, namespace
170 ))
171 .await?;
172
173 let find_keys_by_prefix_bounded = session
174 .prepare(format!(
175 "SELECT k FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?",
176 KEYSPACE, namespace
177 ))
178 .await?;
179
180 let find_key_values_by_prefix_unbounded = session
181 .prepare(format!(
182 "SELECT k,v FROM {}.{} WHERE root_key = ? AND k >= ?",
183 KEYSPACE, namespace
184 ))
185 .await?;
186
187 let find_key_values_by_prefix_bounded = session
188 .prepare(format!(
189 "SELECT k,v FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?",
190 KEYSPACE, namespace
191 ))
192 .await?;
193
194 Ok(Self {
195 session,
196 namespace,
197 read_value,
198 contains_key,
199 write_batch_delete_prefix_unbounded,
200 write_batch_delete_prefix_bounded,
201 write_batch_deletion,
202 write_batch_insertion,
203 find_keys_by_prefix_unbounded,
204 find_keys_by_prefix_bounded,
205 find_key_values_by_prefix_unbounded,
206 find_key_values_by_prefix_bounded,
207 multi_key_values: papaya::HashMap::new(),
208 multi_keys: papaya::HashMap::new(),
209 })
210 }
211
212 fn build_default_policy() -> Arc<dyn LoadBalancingPolicy> {
213 DefaultPolicy::builder().token_aware(true).build()
214 }
215
216 fn build_default_execution_profile_handle(
217 policy: Arc<dyn LoadBalancingPolicy>,
218 ) -> ExecutionProfileHandle {
219 let default_profile = ExecutionProfile::builder()
220 .load_balancing_policy(policy)
221 .retry_policy(Arc::new(DefaultRetryPolicy::new()))
222 .consistency(Consistency::LocalQuorum)
223 .build();
224 default_profile.into_handle()
225 }
226
227 async fn build_default_session(uri: &str) -> Result<Session, ScyllaDbStoreInternalError> {
228 SessionBuilder::new()
231 .known_node(uri)
232 .default_execution_profile_handle(Self::build_default_execution_profile_handle(
233 Self::build_default_policy(),
234 ))
235 .build()
236 .boxed_sync()
237 .await
238 .map_err(Into::into)
239 }
240
241 async fn get_multi_key_values_statement(
242 &self,
243 num_markers: usize,
244 ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
245 if let Some(prepared_statement) = self.multi_key_values.pin().get(&num_markers) {
246 return Ok(prepared_statement.clone());
247 }
248 let markers = std::iter::repeat_n("?", num_markers)
249 .collect::<Vec<_>>()
250 .join(",");
251 let prepared_statement = self
252 .session
253 .prepare(format!(
254 "SELECT k,v FROM {}.{} WHERE root_key = ? AND k IN ({})",
255 KEYSPACE, self.namespace, markers
256 ))
257 .await?;
258 self.multi_key_values
259 .pin()
260 .insert(num_markers, prepared_statement.clone());
261 Ok(prepared_statement)
262 }
263
264 async fn get_multi_keys_statement(
265 &self,
266 num_markers: usize,
267 ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
268 if let Some(prepared_statement) = self.multi_keys.pin().get(&num_markers) {
269 return Ok(prepared_statement.clone());
270 };
271 let markers = std::iter::repeat_n("?", num_markers)
272 .collect::<Vec<_>>()
273 .join(",");
274 let prepared_statement = self
275 .session
276 .prepare(format!(
277 "SELECT k FROM {}.{} WHERE root_key = ? AND k IN ({})",
278 KEYSPACE, self.namespace, markers
279 ))
280 .await?;
281 self.multi_keys
282 .pin()
283 .insert(num_markers, prepared_statement.clone());
284 Ok(prepared_statement)
285 }
286
287 fn check_key_size(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
288 ensure!(
289 key.len() <= MAX_KEY_SIZE,
290 ScyllaDbStoreInternalError::KeyTooLong
291 );
292 Ok(())
293 }
294
295 fn check_value_size(value: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
296 ensure!(
297 value.len() <= RAW_MAX_VALUE_SIZE,
298 ScyllaDbStoreInternalError::ValueTooLong
299 );
300 Ok(())
301 }
302
303 fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> {
304 ensure!(
305 batch.len() <= MAX_BATCH_SIZE,
306 ScyllaDbStoreInternalError::BatchTooLong
307 );
308 Ok(())
309 }
310
311 async fn read_value_internal(
312 &self,
313 root_key: &[u8],
314 key: Vec<u8>,
315 ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
316 Self::check_key_size(&key)?;
317 let session = &self.session;
318 let values = (root_key.to_vec(), key);
320
321 let (result, _) = session
322 .execute_single_page(&self.read_value, &values, PagingState::start())
323 .await?;
324 let rows = result.into_rows_result()?;
325 let mut rows = rows.rows::<(Vec<u8>,)>()?;
326 Ok(match rows.next() {
327 Some(row) => Some(row?.0),
328 None => None,
329 })
330 }
331
332 fn get_occurrences_map(
333 keys: Vec<Vec<u8>>,
334 ) -> Result<HashMap<Vec<u8>, Vec<usize>>, ScyllaDbStoreInternalError> {
335 let mut map = HashMap::<Vec<u8>, Vec<usize>>::new();
336 for (i_key, key) in keys.into_iter().enumerate() {
337 Self::check_key_size(&key)?;
338 map.entry(key).or_default().push(i_key);
339 }
340 Ok(map)
341 }
342
343 async fn read_multi_values_internal(
344 &self,
345 root_key: &[u8],
346 keys: Vec<Vec<u8>>,
347 ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
348 let mut values = vec![None; keys.len()];
349 let map = Self::get_occurrences_map(keys)?;
350 let statement = self.get_multi_key_values_statement(map.len()).await?;
351 let mut inputs = vec![root_key.to_vec()];
352 inputs.extend(map.keys().cloned());
353 let mut rows = self
354 .session
355 .execute_iter(statement, &inputs)
356 .await?
357 .rows_stream::<(Vec<u8>, Vec<u8>)>()?;
358
359 while let Some(row) = rows.next().await {
360 let (key, value) = row?;
361 for i_key in &map[&key] {
362 values[*i_key] = Some(value.clone());
363 }
364 }
365 Ok(values)
366 }
367
368 async fn contains_keys_internal(
369 &self,
370 root_key: &[u8],
371 keys: Vec<Vec<u8>>,
372 ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
373 let mut values = vec![false; keys.len()];
374 let map = Self::get_occurrences_map(keys)?;
375 let statement = self.get_multi_keys_statement(map.len()).await?;
376 let mut inputs = vec![root_key.to_vec()];
377 inputs.extend(map.keys().cloned());
378 let mut rows = self
379 .session
380 .execute_iter(statement, &inputs)
381 .await?
382 .rows_stream::<(Vec<u8>,)>()?;
383
384 while let Some(row) = rows.next().await {
385 let (key,) = row?;
386 for i_key in &map[&key] {
387 values[*i_key] = true;
388 }
389 }
390
391 Ok(values)
392 }
393
394 async fn contains_key_internal(
395 &self,
396 root_key: &[u8],
397 key: Vec<u8>,
398 ) -> Result<bool, ScyllaDbStoreInternalError> {
399 Self::check_key_size(&key)?;
400 let session = &self.session;
401 let values = (root_key.to_vec(), key);
403
404 let (result, _) = session
405 .execute_single_page(&self.contains_key, &values, PagingState::start())
406 .await?;
407 let rows = result.into_rows_result()?;
408 let mut rows = rows.rows::<(Vec<u8>,)>()?;
409 Ok(rows.next().is_some())
410 }
411
412 async fn write_batch_internal(
413 &self,
414 root_key: &[u8],
415 batch: UnorderedBatch,
416 ) -> Result<(), ScyllaDbStoreInternalError> {
417 let session = &self.session;
418 let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged);
419 let mut batch_values = Vec::new();
420 let query1 = &self.write_batch_delete_prefix_unbounded;
421 let query2 = &self.write_batch_delete_prefix_bounded;
422 Self::check_batch_len(&batch)?;
423 for key_prefix in batch.key_prefix_deletions {
424 Self::check_key_size(&key_prefix)?;
425 match get_upper_bound_option(&key_prefix) {
426 None => {
427 let values = vec![root_key.to_vec(), key_prefix];
428 batch_values.push(values);
429 batch_query.append_statement(query1.clone());
430 }
431 Some(upper_bound) => {
432 let values = vec![root_key.to_vec(), key_prefix, upper_bound];
433 batch_values.push(values);
434 batch_query.append_statement(query2.clone());
435 }
436 }
437 }
438 let query3 = &self.write_batch_deletion;
439 for key in batch.simple_unordered_batch.deletions {
440 Self::check_key_size(&key)?;
441 let values = vec![root_key.to_vec(), key];
442 batch_values.push(values);
443 batch_query.append_statement(query3.clone());
444 }
445 let query4 = &self.write_batch_insertion;
446 for (key, value) in batch.simple_unordered_batch.insertions {
447 Self::check_key_size(&key)?;
448 Self::check_value_size(&value)?;
449 let values = vec![root_key.to_vec(), key, value];
450 batch_values.push(values);
451 batch_query.append_statement(query4.clone());
452 }
453 session.batch(&batch_query, batch_values).await?;
454 Ok(())
455 }
456
457 async fn find_keys_by_prefix_internal(
458 &self,
459 root_key: &[u8],
460 key_prefix: Vec<u8>,
461 ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
462 Self::check_key_size(&key_prefix)?;
463 let session = &self.session;
464 let len = key_prefix.len();
466 let query_unbounded = &self.find_keys_by_prefix_unbounded;
467 let query_bounded = &self.find_keys_by_prefix_bounded;
468 let rows = match get_upper_bound_option(&key_prefix) {
469 None => {
470 let values = (root_key.to_vec(), key_prefix.clone());
471 session
472 .execute_iter(query_unbounded.clone(), values)
473 .await?
474 }
475 Some(upper_bound) => {
476 let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
477 session.execute_iter(query_bounded.clone(), values).await?
478 }
479 };
480 let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
481 let mut keys = Vec::new();
482 while let Some(row) = rows.next().await {
483 let (key,) = row?;
484 let short_key = key[len..].to_vec();
485 keys.push(short_key);
486 }
487 Ok(keys)
488 }
489
490 async fn find_key_values_by_prefix_internal(
491 &self,
492 root_key: &[u8],
493 key_prefix: Vec<u8>,
494 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
495 Self::check_key_size(&key_prefix)?;
496 let session = &self.session;
497 let len = key_prefix.len();
499 let query_unbounded = &self.find_key_values_by_prefix_unbounded;
500 let query_bounded = &self.find_key_values_by_prefix_bounded;
501 let rows = match get_upper_bound_option(&key_prefix) {
502 None => {
503 let values = (root_key.to_vec(), key_prefix.clone());
504 session
505 .execute_iter(query_unbounded.clone(), values)
506 .await?
507 }
508 Some(upper_bound) => {
509 let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
510 session.execute_iter(query_bounded.clone(), values).await?
511 }
512 };
513 let mut rows = rows.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
514 let mut key_values = Vec::new();
515 while let Some(row) = rows.next().await {
516 let (key, value) = row?;
517 let short_key = key[len..].to_vec();
518 key_values.push((short_key, value));
519 }
520 Ok(key_values)
521 }
522}
523
524#[derive(Clone)]
526pub struct ScyllaDbStoreInternal {
527 store: Arc<ScyllaDbClient>,
528 semaphore: Option<Arc<Semaphore>>,
529 max_stream_queries: usize,
530 root_key: Vec<u8>,
531}
532
533#[derive(Clone)]
535pub struct ScyllaDbDatabaseInternal {
536 store: Arc<ScyllaDbClient>,
537 semaphore: Option<Arc<Semaphore>>,
538 max_stream_queries: usize,
539}
540
541impl WithError for ScyllaDbDatabaseInternal {
542 type Error = ScyllaDbStoreInternalError;
543}
544
545#[derive(Error, Debug)]
547pub enum ScyllaDbStoreInternalError {
548 #[error(transparent)]
550 BcsError(#[from] bcs::Error),
551
552 #[error("The key must have at most MAX_KEY_SIZE")]
554 KeyTooLong,
555
556 #[error("The value must have at most RAW_MAX_VALUE_SIZE")]
558 ValueTooLong,
559
560 #[error(transparent)]
562 DeserializationError(#[from] DeserializationError),
563
564 #[error(transparent)]
566 RowsError(#[from] RowsError),
567
568 #[error(transparent)]
570 IntoRowsResultError(#[from] IntoRowsResultError),
571
572 #[error(transparent)]
574 TypeCheckError(#[from] TypeCheckError),
575
576 #[error(transparent)]
578 PagerExecutionError(#[from] PagerExecutionError),
579
580 #[error(transparent)]
582 ScyllaDbNewSessionError(#[from] NewSessionError),
583
584 #[error("Namespace contains forbidden characters")]
586 InvalidNamespace,
587
588 #[error(transparent)]
590 JournalConsistencyError(#[from] JournalConsistencyError),
591
592 #[error("The batch is too long to be written")]
594 BatchTooLong,
595
596 #[error(transparent)]
598 PrepareError(#[from] PrepareError),
599
600 #[error(transparent)]
602 ExecutionError(#[from] ExecutionError),
603
604 #[error(transparent)]
606 NextRowError(#[from] NextRowError),
607}
608
609impl KeyValueStoreError for ScyllaDbStoreInternalError {
610 const BACKEND: &'static str = "scylla_db";
611}
612
613impl WithError for ScyllaDbStoreInternal {
614 type Error = ScyllaDbStoreInternalError;
615}
616
617impl ReadableKeyValueStore for ScyllaDbStoreInternal {
618 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
619
620 fn max_stream_queries(&self) -> usize {
621 self.max_stream_queries
622 }
623
624 async fn read_value_bytes(
625 &self,
626 key: &[u8],
627 ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
628 let store = self.store.deref();
629 let _guard = self.acquire().await;
630 store
631 .read_value_internal(&self.root_key, key.to_vec())
632 .await
633 }
634
635 async fn contains_key(&self, key: &[u8]) -> Result<bool, ScyllaDbStoreInternalError> {
636 let store = self.store.deref();
637 let _guard = self.acquire().await;
638 store
639 .contains_key_internal(&self.root_key, key.to_vec())
640 .await
641 }
642
643 async fn contains_keys(
644 &self,
645 keys: Vec<Vec<u8>>,
646 ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
647 if keys.is_empty() {
648 return Ok(Vec::new());
649 }
650 let store = self.store.deref();
651 let _guard = self.acquire().await;
652 let handles = keys
653 .chunks(MAX_MULTI_KEYS)
654 .map(|keys| store.contains_keys_internal(&self.root_key, keys.to_vec()));
655 let results: Vec<_> = join_all(handles)
656 .await
657 .into_iter()
658 .collect::<Result<_, _>>()?;
659 Ok(results.into_iter().flatten().collect())
660 }
661
662 async fn read_multi_values_bytes(
663 &self,
664 keys: Vec<Vec<u8>>,
665 ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
666 if keys.is_empty() {
667 return Ok(Vec::new());
668 }
669 let store = self.store.deref();
670 let _guard = self.acquire().await;
671 let handles = keys
672 .chunks(MAX_MULTI_KEYS)
673 .map(|keys| store.read_multi_values_internal(&self.root_key, keys.to_vec()));
674 let results: Vec<_> = join_all(handles)
675 .await
676 .into_iter()
677 .collect::<Result<_, _>>()?;
678 Ok(results.into_iter().flatten().collect())
679 }
680
681 async fn find_keys_by_prefix(
682 &self,
683 key_prefix: &[u8],
684 ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
685 let store = self.store.deref();
686 let _guard = self.acquire().await;
687 store
688 .find_keys_by_prefix_internal(&self.root_key, key_prefix.to_vec())
689 .await
690 }
691
692 async fn find_key_values_by_prefix(
693 &self,
694 key_prefix: &[u8],
695 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
696 let store = self.store.deref();
697 let _guard = self.acquire().await;
698 store
699 .find_key_values_by_prefix_internal(&self.root_key, key_prefix.to_vec())
700 .await
701 }
702}
703
704impl DirectWritableKeyValueStore for ScyllaDbStoreInternal {
705 const MAX_BATCH_SIZE: usize = MAX_BATCH_SIZE;
706 const MAX_BATCH_TOTAL_SIZE: usize = MAX_BATCH_TOTAL_SIZE;
707 const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
708
709 type Batch = UnorderedBatch;
715
716 async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> {
717 let store = self.store.deref();
718 let _guard = self.acquire().await;
719 store.write_batch_internal(&self.root_key, batch).await
720 }
721}
722
723fn get_big_root_key(root_key: &[u8]) -> Vec<u8> {
725 let mut big_key = vec![0];
726 big_key.extend(root_key);
727 big_key
728}
729
730#[derive(Clone, Debug, Deserialize, Serialize)]
732pub struct ScyllaDbStoreInternalConfig {
733 pub uri: String,
735 pub max_concurrent_queries: Option<usize>,
737 pub max_stream_queries: usize,
739 pub replication_factor: u32,
741}
742
743impl KeyValueDatabase for ScyllaDbDatabaseInternal {
744 type Config = ScyllaDbStoreInternalConfig;
745 type Store = ScyllaDbStoreInternal;
746
747 fn get_name() -> String {
748 "scylladb internal".to_string()
749 }
750
751 async fn connect(
752 config: &Self::Config,
753 namespace: &str,
754 ) -> Result<Self, ScyllaDbStoreInternalError> {
755 Self::check_namespace(namespace)?;
756 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
757 let store = ScyllaDbClient::new(session, namespace).await?;
758 let store = Arc::new(store);
759 let semaphore = config
760 .max_concurrent_queries
761 .map(|n| Arc::new(Semaphore::new(n)));
762 let max_stream_queries = config.max_stream_queries;
763 Ok(Self {
764 store,
765 semaphore,
766 max_stream_queries,
767 })
768 }
769
770 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
771 let store = self.store.clone();
772 let semaphore = self.semaphore.clone();
773 let max_stream_queries = self.max_stream_queries;
774 let root_key = get_big_root_key(root_key);
775 Ok(ScyllaDbStoreInternal {
776 store,
777 semaphore,
778 max_stream_queries,
779 root_key,
780 })
781 }
782
783 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
784 self.open_shared(root_key)
785 }
786
787 async fn list_all(config: &Self::Config) -> Result<Vec<String>, ScyllaDbStoreInternalError> {
788 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
789 let statement = session
790 .prepare(format!("DESCRIBE KEYSPACE {}", KEYSPACE))
791 .await?;
792 let result = session.execute_iter(statement, &[]).await;
793 let miss_msg = format!("'{}' not found in keyspaces", KEYSPACE);
794 let result = match result {
795 Ok(result) => result,
796 Err(error) => {
797 let invalid_or_keyspace_not_found = match &error {
798 PagerExecutionError::NextPageError(NextPageError::RequestFailure(
799 RequestError::LastAttemptError(RequestAttemptError::DbError(db_error, msg)),
800 )) => *db_error == DbError::Invalid && msg.as_str() == miss_msg,
801 _ => false,
802 };
803 if invalid_or_keyspace_not_found {
804 return Ok(Vec::new());
805 } else {
806 return Err(ScyllaDbStoreInternalError::PagerExecutionError(error));
807 }
808 }
809 };
810 let mut namespaces = Vec::new();
811 let mut rows_stream = result.rows_stream::<(String, String, String, String)>()?;
812 while let Some(row) = rows_stream.next().await {
813 let (_, object_kind, name, _) = row?;
814 if object_kind == "table" {
815 namespaces.push(name);
816 }
817 }
818 Ok(namespaces)
819 }
820
821 async fn list_root_keys(
822 config: &Self::Config,
823 namespace: &str,
824 ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
825 Self::check_namespace(namespace)?;
826 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
827 let statement = session
828 .prepare(format!(
829 "SELECT root_key FROM {}.{} ALLOW FILTERING",
830 KEYSPACE, namespace
831 ))
832 .await?;
833
834 let rows = session.execute_iter(statement, &[]).await?;
836 let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
837 let mut root_keys = BTreeSet::new();
838 while let Some(row) = rows.next().await {
839 let (root_key,) = row?;
840 let root_key = root_key[1..].to_vec();
841 root_keys.insert(root_key);
842 }
843 Ok(root_keys.into_iter().collect::<Vec<_>>())
844 }
845
846 async fn delete_all(store_config: &Self::Config) -> Result<(), ScyllaDbStoreInternalError> {
847 let session = ScyllaDbClient::build_default_session(&store_config.uri).await?;
848 let statement = session
849 .prepare(format!("DROP KEYSPACE IF EXISTS {}", KEYSPACE))
850 .await?;
851
852 session
853 .execute_single_page(&statement, &[], PagingState::start())
854 .await?;
855 Ok(())
856 }
857
858 async fn exists(
859 config: &Self::Config,
860 namespace: &str,
861 ) -> Result<bool, ScyllaDbStoreInternalError> {
862 Self::check_namespace(namespace)?;
863 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
864
865 let result = session
867 .prepare(format!(
868 "SELECT root_key FROM {}.{} LIMIT 1 ALLOW FILTERING",
869 KEYSPACE, namespace
870 ))
871 .await;
872
873 let miss_msg1 = format!("unconfigured table {}", namespace);
875 let miss_msg1 = miss_msg1.as_str();
876 let miss_msg2 = "Undefined name root_key in selection clause";
877 let miss_msg3 = format!("Keyspace {} does not exist", KEYSPACE);
878 let Err(error) = result else {
879 return Ok(true);
881 };
882 let missing_table = match &error {
883 PrepareError::AllAttemptsFailed {
884 first_attempt: RequestAttemptError::DbError(db_error, msg),
885 } => {
886 if *db_error != DbError::Invalid {
887 false
888 } else {
889 msg.as_str() == miss_msg1
890 || msg.as_str() == miss_msg2
891 || msg.as_str() == miss_msg3
892 }
893 }
894 _ => false,
895 };
896 if missing_table {
897 Ok(false)
898 } else {
899 Err(ScyllaDbStoreInternalError::PrepareError(error))
900 }
901 }
902
903 async fn create(
904 config: &Self::Config,
905 namespace: &str,
906 ) -> Result<(), ScyllaDbStoreInternalError> {
907 Self::check_namespace(namespace)?;
908 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
909
910 let statement = session
912 .prepare(format!(
913 "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ \
914 'class' : 'NetworkTopologyStrategy', \
915 'replication_factor' : {} \
916 }}",
917 KEYSPACE, config.replication_factor
918 ))
919 .await?;
920 session
921 .execute_single_page(&statement, &[], PagingState::start())
922 .await?;
923
924 let statement = session
927 .prepare(format!(
928 "CREATE TABLE {}.{} (\
929 root_key blob, \
930 k blob, \
931 v blob, \
932 PRIMARY KEY (root_key, k) \
933 ) \
934 WITH compaction = {{ \
935 'class' : 'SizeTieredCompactionStrategy', \
936 'min_sstable_size' : 52428800, \
937 'bucket_low' : 0.5, \
938 'bucket_high' : 1.5, \
939 'min_threshold' : 4, \
940 'max_threshold' : 32 \
941 }} \
942 AND compression = {{ \
943 'sstable_compression': 'LZ4Compressor', \
944 'chunk_length_in_kb':'4' \
945 }} \
946 AND caching = {{ \
947 'enabled': 'true' \
948 }}",
949 KEYSPACE, namespace
950 ))
951 .await?;
952 session
953 .execute_single_page(&statement, &[], PagingState::start())
954 .await?;
955 Ok(())
956 }
957
958 async fn delete(
959 config: &Self::Config,
960 namespace: &str,
961 ) -> Result<(), ScyllaDbStoreInternalError> {
962 Self::check_namespace(namespace)?;
963 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
964 let statement = session
965 .prepare(format!("DROP TABLE IF EXISTS {}.{};", KEYSPACE, namespace))
966 .await?;
967 session
968 .execute_single_page(&statement, &[], PagingState::start())
969 .await?;
970 Ok(())
971 }
972}
973
974impl ScyllaDbStoreInternal {
975 async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
977 match &self.semaphore {
978 None => None,
979 Some(count) => Some(count.acquire().await),
980 }
981 }
982}
983
984impl ScyllaDbDatabaseInternal {
985 fn check_namespace(namespace: &str) -> Result<(), ScyllaDbStoreInternalError> {
986 if !namespace.is_empty()
987 && namespace.len() <= 48
988 && namespace
989 .chars()
990 .all(|c| c.is_ascii_alphanumeric() || c == '_')
991 {
992 return Ok(());
993 }
994 Err(ScyllaDbStoreInternalError::InvalidNamespace)
995 }
996}
997
998#[cfg(with_testing)]
999impl TestKeyValueDatabase for JournalingKeyValueDatabase<ScyllaDbDatabaseInternal> {
1000 async fn new_test_config() -> Result<ScyllaDbStoreInternalConfig, ScyllaDbStoreInternalError> {
1001 let uri = "localhost:9042".to_string();
1003 Ok(ScyllaDbStoreInternalConfig {
1004 uri,
1005 max_concurrent_queries: Some(10),
1006 max_stream_queries: 10,
1007 replication_factor: 1,
1008 })
1009 }
1010}
1011
1012#[cfg(with_metrics)]
1014pub type ScyllaDbDatabase = MeteredDatabase<
1015 LruCachingDatabase<
1016 MeteredDatabase<
1017 ValueSplittingDatabase<
1018 MeteredDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
1019 >,
1020 >,
1021 >,
1022>;
1023
1024#[cfg(not(with_metrics))]
1026pub type ScyllaDbDatabase = LruCachingDatabase<
1027 ValueSplittingDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
1028>;
1029
1030pub type ScyllaDbStoreConfig = LruCachingConfig<ScyllaDbStoreInternalConfig>;
1032
1033pub type ScyllaDbStoreError = ValueSplittingError<ScyllaDbStoreInternalError>;