linera_views/backends/
scylla_db.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] for the ScyllaDB database.
5//!
6//! The current connection is done via a Session and a corresponding primary key called
7//! "namespace". The maximum number of concurrent queries is controlled by
8//! `max_concurrent_queries`.
9
10use std::{
11    collections::{BTreeSet, HashMap},
12    ops::Deref,
13    sync::Arc,
14};
15
16use async_lock::{Semaphore, SemaphoreGuard};
17use futures::{future::join_all, StreamExt as _};
18use linera_base::ensure;
19use scylla::{
20    client::{
21        execution_profile::{ExecutionProfile, ExecutionProfileHandle},
22        session::Session,
23        session_builder::SessionBuilder,
24    },
25    deserialize::{DeserializationError, TypeCheckError},
26    errors::{
27        DbError, ExecutionError, IntoRowsResultError, NewSessionError, NextPageError, NextRowError,
28        PagerExecutionError, PrepareError, RequestAttemptError, RequestError, RowsError,
29    },
30    policies::{
31        load_balancing::{DefaultPolicy, LoadBalancingPolicy},
32        retry::DefaultRetryPolicy,
33    },
34    response::PagingState,
35    statement::{batch::BatchType, prepared::PreparedStatement, Consistency},
36};
37use serde::{Deserialize, Serialize};
38use thiserror::Error;
39
40#[cfg(with_metrics)]
41use crate::metering::MeteredDatabase;
42#[cfg(with_testing)]
43use crate::store::TestKeyValueDatabase;
44use crate::{
45    batch::UnorderedBatch,
46    common::{get_uleb128_size, get_upper_bound_option},
47    journaling::{JournalConsistencyError, JournalingKeyValueDatabase},
48    lru_caching::{LruCachingConfig, LruCachingDatabase},
49    store::{
50        DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
51        WithError,
52    },
53    value_splitting::{ValueSplittingDatabase, ValueSplittingError},
54    FutureSyncExt as _,
55};
56
57/// Fundamental constant in ScyllaDB: The maximum size of a multi keys query
58/// The limit is in reality 100. But we need one entry for the root key.
59const MAX_MULTI_KEYS: usize = 100 - 1;
60
61/// The maximal size of an operation on ScyllaDB seems to be 16 MiB
62/// https://www.scylladb.com/2019/03/27/best-practices-for-scylla-applications/
63/// "There is a hard limit at 16 MiB, and nothing bigger than that can arrive at once
64///  at the database at any particular time"
65/// So, we set up the maximal size of 16 MiB - 10 KiB for the values and 10 KiB for the keys
66/// We also arbitrarily decrease the size by 4000 bytes because an amount of size is
67/// taken internally by the database.
68const RAW_MAX_VALUE_SIZE: usize = 16 * 1024 * 1024 - 10 * 1024 - 4000;
69const MAX_KEY_SIZE: usize = 10 * 1024;
70const MAX_BATCH_TOTAL_SIZE: usize = RAW_MAX_VALUE_SIZE + MAX_KEY_SIZE;
71
72/// The `RAW_MAX_VALUE_SIZE` is the maximum size on the ScyllaDB storage.
73/// However, the value being written can also be the serialization of a `SimpleUnorderedBatch`
74/// Therefore the actual `MAX_VALUE_SIZE` is lower.
75/// At the maximum the key size is 1024 bytes (see below) and we pack just one entry.
76/// So if the key has 1024 bytes this gets us the inequality
77/// `1 + 1 + 1 + serialized_size(MAX_KEY_SIZE)? + serialized_size(x)? <= RAW_MAX_VALUE_SIZE`.
78/// and so this simplifies to `1 + 1 + 1 + (2 + 10240) + (4 + x) <= RAW_MAX_VALUE_SIZE`
79/// Note on the above formula:
80/// * We write 4 because `get_uleb128_size(RAW_MAX_VALUE_SIZE) = 4)`
81/// * We write `1 + 1 + 1`  because the `UnorderedBatch` has three entries.
82///
83/// This gets us to a maximal value of 16752727.
84const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
85    - MAX_KEY_SIZE
86    - get_uleb128_size(RAW_MAX_VALUE_SIZE)
87    - get_uleb128_size(MAX_KEY_SIZE)
88    - 3;
89
90/// The constant 14000 is an empirical constant that was found to be necessary
91/// to make the ScyllaDB system work. We have not been able to find this or
92/// a similar constant in the source code or the documentation.
93/// An experimental approach gets us that 14796 is the latest value that is
94/// correct.
95const MAX_BATCH_SIZE: usize = 5000;
96
97/// The keyspace to use for the ScyllaDB database.
98const KEYSPACE: &str = "kv";
99
100/// The client for ScyllaDB:
101/// * The session allows to pass queries
102/// * The namespace that is being assigned to the database
103/// * The prepared queries used for implementing the features of `KeyValueStore`.
104struct ScyllaDbClient {
105    session: Session,
106    namespace: String,
107    read_value: PreparedStatement,
108    contains_key: PreparedStatement,
109    write_batch_delete_prefix_unbounded: PreparedStatement,
110    write_batch_delete_prefix_bounded: PreparedStatement,
111    write_batch_deletion: PreparedStatement,
112    write_batch_insertion: PreparedStatement,
113    find_keys_by_prefix_unbounded: PreparedStatement,
114    find_keys_by_prefix_bounded: PreparedStatement,
115    find_key_values_by_prefix_unbounded: PreparedStatement,
116    find_key_values_by_prefix_bounded: PreparedStatement,
117    multi_key_values: papaya::HashMap<usize, PreparedStatement>,
118    multi_keys: papaya::HashMap<usize, PreparedStatement>,
119}
120
121impl ScyllaDbClient {
122    async fn new(session: Session, namespace: &str) -> Result<Self, ScyllaDbStoreInternalError> {
123        let namespace = namespace.to_string();
124        let read_value = session
125            .prepare(format!(
126                "SELECT v FROM {}.\"{}\" WHERE root_key = ? AND k = ?",
127                KEYSPACE, namespace
128            ))
129            .await?;
130
131        let contains_key = session
132            .prepare(format!(
133                "SELECT root_key FROM {}.\"{}\" WHERE root_key = ? AND k = ?",
134                KEYSPACE, namespace
135            ))
136            .await?;
137
138        let write_batch_delete_prefix_unbounded = session
139            .prepare(format!(
140                "DELETE FROM {}.\"{}\" WHERE root_key = ? AND k >= ?",
141                KEYSPACE, namespace
142            ))
143            .await?;
144
145        let write_batch_delete_prefix_bounded = session
146            .prepare(format!(
147                "DELETE FROM {}.\"{}\" WHERE root_key = ? AND k >= ? AND k < ?",
148                KEYSPACE, namespace
149            ))
150            .await?;
151
152        let write_batch_deletion = session
153            .prepare(format!(
154                "DELETE FROM {}.\"{}\" WHERE root_key = ? AND k = ?",
155                KEYSPACE, namespace
156            ))
157            .await?;
158
159        let write_batch_insertion = session
160            .prepare(format!(
161                "INSERT INTO {}.\"{}\" (root_key, k, v) VALUES (?, ?, ?)",
162                KEYSPACE, namespace
163            ))
164            .await?;
165
166        let find_keys_by_prefix_unbounded = session
167            .prepare(format!(
168                "SELECT k FROM {}.\"{}\" WHERE root_key = ? AND k >= ?",
169                KEYSPACE, namespace
170            ))
171            .await?;
172
173        let find_keys_by_prefix_bounded = session
174            .prepare(format!(
175                "SELECT k FROM {}.\"{}\" WHERE root_key = ? AND k >= ? AND k < ?",
176                KEYSPACE, namespace
177            ))
178            .await?;
179
180        let find_key_values_by_prefix_unbounded = session
181            .prepare(format!(
182                "SELECT k,v FROM {}.\"{}\" WHERE root_key = ? AND k >= ?",
183                KEYSPACE, namespace
184            ))
185            .await?;
186
187        let find_key_values_by_prefix_bounded = session
188            .prepare(format!(
189                "SELECT k,v FROM {}.\"{}\" WHERE root_key = ? AND k >= ? AND k < ?",
190                KEYSPACE, namespace
191            ))
192            .await?;
193
194        Ok(Self {
195            session,
196            namespace,
197            read_value,
198            contains_key,
199            write_batch_delete_prefix_unbounded,
200            write_batch_delete_prefix_bounded,
201            write_batch_deletion,
202            write_batch_insertion,
203            find_keys_by_prefix_unbounded,
204            find_keys_by_prefix_bounded,
205            find_key_values_by_prefix_unbounded,
206            find_key_values_by_prefix_bounded,
207            multi_key_values: papaya::HashMap::new(),
208            multi_keys: papaya::HashMap::new(),
209        })
210    }
211
212    fn build_default_policy() -> Arc<dyn LoadBalancingPolicy> {
213        DefaultPolicy::builder().token_aware(true).build()
214    }
215
216    fn build_default_execution_profile_handle(
217        policy: Arc<dyn LoadBalancingPolicy>,
218    ) -> ExecutionProfileHandle {
219        let default_profile = ExecutionProfile::builder()
220            .load_balancing_policy(policy)
221            .retry_policy(Arc::new(DefaultRetryPolicy::new()))
222            .consistency(Consistency::LocalQuorum)
223            .build();
224        default_profile.into_handle()
225    }
226
227    async fn build_default_session(uri: &str) -> Result<Session, ScyllaDbStoreInternalError> {
228        // This explicitly sets a lot of default parameters for clarity and for making future changes
229        // easier.
230        SessionBuilder::new()
231            .known_node(uri)
232            .default_execution_profile_handle(Self::build_default_execution_profile_handle(
233                Self::build_default_policy(),
234            ))
235            .build()
236            .boxed_sync()
237            .await
238            .map_err(Into::into)
239    }
240
241    async fn get_multi_key_values_statement(
242        &self,
243        num_markers: usize,
244    ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
245        if let Some(prepared_statement) = self.multi_key_values.pin().get(&num_markers) {
246            return Ok(prepared_statement.clone());
247        }
248        let markers = std::iter::repeat_n("?", num_markers)
249            .collect::<Vec<_>>()
250            .join(",");
251        let prepared_statement = self
252            .session
253            .prepare(format!(
254                "SELECT k,v FROM {}.\"{}\" WHERE root_key = ? AND k IN ({})",
255                KEYSPACE, self.namespace, markers
256            ))
257            .await?;
258        self.multi_key_values
259            .pin()
260            .insert(num_markers, prepared_statement.clone());
261        Ok(prepared_statement)
262    }
263
264    async fn get_multi_keys_statement(
265        &self,
266        num_markers: usize,
267    ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
268        if let Some(prepared_statement) = self.multi_keys.pin().get(&num_markers) {
269            return Ok(prepared_statement.clone());
270        };
271        let markers = std::iter::repeat_n("?", num_markers)
272            .collect::<Vec<_>>()
273            .join(",");
274        let prepared_statement = self
275            .session
276            .prepare(format!(
277                "SELECT k FROM {}.\"{}\" WHERE root_key = ? AND k IN ({})",
278                KEYSPACE, self.namespace, markers
279            ))
280            .await?;
281        self.multi_keys
282            .pin()
283            .insert(num_markers, prepared_statement.clone());
284        Ok(prepared_statement)
285    }
286
287    fn check_key_size(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
288        ensure!(
289            key.len() <= MAX_KEY_SIZE,
290            ScyllaDbStoreInternalError::KeyTooLong
291        );
292        Ok(())
293    }
294
295    fn check_value_size(value: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
296        ensure!(
297            value.len() <= RAW_MAX_VALUE_SIZE,
298            ScyllaDbStoreInternalError::ValueTooLong
299        );
300        Ok(())
301    }
302
303    fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> {
304        ensure!(
305            batch.len() <= MAX_BATCH_SIZE,
306            ScyllaDbStoreInternalError::BatchTooLong
307        );
308        Ok(())
309    }
310
311    async fn read_value_internal(
312        &self,
313        root_key: &[u8],
314        key: Vec<u8>,
315    ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
316        Self::check_key_size(&key)?;
317        let session = &self.session;
318        // Read the value of a key
319        let values = (root_key.to_vec(), key);
320
321        let (result, _) = session
322            .execute_single_page(&self.read_value, &values, PagingState::start())
323            .await?;
324        let rows = result.into_rows_result()?;
325        let mut rows = rows.rows::<(Vec<u8>,)>()?;
326        Ok(match rows.next() {
327            Some(row) => Some(row?.0),
328            None => None,
329        })
330    }
331
332    fn get_occurrences_map(
333        keys: Vec<Vec<u8>>,
334    ) -> Result<HashMap<Vec<u8>, Vec<usize>>, ScyllaDbStoreInternalError> {
335        let mut map = HashMap::<Vec<u8>, Vec<usize>>::new();
336        for (i_key, key) in keys.into_iter().enumerate() {
337            Self::check_key_size(&key)?;
338            map.entry(key).or_default().push(i_key);
339        }
340        Ok(map)
341    }
342
343    async fn read_multi_values_internal(
344        &self,
345        root_key: &[u8],
346        keys: Vec<Vec<u8>>,
347    ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
348        let mut values = vec![None; keys.len()];
349        let map = Self::get_occurrences_map(keys)?;
350        let statement = self.get_multi_key_values_statement(map.len()).await?;
351        let mut inputs = vec![root_key.to_vec()];
352        inputs.extend(map.keys().cloned());
353        let mut rows = self
354            .session
355            .execute_iter(statement, &inputs)
356            .await?
357            .rows_stream::<(Vec<u8>, Vec<u8>)>()?;
358
359        while let Some(row) = rows.next().await {
360            let (key, value) = row?;
361            if let Some((&last, rest)) = map[&key].split_last() {
362                for position in rest {
363                    values[*position] = Some(value.clone());
364                }
365                values[last] = Some(value);
366            }
367        }
368        Ok(values)
369    }
370
371    async fn contains_keys_internal(
372        &self,
373        root_key: &[u8],
374        keys: Vec<Vec<u8>>,
375    ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
376        let mut values = vec![false; keys.len()];
377        let map = Self::get_occurrences_map(keys)?;
378        let statement = self.get_multi_keys_statement(map.len()).await?;
379        let mut inputs = vec![root_key.to_vec()];
380        inputs.extend(map.keys().cloned());
381        let mut rows = self
382            .session
383            .execute_iter(statement, &inputs)
384            .await?
385            .rows_stream::<(Vec<u8>,)>()?;
386
387        while let Some(row) = rows.next().await {
388            let (key,) = row?;
389            for i_key in &map[&key] {
390                values[*i_key] = true;
391            }
392        }
393
394        Ok(values)
395    }
396
397    async fn contains_key_internal(
398        &self,
399        root_key: &[u8],
400        key: Vec<u8>,
401    ) -> Result<bool, ScyllaDbStoreInternalError> {
402        Self::check_key_size(&key)?;
403        let session = &self.session;
404        // Read the value of a key
405        let values = (root_key.to_vec(), key);
406
407        let (result, _) = session
408            .execute_single_page(&self.contains_key, &values, PagingState::start())
409            .await?;
410        let rows = result.into_rows_result()?;
411        let mut rows = rows.rows::<(Vec<u8>,)>()?;
412        Ok(rows.next().is_some())
413    }
414
415    async fn write_batch_internal(
416        &self,
417        root_key: &[u8],
418        batch: UnorderedBatch,
419    ) -> Result<(), ScyllaDbStoreInternalError> {
420        let session = &self.session;
421        let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged);
422        let mut batch_values = Vec::new();
423        let query1 = &self.write_batch_delete_prefix_unbounded;
424        let query2 = &self.write_batch_delete_prefix_bounded;
425        Self::check_batch_len(&batch)?;
426        for key_prefix in batch.key_prefix_deletions {
427            Self::check_key_size(&key_prefix)?;
428            match get_upper_bound_option(&key_prefix) {
429                None => {
430                    let values = vec![root_key.to_vec(), key_prefix];
431                    batch_values.push(values);
432                    batch_query.append_statement(query1.clone());
433                }
434                Some(upper_bound) => {
435                    let values = vec![root_key.to_vec(), key_prefix, upper_bound];
436                    batch_values.push(values);
437                    batch_query.append_statement(query2.clone());
438                }
439            }
440        }
441        let query3 = &self.write_batch_deletion;
442        for key in batch.simple_unordered_batch.deletions {
443            Self::check_key_size(&key)?;
444            let values = vec![root_key.to_vec(), key];
445            batch_values.push(values);
446            batch_query.append_statement(query3.clone());
447        }
448        let query4 = &self.write_batch_insertion;
449        for (key, value) in batch.simple_unordered_batch.insertions {
450            Self::check_key_size(&key)?;
451            Self::check_value_size(&value)?;
452            let values = vec![root_key.to_vec(), key, value];
453            batch_values.push(values);
454            batch_query.append_statement(query4.clone());
455        }
456        session.batch(&batch_query, batch_values).await?;
457        Ok(())
458    }
459
460    async fn find_keys_by_prefix_internal(
461        &self,
462        root_key: &[u8],
463        key_prefix: Vec<u8>,
464    ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
465        Self::check_key_size(&key_prefix)?;
466        let session = &self.session;
467        // Read the value of a key
468        let len = key_prefix.len();
469        let query_unbounded = &self.find_keys_by_prefix_unbounded;
470        let query_bounded = &self.find_keys_by_prefix_bounded;
471        let rows = match get_upper_bound_option(&key_prefix) {
472            None => {
473                let values = (root_key.to_vec(), key_prefix.clone());
474                session
475                    .execute_iter(query_unbounded.clone(), values)
476                    .await?
477            }
478            Some(upper_bound) => {
479                let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
480                session.execute_iter(query_bounded.clone(), values).await?
481            }
482        };
483        let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
484        let mut keys = Vec::new();
485        while let Some(row) = rows.next().await {
486            let (key,) = row?;
487            let short_key = key[len..].to_vec();
488            keys.push(short_key);
489        }
490        Ok(keys)
491    }
492
493    async fn find_key_values_by_prefix_internal(
494        &self,
495        root_key: &[u8],
496        key_prefix: Vec<u8>,
497    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
498        Self::check_key_size(&key_prefix)?;
499        let session = &self.session;
500        // Read the value of a key
501        let len = key_prefix.len();
502        let query_unbounded = &self.find_key_values_by_prefix_unbounded;
503        let query_bounded = &self.find_key_values_by_prefix_bounded;
504        let rows = match get_upper_bound_option(&key_prefix) {
505            None => {
506                let values = (root_key.to_vec(), key_prefix.clone());
507                session
508                    .execute_iter(query_unbounded.clone(), values)
509                    .await?
510            }
511            Some(upper_bound) => {
512                let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
513                session.execute_iter(query_bounded.clone(), values).await?
514            }
515        };
516        let mut rows = rows.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
517        let mut key_values = Vec::new();
518        while let Some(row) = rows.next().await {
519            let (key, value) = row?;
520            let short_key = key[len..].to_vec();
521            key_values.push((short_key, value));
522        }
523        Ok(key_values)
524    }
525}
526
527/// The client itself and the keeping of the count of active connections.
528#[derive(Clone)]
529pub struct ScyllaDbStoreInternal {
530    store: Arc<ScyllaDbClient>,
531    semaphore: Option<Arc<Semaphore>>,
532    max_stream_queries: usize,
533    root_key: Vec<u8>,
534}
535
536/// Database-level connection to ScyllaDB for managing namespaces and partitions.
537#[derive(Clone)]
538pub struct ScyllaDbDatabaseInternal {
539    store: Arc<ScyllaDbClient>,
540    semaphore: Option<Arc<Semaphore>>,
541    max_stream_queries: usize,
542}
543
544impl WithError for ScyllaDbDatabaseInternal {
545    type Error = ScyllaDbStoreInternalError;
546}
547
548/// The error type for [`ScyllaDbStoreInternal`]
549#[derive(Error, Debug)]
550pub enum ScyllaDbStoreInternalError {
551    /// BCS serialization error.
552    #[error(transparent)]
553    BcsError(#[from] bcs::Error),
554
555    /// The key must have at most `MAX_KEY_SIZE` bytes
556    #[error("The key must have at most MAX_KEY_SIZE")]
557    KeyTooLong,
558
559    /// The value must have at most `RAW_MAX_VALUE_SIZE` bytes
560    #[error("The value must have at most RAW_MAX_VALUE_SIZE")]
561    ValueTooLong,
562
563    /// A deserialization error in ScyllaDB
564    #[error(transparent)]
565    DeserializationError(#[from] DeserializationError),
566
567    /// A row error in ScyllaDB
568    #[error(transparent)]
569    RowsError(#[from] RowsError),
570
571    /// A type error in the accessed data in ScyllaDB
572    #[error(transparent)]
573    IntoRowsResultError(#[from] IntoRowsResultError),
574
575    /// A type check error in ScyllaDB
576    #[error(transparent)]
577    TypeCheckError(#[from] TypeCheckError),
578
579    /// A query error in ScyllaDB
580    #[error(transparent)]
581    PagerExecutionError(#[from] PagerExecutionError),
582
583    /// A query error in ScyllaDB
584    #[error(transparent)]
585    ScyllaDbNewSessionError(#[from] NewSessionError),
586
587    /// Namespace contains forbidden characters
588    #[error("Namespace contains forbidden characters")]
589    InvalidNamespace,
590
591    /// The journal is not coherent
592    #[error(transparent)]
593    JournalConsistencyError(#[from] JournalConsistencyError),
594
595    /// The batch is too long to be written
596    #[error("The batch is too long to be written")]
597    BatchTooLong,
598
599    /// A prepare error in ScyllaDB
600    #[error(transparent)]
601    PrepareError(#[from] PrepareError),
602
603    /// An execution error in ScyllaDB
604    #[error(transparent)]
605    ExecutionError(#[from] ExecutionError),
606
607    /// A next row error in ScyllaDB
608    #[error(transparent)]
609    NextRowError(#[from] NextRowError),
610}
611
612impl KeyValueStoreError for ScyllaDbStoreInternalError {
613    const BACKEND: &'static str = "scylla_db";
614}
615
616impl WithError for ScyllaDbStoreInternal {
617    type Error = ScyllaDbStoreInternalError;
618}
619
620impl ReadableKeyValueStore for ScyllaDbStoreInternal {
621    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
622
623    fn max_stream_queries(&self) -> usize {
624        self.max_stream_queries
625    }
626
627    async fn read_value_bytes(
628        &self,
629        key: &[u8],
630    ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
631        let store = self.store.deref();
632        let _guard = self.acquire().await;
633        store
634            .read_value_internal(&self.root_key, key.to_vec())
635            .await
636    }
637
638    async fn contains_key(&self, key: &[u8]) -> Result<bool, ScyllaDbStoreInternalError> {
639        let store = self.store.deref();
640        let _guard = self.acquire().await;
641        store
642            .contains_key_internal(&self.root_key, key.to_vec())
643            .await
644    }
645
646    async fn contains_keys(
647        &self,
648        keys: Vec<Vec<u8>>,
649    ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
650        if keys.is_empty() {
651            return Ok(Vec::new());
652        }
653        let store = self.store.deref();
654        let _guard = self.acquire().await;
655        let handles = keys
656            .chunks(MAX_MULTI_KEYS)
657            .map(|keys| store.contains_keys_internal(&self.root_key, keys.to_vec()));
658        let results: Vec<_> = join_all(handles)
659            .await
660            .into_iter()
661            .collect::<Result<_, _>>()?;
662        Ok(results.into_iter().flatten().collect())
663    }
664
665    async fn read_multi_values_bytes(
666        &self,
667        keys: Vec<Vec<u8>>,
668    ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
669        if keys.is_empty() {
670            return Ok(Vec::new());
671        }
672        let store = self.store.deref();
673        let _guard = self.acquire().await;
674        let handles = keys
675            .chunks(MAX_MULTI_KEYS)
676            .map(|keys| store.read_multi_values_internal(&self.root_key, keys.to_vec()));
677        let results: Vec<_> = join_all(handles)
678            .await
679            .into_iter()
680            .collect::<Result<_, _>>()?;
681        Ok(results.into_iter().flatten().collect())
682    }
683
684    async fn find_keys_by_prefix(
685        &self,
686        key_prefix: &[u8],
687    ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
688        let store = self.store.deref();
689        let _guard = self.acquire().await;
690        store
691            .find_keys_by_prefix_internal(&self.root_key, key_prefix.to_vec())
692            .await
693    }
694
695    async fn find_key_values_by_prefix(
696        &self,
697        key_prefix: &[u8],
698    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
699        let store = self.store.deref();
700        let _guard = self.acquire().await;
701        store
702            .find_key_values_by_prefix_internal(&self.root_key, key_prefix.to_vec())
703            .await
704    }
705}
706
707impl DirectWritableKeyValueStore for ScyllaDbStoreInternal {
708    const MAX_BATCH_SIZE: usize = MAX_BATCH_SIZE;
709    const MAX_BATCH_TOTAL_SIZE: usize = MAX_BATCH_TOTAL_SIZE;
710    const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
711
712    // ScyllaDB cannot take a `crate::batch::Batch` directly. Indeed, if a delete is
713    // followed by a write, then the delete takes priority. See the sentence "The first
714    // tie-breaking rule when two cells have the same write timestamp is that dead cells
715    // win over live cells" from
716    // https://github.com/scylladb/scylladb/blob/master/docs/dev/timestamp-conflict-resolution.md
717    type Batch = UnorderedBatch;
718
719    async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> {
720        let store = self.store.deref();
721        let _guard = self.acquire().await;
722        store.write_batch_internal(&self.root_key, batch).await
723    }
724}
725
726// ScyllaDB requires that the keys are non-empty.
727fn get_big_root_key(root_key: &[u8]) -> Vec<u8> {
728    let mut big_key = vec![0];
729    big_key.extend(root_key);
730    big_key
731}
732
733/// The type for building a new ScyllaDB Key Value Store
734#[derive(Clone, Debug, Deserialize, Serialize)]
735pub struct ScyllaDbStoreInternalConfig {
736    /// The URL to which the requests have to be sent
737    pub uri: String,
738    /// Maximum number of concurrent database queries allowed for this client.
739    pub max_concurrent_queries: Option<usize>,
740    /// Preferred buffer size for async streams.
741    pub max_stream_queries: usize,
742    /// The replication factor.
743    pub replication_factor: u32,
744}
745
746impl KeyValueDatabase for ScyllaDbDatabaseInternal {
747    type Config = ScyllaDbStoreInternalConfig;
748    type Store = ScyllaDbStoreInternal;
749
750    fn get_name() -> String {
751        "scylladb internal".to_string()
752    }
753
754    async fn connect(
755        config: &Self::Config,
756        namespace: &str,
757    ) -> Result<Self, ScyllaDbStoreInternalError> {
758        Self::check_namespace(namespace)?;
759        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
760        let store = ScyllaDbClient::new(session, namespace).await?;
761        let store = Arc::new(store);
762        let semaphore = config
763            .max_concurrent_queries
764            .map(|n| Arc::new(Semaphore::new(n)));
765        let max_stream_queries = config.max_stream_queries;
766        Ok(Self {
767            store,
768            semaphore,
769            max_stream_queries,
770        })
771    }
772
773    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
774        let store = self.store.clone();
775        let semaphore = self.semaphore.clone();
776        let max_stream_queries = self.max_stream_queries;
777        let root_key = get_big_root_key(root_key);
778        Ok(ScyllaDbStoreInternal {
779            store,
780            semaphore,
781            max_stream_queries,
782            root_key,
783        })
784    }
785
786    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
787        self.open_shared(root_key)
788    }
789
790    async fn list_all(config: &Self::Config) -> Result<Vec<String>, ScyllaDbStoreInternalError> {
791        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
792        let statement = session
793            .prepare(format!("DESCRIBE KEYSPACE {}", KEYSPACE))
794            .await?;
795        let result = session.execute_iter(statement, &[]).await;
796        let miss_msg = format!("'{}' not found in keyspaces", KEYSPACE);
797        let result = match result {
798            Ok(result) => result,
799            Err(error) => {
800                let invalid_or_keyspace_not_found = match &error {
801                    PagerExecutionError::NextPageError(NextPageError::RequestFailure(
802                        RequestError::LastAttemptError(RequestAttemptError::DbError(db_error, msg)),
803                    )) => *db_error == DbError::Invalid && msg.as_str() == miss_msg,
804                    _ => false,
805                };
806                if invalid_or_keyspace_not_found {
807                    return Ok(Vec::new());
808                } else {
809                    return Err(ScyllaDbStoreInternalError::PagerExecutionError(error));
810                }
811            }
812        };
813        let mut namespaces = Vec::new();
814        let mut rows_stream = result.rows_stream::<(String, String, String, String)>()?;
815        while let Some(row) = rows_stream.next().await {
816            let (_, object_kind, name, _) = row?;
817            if object_kind == "table" {
818                namespaces.push(name);
819            }
820        }
821        Ok(namespaces)
822    }
823
824    async fn list_root_keys(
825        config: &Self::Config,
826        namespace: &str,
827    ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
828        Self::check_namespace(namespace)?;
829        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
830        let statement = session
831            .prepare(format!(
832                "SELECT root_key FROM {}.\"{}\" ALLOW FILTERING",
833                KEYSPACE, namespace
834            ))
835            .await?;
836
837        // Execute the query
838        let rows = session.execute_iter(statement, &[]).await?;
839        let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
840        let mut root_keys = BTreeSet::new();
841        while let Some(row) = rows.next().await {
842            let (root_key,) = row?;
843            let root_key = root_key[1..].to_vec();
844            root_keys.insert(root_key);
845        }
846        Ok(root_keys.into_iter().collect::<Vec<_>>())
847    }
848
849    async fn delete_all(store_config: &Self::Config) -> Result<(), ScyllaDbStoreInternalError> {
850        let session = ScyllaDbClient::build_default_session(&store_config.uri).await?;
851        let statement = session
852            .prepare(format!("DROP KEYSPACE IF EXISTS {}", KEYSPACE))
853            .await?;
854
855        session
856            .execute_single_page(&statement, &[], PagingState::start())
857            .await?;
858        Ok(())
859    }
860
861    async fn exists(
862        config: &Self::Config,
863        namespace: &str,
864    ) -> Result<bool, ScyllaDbStoreInternalError> {
865        Self::check_namespace(namespace)?;
866        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
867
868        // We check the way the test can fail. It can fail in different ways.
869        let result = session
870            .prepare(format!(
871                "SELECT root_key FROM {}.\"{}\" LIMIT 1 ALLOW FILTERING",
872                KEYSPACE, namespace
873            ))
874            .await;
875
876        // The missing table translates into a very specific error that we matched
877        let miss_msg1 = format!("unconfigured table {}", namespace);
878        let miss_msg1 = miss_msg1.as_str();
879        let miss_msg2 = "Undefined name root_key in selection clause";
880        let miss_msg3 = format!("Keyspace {} does not exist", KEYSPACE);
881        let Err(error) = result else {
882            // If OK, then the table exists
883            return Ok(true);
884        };
885        let missing_table = match &error {
886            PrepareError::AllAttemptsFailed {
887                first_attempt: RequestAttemptError::DbError(db_error, msg),
888            } => {
889                if *db_error != DbError::Invalid {
890                    false
891                } else {
892                    msg.as_str() == miss_msg1
893                        || msg.as_str() == miss_msg2
894                        || msg.as_str() == miss_msg3
895                }
896            }
897            _ => false,
898        };
899        if missing_table {
900            Ok(false)
901        } else {
902            Err(ScyllaDbStoreInternalError::PrepareError(error))
903        }
904    }
905
906    async fn create(
907        config: &Self::Config,
908        namespace: &str,
909    ) -> Result<(), ScyllaDbStoreInternalError> {
910        Self::check_namespace(namespace)?;
911        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
912
913        // Create a keyspace if it doesn't exist
914        let statement = session
915            .prepare(format!(
916                "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ \
917                    'class' : 'NetworkTopologyStrategy', \
918                    'replication_factor' : {} \
919                }}",
920                KEYSPACE, config.replication_factor
921            ))
922            .await?;
923        session
924            .execute_single_page(&statement, &[], PagingState::start())
925            .await?;
926
927        // This explicitly sets a lot of default parameters for clarity and for making future
928        // changes easier.
929        let statement = session
930            .prepare(format!(
931                "CREATE TABLE {}.\"{}\" (\
932                    root_key blob, \
933                    k blob, \
934                    v blob, \
935                    PRIMARY KEY (root_key, k) \
936                ) \
937                WITH compaction = {{ \
938                    'class'            : 'SizeTieredCompactionStrategy', \
939                    'min_sstable_size' : 52428800, \
940                    'bucket_low'       : 0.5, \
941                    'bucket_high'      : 1.5, \
942                    'min_threshold'    : 4, \
943                    'max_threshold'    : 32 \
944                }} \
945                AND compression = {{ \
946                    'sstable_compression': 'LZ4Compressor', \
947                    'chunk_length_in_kb':'4' \
948                }} \
949                AND caching = {{ \
950                    'enabled': 'true' \
951                }}",
952                KEYSPACE, namespace
953            ))
954            .await?;
955        session
956            .execute_single_page(&statement, &[], PagingState::start())
957            .await?;
958        Ok(())
959    }
960
961    async fn delete(
962        config: &Self::Config,
963        namespace: &str,
964    ) -> Result<(), ScyllaDbStoreInternalError> {
965        Self::check_namespace(namespace)?;
966        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
967        let statement = session
968            .prepare(format!(
969                "DROP TABLE IF EXISTS {}.\"{}\";",
970                KEYSPACE, namespace
971            ))
972            .await?;
973        session
974            .execute_single_page(&statement, &[], PagingState::start())
975            .await?;
976        Ok(())
977    }
978}
979
980impl ScyllaDbStoreInternal {
981    /// Obtains the semaphore lock on the database if needed.
982    async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
983        match &self.semaphore {
984            None => None,
985            Some(count) => Some(count.acquire().await),
986        }
987    }
988}
989
990impl ScyllaDbDatabaseInternal {
991    fn check_namespace(namespace: &str) -> Result<(), ScyllaDbStoreInternalError> {
992        if !namespace.is_empty()
993            && namespace.len() <= 48
994            && namespace
995                .chars()
996                .all(|c| c.is_ascii_alphanumeric() || c == '_')
997        {
998            return Ok(());
999        }
1000        Err(ScyllaDbStoreInternalError::InvalidNamespace)
1001    }
1002}
1003
1004#[cfg(with_testing)]
1005impl TestKeyValueDatabase for JournalingKeyValueDatabase<ScyllaDbDatabaseInternal> {
1006    async fn new_test_config() -> Result<ScyllaDbStoreInternalConfig, ScyllaDbStoreInternalError> {
1007        // TODO(#4114): Read the port from an environment variable.
1008        let uri = "localhost:9042".to_string();
1009        Ok(ScyllaDbStoreInternalConfig {
1010            uri,
1011            max_concurrent_queries: Some(10),
1012            max_stream_queries: 10,
1013            replication_factor: 1,
1014        })
1015    }
1016}
1017
1018/// The `ScyllaDbDatabase` composed type with metrics
1019#[cfg(with_metrics)]
1020pub type ScyllaDbDatabase = MeteredDatabase<
1021    LruCachingDatabase<
1022        MeteredDatabase<
1023            ValueSplittingDatabase<
1024                MeteredDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
1025            >,
1026        >,
1027    >,
1028>;
1029
1030/// The `ScyllaDbDatabase` composed type
1031#[cfg(not(with_metrics))]
1032pub type ScyllaDbDatabase = LruCachingDatabase<
1033    ValueSplittingDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
1034>;
1035
1036/// The `ScyllaDbStoreConfig` input type
1037pub type ScyllaDbStoreConfig = LruCachingConfig<ScyllaDbStoreInternalConfig>;
1038
1039/// The combined error type for the `ScyllaDbDatabase`.
1040pub type ScyllaDbStoreError = ValueSplittingError<ScyllaDbStoreInternalError>;