linera_views/backends/
scylla_db.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] for the ScyllaDB database.
5//!
6//! The current connection is done via a Session and a corresponding primary key called
7//! "namespace". The maximum number of concurrent queries is controlled by
8//! `max_concurrent_queries`.
9
10use std::{
11    collections::{BTreeSet, HashMap},
12    ops::Deref,
13    sync::Arc,
14};
15
16use async_lock::{Semaphore, SemaphoreGuard};
17use dashmap::DashMap;
18use futures::{future::join_all, StreamExt as _};
19use linera_base::ensure;
20use scylla::{
21    client::{
22        execution_profile::{ExecutionProfile, ExecutionProfileHandle},
23        session::Session,
24        session_builder::SessionBuilder,
25    },
26    deserialize::{DeserializationError, TypeCheckError},
27    errors::{
28        DbError, ExecutionError, IntoRowsResultError, NewSessionError, NextPageError, NextRowError,
29        PagerExecutionError, PrepareError, RequestAttemptError, RequestError, RowsError,
30    },
31    policies::{
32        load_balancing::{DefaultPolicy, LoadBalancingPolicy},
33        retry::DefaultRetryPolicy,
34    },
35    response::PagingState,
36    statement::{batch::BatchType, prepared::PreparedStatement, Consistency},
37};
38use serde::{Deserialize, Serialize};
39use thiserror::Error;
40
41#[cfg(with_metrics)]
42use crate::metering::MeteredStore;
43#[cfg(with_testing)]
44use crate::store::TestKeyValueStore;
45use crate::{
46    batch::UnorderedBatch,
47    common::{get_uleb128_size, get_upper_bound_option},
48    journaling::{DirectWritableKeyValueStore, JournalConsistencyError, JournalingKeyValueStore},
49    lru_caching::{LruCachingConfig, LruCachingStore},
50    store::{AdminKeyValueStore, KeyValueStoreError, ReadableKeyValueStore, WithError},
51    value_splitting::{ValueSplittingError, ValueSplittingStore},
52    FutureSyncExt as _,
53};
54
55/// Fundamental constant in ScyllaDB: The maximum size of a multi keys query
56/// The limit is in reality 100. But we need one entry for the root key.
57const MAX_MULTI_KEYS: usize = 100 - 1;
58
59/// The maximal size of an operation on ScyllaDB seems to be 16 MiB
60/// https://www.scylladb.com/2019/03/27/best-practices-for-scylla-applications/
61/// "There is a hard limit at 16 MiB, and nothing bigger than that can arrive at once
62///  at the database at any particular time"
63/// So, we set up the maximal size of 16 MiB - 10 KiB for the values and 10 KiB for the keys
64/// We also arbitrarily decrease the size by 4000 bytes because an amount of size is
65/// taken internally by the database.
66const RAW_MAX_VALUE_SIZE: usize = 16 * 1024 * 1024 - 10 * 1024 - 4000;
67const MAX_KEY_SIZE: usize = 10 * 1024;
68const MAX_BATCH_TOTAL_SIZE: usize = RAW_MAX_VALUE_SIZE + MAX_KEY_SIZE;
69
70/// The `RAW_MAX_VALUE_SIZE` is the maximum size on the ScyllaDB storage.
71/// However, the value being written can also be the serialization of a `SimpleUnorderedBatch`
72/// Therefore the actual `MAX_VALUE_SIZE` is lower.
73/// At the maximum the key size is 1024 bytes (see below) and we pack just one entry.
74/// So if the key has 1024 bytes this gets us the inequality
75/// `1 + 1 + 1 + serialized_size(MAX_KEY_SIZE)? + serialized_size(x)? <= RAW_MAX_VALUE_SIZE`.
76/// and so this simplifies to `1 + 1 + 1 + (2 + 10240) + (4 + x) <= RAW_MAX_VALUE_SIZE`
77/// Note on the above formula:
78/// * We write 4 because `get_uleb128_size(RAW_MAX_VALUE_SIZE) = 4)`
79/// * We write `1 + 1 + 1`  because the `UnorderedBatch` has three entries.
80///
81/// This gets us to a maximal value of 16752727.
82const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
83    - MAX_KEY_SIZE
84    - get_uleb128_size(RAW_MAX_VALUE_SIZE)
85    - get_uleb128_size(MAX_KEY_SIZE)
86    - 3;
87
88/// The constant 14000 is an empirical constant that was found to be necessary
89/// to make the ScyllaDB system work. We have not been able to find this or
90/// a similar constant in the source code or the documentation.
91/// An experimental approach gets us that 14796 is the latest value that is
92/// correct.
93const MAX_BATCH_SIZE: usize = 5000;
94
95/// The keyspace to use for the ScyllaDB database.
96const KEYSPACE: &str = "kv";
97
98/// The client for ScyllaDB:
99/// * The session allows to pass queries
100/// * The namespace that is being assigned to the database
101/// * The prepared queries used for implementing the features of `KeyValueStore`.
102struct ScyllaDbClient {
103    session: Session,
104    namespace: String,
105    read_value: PreparedStatement,
106    contains_key: PreparedStatement,
107    write_batch_delete_prefix_unbounded: PreparedStatement,
108    write_batch_delete_prefix_bounded: PreparedStatement,
109    write_batch_deletion: PreparedStatement,
110    write_batch_insertion: PreparedStatement,
111    find_keys_by_prefix_unbounded: PreparedStatement,
112    find_keys_by_prefix_bounded: PreparedStatement,
113    find_key_values_by_prefix_unbounded: PreparedStatement,
114    find_key_values_by_prefix_bounded: PreparedStatement,
115    multi_key_values: DashMap<usize, PreparedStatement>,
116    multi_keys: DashMap<usize, PreparedStatement>,
117}
118
119impl ScyllaDbClient {
120    async fn new(session: Session, namespace: &str) -> Result<Self, ScyllaDbStoreInternalError> {
121        let namespace = namespace.to_string();
122        let read_value = session
123            .prepare(format!(
124                "SELECT v FROM {}.{} WHERE root_key = ? AND k = ?",
125                KEYSPACE, namespace
126            ))
127            .await?;
128
129        let contains_key = session
130            .prepare(format!(
131                "SELECT root_key FROM {}.{} WHERE root_key = ? AND k = ?",
132                KEYSPACE, namespace
133            ))
134            .await?;
135
136        let write_batch_delete_prefix_unbounded = session
137            .prepare(format!(
138                "DELETE FROM {}.{} WHERE root_key = ? AND k >= ?",
139                KEYSPACE, namespace
140            ))
141            .await?;
142
143        let write_batch_delete_prefix_bounded = session
144            .prepare(format!(
145                "DELETE FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?",
146                KEYSPACE, namespace
147            ))
148            .await?;
149
150        let write_batch_deletion = session
151            .prepare(format!(
152                "DELETE FROM {}.{} WHERE root_key = ? AND k = ?",
153                KEYSPACE, namespace
154            ))
155            .await?;
156
157        let write_batch_insertion = session
158            .prepare(format!(
159                "INSERT INTO {}.{} (root_key, k, v) VALUES (?, ?, ?)",
160                KEYSPACE, namespace
161            ))
162            .await?;
163
164        let find_keys_by_prefix_unbounded = session
165            .prepare(format!(
166                "SELECT k FROM {}.{} WHERE root_key = ? AND k >= ?",
167                KEYSPACE, namespace
168            ))
169            .await?;
170
171        let find_keys_by_prefix_bounded = session
172            .prepare(format!(
173                "SELECT k FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?",
174                KEYSPACE, namespace
175            ))
176            .await?;
177
178        let find_key_values_by_prefix_unbounded = session
179            .prepare(format!(
180                "SELECT k,v FROM {}.{} WHERE root_key = ? AND k >= ?",
181                KEYSPACE, namespace
182            ))
183            .await?;
184
185        let find_key_values_by_prefix_bounded = session
186            .prepare(format!(
187                "SELECT k,v FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?",
188                KEYSPACE, namespace
189            ))
190            .await?;
191
192        Ok(Self {
193            session,
194            namespace,
195            read_value,
196            contains_key,
197            write_batch_delete_prefix_unbounded,
198            write_batch_delete_prefix_bounded,
199            write_batch_deletion,
200            write_batch_insertion,
201            find_keys_by_prefix_unbounded,
202            find_keys_by_prefix_bounded,
203            find_key_values_by_prefix_unbounded,
204            find_key_values_by_prefix_bounded,
205            multi_key_values: DashMap::new(),
206            multi_keys: DashMap::new(),
207        })
208    }
209
210    fn build_default_policy() -> Arc<dyn LoadBalancingPolicy> {
211        DefaultPolicy::builder().token_aware(true).build()
212    }
213
214    fn build_default_execution_profile_handle(
215        policy: Arc<dyn LoadBalancingPolicy>,
216    ) -> ExecutionProfileHandle {
217        let default_profile = ExecutionProfile::builder()
218            .load_balancing_policy(policy)
219            .retry_policy(Arc::new(DefaultRetryPolicy::new()))
220            .consistency(Consistency::LocalQuorum)
221            .build();
222        default_profile.into_handle()
223    }
224
225    async fn build_default_session(uri: &str) -> Result<Session, ScyllaDbStoreInternalError> {
226        // This explicitly sets a lot of default parameters for clarity and for making future changes
227        // easier.
228        SessionBuilder::new()
229            .known_node(uri)
230            .default_execution_profile_handle(Self::build_default_execution_profile_handle(
231                Self::build_default_policy(),
232            ))
233            .build()
234            .boxed_sync()
235            .await
236            .map_err(Into::into)
237    }
238
239    async fn get_multi_key_values_statement(
240        &self,
241        num_markers: usize,
242    ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
243        if let Some(prepared_statement) = self.multi_key_values.get(&num_markers) {
244            return Ok(prepared_statement.clone());
245        }
246        let markers = std::iter::repeat_n("?", num_markers)
247            .collect::<Vec<_>>()
248            .join(",");
249        let prepared_statement = self
250            .session
251            .prepare(format!(
252                "SELECT k,v FROM {}.{} WHERE root_key = ? AND k IN ({})",
253                KEYSPACE, self.namespace, markers
254            ))
255            .await?;
256        self.multi_key_values
257            .insert(num_markers, prepared_statement.clone());
258        Ok(prepared_statement)
259    }
260
261    async fn get_multi_keys_statement(
262        &self,
263        num_markers: usize,
264    ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
265        if let Some(prepared_statement) = self.multi_keys.get(&num_markers) {
266            return Ok(prepared_statement.clone());
267        };
268        let markers = std::iter::repeat_n("?", num_markers)
269            .collect::<Vec<_>>()
270            .join(",");
271        let prepared_statement = self
272            .session
273            .prepare(format!(
274                "SELECT k FROM {}.{} WHERE root_key = ? AND k IN ({})",
275                KEYSPACE, self.namespace, markers
276            ))
277            .await?;
278        self.multi_keys
279            .insert(num_markers, prepared_statement.clone());
280        Ok(prepared_statement)
281    }
282
283    fn check_key_size(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
284        ensure!(
285            key.len() <= MAX_KEY_SIZE,
286            ScyllaDbStoreInternalError::KeyTooLong
287        );
288        Ok(())
289    }
290
291    fn check_value_size(value: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
292        ensure!(
293            value.len() <= RAW_MAX_VALUE_SIZE,
294            ScyllaDbStoreInternalError::ValueTooLong
295        );
296        Ok(())
297    }
298
299    fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> {
300        ensure!(
301            batch.len() <= MAX_BATCH_SIZE,
302            ScyllaDbStoreInternalError::BatchTooLong
303        );
304        Ok(())
305    }
306
307    async fn read_value_internal(
308        &self,
309        root_key: &[u8],
310        key: Vec<u8>,
311    ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
312        Self::check_key_size(&key)?;
313        let session = &self.session;
314        // Read the value of a key
315        let values = (root_key.to_vec(), key);
316
317        let (result, _) = session
318            .execute_single_page(&self.read_value, &values, PagingState::start())
319            .await?;
320        let rows = result.into_rows_result()?;
321        let mut rows = rows.rows::<(Vec<u8>,)>()?;
322        Ok(match rows.next() {
323            Some(row) => Some(row?.0),
324            None => None,
325        })
326    }
327
328    fn get_occurrences_map(
329        keys: Vec<Vec<u8>>,
330    ) -> Result<HashMap<Vec<u8>, Vec<usize>>, ScyllaDbStoreInternalError> {
331        let mut map = HashMap::<Vec<u8>, Vec<usize>>::new();
332        for (i_key, key) in keys.into_iter().enumerate() {
333            Self::check_key_size(&key)?;
334            map.entry(key).or_default().push(i_key);
335        }
336        Ok(map)
337    }
338
339    async fn read_multi_values_internal(
340        &self,
341        root_key: &[u8],
342        keys: Vec<Vec<u8>>,
343    ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
344        let mut values = vec![None; keys.len()];
345        let map = Self::get_occurrences_map(keys)?;
346        let statement = self.get_multi_key_values_statement(map.len()).await?;
347        let mut inputs = vec![root_key.to_vec()];
348        inputs.extend(map.keys().cloned());
349        let mut rows = self
350            .session
351            .execute_iter(statement, &inputs)
352            .await?
353            .rows_stream::<(Vec<u8>, Vec<u8>)>()?;
354
355        while let Some(row) = rows.next().await {
356            let (key, value) = row?;
357            for i_key in &map[&key] {
358                values[*i_key] = Some(value.clone());
359            }
360        }
361        Ok(values)
362    }
363
364    async fn contains_keys_internal(
365        &self,
366        root_key: &[u8],
367        keys: Vec<Vec<u8>>,
368    ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
369        let mut values = vec![false; keys.len()];
370        let map = Self::get_occurrences_map(keys)?;
371        let statement = self.get_multi_keys_statement(map.len()).await?;
372        let mut inputs = vec![root_key.to_vec()];
373        inputs.extend(map.keys().cloned());
374        let mut rows = self
375            .session
376            .execute_iter(statement, &inputs)
377            .await?
378            .rows_stream::<(Vec<u8>,)>()?;
379
380        while let Some(row) = rows.next().await {
381            let (key,) = row?;
382            for i_key in &map[&key] {
383                values[*i_key] = true;
384            }
385        }
386
387        Ok(values)
388    }
389
390    async fn contains_key_internal(
391        &self,
392        root_key: &[u8],
393        key: Vec<u8>,
394    ) -> Result<bool, ScyllaDbStoreInternalError> {
395        Self::check_key_size(&key)?;
396        let session = &self.session;
397        // Read the value of a key
398        let values = (root_key.to_vec(), key);
399
400        let (result, _) = session
401            .execute_single_page(&self.contains_key, &values, PagingState::start())
402            .await?;
403        let rows = result.into_rows_result()?;
404        let mut rows = rows.rows::<(Vec<u8>,)>()?;
405        Ok(rows.next().is_some())
406    }
407
408    async fn write_batch_internal(
409        &self,
410        root_key: &[u8],
411        batch: UnorderedBatch,
412    ) -> Result<(), ScyllaDbStoreInternalError> {
413        let session = &self.session;
414        let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged);
415        let mut batch_values = Vec::new();
416        let query1 = &self.write_batch_delete_prefix_unbounded;
417        let query2 = &self.write_batch_delete_prefix_bounded;
418        Self::check_batch_len(&batch)?;
419        for key_prefix in batch.key_prefix_deletions {
420            Self::check_key_size(&key_prefix)?;
421            match get_upper_bound_option(&key_prefix) {
422                None => {
423                    let values = vec![root_key.to_vec(), key_prefix];
424                    batch_values.push(values);
425                    batch_query.append_statement(query1.clone());
426                }
427                Some(upper_bound) => {
428                    let values = vec![root_key.to_vec(), key_prefix, upper_bound];
429                    batch_values.push(values);
430                    batch_query.append_statement(query2.clone());
431                }
432            }
433        }
434        let query3 = &self.write_batch_deletion;
435        for key in batch.simple_unordered_batch.deletions {
436            Self::check_key_size(&key)?;
437            let values = vec![root_key.to_vec(), key];
438            batch_values.push(values);
439            batch_query.append_statement(query3.clone());
440        }
441        let query4 = &self.write_batch_insertion;
442        for (key, value) in batch.simple_unordered_batch.insertions {
443            Self::check_key_size(&key)?;
444            Self::check_value_size(&value)?;
445            let values = vec![root_key.to_vec(), key, value];
446            batch_values.push(values);
447            batch_query.append_statement(query4.clone());
448        }
449        session.batch(&batch_query, batch_values).await?;
450        Ok(())
451    }
452
453    async fn find_keys_by_prefix_internal(
454        &self,
455        root_key: &[u8],
456        key_prefix: Vec<u8>,
457    ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
458        Self::check_key_size(&key_prefix)?;
459        let session = &self.session;
460        // Read the value of a key
461        let len = key_prefix.len();
462        let query_unbounded = &self.find_keys_by_prefix_unbounded;
463        let query_bounded = &self.find_keys_by_prefix_bounded;
464        let rows = match get_upper_bound_option(&key_prefix) {
465            None => {
466                let values = (root_key.to_vec(), key_prefix.clone());
467                session
468                    .execute_iter(query_unbounded.clone(), values)
469                    .await?
470            }
471            Some(upper_bound) => {
472                let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
473                session.execute_iter(query_bounded.clone(), values).await?
474            }
475        };
476        let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
477        let mut keys = Vec::new();
478        while let Some(row) = rows.next().await {
479            let (key,) = row?;
480            let short_key = key[len..].to_vec();
481            keys.push(short_key);
482        }
483        Ok(keys)
484    }
485
486    async fn find_key_values_by_prefix_internal(
487        &self,
488        root_key: &[u8],
489        key_prefix: Vec<u8>,
490    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
491        Self::check_key_size(&key_prefix)?;
492        let session = &self.session;
493        // Read the value of a key
494        let len = key_prefix.len();
495        let query_unbounded = &self.find_key_values_by_prefix_unbounded;
496        let query_bounded = &self.find_key_values_by_prefix_bounded;
497        let rows = match get_upper_bound_option(&key_prefix) {
498            None => {
499                let values = (root_key.to_vec(), key_prefix.clone());
500                session
501                    .execute_iter(query_unbounded.clone(), values)
502                    .await?
503            }
504            Some(upper_bound) => {
505                let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
506                session.execute_iter(query_bounded.clone(), values).await?
507            }
508        };
509        let mut rows = rows.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
510        let mut key_values = Vec::new();
511        while let Some(row) = rows.next().await {
512            let (key, value) = row?;
513            let short_key = key[len..].to_vec();
514            key_values.push((short_key, value));
515        }
516        Ok(key_values)
517    }
518}
519
520/// The client itself and the keeping of the count of active connections.
521#[derive(Clone)]
522pub struct ScyllaDbStoreInternal {
523    store: Arc<ScyllaDbClient>,
524    semaphore: Option<Arc<Semaphore>>,
525    max_stream_queries: usize,
526    root_key: Vec<u8>,
527}
528
529/// The error type for [`ScyllaDbStoreInternal`]
530#[derive(Error, Debug)]
531pub enum ScyllaDbStoreInternalError {
532    /// BCS serialization error.
533    #[error(transparent)]
534    BcsError(#[from] bcs::Error),
535
536    /// The key must have at most `MAX_KEY_SIZE` bytes
537    #[error("The key must have at most MAX_KEY_SIZE")]
538    KeyTooLong,
539
540    /// The value must have at most `RAW_MAX_VALUE_SIZE` bytes
541    #[error("The value must have at most RAW_MAX_VALUE_SIZE")]
542    ValueTooLong,
543
544    /// A deserialization error in ScyllaDB
545    #[error(transparent)]
546    DeserializationError(#[from] DeserializationError),
547
548    /// A row error in ScyllaDB
549    #[error(transparent)]
550    RowsError(#[from] RowsError),
551
552    /// A type error in the accessed data in ScyllaDB
553    #[error(transparent)]
554    IntoRowsResultError(#[from] IntoRowsResultError),
555
556    /// A type check error in ScyllaDB
557    #[error(transparent)]
558    TypeCheckError(#[from] TypeCheckError),
559
560    /// A query error in ScyllaDB
561    #[error(transparent)]
562    PagerExecutionError(#[from] PagerExecutionError),
563
564    /// A query error in ScyllaDB
565    #[error(transparent)]
566    ScyllaDbNewSessionError(#[from] NewSessionError),
567
568    /// Namespace contains forbidden characters
569    #[error("Namespace contains forbidden characters")]
570    InvalidNamespace,
571
572    /// The journal is not coherent
573    #[error(transparent)]
574    JournalConsistencyError(#[from] JournalConsistencyError),
575
576    /// The batch is too long to be written
577    #[error("The batch is too long to be written")]
578    BatchTooLong,
579
580    /// A prepare error in ScyllaDB
581    #[error(transparent)]
582    PrepareError(#[from] PrepareError),
583
584    /// An execution error in ScyllaDB
585    #[error(transparent)]
586    ExecutionError(#[from] ExecutionError),
587
588    /// A next row error in ScyllaDB
589    #[error(transparent)]
590    NextRowError(#[from] NextRowError),
591}
592
593impl KeyValueStoreError for ScyllaDbStoreInternalError {
594    const BACKEND: &'static str = "scylla_db";
595}
596
597impl WithError for ScyllaDbStoreInternal {
598    type Error = ScyllaDbStoreInternalError;
599}
600
601impl ReadableKeyValueStore for ScyllaDbStoreInternal {
602    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
603
604    fn max_stream_queries(&self) -> usize {
605        self.max_stream_queries
606    }
607
608    async fn read_value_bytes(
609        &self,
610        key: &[u8],
611    ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
612        let store = self.store.deref();
613        let _guard = self.acquire().await;
614        store
615            .read_value_internal(&self.root_key, key.to_vec())
616            .await
617    }
618
619    async fn contains_key(&self, key: &[u8]) -> Result<bool, ScyllaDbStoreInternalError> {
620        let store = self.store.deref();
621        let _guard = self.acquire().await;
622        store
623            .contains_key_internal(&self.root_key, key.to_vec())
624            .await
625    }
626
627    async fn contains_keys(
628        &self,
629        keys: Vec<Vec<u8>>,
630    ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
631        if keys.is_empty() {
632            return Ok(Vec::new());
633        }
634        let store = self.store.deref();
635        let _guard = self.acquire().await;
636        let handles = keys
637            .chunks(MAX_MULTI_KEYS)
638            .map(|keys| store.contains_keys_internal(&self.root_key, keys.to_vec()));
639        let results: Vec<_> = join_all(handles)
640            .await
641            .into_iter()
642            .collect::<Result<_, _>>()?;
643        Ok(results.into_iter().flatten().collect())
644    }
645
646    async fn read_multi_values_bytes(
647        &self,
648        keys: Vec<Vec<u8>>,
649    ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
650        if keys.is_empty() {
651            return Ok(Vec::new());
652        }
653        let store = self.store.deref();
654        let _guard = self.acquire().await;
655        let handles = keys
656            .chunks(MAX_MULTI_KEYS)
657            .map(|keys| store.read_multi_values_internal(&self.root_key, keys.to_vec()));
658        let results: Vec<_> = join_all(handles)
659            .await
660            .into_iter()
661            .collect::<Result<_, _>>()?;
662        Ok(results.into_iter().flatten().collect())
663    }
664
665    async fn find_keys_by_prefix(
666        &self,
667        key_prefix: &[u8],
668    ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
669        let store = self.store.deref();
670        let _guard = self.acquire().await;
671        store
672            .find_keys_by_prefix_internal(&self.root_key, key_prefix.to_vec())
673            .await
674    }
675
676    async fn find_key_values_by_prefix(
677        &self,
678        key_prefix: &[u8],
679    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
680        let store = self.store.deref();
681        let _guard = self.acquire().await;
682        store
683            .find_key_values_by_prefix_internal(&self.root_key, key_prefix.to_vec())
684            .await
685    }
686}
687
688impl DirectWritableKeyValueStore for ScyllaDbStoreInternal {
689    const MAX_BATCH_SIZE: usize = MAX_BATCH_SIZE;
690    const MAX_BATCH_TOTAL_SIZE: usize = MAX_BATCH_TOTAL_SIZE;
691    const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
692
693    // ScyllaDB cannot take a `crate::batch::Batch` directly. Indeed, if a delete is
694    // followed by a write, then the delete takes priority. See the sentence "The first
695    // tie-breaking rule when two cells have the same write timestamp is that dead cells
696    // win over live cells" from
697    // https://github.com/scylladb/scylladb/blob/master/docs/dev/timestamp-conflict-resolution.md
698    type Batch = UnorderedBatch;
699
700    async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> {
701        let store = self.store.deref();
702        let _guard = self.acquire().await;
703        store.write_batch_internal(&self.root_key, batch).await
704    }
705}
706
707// ScyllaDB requires that the keys are non-empty.
708fn get_big_root_key(root_key: &[u8]) -> Vec<u8> {
709    let mut big_key = vec![0];
710    big_key.extend(root_key);
711    big_key
712}
713
714/// The type for building a new ScyllaDB Key Value Store
715#[derive(Clone, Debug, Deserialize, Serialize)]
716pub struct ScyllaDbStoreInternalConfig {
717    /// The URL to which the requests have to be sent
718    pub uri: String,
719    /// Maximum number of concurrent database queries allowed for this client.
720    pub max_concurrent_queries: Option<usize>,
721    /// Preferred buffer size for async streams.
722    pub max_stream_queries: usize,
723    /// The replication factor.
724    pub replication_factor: u32,
725}
726
727impl AdminKeyValueStore for ScyllaDbStoreInternal {
728    type Config = ScyllaDbStoreInternalConfig;
729
730    fn get_name() -> String {
731        "scylladb internal".to_string()
732    }
733
734    async fn connect(
735        config: &Self::Config,
736        namespace: &str,
737    ) -> Result<Self, ScyllaDbStoreInternalError> {
738        Self::check_namespace(namespace)?;
739        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
740        let store = ScyllaDbClient::new(session, namespace).await?;
741        let store = Arc::new(store);
742        let semaphore = config
743            .max_concurrent_queries
744            .map(|n| Arc::new(Semaphore::new(n)));
745        let max_stream_queries = config.max_stream_queries;
746        let root_key = get_big_root_key(&[]);
747        Ok(Self {
748            store,
749            semaphore,
750            max_stream_queries,
751            root_key,
752        })
753    }
754
755    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, ScyllaDbStoreInternalError> {
756        let store = self.store.clone();
757        let semaphore = self.semaphore.clone();
758        let max_stream_queries = self.max_stream_queries;
759        let root_key = get_big_root_key(root_key);
760        Ok(Self {
761            store,
762            semaphore,
763            max_stream_queries,
764            root_key,
765        })
766    }
767
768    async fn list_all(config: &Self::Config) -> Result<Vec<String>, ScyllaDbStoreInternalError> {
769        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
770        let statement = session
771            .prepare(format!("DESCRIBE KEYSPACE {}", KEYSPACE))
772            .await?;
773        let result = session.execute_iter(statement, &[]).await;
774        let miss_msg = format!("'{}' not found in keyspaces", KEYSPACE);
775        let result = match result {
776            Ok(result) => result,
777            Err(error) => {
778                let invalid_or_keyspace_not_found = match &error {
779                    PagerExecutionError::NextPageError(NextPageError::RequestFailure(
780                        RequestError::LastAttemptError(RequestAttemptError::DbError(db_error, msg)),
781                    )) => *db_error == DbError::Invalid && msg.as_str() == miss_msg,
782                    _ => false,
783                };
784                if invalid_or_keyspace_not_found {
785                    return Ok(Vec::new());
786                } else {
787                    return Err(ScyllaDbStoreInternalError::PagerExecutionError(error));
788                }
789            }
790        };
791        let mut namespaces = Vec::new();
792        let mut rows_stream = result.rows_stream::<(String, String, String, String)>()?;
793        while let Some(row) = rows_stream.next().await {
794            let (_, object_kind, name, _) = row?;
795            if object_kind == "table" {
796                namespaces.push(name);
797            }
798        }
799        Ok(namespaces)
800    }
801
802    async fn list_root_keys(
803        config: &Self::Config,
804        namespace: &str,
805    ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
806        Self::check_namespace(namespace)?;
807        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
808        let statement = session
809            .prepare(format!(
810                "SELECT root_key FROM {}.{} ALLOW FILTERING",
811                KEYSPACE, namespace
812            ))
813            .await?;
814
815        // Execute the query
816        let rows = session.execute_iter(statement, &[]).await?;
817        let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
818        let mut root_keys = BTreeSet::new();
819        while let Some(row) = rows.next().await {
820            let (root_key,) = row?;
821            let root_key = root_key[1..].to_vec();
822            root_keys.insert(root_key);
823        }
824        Ok(root_keys.into_iter().collect::<Vec<_>>())
825    }
826
827    async fn delete_all(store_config: &Self::Config) -> Result<(), ScyllaDbStoreInternalError> {
828        let session = ScyllaDbClient::build_default_session(&store_config.uri).await?;
829        let statement = session
830            .prepare(format!("DROP KEYSPACE IF EXISTS {}", KEYSPACE))
831            .await?;
832
833        session
834            .execute_single_page(&statement, &[], PagingState::start())
835            .await?;
836        Ok(())
837    }
838
839    async fn exists(
840        config: &Self::Config,
841        namespace: &str,
842    ) -> Result<bool, ScyllaDbStoreInternalError> {
843        Self::check_namespace(namespace)?;
844        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
845
846        // We check the way the test can fail. It can fail in different ways.
847        let result = session
848            .prepare(format!(
849                "SELECT root_key FROM {}.{} LIMIT 1 ALLOW FILTERING",
850                KEYSPACE, namespace
851            ))
852            .await;
853
854        // The missing table translates into a very specific error that we matched
855        let miss_msg1 = format!("unconfigured table {}", namespace);
856        let miss_msg1 = miss_msg1.as_str();
857        let miss_msg2 = "Undefined name root_key in selection clause";
858        let miss_msg3 = format!("Keyspace {} does not exist", KEYSPACE);
859        let Err(error) = result else {
860            // If OK, then the table exists
861            return Ok(true);
862        };
863        let missing_table = match &error {
864            PrepareError::AllAttemptsFailed {
865                first_attempt: RequestAttemptError::DbError(db_error, msg),
866            } => {
867                if *db_error != DbError::Invalid {
868                    false
869                } else {
870                    msg.as_str() == miss_msg1
871                        || msg.as_str() == miss_msg2
872                        || msg.as_str() == miss_msg3
873                }
874            }
875            _ => false,
876        };
877        if missing_table {
878            Ok(false)
879        } else {
880            Err(ScyllaDbStoreInternalError::PrepareError(error))
881        }
882    }
883
884    async fn create(
885        config: &Self::Config,
886        namespace: &str,
887    ) -> Result<(), ScyllaDbStoreInternalError> {
888        Self::check_namespace(namespace)?;
889        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
890
891        // Create a keyspace if it doesn't exist
892        let statement = session
893            .prepare(format!(
894                "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ \
895                    'class' : 'NetworkTopologyStrategy', \
896                    'replication_factor' : {} \
897                }}",
898                KEYSPACE, config.replication_factor
899            ))
900            .await?;
901        session
902            .execute_single_page(&statement, &[], PagingState::start())
903            .await?;
904
905        // This explicitly sets a lot of default parameters for clarity and for making future
906        // changes easier.
907        let statement = session
908            .prepare(format!(
909                "CREATE TABLE {}.{} (\
910                    root_key blob, \
911                    k blob, \
912                    v blob, \
913                    PRIMARY KEY (root_key, k) \
914                ) \
915                WITH compaction = {{ \
916                    'class'            : 'SizeTieredCompactionStrategy', \
917                    'min_sstable_size' : 52428800, \
918                    'bucket_low'       : 0.5, \
919                    'bucket_high'      : 1.5, \
920                    'min_threshold'    : 4, \
921                    'max_threshold'    : 32 \
922                }} \
923                AND compression = {{ \
924                    'sstable_compression': 'LZ4Compressor', \
925                    'chunk_length_in_kb':'4' \
926                }} \
927                AND caching = {{ \
928                    'enabled': 'true' \
929                }}",
930                KEYSPACE, namespace
931            ))
932            .await?;
933        session
934            .execute_single_page(&statement, &[], PagingState::start())
935            .await?;
936        Ok(())
937    }
938
939    async fn delete(
940        config: &Self::Config,
941        namespace: &str,
942    ) -> Result<(), ScyllaDbStoreInternalError> {
943        Self::check_namespace(namespace)?;
944        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
945        let statement = session
946            .prepare(format!("DROP TABLE IF EXISTS {}.{};", KEYSPACE, namespace))
947            .await?;
948        session
949            .execute_single_page(&statement, &[], PagingState::start())
950            .await?;
951        Ok(())
952    }
953}
954
955impl ScyllaDbStoreInternal {
956    /// Obtains the semaphore lock on the database if needed.
957    async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
958        match &self.semaphore {
959            None => None,
960            Some(count) => Some(count.acquire().await),
961        }
962    }
963
964    fn check_namespace(namespace: &str) -> Result<(), ScyllaDbStoreInternalError> {
965        if !namespace.is_empty()
966            && namespace.len() <= 48
967            && namespace
968                .chars()
969                .all(|c| c.is_ascii_alphanumeric() || c == '_')
970        {
971            return Ok(());
972        }
973        Err(ScyllaDbStoreInternalError::InvalidNamespace)
974    }
975}
976
977#[cfg(with_testing)]
978impl TestKeyValueStore for JournalingKeyValueStore<ScyllaDbStoreInternal> {
979    async fn new_test_config() -> Result<ScyllaDbStoreInternalConfig, ScyllaDbStoreInternalError> {
980        // TODO(#4114): Read the port from an environment variable.
981        let uri = "localhost:9042".to_string();
982        Ok(ScyllaDbStoreInternalConfig {
983            uri,
984            max_concurrent_queries: Some(10),
985            max_stream_queries: 10,
986            replication_factor: 1,
987        })
988    }
989}
990
991/// The `ScyllaDbStore` composed type with metrics
992#[cfg(with_metrics)]
993pub type ScyllaDbStore = MeteredStore<
994    LruCachingStore<
995        MeteredStore<
996            ValueSplittingStore<MeteredStore<JournalingKeyValueStore<ScyllaDbStoreInternal>>>,
997        >,
998    >,
999>;
1000
1001/// The `ScyllaDbStore` composed type
1002#[cfg(not(with_metrics))]
1003pub type ScyllaDbStore =
1004    LruCachingStore<ValueSplittingStore<JournalingKeyValueStore<ScyllaDbStoreInternal>>>;
1005
1006/// The `ScyllaDbStoreConfig` input type
1007pub type ScyllaDbStoreConfig = LruCachingConfig<ScyllaDbStoreInternalConfig>;
1008
1009/// The combined error type for the `ScyllaDbStore`.
1010pub type ScyllaDbStoreError = ValueSplittingError<ScyllaDbStoreInternalError>;