1use std::{
11 collections::{BTreeSet, HashMap},
12 ops::Deref,
13 sync::Arc,
14};
15
16use async_lock::{Semaphore, SemaphoreGuard};
17use futures::{future::join_all, StreamExt as _};
18use linera_base::{ensure, util::future::FutureSyncExt as _};
19use scylla::{
20 client::{
21 execution_profile::{ExecutionProfile, ExecutionProfileHandle},
22 session::Session,
23 session_builder::SessionBuilder,
24 },
25 deserialize::{DeserializationError, TypeCheckError},
26 errors::{
27 DbError, ExecutionError, IntoRowsResultError, NewSessionError, NextPageError, NextRowError,
28 PagerExecutionError, PrepareError, RequestAttemptError, RequestError, RowsError,
29 },
30 policies::{
31 load_balancing::{DefaultPolicy, LoadBalancingPolicy},
32 retry::DefaultRetryPolicy,
33 },
34 response::PagingState,
35 statement::{batch::BatchType, prepared::PreparedStatement, Consistency},
36};
37use serde::{Deserialize, Serialize};
38use thiserror::Error;
39
40#[cfg(with_metrics)]
41use crate::metering::MeteredDatabase;
42#[cfg(with_testing)]
43use crate::store::TestKeyValueDatabase;
44use crate::{
45 batch::UnorderedBatch,
46 common::{get_uleb128_size, get_upper_bound_option},
47 journaling::{JournalingError, JournalingKeyValueDatabase},
48 lru_caching::{LruCachingConfig, LruCachingDatabase},
49 store::{
50 DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
51 WithError,
52 },
53 value_splitting::{ValueSplittingDatabase, ValueSplittingError},
54};
55
56const MAX_MULTI_KEYS: usize = 100 - 1;
59
60const RAW_MAX_VALUE_SIZE: usize = 16 * 1024 * 1024 - 10 * 1024 - 4000;
68const MAX_KEY_SIZE: usize = 10 * 1024;
69const MAX_BATCH_TOTAL_SIZE: usize = RAW_MAX_VALUE_SIZE + MAX_KEY_SIZE;
70
71const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
84 - MAX_KEY_SIZE
85 - get_uleb128_size(RAW_MAX_VALUE_SIZE)
86 - get_uleb128_size(MAX_KEY_SIZE)
87 - 3;
88
89const MAX_BATCH_SIZE: usize = 5000;
95
96const KEYSPACE: &str = "kv";
98
99struct ScyllaDbClient {
104 session: Session,
105 namespace: String,
106 read_value: PreparedStatement,
107 contains_key: PreparedStatement,
108 write_batch_delete_prefix_unbounded: PreparedStatement,
109 write_batch_delete_prefix_bounded: PreparedStatement,
110 write_batch_deletion: PreparedStatement,
111 write_batch_insertion: PreparedStatement,
112 find_keys_by_prefix_unbounded: PreparedStatement,
113 find_keys_by_prefix_bounded: PreparedStatement,
114 find_key_values_by_prefix_unbounded: PreparedStatement,
115 find_key_values_by_prefix_bounded: PreparedStatement,
116 multi_key_values: papaya::HashMap<usize, PreparedStatement>,
117 multi_keys: papaya::HashMap<usize, PreparedStatement>,
118}
119
120impl ScyllaDbClient {
121 async fn new(session: Session, namespace: &str) -> Result<Self, ScyllaDbStoreInternalError> {
122 let namespace = namespace.to_string();
123 let read_value = session
124 .prepare(format!(
125 "SELECT v FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k = ?"
126 ))
127 .await?;
128
129 let contains_key = session
130 .prepare(format!(
131 "SELECT root_key FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k = ?"
132 ))
133 .await?;
134
135 let write_batch_delete_prefix_unbounded = session
136 .prepare(format!(
137 "DELETE FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ?"
138 ))
139 .await?;
140
141 let write_batch_delete_prefix_bounded = session
142 .prepare(format!(
143 "DELETE FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ? AND k < ?"
144 ))
145 .await?;
146
147 let write_batch_deletion = session
148 .prepare(format!(
149 "DELETE FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k = ?"
150 ))
151 .await?;
152
153 let write_batch_insertion = session
154 .prepare(format!(
155 "INSERT INTO {KEYSPACE}.\"{namespace}\" (root_key, k, v) VALUES (?, ?, ?)"
156 ))
157 .await?;
158
159 let find_keys_by_prefix_unbounded = session
160 .prepare(format!(
161 "SELECT k FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ?"
162 ))
163 .await?;
164
165 let find_keys_by_prefix_bounded = session
166 .prepare(format!(
167 "SELECT k FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ? AND k < ?"
168 ))
169 .await?;
170
171 let find_key_values_by_prefix_unbounded = session
172 .prepare(format!(
173 "SELECT k,v FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ?"
174 ))
175 .await?;
176
177 let find_key_values_by_prefix_bounded = session
178 .prepare(format!(
179 "SELECT k,v FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ? AND k < ?"
180 ))
181 .await?;
182
183 Ok(Self {
184 session,
185 namespace,
186 read_value,
187 contains_key,
188 write_batch_delete_prefix_unbounded,
189 write_batch_delete_prefix_bounded,
190 write_batch_deletion,
191 write_batch_insertion,
192 find_keys_by_prefix_unbounded,
193 find_keys_by_prefix_bounded,
194 find_key_values_by_prefix_unbounded,
195 find_key_values_by_prefix_bounded,
196 multi_key_values: papaya::HashMap::new(),
197 multi_keys: papaya::HashMap::new(),
198 })
199 }
200
201 fn build_default_policy() -> Arc<dyn LoadBalancingPolicy> {
202 DefaultPolicy::builder().token_aware(true).build()
203 }
204
205 fn build_default_execution_profile_handle(
206 policy: Arc<dyn LoadBalancingPolicy>,
207 ) -> ExecutionProfileHandle {
208 let default_profile = ExecutionProfile::builder()
209 .load_balancing_policy(policy)
210 .retry_policy(Arc::new(DefaultRetryPolicy::new()))
211 .consistency(Consistency::LocalQuorum)
212 .build();
213 default_profile.into_handle()
214 }
215
216 async fn build_default_session(uri: &str) -> Result<Session, ScyllaDbStoreInternalError> {
217 SessionBuilder::new()
220 .known_node(uri)
221 .default_execution_profile_handle(Self::build_default_execution_profile_handle(
222 Self::build_default_policy(),
223 ))
224 .build()
225 .boxed_sync()
226 .await
227 .map_err(Into::into)
228 }
229
230 async fn get_multi_key_values_statement(
231 &self,
232 num_markers: usize,
233 ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
234 if let Some(prepared_statement) = self.multi_key_values.pin().get(&num_markers) {
235 return Ok(prepared_statement.clone());
236 }
237 let markers = std::iter::repeat_n("?", num_markers)
238 .collect::<Vec<_>>()
239 .join(",");
240 let prepared_statement = self
241 .session
242 .prepare(format!(
243 "SELECT k,v FROM {}.\"{}\" WHERE root_key = ? AND k IN ({})",
244 KEYSPACE, self.namespace, markers
245 ))
246 .await?;
247 self.multi_key_values
248 .pin()
249 .insert(num_markers, prepared_statement.clone());
250 Ok(prepared_statement)
251 }
252
253 async fn get_multi_keys_statement(
254 &self,
255 num_markers: usize,
256 ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
257 if let Some(prepared_statement) = self.multi_keys.pin().get(&num_markers) {
258 return Ok(prepared_statement.clone());
259 };
260 let markers = std::iter::repeat_n("?", num_markers)
261 .collect::<Vec<_>>()
262 .join(",");
263 let prepared_statement = self
264 .session
265 .prepare(format!(
266 "SELECT k FROM {}.\"{}\" WHERE root_key = ? AND k IN ({})",
267 KEYSPACE, self.namespace, markers
268 ))
269 .await?;
270 self.multi_keys
271 .pin()
272 .insert(num_markers, prepared_statement.clone());
273 Ok(prepared_statement)
274 }
275
276 fn check_key_size(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
277 ensure!(
278 key.len() <= MAX_KEY_SIZE,
279 ScyllaDbStoreInternalError::KeyTooLong
280 );
281 Ok(())
282 }
283
284 fn check_value_size(value: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
285 ensure!(
286 value.len() <= RAW_MAX_VALUE_SIZE,
287 ScyllaDbStoreInternalError::ValueTooLong
288 );
289 Ok(())
290 }
291
292 fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> {
293 ensure!(
294 batch.len() <= MAX_BATCH_SIZE,
295 ScyllaDbStoreInternalError::BatchTooLong
296 );
297 Ok(())
298 }
299
300 async fn read_value_internal(
301 &self,
302 root_key: &[u8],
303 key: Vec<u8>,
304 ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
305 Self::check_key_size(&key)?;
306 let session = &self.session;
307 let values = (root_key.to_vec(), key);
309
310 let (result, _) = session
311 .execute_single_page(&self.read_value, &values, PagingState::start())
312 .await?;
313 let rows = result.into_rows_result()?;
314 let mut rows = rows.rows::<(Vec<u8>,)>()?;
315 Ok(match rows.next() {
316 Some(row) => Some(row?.0),
317 None => None,
318 })
319 }
320
321 fn get_occurrences_map(
322 keys: Vec<Vec<u8>>,
323 ) -> Result<HashMap<Vec<u8>, Vec<usize>>, ScyllaDbStoreInternalError> {
324 let mut map = HashMap::<Vec<u8>, Vec<usize>>::new();
325 for (i_key, key) in keys.into_iter().enumerate() {
326 Self::check_key_size(&key)?;
327 map.entry(key).or_default().push(i_key);
328 }
329 Ok(map)
330 }
331
332 async fn read_multi_values_internal(
333 &self,
334 root_key: &[u8],
335 keys: Vec<Vec<u8>>,
336 ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
337 let mut values = vec![None; keys.len()];
338 let map = Self::get_occurrences_map(keys)?;
339 let statement = self.get_multi_key_values_statement(map.len()).await?;
340 let mut inputs = vec![root_key.to_vec()];
341 inputs.extend(map.keys().cloned());
342 let mut rows = Box::pin(self.session.execute_iter(statement, &inputs))
343 .await?
344 .rows_stream::<(Vec<u8>, Vec<u8>)>()?;
345
346 while let Some(row) = rows.next().await {
347 let (key, value) = row?;
348 if let Some((&last, rest)) = map[&key].split_last() {
349 for position in rest {
350 values[*position] = Some(value.clone());
351 }
352 values[last] = Some(value);
353 }
354 }
355 Ok(values)
356 }
357
358 async fn contains_keys_internal(
359 &self,
360 root_key: &[u8],
361 keys: Vec<Vec<u8>>,
362 ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
363 let mut values = vec![false; keys.len()];
364 let map = Self::get_occurrences_map(keys)?;
365 let statement = self.get_multi_keys_statement(map.len()).await?;
366 let mut inputs = vec![root_key.to_vec()];
367 inputs.extend(map.keys().cloned());
368 let mut rows = Box::pin(self.session.execute_iter(statement, &inputs))
369 .await?
370 .rows_stream::<(Vec<u8>,)>()?;
371
372 while let Some(row) = rows.next().await {
373 let (key,) = row?;
374 for i_key in &map[&key] {
375 values[*i_key] = true;
376 }
377 }
378
379 Ok(values)
380 }
381
382 async fn contains_key_internal(
383 &self,
384 root_key: &[u8],
385 key: Vec<u8>,
386 ) -> Result<bool, ScyllaDbStoreInternalError> {
387 Self::check_key_size(&key)?;
388 let session = &self.session;
389 let values = (root_key.to_vec(), key);
391
392 let (result, _) = session
393 .execute_single_page(&self.contains_key, &values, PagingState::start())
394 .await?;
395 let rows = result.into_rows_result()?;
396 let mut rows = rows.rows::<(Vec<u8>,)>()?;
397 Ok(rows.next().is_some())
398 }
399
400 async fn write_batch_internal(
401 &self,
402 root_key: &[u8],
403 batch: UnorderedBatch,
404 ) -> Result<(), ScyllaDbStoreInternalError> {
405 let session = &self.session;
406 let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged);
407 let mut batch_values = Vec::new();
408 let query1 = &self.write_batch_delete_prefix_unbounded;
409 let query2 = &self.write_batch_delete_prefix_bounded;
410 Self::check_batch_len(&batch)?;
411 for key_prefix in batch.key_prefix_deletions {
412 Self::check_key_size(&key_prefix)?;
413 match get_upper_bound_option(&key_prefix) {
414 None => {
415 let values = vec![root_key.to_vec(), key_prefix];
416 batch_values.push(values);
417 batch_query.append_statement(query1.clone());
418 }
419 Some(upper_bound) => {
420 let values = vec![root_key.to_vec(), key_prefix, upper_bound];
421 batch_values.push(values);
422 batch_query.append_statement(query2.clone());
423 }
424 }
425 }
426 let query3 = &self.write_batch_deletion;
427 for key in batch.simple_unordered_batch.deletions {
428 Self::check_key_size(&key)?;
429 let values = vec![root_key.to_vec(), key];
430 batch_values.push(values);
431 batch_query.append_statement(query3.clone());
432 }
433 let query4 = &self.write_batch_insertion;
434 for (key, value) in batch.simple_unordered_batch.insertions {
435 Self::check_key_size(&key)?;
436 Self::check_value_size(&value)?;
437 let values = vec![root_key.to_vec(), key, value];
438 batch_values.push(values);
439 batch_query.append_statement(query4.clone());
440 }
441 session.batch(&batch_query, batch_values).await?;
442 Ok(())
443 }
444
445 async fn find_keys_by_prefix_internal(
446 &self,
447 root_key: &[u8],
448 key_prefix: Vec<u8>,
449 ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
450 Self::check_key_size(&key_prefix)?;
451 let session = &self.session;
452 let len = key_prefix.len();
454 let query_unbounded = &self.find_keys_by_prefix_unbounded;
455 let query_bounded = &self.find_keys_by_prefix_bounded;
456 let rows = match get_upper_bound_option(&key_prefix) {
457 None => {
458 let values = (root_key.to_vec(), key_prefix.clone());
459 Box::pin(session.execute_iter(query_unbounded.clone(), values)).await?
460 }
461 Some(upper_bound) => {
462 let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
463 Box::pin(session.execute_iter(query_bounded.clone(), values)).await?
464 }
465 };
466 let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
467 let mut keys = Vec::new();
468 while let Some(row) = rows.next().await {
469 let (key,) = row?;
470 let short_key = key[len..].to_vec();
471 keys.push(short_key);
472 }
473 Ok(keys)
474 }
475
476 async fn find_key_values_by_prefix_internal(
477 &self,
478 root_key: &[u8],
479 key_prefix: Vec<u8>,
480 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
481 Self::check_key_size(&key_prefix)?;
482 let session = &self.session;
483 let len = key_prefix.len();
485 let query_unbounded = &self.find_key_values_by_prefix_unbounded;
486 let query_bounded = &self.find_key_values_by_prefix_bounded;
487 let rows = match get_upper_bound_option(&key_prefix) {
488 None => {
489 let values = (root_key.to_vec(), key_prefix.clone());
490 Box::pin(session.execute_iter(query_unbounded.clone(), values)).await?
491 }
492 Some(upper_bound) => {
493 let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
494 Box::pin(session.execute_iter(query_bounded.clone(), values)).await?
495 }
496 };
497 let mut rows = rows.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
498 let mut key_values = Vec::new();
499 while let Some(row) = rows.next().await {
500 let (key, value) = row?;
501 let short_key = key[len..].to_vec();
502 key_values.push((short_key, value));
503 }
504 Ok(key_values)
505 }
506}
507
508#[derive(Clone)]
510pub struct ScyllaDbStoreInternal {
511 store: Arc<ScyllaDbClient>,
512 semaphore: Option<Arc<Semaphore>>,
513 max_stream_queries: usize,
514 root_key: Vec<u8>,
515}
516
517#[derive(Clone)]
519pub struct ScyllaDbDatabaseInternal {
520 store: Arc<ScyllaDbClient>,
521 semaphore: Option<Arc<Semaphore>>,
522 max_stream_queries: usize,
523}
524
525impl WithError for ScyllaDbDatabaseInternal {
526 type Error = ScyllaDbStoreInternalError;
527}
528
529#[derive(Error, Debug)]
531pub enum ScyllaDbStoreInternalError {
532 #[error(transparent)]
534 BcsError(#[from] bcs::Error),
535
536 #[error("The key must have at most MAX_KEY_SIZE")]
538 KeyTooLong,
539
540 #[error("The value must have at most RAW_MAX_VALUE_SIZE")]
542 ValueTooLong,
543
544 #[error(transparent)]
546 DeserializationError(#[from] DeserializationError),
547
548 #[error(transparent)]
550 RowsError(#[from] RowsError),
551
552 #[error(transparent)]
554 IntoRowsResultError(#[from] IntoRowsResultError),
555
556 #[error(transparent)]
558 TypeCheckError(#[from] TypeCheckError),
559
560 #[error(transparent)]
562 PagerExecutionError(#[from] PagerExecutionError),
563
564 #[error(transparent)]
566 ScyllaDbNewSessionError(#[from] NewSessionError),
567
568 #[error("Namespace contains forbidden characters")]
570 InvalidNamespace,
571
572 #[error("The batch is too long to be written")]
574 BatchTooLong,
575
576 #[error(transparent)]
578 PrepareError(#[from] PrepareError),
579
580 #[error(transparent)]
582 ExecutionError(#[from] ExecutionError),
583
584 #[error(transparent)]
586 NextRowError(#[from] NextRowError),
587}
588
589impl KeyValueStoreError for ScyllaDbStoreInternalError {
590 const BACKEND: &'static str = "scylla_db";
591}
592
593impl WithError for ScyllaDbStoreInternal {
594 type Error = ScyllaDbStoreInternalError;
595}
596
597impl ReadableKeyValueStore for ScyllaDbStoreInternal {
598 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
599
600 fn max_stream_queries(&self) -> usize {
601 self.max_stream_queries
602 }
603
604 fn root_key(&self) -> Result<Vec<u8>, ScyllaDbStoreInternalError> {
605 Ok(self.root_key[1..].to_vec())
606 }
607
608 async fn read_value_bytes(
609 &self,
610 key: &[u8],
611 ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
612 let store = self.store.deref();
613 let _guard = self.acquire().await;
614 Box::pin(store.read_value_internal(&self.root_key, key.to_vec())).await
615 }
616
617 async fn contains_key(&self, key: &[u8]) -> Result<bool, ScyllaDbStoreInternalError> {
618 let store = self.store.deref();
619 let _guard = self.acquire().await;
620 Box::pin(store.contains_key_internal(&self.root_key, key.to_vec())).await
621 }
622
623 async fn contains_keys(
624 &self,
625 keys: &[Vec<u8>],
626 ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
627 if keys.is_empty() {
628 return Ok(Vec::new());
629 }
630 let store = self.store.deref();
631 let _guard = self.acquire().await;
632 let handles = keys
633 .chunks(MAX_MULTI_KEYS)
634 .map(|keys| store.contains_keys_internal(&self.root_key, keys.to_vec()));
635 let results: Vec<_> = join_all(handles)
636 .await
637 .into_iter()
638 .collect::<Result<_, _>>()?;
639 Ok(results.into_iter().flatten().collect())
640 }
641
642 async fn read_multi_values_bytes(
643 &self,
644 keys: &[Vec<u8>],
645 ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
646 if keys.is_empty() {
647 return Ok(Vec::new());
648 }
649 let store = self.store.deref();
650 let _guard = self.acquire().await;
651 let handles = keys
652 .chunks(MAX_MULTI_KEYS)
653 .map(|keys| store.read_multi_values_internal(&self.root_key, keys.to_vec()));
654 let results: Vec<_> = join_all(handles)
655 .await
656 .into_iter()
657 .collect::<Result<_, _>>()?;
658 Ok(results.into_iter().flatten().collect())
659 }
660
661 async fn find_keys_by_prefix(
662 &self,
663 key_prefix: &[u8],
664 ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
665 let store = self.store.deref();
666 let _guard = self.acquire().await;
667 Box::pin(store.find_keys_by_prefix_internal(&self.root_key, key_prefix.to_vec())).await
668 }
669
670 async fn find_key_values_by_prefix(
671 &self,
672 key_prefix: &[u8],
673 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
674 let store = self.store.deref();
675 let _guard = self.acquire().await;
676 Box::pin(store.find_key_values_by_prefix_internal(&self.root_key, key_prefix.to_vec()))
677 .await
678 }
679}
680
681impl DirectWritableKeyValueStore for ScyllaDbStoreInternal {
682 const MAX_BATCH_SIZE: usize = MAX_BATCH_SIZE;
683 const MAX_BATCH_TOTAL_SIZE: usize = MAX_BATCH_TOTAL_SIZE;
684 const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
685
686 type Batch = UnorderedBatch;
692
693 async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> {
694 let store = self.store.deref();
695 let _guard = self.acquire().await;
696 store.write_batch_internal(&self.root_key, batch).await
697 }
698}
699
700fn get_big_root_key(root_key: &[u8]) -> Vec<u8> {
702 let mut big_key = vec![0];
703 big_key.extend(root_key);
704 big_key
705}
706
707#[derive(Clone, Debug, Deserialize, Serialize)]
709pub struct ScyllaDbStoreInternalConfig {
710 pub uri: String,
712 pub max_concurrent_queries: Option<usize>,
714 pub max_stream_queries: usize,
716 pub replication_factor: u32,
718}
719
720impl KeyValueDatabase for ScyllaDbDatabaseInternal {
721 type Config = ScyllaDbStoreInternalConfig;
722 type Store = ScyllaDbStoreInternal;
723
724 fn get_name() -> String {
725 "scylladb internal".to_string()
726 }
727
728 async fn connect(
729 config: &Self::Config,
730 namespace: &str,
731 ) -> Result<Self, ScyllaDbStoreInternalError> {
732 Self::check_namespace(namespace)?;
733 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
734 let store = ScyllaDbClient::new(session, namespace).await?;
735 let store = Arc::new(store);
736 let semaphore = config
737 .max_concurrent_queries
738 .map(|n| Arc::new(Semaphore::new(n)));
739 let max_stream_queries = config.max_stream_queries;
740 Ok(Self {
741 store,
742 semaphore,
743 max_stream_queries,
744 })
745 }
746
747 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
748 let store = self.store.clone();
749 let semaphore = self.semaphore.clone();
750 let max_stream_queries = self.max_stream_queries;
751 let root_key = get_big_root_key(root_key);
752 Ok(ScyllaDbStoreInternal {
753 store,
754 semaphore,
755 max_stream_queries,
756 root_key,
757 })
758 }
759
760 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
761 self.open_shared(root_key)
762 }
763
764 async fn list_all(config: &Self::Config) -> Result<Vec<String>, ScyllaDbStoreInternalError> {
765 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
766 let statement = session
767 .prepare(format!("DESCRIBE KEYSPACE {KEYSPACE}"))
768 .await?;
769 let result = Box::pin(session.execute_iter(statement, &[])).await;
770 let miss_msg = format!("'{KEYSPACE}' not found in keyspaces");
771 let result = match result {
772 Ok(result) => result,
773 Err(error) => {
774 let invalid_or_keyspace_not_found = match &error {
775 PagerExecutionError::NextPageError(NextPageError::RequestFailure(
776 RequestError::LastAttemptError(RequestAttemptError::DbError(db_error, msg)),
777 )) => *db_error == DbError::Invalid && msg.as_str() == miss_msg,
778 _ => false,
779 };
780 if invalid_or_keyspace_not_found {
781 return Ok(Vec::new());
782 } else {
783 return Err(ScyllaDbStoreInternalError::PagerExecutionError(error));
784 }
785 }
786 };
787 let mut namespaces = Vec::new();
788 let mut rows_stream = result.rows_stream::<(String, String, String, String)>()?;
789 while let Some(row) = rows_stream.next().await {
790 let (_, object_kind, name, _) = row?;
791 if object_kind == "table" {
792 namespaces.push(name);
793 }
794 }
795 Ok(namespaces)
796 }
797
798 async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
799 let statement = self
800 .store
801 .session
802 .prepare(format!(
803 "SELECT root_key FROM {}.\"{}\" ALLOW FILTERING",
804 KEYSPACE, self.store.namespace
805 ))
806 .await?;
807
808 let rows = Box::pin(self.store.session.execute_iter(statement, &[])).await?;
810 let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
811 let mut root_keys = BTreeSet::new();
812 while let Some(row) = rows.next().await {
813 let (root_key,) = row?;
814 let root_key = root_key[1..].to_vec();
815 root_keys.insert(root_key);
816 }
817 Ok(root_keys.into_iter().collect::<Vec<_>>())
818 }
819
820 async fn delete_all(store_config: &Self::Config) -> Result<(), ScyllaDbStoreInternalError> {
821 let session = ScyllaDbClient::build_default_session(&store_config.uri).await?;
822 let statement = session
823 .prepare(format!("DROP KEYSPACE IF EXISTS {KEYSPACE}"))
824 .await?;
825
826 session
827 .execute_single_page(&statement, &[], PagingState::start())
828 .await?;
829 Ok(())
830 }
831
832 async fn exists(
833 config: &Self::Config,
834 namespace: &str,
835 ) -> Result<bool, ScyllaDbStoreInternalError> {
836 Self::check_namespace(namespace)?;
837 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
838
839 let result = session
841 .prepare(format!(
842 "SELECT root_key FROM {KEYSPACE}.\"{namespace}\" LIMIT 1 ALLOW FILTERING"
843 ))
844 .await;
845
846 let miss_msg1 = format!("unconfigured table {namespace}");
848 let miss_msg1 = miss_msg1.as_str();
849 let miss_msg2 = "Undefined name root_key in selection clause";
850 let miss_msg3 = format!("Keyspace {KEYSPACE} does not exist");
851 let Err(error) = result else {
852 return Ok(true);
854 };
855 let missing_table = match &error {
856 PrepareError::AllAttemptsFailed {
857 first_attempt: RequestAttemptError::DbError(db_error, msg),
858 } => {
859 if *db_error != DbError::Invalid {
860 false
861 } else {
862 msg.as_str() == miss_msg1
863 || msg.as_str() == miss_msg2
864 || msg.as_str() == miss_msg3
865 }
866 }
867 _ => false,
868 };
869 if missing_table {
870 Ok(false)
871 } else {
872 Err(ScyllaDbStoreInternalError::PrepareError(error))
873 }
874 }
875
876 async fn create(
877 config: &Self::Config,
878 namespace: &str,
879 ) -> Result<(), ScyllaDbStoreInternalError> {
880 Self::check_namespace(namespace)?;
881 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
882
883 let statement = session
885 .prepare(format!(
886 "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ \
887 'class' : 'NetworkTopologyStrategy', \
888 'replication_factor' : {} \
889 }}",
890 KEYSPACE, config.replication_factor
891 ))
892 .await?;
893 session
894 .execute_single_page(&statement, &[], PagingState::start())
895 .await?;
896
897 let statement = session
900 .prepare(format!(
901 "CREATE TABLE {KEYSPACE}.\"{namespace}\" (\
902 root_key blob, \
903 k blob, \
904 v blob, \
905 PRIMARY KEY (root_key, k) \
906 ) \
907 WITH compaction = {{ \
908 'class' : 'LeveledCompactionStrategy', \
909 'sstable_size_in_mb' : 160 \
910 }} \
911 AND compression = {{ \
912 'sstable_compression': 'LZ4Compressor', \
913 'chunk_length_in_kb':'4' \
914 }} \
915 AND caching = {{ \
916 'enabled': 'true' \
917 }} \
918 AND gc_grace_seconds = 0 \
919 AND tombstone_gc = {{'mode': 'immediate'}}"
920 ))
921 .await?;
922 session
923 .execute_single_page(&statement, &[], PagingState::start())
924 .await?;
925 Ok(())
926 }
927
928 async fn delete(
929 config: &Self::Config,
930 namespace: &str,
931 ) -> Result<(), ScyllaDbStoreInternalError> {
932 Self::check_namespace(namespace)?;
933 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
934 let statement = session
935 .prepare(format!("DROP TABLE IF EXISTS {KEYSPACE}.\"{namespace}\";"))
936 .await?;
937 session
938 .execute_single_page(&statement, &[], PagingState::start())
939 .await?;
940 Ok(())
941 }
942}
943
944impl ScyllaDbStoreInternal {
945 async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
947 match &self.semaphore {
948 None => None,
949 Some(count) => Some(count.acquire().await),
950 }
951 }
952}
953
954impl ScyllaDbDatabaseInternal {
955 fn check_namespace(namespace: &str) -> Result<(), ScyllaDbStoreInternalError> {
956 if !namespace.is_empty()
957 && namespace.len() <= 48
958 && namespace
959 .chars()
960 .all(|c| c.is_ascii_alphanumeric() || c == '_')
961 {
962 return Ok(());
963 }
964 Err(ScyllaDbStoreInternalError::InvalidNamespace)
965 }
966}
967
968#[cfg(with_testing)]
969impl TestKeyValueDatabase for JournalingKeyValueDatabase<ScyllaDbDatabaseInternal> {
970 async fn new_test_config(
971 ) -> Result<ScyllaDbStoreInternalConfig, JournalingError<ScyllaDbStoreInternalError>> {
972 let uri = "localhost:9042".to_string();
974 Ok(ScyllaDbStoreInternalConfig {
975 uri,
976 max_concurrent_queries: Some(10),
977 max_stream_queries: 10,
978 replication_factor: 1,
979 })
980 }
981}
982
983#[cfg(with_metrics)]
985pub type ScyllaDbDatabase = MeteredDatabase<
986 LruCachingDatabase<
987 MeteredDatabase<
988 ValueSplittingDatabase<
989 MeteredDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
990 >,
991 >,
992 >,
993>;
994
995#[cfg(not(with_metrics))]
997pub type ScyllaDbDatabase = LruCachingDatabase<
998 ValueSplittingDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
999>;
1000
1001pub type ScyllaDbStoreConfig = LruCachingConfig<ScyllaDbStoreInternalConfig>;
1003
1004pub type ScyllaDbStoreError = ValueSplittingError<JournalingError<ScyllaDbStoreInternalError>>;