1use std::{
11 collections::{BTreeSet, HashMap},
12 ops::Deref,
13 sync::Arc,
14};
15
16use async_lock::{Semaphore, SemaphoreGuard};
17use futures::{future::join_all, StreamExt as _};
18use linera_base::ensure;
19use scylla::{
20 client::{
21 execution_profile::{ExecutionProfile, ExecutionProfileHandle},
22 session::Session,
23 session_builder::SessionBuilder,
24 },
25 deserialize::{DeserializationError, TypeCheckError},
26 errors::{
27 DbError, ExecutionError, IntoRowsResultError, NewSessionError, NextPageError, NextRowError,
28 PagerExecutionError, PrepareError, RequestAttemptError, RequestError, RowsError,
29 },
30 policies::{
31 load_balancing::{DefaultPolicy, LoadBalancingPolicy},
32 retry::DefaultRetryPolicy,
33 },
34 response::PagingState,
35 statement::{batch::BatchType, prepared::PreparedStatement, Consistency},
36};
37use serde::{Deserialize, Serialize};
38use thiserror::Error;
39
40#[cfg(with_metrics)]
41use crate::metering::MeteredDatabase;
42#[cfg(with_testing)]
43use crate::store::TestKeyValueDatabase;
44use crate::{
45 batch::UnorderedBatch,
46 common::{get_uleb128_size, get_upper_bound_option},
47 journaling::{JournalConsistencyError, JournalingKeyValueDatabase},
48 lru_caching::{LruCachingConfig, LruCachingDatabase},
49 store::{
50 DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
51 WithError,
52 },
53 value_splitting::{ValueSplittingDatabase, ValueSplittingError},
54 FutureSyncExt as _,
55};
56
57const MAX_MULTI_KEYS: usize = 100 - 1;
60
61const RAW_MAX_VALUE_SIZE: usize = 16 * 1024 * 1024 - 10 * 1024 - 4000;
69const MAX_KEY_SIZE: usize = 10 * 1024;
70const MAX_BATCH_TOTAL_SIZE: usize = RAW_MAX_VALUE_SIZE + MAX_KEY_SIZE;
71
72const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
85 - MAX_KEY_SIZE
86 - get_uleb128_size(RAW_MAX_VALUE_SIZE)
87 - get_uleb128_size(MAX_KEY_SIZE)
88 - 3;
89
90const MAX_BATCH_SIZE: usize = 5000;
96
97const KEYSPACE: &str = "kv";
99
100struct ScyllaDbClient {
105 session: Session,
106 namespace: String,
107 read_value: PreparedStatement,
108 contains_key: PreparedStatement,
109 write_batch_delete_prefix_unbounded: PreparedStatement,
110 write_batch_delete_prefix_bounded: PreparedStatement,
111 write_batch_deletion: PreparedStatement,
112 write_batch_insertion: PreparedStatement,
113 find_keys_by_prefix_unbounded: PreparedStatement,
114 find_keys_by_prefix_bounded: PreparedStatement,
115 find_key_values_by_prefix_unbounded: PreparedStatement,
116 find_key_values_by_prefix_bounded: PreparedStatement,
117 multi_key_values: papaya::HashMap<usize, PreparedStatement>,
118 multi_keys: papaya::HashMap<usize, PreparedStatement>,
119}
120
121impl ScyllaDbClient {
122 async fn new(session: Session, namespace: &str) -> Result<Self, ScyllaDbStoreInternalError> {
123 let namespace = namespace.to_string();
124 let read_value = session
125 .prepare(format!(
126 "SELECT v FROM {}.\"{}\" WHERE root_key = ? AND k = ?",
127 KEYSPACE, namespace
128 ))
129 .await?;
130
131 let contains_key = session
132 .prepare(format!(
133 "SELECT root_key FROM {}.\"{}\" WHERE root_key = ? AND k = ?",
134 KEYSPACE, namespace
135 ))
136 .await?;
137
138 let write_batch_delete_prefix_unbounded = session
139 .prepare(format!(
140 "DELETE FROM {}.\"{}\" WHERE root_key = ? AND k >= ?",
141 KEYSPACE, namespace
142 ))
143 .await?;
144
145 let write_batch_delete_prefix_bounded = session
146 .prepare(format!(
147 "DELETE FROM {}.\"{}\" WHERE root_key = ? AND k >= ? AND k < ?",
148 KEYSPACE, namespace
149 ))
150 .await?;
151
152 let write_batch_deletion = session
153 .prepare(format!(
154 "DELETE FROM {}.\"{}\" WHERE root_key = ? AND k = ?",
155 KEYSPACE, namespace
156 ))
157 .await?;
158
159 let write_batch_insertion = session
160 .prepare(format!(
161 "INSERT INTO {}.\"{}\" (root_key, k, v) VALUES (?, ?, ?)",
162 KEYSPACE, namespace
163 ))
164 .await?;
165
166 let find_keys_by_prefix_unbounded = session
167 .prepare(format!(
168 "SELECT k FROM {}.\"{}\" WHERE root_key = ? AND k >= ?",
169 KEYSPACE, namespace
170 ))
171 .await?;
172
173 let find_keys_by_prefix_bounded = session
174 .prepare(format!(
175 "SELECT k FROM {}.\"{}\" WHERE root_key = ? AND k >= ? AND k < ?",
176 KEYSPACE, namespace
177 ))
178 .await?;
179
180 let find_key_values_by_prefix_unbounded = session
181 .prepare(format!(
182 "SELECT k,v FROM {}.\"{}\" WHERE root_key = ? AND k >= ?",
183 KEYSPACE, namespace
184 ))
185 .await?;
186
187 let find_key_values_by_prefix_bounded = session
188 .prepare(format!(
189 "SELECT k,v FROM {}.\"{}\" WHERE root_key = ? AND k >= ? AND k < ?",
190 KEYSPACE, namespace
191 ))
192 .await?;
193
194 Ok(Self {
195 session,
196 namespace,
197 read_value,
198 contains_key,
199 write_batch_delete_prefix_unbounded,
200 write_batch_delete_prefix_bounded,
201 write_batch_deletion,
202 write_batch_insertion,
203 find_keys_by_prefix_unbounded,
204 find_keys_by_prefix_bounded,
205 find_key_values_by_prefix_unbounded,
206 find_key_values_by_prefix_bounded,
207 multi_key_values: papaya::HashMap::new(),
208 multi_keys: papaya::HashMap::new(),
209 })
210 }
211
212 fn build_default_policy() -> Arc<dyn LoadBalancingPolicy> {
213 DefaultPolicy::builder().token_aware(true).build()
214 }
215
216 fn build_default_execution_profile_handle(
217 policy: Arc<dyn LoadBalancingPolicy>,
218 ) -> ExecutionProfileHandle {
219 let default_profile = ExecutionProfile::builder()
220 .load_balancing_policy(policy)
221 .retry_policy(Arc::new(DefaultRetryPolicy::new()))
222 .consistency(Consistency::LocalQuorum)
223 .build();
224 default_profile.into_handle()
225 }
226
227 async fn build_default_session(uri: &str) -> Result<Session, ScyllaDbStoreInternalError> {
228 SessionBuilder::new()
231 .known_node(uri)
232 .default_execution_profile_handle(Self::build_default_execution_profile_handle(
233 Self::build_default_policy(),
234 ))
235 .build()
236 .boxed_sync()
237 .await
238 .map_err(Into::into)
239 }
240
241 async fn get_multi_key_values_statement(
242 &self,
243 num_markers: usize,
244 ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
245 if let Some(prepared_statement) = self.multi_key_values.pin().get(&num_markers) {
246 return Ok(prepared_statement.clone());
247 }
248 let markers = std::iter::repeat_n("?", num_markers)
249 .collect::<Vec<_>>()
250 .join(",");
251 let prepared_statement = self
252 .session
253 .prepare(format!(
254 "SELECT k,v FROM {}.\"{}\" WHERE root_key = ? AND k IN ({})",
255 KEYSPACE, self.namespace, markers
256 ))
257 .await?;
258 self.multi_key_values
259 .pin()
260 .insert(num_markers, prepared_statement.clone());
261 Ok(prepared_statement)
262 }
263
264 async fn get_multi_keys_statement(
265 &self,
266 num_markers: usize,
267 ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
268 if let Some(prepared_statement) = self.multi_keys.pin().get(&num_markers) {
269 return Ok(prepared_statement.clone());
270 };
271 let markers = std::iter::repeat_n("?", num_markers)
272 .collect::<Vec<_>>()
273 .join(",");
274 let prepared_statement = self
275 .session
276 .prepare(format!(
277 "SELECT k FROM {}.\"{}\" WHERE root_key = ? AND k IN ({})",
278 KEYSPACE, self.namespace, markers
279 ))
280 .await?;
281 self.multi_keys
282 .pin()
283 .insert(num_markers, prepared_statement.clone());
284 Ok(prepared_statement)
285 }
286
287 fn check_key_size(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
288 ensure!(
289 key.len() <= MAX_KEY_SIZE,
290 ScyllaDbStoreInternalError::KeyTooLong
291 );
292 Ok(())
293 }
294
295 fn check_value_size(value: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
296 ensure!(
297 value.len() <= RAW_MAX_VALUE_SIZE,
298 ScyllaDbStoreInternalError::ValueTooLong
299 );
300 Ok(())
301 }
302
303 fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> {
304 ensure!(
305 batch.len() <= MAX_BATCH_SIZE,
306 ScyllaDbStoreInternalError::BatchTooLong
307 );
308 Ok(())
309 }
310
311 async fn read_value_internal(
312 &self,
313 root_key: &[u8],
314 key: Vec<u8>,
315 ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
316 Self::check_key_size(&key)?;
317 let session = &self.session;
318 let values = (root_key.to_vec(), key);
320
321 let (result, _) = session
322 .execute_single_page(&self.read_value, &values, PagingState::start())
323 .await?;
324 let rows = result.into_rows_result()?;
325 let mut rows = rows.rows::<(Vec<u8>,)>()?;
326 Ok(match rows.next() {
327 Some(row) => Some(row?.0),
328 None => None,
329 })
330 }
331
332 fn get_occurrences_map(
333 keys: Vec<Vec<u8>>,
334 ) -> Result<HashMap<Vec<u8>, Vec<usize>>, ScyllaDbStoreInternalError> {
335 let mut map = HashMap::<Vec<u8>, Vec<usize>>::new();
336 for (i_key, key) in keys.into_iter().enumerate() {
337 Self::check_key_size(&key)?;
338 map.entry(key).or_default().push(i_key);
339 }
340 Ok(map)
341 }
342
343 async fn read_multi_values_internal(
344 &self,
345 root_key: &[u8],
346 keys: Vec<Vec<u8>>,
347 ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
348 let mut values = vec![None; keys.len()];
349 let map = Self::get_occurrences_map(keys)?;
350 let statement = self.get_multi_key_values_statement(map.len()).await?;
351 let mut inputs = vec![root_key.to_vec()];
352 inputs.extend(map.keys().cloned());
353 let mut rows = self
354 .session
355 .execute_iter(statement, &inputs)
356 .await?
357 .rows_stream::<(Vec<u8>, Vec<u8>)>()?;
358
359 while let Some(row) = rows.next().await {
360 let (key, value) = row?;
361 if let Some((&last, rest)) = map[&key].split_last() {
362 for position in rest {
363 values[*position] = Some(value.clone());
364 }
365 values[last] = Some(value);
366 }
367 }
368 Ok(values)
369 }
370
371 async fn contains_keys_internal(
372 &self,
373 root_key: &[u8],
374 keys: Vec<Vec<u8>>,
375 ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
376 let mut values = vec![false; keys.len()];
377 let map = Self::get_occurrences_map(keys)?;
378 let statement = self.get_multi_keys_statement(map.len()).await?;
379 let mut inputs = vec![root_key.to_vec()];
380 inputs.extend(map.keys().cloned());
381 let mut rows = self
382 .session
383 .execute_iter(statement, &inputs)
384 .await?
385 .rows_stream::<(Vec<u8>,)>()?;
386
387 while let Some(row) = rows.next().await {
388 let (key,) = row?;
389 for i_key in &map[&key] {
390 values[*i_key] = true;
391 }
392 }
393
394 Ok(values)
395 }
396
397 async fn contains_key_internal(
398 &self,
399 root_key: &[u8],
400 key: Vec<u8>,
401 ) -> Result<bool, ScyllaDbStoreInternalError> {
402 Self::check_key_size(&key)?;
403 let session = &self.session;
404 let values = (root_key.to_vec(), key);
406
407 let (result, _) = session
408 .execute_single_page(&self.contains_key, &values, PagingState::start())
409 .await?;
410 let rows = result.into_rows_result()?;
411 let mut rows = rows.rows::<(Vec<u8>,)>()?;
412 Ok(rows.next().is_some())
413 }
414
415 async fn write_batch_internal(
416 &self,
417 root_key: &[u8],
418 batch: UnorderedBatch,
419 ) -> Result<(), ScyllaDbStoreInternalError> {
420 let session = &self.session;
421 let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged);
422 let mut batch_values = Vec::new();
423 let query1 = &self.write_batch_delete_prefix_unbounded;
424 let query2 = &self.write_batch_delete_prefix_bounded;
425 Self::check_batch_len(&batch)?;
426 for key_prefix in batch.key_prefix_deletions {
427 Self::check_key_size(&key_prefix)?;
428 match get_upper_bound_option(&key_prefix) {
429 None => {
430 let values = vec![root_key.to_vec(), key_prefix];
431 batch_values.push(values);
432 batch_query.append_statement(query1.clone());
433 }
434 Some(upper_bound) => {
435 let values = vec![root_key.to_vec(), key_prefix, upper_bound];
436 batch_values.push(values);
437 batch_query.append_statement(query2.clone());
438 }
439 }
440 }
441 let query3 = &self.write_batch_deletion;
442 for key in batch.simple_unordered_batch.deletions {
443 Self::check_key_size(&key)?;
444 let values = vec![root_key.to_vec(), key];
445 batch_values.push(values);
446 batch_query.append_statement(query3.clone());
447 }
448 let query4 = &self.write_batch_insertion;
449 for (key, value) in batch.simple_unordered_batch.insertions {
450 Self::check_key_size(&key)?;
451 Self::check_value_size(&value)?;
452 let values = vec![root_key.to_vec(), key, value];
453 batch_values.push(values);
454 batch_query.append_statement(query4.clone());
455 }
456 session.batch(&batch_query, batch_values).await?;
457 Ok(())
458 }
459
460 async fn find_keys_by_prefix_internal(
461 &self,
462 root_key: &[u8],
463 key_prefix: Vec<u8>,
464 ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
465 Self::check_key_size(&key_prefix)?;
466 let session = &self.session;
467 let len = key_prefix.len();
469 let query_unbounded = &self.find_keys_by_prefix_unbounded;
470 let query_bounded = &self.find_keys_by_prefix_bounded;
471 let rows = match get_upper_bound_option(&key_prefix) {
472 None => {
473 let values = (root_key.to_vec(), key_prefix.clone());
474 session
475 .execute_iter(query_unbounded.clone(), values)
476 .await?
477 }
478 Some(upper_bound) => {
479 let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
480 session.execute_iter(query_bounded.clone(), values).await?
481 }
482 };
483 let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
484 let mut keys = Vec::new();
485 while let Some(row) = rows.next().await {
486 let (key,) = row?;
487 let short_key = key[len..].to_vec();
488 keys.push(short_key);
489 }
490 Ok(keys)
491 }
492
493 async fn find_key_values_by_prefix_internal(
494 &self,
495 root_key: &[u8],
496 key_prefix: Vec<u8>,
497 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
498 Self::check_key_size(&key_prefix)?;
499 let session = &self.session;
500 let len = key_prefix.len();
502 let query_unbounded = &self.find_key_values_by_prefix_unbounded;
503 let query_bounded = &self.find_key_values_by_prefix_bounded;
504 let rows = match get_upper_bound_option(&key_prefix) {
505 None => {
506 let values = (root_key.to_vec(), key_prefix.clone());
507 session
508 .execute_iter(query_unbounded.clone(), values)
509 .await?
510 }
511 Some(upper_bound) => {
512 let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
513 session.execute_iter(query_bounded.clone(), values).await?
514 }
515 };
516 let mut rows = rows.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
517 let mut key_values = Vec::new();
518 while let Some(row) = rows.next().await {
519 let (key, value) = row?;
520 let short_key = key[len..].to_vec();
521 key_values.push((short_key, value));
522 }
523 Ok(key_values)
524 }
525}
526
527#[derive(Clone)]
529pub struct ScyllaDbStoreInternal {
530 store: Arc<ScyllaDbClient>,
531 semaphore: Option<Arc<Semaphore>>,
532 max_stream_queries: usize,
533 root_key: Vec<u8>,
534}
535
536#[derive(Clone)]
538pub struct ScyllaDbDatabaseInternal {
539 store: Arc<ScyllaDbClient>,
540 semaphore: Option<Arc<Semaphore>>,
541 max_stream_queries: usize,
542}
543
544impl WithError for ScyllaDbDatabaseInternal {
545 type Error = ScyllaDbStoreInternalError;
546}
547
548#[derive(Error, Debug)]
550pub enum ScyllaDbStoreInternalError {
551 #[error(transparent)]
553 BcsError(#[from] bcs::Error),
554
555 #[error("The key must have at most MAX_KEY_SIZE")]
557 KeyTooLong,
558
559 #[error("The value must have at most RAW_MAX_VALUE_SIZE")]
561 ValueTooLong,
562
563 #[error(transparent)]
565 DeserializationError(#[from] DeserializationError),
566
567 #[error(transparent)]
569 RowsError(#[from] RowsError),
570
571 #[error(transparent)]
573 IntoRowsResultError(#[from] IntoRowsResultError),
574
575 #[error(transparent)]
577 TypeCheckError(#[from] TypeCheckError),
578
579 #[error(transparent)]
581 PagerExecutionError(#[from] PagerExecutionError),
582
583 #[error(transparent)]
585 ScyllaDbNewSessionError(#[from] NewSessionError),
586
587 #[error("Namespace contains forbidden characters")]
589 InvalidNamespace,
590
591 #[error(transparent)]
593 JournalConsistencyError(#[from] JournalConsistencyError),
594
595 #[error("The batch is too long to be written")]
597 BatchTooLong,
598
599 #[error(transparent)]
601 PrepareError(#[from] PrepareError),
602
603 #[error(transparent)]
605 ExecutionError(#[from] ExecutionError),
606
607 #[error(transparent)]
609 NextRowError(#[from] NextRowError),
610}
611
612impl KeyValueStoreError for ScyllaDbStoreInternalError {
613 const BACKEND: &'static str = "scylla_db";
614}
615
616impl WithError for ScyllaDbStoreInternal {
617 type Error = ScyllaDbStoreInternalError;
618}
619
620impl ReadableKeyValueStore for ScyllaDbStoreInternal {
621 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
622
623 fn max_stream_queries(&self) -> usize {
624 self.max_stream_queries
625 }
626
627 async fn read_value_bytes(
628 &self,
629 key: &[u8],
630 ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
631 let store = self.store.deref();
632 let _guard = self.acquire().await;
633 store
634 .read_value_internal(&self.root_key, key.to_vec())
635 .await
636 }
637
638 async fn contains_key(&self, key: &[u8]) -> Result<bool, ScyllaDbStoreInternalError> {
639 let store = self.store.deref();
640 let _guard = self.acquire().await;
641 store
642 .contains_key_internal(&self.root_key, key.to_vec())
643 .await
644 }
645
646 async fn contains_keys(
647 &self,
648 keys: Vec<Vec<u8>>,
649 ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
650 if keys.is_empty() {
651 return Ok(Vec::new());
652 }
653 let store = self.store.deref();
654 let _guard = self.acquire().await;
655 let handles = keys
656 .chunks(MAX_MULTI_KEYS)
657 .map(|keys| store.contains_keys_internal(&self.root_key, keys.to_vec()));
658 let results: Vec<_> = join_all(handles)
659 .await
660 .into_iter()
661 .collect::<Result<_, _>>()?;
662 Ok(results.into_iter().flatten().collect())
663 }
664
665 async fn read_multi_values_bytes(
666 &self,
667 keys: Vec<Vec<u8>>,
668 ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
669 if keys.is_empty() {
670 return Ok(Vec::new());
671 }
672 let store = self.store.deref();
673 let _guard = self.acquire().await;
674 let handles = keys
675 .chunks(MAX_MULTI_KEYS)
676 .map(|keys| store.read_multi_values_internal(&self.root_key, keys.to_vec()));
677 let results: Vec<_> = join_all(handles)
678 .await
679 .into_iter()
680 .collect::<Result<_, _>>()?;
681 Ok(results.into_iter().flatten().collect())
682 }
683
684 async fn find_keys_by_prefix(
685 &self,
686 key_prefix: &[u8],
687 ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
688 let store = self.store.deref();
689 let _guard = self.acquire().await;
690 store
691 .find_keys_by_prefix_internal(&self.root_key, key_prefix.to_vec())
692 .await
693 }
694
695 async fn find_key_values_by_prefix(
696 &self,
697 key_prefix: &[u8],
698 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
699 let store = self.store.deref();
700 let _guard = self.acquire().await;
701 store
702 .find_key_values_by_prefix_internal(&self.root_key, key_prefix.to_vec())
703 .await
704 }
705}
706
707impl DirectWritableKeyValueStore for ScyllaDbStoreInternal {
708 const MAX_BATCH_SIZE: usize = MAX_BATCH_SIZE;
709 const MAX_BATCH_TOTAL_SIZE: usize = MAX_BATCH_TOTAL_SIZE;
710 const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
711
712 type Batch = UnorderedBatch;
718
719 async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> {
720 let store = self.store.deref();
721 let _guard = self.acquire().await;
722 store.write_batch_internal(&self.root_key, batch).await
723 }
724}
725
726fn get_big_root_key(root_key: &[u8]) -> Vec<u8> {
728 let mut big_key = vec![0];
729 big_key.extend(root_key);
730 big_key
731}
732
733#[derive(Clone, Debug, Deserialize, Serialize)]
735pub struct ScyllaDbStoreInternalConfig {
736 pub uri: String,
738 pub max_concurrent_queries: Option<usize>,
740 pub max_stream_queries: usize,
742 pub replication_factor: u32,
744}
745
746impl KeyValueDatabase for ScyllaDbDatabaseInternal {
747 type Config = ScyllaDbStoreInternalConfig;
748 type Store = ScyllaDbStoreInternal;
749
750 fn get_name() -> String {
751 "scylladb internal".to_string()
752 }
753
754 async fn connect(
755 config: &Self::Config,
756 namespace: &str,
757 ) -> Result<Self, ScyllaDbStoreInternalError> {
758 Self::check_namespace(namespace)?;
759 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
760 let store = ScyllaDbClient::new(session, namespace).await?;
761 let store = Arc::new(store);
762 let semaphore = config
763 .max_concurrent_queries
764 .map(|n| Arc::new(Semaphore::new(n)));
765 let max_stream_queries = config.max_stream_queries;
766 Ok(Self {
767 store,
768 semaphore,
769 max_stream_queries,
770 })
771 }
772
773 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
774 let store = self.store.clone();
775 let semaphore = self.semaphore.clone();
776 let max_stream_queries = self.max_stream_queries;
777 let root_key = get_big_root_key(root_key);
778 Ok(ScyllaDbStoreInternal {
779 store,
780 semaphore,
781 max_stream_queries,
782 root_key,
783 })
784 }
785
786 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
787 self.open_shared(root_key)
788 }
789
790 async fn list_all(config: &Self::Config) -> Result<Vec<String>, ScyllaDbStoreInternalError> {
791 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
792 let statement = session
793 .prepare(format!("DESCRIBE KEYSPACE {}", KEYSPACE))
794 .await?;
795 let result = session.execute_iter(statement, &[]).await;
796 let miss_msg = format!("'{}' not found in keyspaces", KEYSPACE);
797 let result = match result {
798 Ok(result) => result,
799 Err(error) => {
800 let invalid_or_keyspace_not_found = match &error {
801 PagerExecutionError::NextPageError(NextPageError::RequestFailure(
802 RequestError::LastAttemptError(RequestAttemptError::DbError(db_error, msg)),
803 )) => *db_error == DbError::Invalid && msg.as_str() == miss_msg,
804 _ => false,
805 };
806 if invalid_or_keyspace_not_found {
807 return Ok(Vec::new());
808 } else {
809 return Err(ScyllaDbStoreInternalError::PagerExecutionError(error));
810 }
811 }
812 };
813 let mut namespaces = Vec::new();
814 let mut rows_stream = result.rows_stream::<(String, String, String, String)>()?;
815 while let Some(row) = rows_stream.next().await {
816 let (_, object_kind, name, _) = row?;
817 if object_kind == "table" {
818 namespaces.push(name);
819 }
820 }
821 Ok(namespaces)
822 }
823
824 async fn list_root_keys(
825 config: &Self::Config,
826 namespace: &str,
827 ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
828 Self::check_namespace(namespace)?;
829 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
830 let statement = session
831 .prepare(format!(
832 "SELECT root_key FROM {}.\"{}\" ALLOW FILTERING",
833 KEYSPACE, namespace
834 ))
835 .await?;
836
837 let rows = session.execute_iter(statement, &[]).await?;
839 let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
840 let mut root_keys = BTreeSet::new();
841 while let Some(row) = rows.next().await {
842 let (root_key,) = row?;
843 let root_key = root_key[1..].to_vec();
844 root_keys.insert(root_key);
845 }
846 Ok(root_keys.into_iter().collect::<Vec<_>>())
847 }
848
849 async fn delete_all(store_config: &Self::Config) -> Result<(), ScyllaDbStoreInternalError> {
850 let session = ScyllaDbClient::build_default_session(&store_config.uri).await?;
851 let statement = session
852 .prepare(format!("DROP KEYSPACE IF EXISTS {}", KEYSPACE))
853 .await?;
854
855 session
856 .execute_single_page(&statement, &[], PagingState::start())
857 .await?;
858 Ok(())
859 }
860
861 async fn exists(
862 config: &Self::Config,
863 namespace: &str,
864 ) -> Result<bool, ScyllaDbStoreInternalError> {
865 Self::check_namespace(namespace)?;
866 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
867
868 let result = session
870 .prepare(format!(
871 "SELECT root_key FROM {}.\"{}\" LIMIT 1 ALLOW FILTERING",
872 KEYSPACE, namespace
873 ))
874 .await;
875
876 let miss_msg1 = format!("unconfigured table {}", namespace);
878 let miss_msg1 = miss_msg1.as_str();
879 let miss_msg2 = "Undefined name root_key in selection clause";
880 let miss_msg3 = format!("Keyspace {} does not exist", KEYSPACE);
881 let Err(error) = result else {
882 return Ok(true);
884 };
885 let missing_table = match &error {
886 PrepareError::AllAttemptsFailed {
887 first_attempt: RequestAttemptError::DbError(db_error, msg),
888 } => {
889 if *db_error != DbError::Invalid {
890 false
891 } else {
892 msg.as_str() == miss_msg1
893 || msg.as_str() == miss_msg2
894 || msg.as_str() == miss_msg3
895 }
896 }
897 _ => false,
898 };
899 if missing_table {
900 Ok(false)
901 } else {
902 Err(ScyllaDbStoreInternalError::PrepareError(error))
903 }
904 }
905
906 async fn create(
907 config: &Self::Config,
908 namespace: &str,
909 ) -> Result<(), ScyllaDbStoreInternalError> {
910 Self::check_namespace(namespace)?;
911 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
912
913 let statement = session
915 .prepare(format!(
916 "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ \
917 'class' : 'NetworkTopologyStrategy', \
918 'replication_factor' : {} \
919 }}",
920 KEYSPACE, config.replication_factor
921 ))
922 .await?;
923 session
924 .execute_single_page(&statement, &[], PagingState::start())
925 .await?;
926
927 let statement = session
930 .prepare(format!(
931 "CREATE TABLE {}.\"{}\" (\
932 root_key blob, \
933 k blob, \
934 v blob, \
935 PRIMARY KEY (root_key, k) \
936 ) \
937 WITH compaction = {{ \
938 'class' : 'SizeTieredCompactionStrategy', \
939 'min_sstable_size' : 52428800, \
940 'bucket_low' : 0.5, \
941 'bucket_high' : 1.5, \
942 'min_threshold' : 4, \
943 'max_threshold' : 32 \
944 }} \
945 AND compression = {{ \
946 'sstable_compression': 'LZ4Compressor', \
947 'chunk_length_in_kb':'4' \
948 }} \
949 AND caching = {{ \
950 'enabled': 'true' \
951 }}",
952 KEYSPACE, namespace
953 ))
954 .await?;
955 session
956 .execute_single_page(&statement, &[], PagingState::start())
957 .await?;
958 Ok(())
959 }
960
961 async fn delete(
962 config: &Self::Config,
963 namespace: &str,
964 ) -> Result<(), ScyllaDbStoreInternalError> {
965 Self::check_namespace(namespace)?;
966 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
967 let statement = session
968 .prepare(format!(
969 "DROP TABLE IF EXISTS {}.\"{}\";",
970 KEYSPACE, namespace
971 ))
972 .await?;
973 session
974 .execute_single_page(&statement, &[], PagingState::start())
975 .await?;
976 Ok(())
977 }
978}
979
980impl ScyllaDbStoreInternal {
981 async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
983 match &self.semaphore {
984 None => None,
985 Some(count) => Some(count.acquire().await),
986 }
987 }
988}
989
990impl ScyllaDbDatabaseInternal {
991 fn check_namespace(namespace: &str) -> Result<(), ScyllaDbStoreInternalError> {
992 if !namespace.is_empty()
993 && namespace.len() <= 48
994 && namespace
995 .chars()
996 .all(|c| c.is_ascii_alphanumeric() || c == '_')
997 {
998 return Ok(());
999 }
1000 Err(ScyllaDbStoreInternalError::InvalidNamespace)
1001 }
1002}
1003
1004#[cfg(with_testing)]
1005impl TestKeyValueDatabase for JournalingKeyValueDatabase<ScyllaDbDatabaseInternal> {
1006 async fn new_test_config() -> Result<ScyllaDbStoreInternalConfig, ScyllaDbStoreInternalError> {
1007 let uri = "localhost:9042".to_string();
1009 Ok(ScyllaDbStoreInternalConfig {
1010 uri,
1011 max_concurrent_queries: Some(10),
1012 max_stream_queries: 10,
1013 replication_factor: 1,
1014 })
1015 }
1016}
1017
1018#[cfg(with_metrics)]
1020pub type ScyllaDbDatabase = MeteredDatabase<
1021 LruCachingDatabase<
1022 MeteredDatabase<
1023 ValueSplittingDatabase<
1024 MeteredDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
1025 >,
1026 >,
1027 >,
1028>;
1029
1030#[cfg(not(with_metrics))]
1032pub type ScyllaDbDatabase = LruCachingDatabase<
1033 ValueSplittingDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
1034>;
1035
1036pub type ScyllaDbStoreConfig = LruCachingConfig<ScyllaDbStoreInternalConfig>;
1038
1039pub type ScyllaDbStoreError = ValueSplittingError<ScyllaDbStoreInternalError>;