1use std::{
11 collections::{BTreeSet, HashMap},
12 ops::Deref,
13 sync::Arc,
14};
15
16use async_lock::{Semaphore, SemaphoreGuard};
17use futures::{future::join_all, StreamExt as _};
18use linera_base::{ensure, util::future::FutureSyncExt as _};
19use scylla::{
20 client::{
21 execution_profile::{ExecutionProfile, ExecutionProfileHandle},
22 session::Session,
23 session_builder::SessionBuilder,
24 },
25 deserialize::{DeserializationError, TypeCheckError},
26 errors::{
27 DbError, ExecutionError, IntoRowsResultError, NewSessionError, NextPageError, NextRowError,
28 PagerExecutionError, PrepareError, RequestAttemptError, RequestError, RowsError,
29 },
30 policies::{
31 load_balancing::{DefaultPolicy, LoadBalancingPolicy},
32 retry::DefaultRetryPolicy,
33 },
34 response::PagingState,
35 statement::{batch::BatchType, prepared::PreparedStatement, Consistency},
36};
37use serde::{Deserialize, Serialize};
38use thiserror::Error;
39
40#[cfg(with_metrics)]
41use crate::metering::MeteredDatabase;
42#[cfg(with_testing)]
43use crate::store::TestKeyValueDatabase;
44use crate::{
45 batch::UnorderedBatch,
46 common::{get_uleb128_size, get_upper_bound_option},
47 journaling::{JournalConsistencyError, JournalingKeyValueDatabase},
48 lru_caching::{LruCachingConfig, LruCachingDatabase},
49 store::{
50 DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
51 WithError,
52 },
53 value_splitting::{ValueSplittingDatabase, ValueSplittingError},
54};
55
56const MAX_MULTI_KEYS: usize = 100 - 1;
59
60const RAW_MAX_VALUE_SIZE: usize = 16 * 1024 * 1024 - 10 * 1024 - 4000;
68const MAX_KEY_SIZE: usize = 10 * 1024;
69const MAX_BATCH_TOTAL_SIZE: usize = RAW_MAX_VALUE_SIZE + MAX_KEY_SIZE;
70
71const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
84 - MAX_KEY_SIZE
85 - get_uleb128_size(RAW_MAX_VALUE_SIZE)
86 - get_uleb128_size(MAX_KEY_SIZE)
87 - 3;
88
89const MAX_BATCH_SIZE: usize = 5000;
95
96const KEYSPACE: &str = "kv";
98
99struct ScyllaDbClient {
104 session: Session,
105 namespace: String,
106 read_value: PreparedStatement,
107 contains_key: PreparedStatement,
108 write_batch_delete_prefix_unbounded: PreparedStatement,
109 write_batch_delete_prefix_bounded: PreparedStatement,
110 write_batch_deletion: PreparedStatement,
111 write_batch_insertion: PreparedStatement,
112 find_keys_by_prefix_unbounded: PreparedStatement,
113 find_keys_by_prefix_bounded: PreparedStatement,
114 find_key_values_by_prefix_unbounded: PreparedStatement,
115 find_key_values_by_prefix_bounded: PreparedStatement,
116 multi_key_values: papaya::HashMap<usize, PreparedStatement>,
117 multi_keys: papaya::HashMap<usize, PreparedStatement>,
118}
119
120impl ScyllaDbClient {
121 async fn new(session: Session, namespace: &str) -> Result<Self, ScyllaDbStoreInternalError> {
122 let namespace = namespace.to_string();
123 let read_value = session
124 .prepare(format!(
125 "SELECT v FROM {}.\"{}\" WHERE root_key = ? AND k = ?",
126 KEYSPACE, namespace
127 ))
128 .await?;
129
130 let contains_key = session
131 .prepare(format!(
132 "SELECT root_key FROM {}.\"{}\" WHERE root_key = ? AND k = ?",
133 KEYSPACE, namespace
134 ))
135 .await?;
136
137 let write_batch_delete_prefix_unbounded = session
138 .prepare(format!(
139 "DELETE FROM {}.\"{}\" WHERE root_key = ? AND k >= ?",
140 KEYSPACE, namespace
141 ))
142 .await?;
143
144 let write_batch_delete_prefix_bounded = session
145 .prepare(format!(
146 "DELETE FROM {}.\"{}\" WHERE root_key = ? AND k >= ? AND k < ?",
147 KEYSPACE, namespace
148 ))
149 .await?;
150
151 let write_batch_deletion = session
152 .prepare(format!(
153 "DELETE FROM {}.\"{}\" WHERE root_key = ? AND k = ?",
154 KEYSPACE, namespace
155 ))
156 .await?;
157
158 let write_batch_insertion = session
159 .prepare(format!(
160 "INSERT INTO {}.\"{}\" (root_key, k, v) VALUES (?, ?, ?)",
161 KEYSPACE, namespace
162 ))
163 .await?;
164
165 let find_keys_by_prefix_unbounded = session
166 .prepare(format!(
167 "SELECT k FROM {}.\"{}\" WHERE root_key = ? AND k >= ?",
168 KEYSPACE, namespace
169 ))
170 .await?;
171
172 let find_keys_by_prefix_bounded = session
173 .prepare(format!(
174 "SELECT k FROM {}.\"{}\" WHERE root_key = ? AND k >= ? AND k < ?",
175 KEYSPACE, namespace
176 ))
177 .await?;
178
179 let find_key_values_by_prefix_unbounded = session
180 .prepare(format!(
181 "SELECT k,v FROM {}.\"{}\" WHERE root_key = ? AND k >= ?",
182 KEYSPACE, namespace
183 ))
184 .await?;
185
186 let find_key_values_by_prefix_bounded = session
187 .prepare(format!(
188 "SELECT k,v FROM {}.\"{}\" WHERE root_key = ? AND k >= ? AND k < ?",
189 KEYSPACE, namespace
190 ))
191 .await?;
192
193 Ok(Self {
194 session,
195 namespace,
196 read_value,
197 contains_key,
198 write_batch_delete_prefix_unbounded,
199 write_batch_delete_prefix_bounded,
200 write_batch_deletion,
201 write_batch_insertion,
202 find_keys_by_prefix_unbounded,
203 find_keys_by_prefix_bounded,
204 find_key_values_by_prefix_unbounded,
205 find_key_values_by_prefix_bounded,
206 multi_key_values: papaya::HashMap::new(),
207 multi_keys: papaya::HashMap::new(),
208 })
209 }
210
211 fn build_default_policy() -> Arc<dyn LoadBalancingPolicy> {
212 DefaultPolicy::builder().token_aware(true).build()
213 }
214
215 fn build_default_execution_profile_handle(
216 policy: Arc<dyn LoadBalancingPolicy>,
217 ) -> ExecutionProfileHandle {
218 let default_profile = ExecutionProfile::builder()
219 .load_balancing_policy(policy)
220 .retry_policy(Arc::new(DefaultRetryPolicy::new()))
221 .consistency(Consistency::LocalQuorum)
222 .build();
223 default_profile.into_handle()
224 }
225
226 async fn build_default_session(uri: &str) -> Result<Session, ScyllaDbStoreInternalError> {
227 SessionBuilder::new()
230 .known_node(uri)
231 .default_execution_profile_handle(Self::build_default_execution_profile_handle(
232 Self::build_default_policy(),
233 ))
234 .build()
235 .boxed_sync()
236 .await
237 .map_err(Into::into)
238 }
239
240 async fn get_multi_key_values_statement(
241 &self,
242 num_markers: usize,
243 ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
244 if let Some(prepared_statement) = self.multi_key_values.pin().get(&num_markers) {
245 return Ok(prepared_statement.clone());
246 }
247 let markers = std::iter::repeat_n("?", num_markers)
248 .collect::<Vec<_>>()
249 .join(",");
250 let prepared_statement = self
251 .session
252 .prepare(format!(
253 "SELECT k,v FROM {}.\"{}\" WHERE root_key = ? AND k IN ({})",
254 KEYSPACE, self.namespace, markers
255 ))
256 .await?;
257 self.multi_key_values
258 .pin()
259 .insert(num_markers, prepared_statement.clone());
260 Ok(prepared_statement)
261 }
262
263 async fn get_multi_keys_statement(
264 &self,
265 num_markers: usize,
266 ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
267 if let Some(prepared_statement) = self.multi_keys.pin().get(&num_markers) {
268 return Ok(prepared_statement.clone());
269 };
270 let markers = std::iter::repeat_n("?", num_markers)
271 .collect::<Vec<_>>()
272 .join(",");
273 let prepared_statement = self
274 .session
275 .prepare(format!(
276 "SELECT k FROM {}.\"{}\" WHERE root_key = ? AND k IN ({})",
277 KEYSPACE, self.namespace, markers
278 ))
279 .await?;
280 self.multi_keys
281 .pin()
282 .insert(num_markers, prepared_statement.clone());
283 Ok(prepared_statement)
284 }
285
286 fn check_key_size(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
287 ensure!(
288 key.len() <= MAX_KEY_SIZE,
289 ScyllaDbStoreInternalError::KeyTooLong
290 );
291 Ok(())
292 }
293
294 fn check_value_size(value: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
295 ensure!(
296 value.len() <= RAW_MAX_VALUE_SIZE,
297 ScyllaDbStoreInternalError::ValueTooLong
298 );
299 Ok(())
300 }
301
302 fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> {
303 ensure!(
304 batch.len() <= MAX_BATCH_SIZE,
305 ScyllaDbStoreInternalError::BatchTooLong
306 );
307 Ok(())
308 }
309
310 async fn read_value_internal(
311 &self,
312 root_key: &[u8],
313 key: Vec<u8>,
314 ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
315 Self::check_key_size(&key)?;
316 let session = &self.session;
317 let values = (root_key.to_vec(), key);
319
320 let (result, _) = session
321 .execute_single_page(&self.read_value, &values, PagingState::start())
322 .await?;
323 let rows = result.into_rows_result()?;
324 let mut rows = rows.rows::<(Vec<u8>,)>()?;
325 Ok(match rows.next() {
326 Some(row) => Some(row?.0),
327 None => None,
328 })
329 }
330
331 fn get_occurrences_map(
332 keys: Vec<Vec<u8>>,
333 ) -> Result<HashMap<Vec<u8>, Vec<usize>>, ScyllaDbStoreInternalError> {
334 let mut map = HashMap::<Vec<u8>, Vec<usize>>::new();
335 for (i_key, key) in keys.into_iter().enumerate() {
336 Self::check_key_size(&key)?;
337 map.entry(key).or_default().push(i_key);
338 }
339 Ok(map)
340 }
341
342 async fn read_multi_values_internal(
343 &self,
344 root_key: &[u8],
345 keys: Vec<Vec<u8>>,
346 ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
347 let mut values = vec![None; keys.len()];
348 let map = Self::get_occurrences_map(keys)?;
349 let statement = self.get_multi_key_values_statement(map.len()).await?;
350 let mut inputs = vec![root_key.to_vec()];
351 inputs.extend(map.keys().cloned());
352 let mut rows = Box::pin(self.session.execute_iter(statement, &inputs))
353 .await?
354 .rows_stream::<(Vec<u8>, Vec<u8>)>()?;
355
356 while let Some(row) = rows.next().await {
357 let (key, value) = row?;
358 if let Some((&last, rest)) = map[&key].split_last() {
359 for position in rest {
360 values[*position] = Some(value.clone());
361 }
362 values[last] = Some(value);
363 }
364 }
365 Ok(values)
366 }
367
368 async fn contains_keys_internal(
369 &self,
370 root_key: &[u8],
371 keys: Vec<Vec<u8>>,
372 ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
373 let mut values = vec![false; keys.len()];
374 let map = Self::get_occurrences_map(keys)?;
375 let statement = self.get_multi_keys_statement(map.len()).await?;
376 let mut inputs = vec![root_key.to_vec()];
377 inputs.extend(map.keys().cloned());
378 let mut rows = Box::pin(self.session.execute_iter(statement, &inputs))
379 .await?
380 .rows_stream::<(Vec<u8>,)>()?;
381
382 while let Some(row) = rows.next().await {
383 let (key,) = row?;
384 for i_key in &map[&key] {
385 values[*i_key] = true;
386 }
387 }
388
389 Ok(values)
390 }
391
392 async fn contains_key_internal(
393 &self,
394 root_key: &[u8],
395 key: Vec<u8>,
396 ) -> Result<bool, ScyllaDbStoreInternalError> {
397 Self::check_key_size(&key)?;
398 let session = &self.session;
399 let values = (root_key.to_vec(), key);
401
402 let (result, _) = session
403 .execute_single_page(&self.contains_key, &values, PagingState::start())
404 .await?;
405 let rows = result.into_rows_result()?;
406 let mut rows = rows.rows::<(Vec<u8>,)>()?;
407 Ok(rows.next().is_some())
408 }
409
410 async fn write_batch_internal(
411 &self,
412 root_key: &[u8],
413 batch: UnorderedBatch,
414 ) -> Result<(), ScyllaDbStoreInternalError> {
415 let session = &self.session;
416 let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged);
417 let mut batch_values = Vec::new();
418 let query1 = &self.write_batch_delete_prefix_unbounded;
419 let query2 = &self.write_batch_delete_prefix_bounded;
420 Self::check_batch_len(&batch)?;
421 for key_prefix in batch.key_prefix_deletions {
422 Self::check_key_size(&key_prefix)?;
423 match get_upper_bound_option(&key_prefix) {
424 None => {
425 let values = vec![root_key.to_vec(), key_prefix];
426 batch_values.push(values);
427 batch_query.append_statement(query1.clone());
428 }
429 Some(upper_bound) => {
430 let values = vec![root_key.to_vec(), key_prefix, upper_bound];
431 batch_values.push(values);
432 batch_query.append_statement(query2.clone());
433 }
434 }
435 }
436 let query3 = &self.write_batch_deletion;
437 for key in batch.simple_unordered_batch.deletions {
438 Self::check_key_size(&key)?;
439 let values = vec![root_key.to_vec(), key];
440 batch_values.push(values);
441 batch_query.append_statement(query3.clone());
442 }
443 let query4 = &self.write_batch_insertion;
444 for (key, value) in batch.simple_unordered_batch.insertions {
445 Self::check_key_size(&key)?;
446 Self::check_value_size(&value)?;
447 let values = vec![root_key.to_vec(), key, value];
448 batch_values.push(values);
449 batch_query.append_statement(query4.clone());
450 }
451 session.batch(&batch_query, batch_values).await?;
452 Ok(())
453 }
454
455 async fn find_keys_by_prefix_internal(
456 &self,
457 root_key: &[u8],
458 key_prefix: Vec<u8>,
459 ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
460 Self::check_key_size(&key_prefix)?;
461 let session = &self.session;
462 let len = key_prefix.len();
464 let query_unbounded = &self.find_keys_by_prefix_unbounded;
465 let query_bounded = &self.find_keys_by_prefix_bounded;
466 let rows = match get_upper_bound_option(&key_prefix) {
467 None => {
468 let values = (root_key.to_vec(), key_prefix.clone());
469 Box::pin(session.execute_iter(query_unbounded.clone(), values)).await?
470 }
471 Some(upper_bound) => {
472 let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
473 Box::pin(session.execute_iter(query_bounded.clone(), values)).await?
474 }
475 };
476 let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
477 let mut keys = Vec::new();
478 while let Some(row) = rows.next().await {
479 let (key,) = row?;
480 let short_key = key[len..].to_vec();
481 keys.push(short_key);
482 }
483 Ok(keys)
484 }
485
486 async fn find_key_values_by_prefix_internal(
487 &self,
488 root_key: &[u8],
489 key_prefix: Vec<u8>,
490 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
491 Self::check_key_size(&key_prefix)?;
492 let session = &self.session;
493 let len = key_prefix.len();
495 let query_unbounded = &self.find_key_values_by_prefix_unbounded;
496 let query_bounded = &self.find_key_values_by_prefix_bounded;
497 let rows = match get_upper_bound_option(&key_prefix) {
498 None => {
499 let values = (root_key.to_vec(), key_prefix.clone());
500 Box::pin(session.execute_iter(query_unbounded.clone(), values)).await?
501 }
502 Some(upper_bound) => {
503 let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
504 Box::pin(session.execute_iter(query_bounded.clone(), values)).await?
505 }
506 };
507 let mut rows = rows.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
508 let mut key_values = Vec::new();
509 while let Some(row) = rows.next().await {
510 let (key, value) = row?;
511 let short_key = key[len..].to_vec();
512 key_values.push((short_key, value));
513 }
514 Ok(key_values)
515 }
516}
517
518#[derive(Clone)]
520pub struct ScyllaDbStoreInternal {
521 store: Arc<ScyllaDbClient>,
522 semaphore: Option<Arc<Semaphore>>,
523 max_stream_queries: usize,
524 root_key: Vec<u8>,
525}
526
527#[derive(Clone)]
529pub struct ScyllaDbDatabaseInternal {
530 store: Arc<ScyllaDbClient>,
531 semaphore: Option<Arc<Semaphore>>,
532 max_stream_queries: usize,
533}
534
535impl WithError for ScyllaDbDatabaseInternal {
536 type Error = ScyllaDbStoreInternalError;
537}
538
539#[derive(Error, Debug)]
541pub enum ScyllaDbStoreInternalError {
542 #[error(transparent)]
544 BcsError(#[from] bcs::Error),
545
546 #[error("The key must have at most MAX_KEY_SIZE")]
548 KeyTooLong,
549
550 #[error("The value must have at most RAW_MAX_VALUE_SIZE")]
552 ValueTooLong,
553
554 #[error(transparent)]
556 DeserializationError(#[from] DeserializationError),
557
558 #[error(transparent)]
560 RowsError(#[from] RowsError),
561
562 #[error(transparent)]
564 IntoRowsResultError(#[from] IntoRowsResultError),
565
566 #[error(transparent)]
568 TypeCheckError(#[from] TypeCheckError),
569
570 #[error(transparent)]
572 PagerExecutionError(#[from] PagerExecutionError),
573
574 #[error(transparent)]
576 ScyllaDbNewSessionError(#[from] NewSessionError),
577
578 #[error("Namespace contains forbidden characters")]
580 InvalidNamespace,
581
582 #[error(transparent)]
584 JournalConsistencyError(#[from] JournalConsistencyError),
585
586 #[error("The batch is too long to be written")]
588 BatchTooLong,
589
590 #[error(transparent)]
592 PrepareError(#[from] PrepareError),
593
594 #[error(transparent)]
596 ExecutionError(#[from] ExecutionError),
597
598 #[error(transparent)]
600 NextRowError(#[from] NextRowError),
601}
602
603impl KeyValueStoreError for ScyllaDbStoreInternalError {
604 const BACKEND: &'static str = "scylla_db";
605}
606
607impl WithError for ScyllaDbStoreInternal {
608 type Error = ScyllaDbStoreInternalError;
609}
610
611impl ReadableKeyValueStore for ScyllaDbStoreInternal {
612 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
613
614 fn max_stream_queries(&self) -> usize {
615 self.max_stream_queries
616 }
617
618 fn root_key(&self) -> Result<Vec<u8>, ScyllaDbStoreInternalError> {
619 Ok(self.root_key[1..].to_vec())
620 }
621
622 async fn read_value_bytes(
623 &self,
624 key: &[u8],
625 ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
626 let store = self.store.deref();
627 let _guard = self.acquire().await;
628 Box::pin(store.read_value_internal(&self.root_key, key.to_vec())).await
629 }
630
631 async fn contains_key(&self, key: &[u8]) -> Result<bool, ScyllaDbStoreInternalError> {
632 let store = self.store.deref();
633 let _guard = self.acquire().await;
634 Box::pin(store.contains_key_internal(&self.root_key, key.to_vec())).await
635 }
636
637 async fn contains_keys(
638 &self,
639 keys: &[Vec<u8>],
640 ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
641 if keys.is_empty() {
642 return Ok(Vec::new());
643 }
644 let store = self.store.deref();
645 let _guard = self.acquire().await;
646 let handles = keys
647 .chunks(MAX_MULTI_KEYS)
648 .map(|keys| store.contains_keys_internal(&self.root_key, keys.to_vec()));
649 let results: Vec<_> = join_all(handles)
650 .await
651 .into_iter()
652 .collect::<Result<_, _>>()?;
653 Ok(results.into_iter().flatten().collect())
654 }
655
656 async fn read_multi_values_bytes(
657 &self,
658 keys: &[Vec<u8>],
659 ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
660 if keys.is_empty() {
661 return Ok(Vec::new());
662 }
663 let store = self.store.deref();
664 let _guard = self.acquire().await;
665 let handles = keys
666 .chunks(MAX_MULTI_KEYS)
667 .map(|keys| store.read_multi_values_internal(&self.root_key, keys.to_vec()));
668 let results: Vec<_> = join_all(handles)
669 .await
670 .into_iter()
671 .collect::<Result<_, _>>()?;
672 Ok(results.into_iter().flatten().collect())
673 }
674
675 async fn find_keys_by_prefix(
676 &self,
677 key_prefix: &[u8],
678 ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
679 let store = self.store.deref();
680 let _guard = self.acquire().await;
681 Box::pin(store.find_keys_by_prefix_internal(&self.root_key, key_prefix.to_vec())).await
682 }
683
684 async fn find_key_values_by_prefix(
685 &self,
686 key_prefix: &[u8],
687 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
688 let store = self.store.deref();
689 let _guard = self.acquire().await;
690 Box::pin(store.find_key_values_by_prefix_internal(&self.root_key, key_prefix.to_vec()))
691 .await
692 }
693}
694
695impl DirectWritableKeyValueStore for ScyllaDbStoreInternal {
696 const MAX_BATCH_SIZE: usize = MAX_BATCH_SIZE;
697 const MAX_BATCH_TOTAL_SIZE: usize = MAX_BATCH_TOTAL_SIZE;
698 const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
699
700 type Batch = UnorderedBatch;
706
707 async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> {
708 let store = self.store.deref();
709 let _guard = self.acquire().await;
710 store.write_batch_internal(&self.root_key, batch).await
711 }
712}
713
714fn get_big_root_key(root_key: &[u8]) -> Vec<u8> {
716 let mut big_key = vec![0];
717 big_key.extend(root_key);
718 big_key
719}
720
721#[derive(Clone, Debug, Deserialize, Serialize)]
723pub struct ScyllaDbStoreInternalConfig {
724 pub uri: String,
726 pub max_concurrent_queries: Option<usize>,
728 pub max_stream_queries: usize,
730 pub replication_factor: u32,
732}
733
734impl KeyValueDatabase for ScyllaDbDatabaseInternal {
735 type Config = ScyllaDbStoreInternalConfig;
736 type Store = ScyllaDbStoreInternal;
737
738 fn get_name() -> String {
739 "scylladb internal".to_string()
740 }
741
742 async fn connect(
743 config: &Self::Config,
744 namespace: &str,
745 ) -> Result<Self, ScyllaDbStoreInternalError> {
746 Self::check_namespace(namespace)?;
747 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
748 let store = ScyllaDbClient::new(session, namespace).await?;
749 let store = Arc::new(store);
750 let semaphore = config
751 .max_concurrent_queries
752 .map(|n| Arc::new(Semaphore::new(n)));
753 let max_stream_queries = config.max_stream_queries;
754 Ok(Self {
755 store,
756 semaphore,
757 max_stream_queries,
758 })
759 }
760
761 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
762 let store = self.store.clone();
763 let semaphore = self.semaphore.clone();
764 let max_stream_queries = self.max_stream_queries;
765 let root_key = get_big_root_key(root_key);
766 Ok(ScyllaDbStoreInternal {
767 store,
768 semaphore,
769 max_stream_queries,
770 root_key,
771 })
772 }
773
774 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
775 self.open_shared(root_key)
776 }
777
778 async fn list_all(config: &Self::Config) -> Result<Vec<String>, ScyllaDbStoreInternalError> {
779 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
780 let statement = session
781 .prepare(format!("DESCRIBE KEYSPACE {}", KEYSPACE))
782 .await?;
783 let result = Box::pin(session.execute_iter(statement, &[])).await;
784 let miss_msg = format!("'{}' not found in keyspaces", KEYSPACE);
785 let result = match result {
786 Ok(result) => result,
787 Err(error) => {
788 let invalid_or_keyspace_not_found = match &error {
789 PagerExecutionError::NextPageError(NextPageError::RequestFailure(
790 RequestError::LastAttemptError(RequestAttemptError::DbError(db_error, msg)),
791 )) => *db_error == DbError::Invalid && msg.as_str() == miss_msg,
792 _ => false,
793 };
794 if invalid_or_keyspace_not_found {
795 return Ok(Vec::new());
796 } else {
797 return Err(ScyllaDbStoreInternalError::PagerExecutionError(error));
798 }
799 }
800 };
801 let mut namespaces = Vec::new();
802 let mut rows_stream = result.rows_stream::<(String, String, String, String)>()?;
803 while let Some(row) = rows_stream.next().await {
804 let (_, object_kind, name, _) = row?;
805 if object_kind == "table" {
806 namespaces.push(name);
807 }
808 }
809 Ok(namespaces)
810 }
811
812 async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
813 let statement = self
814 .store
815 .session
816 .prepare(format!(
817 "SELECT root_key FROM {}.\"{}\" ALLOW FILTERING",
818 KEYSPACE, self.store.namespace
819 ))
820 .await?;
821
822 let rows = Box::pin(self.store.session.execute_iter(statement, &[])).await?;
824 let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
825 let mut root_keys = BTreeSet::new();
826 while let Some(row) = rows.next().await {
827 let (root_key,) = row?;
828 let root_key = root_key[1..].to_vec();
829 root_keys.insert(root_key);
830 }
831 Ok(root_keys.into_iter().collect::<Vec<_>>())
832 }
833
834 async fn delete_all(store_config: &Self::Config) -> Result<(), ScyllaDbStoreInternalError> {
835 let session = ScyllaDbClient::build_default_session(&store_config.uri).await?;
836 let statement = session
837 .prepare(format!("DROP KEYSPACE IF EXISTS {}", KEYSPACE))
838 .await?;
839
840 session
841 .execute_single_page(&statement, &[], PagingState::start())
842 .await?;
843 Ok(())
844 }
845
846 async fn exists(
847 config: &Self::Config,
848 namespace: &str,
849 ) -> Result<bool, ScyllaDbStoreInternalError> {
850 Self::check_namespace(namespace)?;
851 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
852
853 let result = session
855 .prepare(format!(
856 "SELECT root_key FROM {}.\"{}\" LIMIT 1 ALLOW FILTERING",
857 KEYSPACE, namespace
858 ))
859 .await;
860
861 let miss_msg1 = format!("unconfigured table {}", namespace);
863 let miss_msg1 = miss_msg1.as_str();
864 let miss_msg2 = "Undefined name root_key in selection clause";
865 let miss_msg3 = format!("Keyspace {} does not exist", KEYSPACE);
866 let Err(error) = result else {
867 return Ok(true);
869 };
870 let missing_table = match &error {
871 PrepareError::AllAttemptsFailed {
872 first_attempt: RequestAttemptError::DbError(db_error, msg),
873 } => {
874 if *db_error != DbError::Invalid {
875 false
876 } else {
877 msg.as_str() == miss_msg1
878 || msg.as_str() == miss_msg2
879 || msg.as_str() == miss_msg3
880 }
881 }
882 _ => false,
883 };
884 if missing_table {
885 Ok(false)
886 } else {
887 Err(ScyllaDbStoreInternalError::PrepareError(error))
888 }
889 }
890
891 async fn create(
892 config: &Self::Config,
893 namespace: &str,
894 ) -> Result<(), ScyllaDbStoreInternalError> {
895 Self::check_namespace(namespace)?;
896 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
897
898 let statement = session
900 .prepare(format!(
901 "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ \
902 'class' : 'NetworkTopologyStrategy', \
903 'replication_factor' : {} \
904 }}",
905 KEYSPACE, config.replication_factor
906 ))
907 .await?;
908 session
909 .execute_single_page(&statement, &[], PagingState::start())
910 .await?;
911
912 let statement = session
915 .prepare(format!(
916 "CREATE TABLE {}.\"{}\" (\
917 root_key blob, \
918 k blob, \
919 v blob, \
920 PRIMARY KEY (root_key, k) \
921 ) \
922 WITH compaction = {{ \
923 'class' : 'SizeTieredCompactionStrategy', \
924 'min_sstable_size' : 52428800, \
925 'bucket_low' : 0.5, \
926 'bucket_high' : 1.5, \
927 'min_threshold' : 4, \
928 'max_threshold' : 32 \
929 }} \
930 AND compression = {{ \
931 'sstable_compression': 'LZ4Compressor', \
932 'chunk_length_in_kb':'4' \
933 }} \
934 AND caching = {{ \
935 'enabled': 'true' \
936 }}",
937 KEYSPACE, namespace
938 ))
939 .await?;
940 session
941 .execute_single_page(&statement, &[], PagingState::start())
942 .await?;
943 Ok(())
944 }
945
946 async fn delete(
947 config: &Self::Config,
948 namespace: &str,
949 ) -> Result<(), ScyllaDbStoreInternalError> {
950 Self::check_namespace(namespace)?;
951 let session = ScyllaDbClient::build_default_session(&config.uri).await?;
952 let statement = session
953 .prepare(format!(
954 "DROP TABLE IF EXISTS {}.\"{}\";",
955 KEYSPACE, namespace
956 ))
957 .await?;
958 session
959 .execute_single_page(&statement, &[], PagingState::start())
960 .await?;
961 Ok(())
962 }
963}
964
965impl ScyllaDbStoreInternal {
966 async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
968 match &self.semaphore {
969 None => None,
970 Some(count) => Some(count.acquire().await),
971 }
972 }
973}
974
975impl ScyllaDbDatabaseInternal {
976 fn check_namespace(namespace: &str) -> Result<(), ScyllaDbStoreInternalError> {
977 if !namespace.is_empty()
978 && namespace.len() <= 48
979 && namespace
980 .chars()
981 .all(|c| c.is_ascii_alphanumeric() || c == '_')
982 {
983 return Ok(());
984 }
985 Err(ScyllaDbStoreInternalError::InvalidNamespace)
986 }
987}
988
989#[cfg(with_testing)]
990impl TestKeyValueDatabase for JournalingKeyValueDatabase<ScyllaDbDatabaseInternal> {
991 async fn new_test_config() -> Result<ScyllaDbStoreInternalConfig, ScyllaDbStoreInternalError> {
992 let uri = "localhost:9042".to_string();
994 Ok(ScyllaDbStoreInternalConfig {
995 uri,
996 max_concurrent_queries: Some(10),
997 max_stream_queries: 10,
998 replication_factor: 1,
999 })
1000 }
1001}
1002
1003#[cfg(with_metrics)]
1005pub type ScyllaDbDatabase = MeteredDatabase<
1006 LruCachingDatabase<
1007 MeteredDatabase<
1008 ValueSplittingDatabase<
1009 MeteredDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
1010 >,
1011 >,
1012 >,
1013>;
1014
1015#[cfg(not(with_metrics))]
1017pub type ScyllaDbDatabase = LruCachingDatabase<
1018 ValueSplittingDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
1019>;
1020
1021pub type ScyllaDbStoreConfig = LruCachingConfig<ScyllaDbStoreInternalConfig>;
1023
1024pub type ScyllaDbStoreError = ValueSplittingError<ScyllaDbStoreInternalError>;