linera_views/backends/
scylla_db.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] for the ScyllaDB database.
5//!
6//! The current connection is done via a Session and a corresponding primary key called
7//! "namespace". The maximum number of concurrent queries is controlled by
8//! `max_concurrent_queries`.
9
10use std::{
11    collections::{BTreeSet, HashMap},
12    ops::Deref,
13    sync::Arc,
14};
15
16use async_lock::{Semaphore, SemaphoreGuard};
17use futures::{future::join_all, StreamExt as _};
18use linera_base::ensure;
19use scylla::{
20    client::{
21        execution_profile::{ExecutionProfile, ExecutionProfileHandle},
22        session::Session,
23        session_builder::SessionBuilder,
24    },
25    deserialize::{DeserializationError, TypeCheckError},
26    errors::{
27        DbError, ExecutionError, IntoRowsResultError, NewSessionError, NextPageError, NextRowError,
28        PagerExecutionError, PrepareError, RequestAttemptError, RequestError, RowsError,
29    },
30    policies::{
31        load_balancing::{DefaultPolicy, LoadBalancingPolicy},
32        retry::DefaultRetryPolicy,
33    },
34    response::PagingState,
35    statement::{batch::BatchType, prepared::PreparedStatement, Consistency},
36};
37use serde::{Deserialize, Serialize};
38use thiserror::Error;
39
40#[cfg(with_metrics)]
41use crate::metering::MeteredDatabase;
42#[cfg(with_testing)]
43use crate::store::TestKeyValueDatabase;
44use crate::{
45    batch::UnorderedBatch,
46    common::{get_uleb128_size, get_upper_bound_option},
47    journaling::{JournalConsistencyError, JournalingKeyValueDatabase},
48    lru_caching::{LruCachingConfig, LruCachingDatabase},
49    store::{
50        DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
51        WithError,
52    },
53    value_splitting::{ValueSplittingDatabase, ValueSplittingError},
54    FutureSyncExt as _,
55};
56
57/// Fundamental constant in ScyllaDB: The maximum size of a multi keys query
58/// The limit is in reality 100. But we need one entry for the root key.
59const MAX_MULTI_KEYS: usize = 100 - 1;
60
61/// The maximal size of an operation on ScyllaDB seems to be 16 MiB
62/// https://www.scylladb.com/2019/03/27/best-practices-for-scylla-applications/
63/// "There is a hard limit at 16 MiB, and nothing bigger than that can arrive at once
64///  at the database at any particular time"
65/// So, we set up the maximal size of 16 MiB - 10 KiB for the values and 10 KiB for the keys
66/// We also arbitrarily decrease the size by 4000 bytes because an amount of size is
67/// taken internally by the database.
68const RAW_MAX_VALUE_SIZE: usize = 16 * 1024 * 1024 - 10 * 1024 - 4000;
69const MAX_KEY_SIZE: usize = 10 * 1024;
70const MAX_BATCH_TOTAL_SIZE: usize = RAW_MAX_VALUE_SIZE + MAX_KEY_SIZE;
71
72/// The `RAW_MAX_VALUE_SIZE` is the maximum size on the ScyllaDB storage.
73/// However, the value being written can also be the serialization of a `SimpleUnorderedBatch`
74/// Therefore the actual `MAX_VALUE_SIZE` is lower.
75/// At the maximum the key size is 1024 bytes (see below) and we pack just one entry.
76/// So if the key has 1024 bytes this gets us the inequality
77/// `1 + 1 + 1 + serialized_size(MAX_KEY_SIZE)? + serialized_size(x)? <= RAW_MAX_VALUE_SIZE`.
78/// and so this simplifies to `1 + 1 + 1 + (2 + 10240) + (4 + x) <= RAW_MAX_VALUE_SIZE`
79/// Note on the above formula:
80/// * We write 4 because `get_uleb128_size(RAW_MAX_VALUE_SIZE) = 4)`
81/// * We write `1 + 1 + 1`  because the `UnorderedBatch` has three entries.
82///
83/// This gets us to a maximal value of 16752727.
84const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
85    - MAX_KEY_SIZE
86    - get_uleb128_size(RAW_MAX_VALUE_SIZE)
87    - get_uleb128_size(MAX_KEY_SIZE)
88    - 3;
89
90/// The constant 14000 is an empirical constant that was found to be necessary
91/// to make the ScyllaDB system work. We have not been able to find this or
92/// a similar constant in the source code or the documentation.
93/// An experimental approach gets us that 14796 is the latest value that is
94/// correct.
95const MAX_BATCH_SIZE: usize = 5000;
96
97/// The keyspace to use for the ScyllaDB database.
98const KEYSPACE: &str = "kv";
99
100/// The client for ScyllaDB:
101/// * The session allows to pass queries
102/// * The namespace that is being assigned to the database
103/// * The prepared queries used for implementing the features of `KeyValueStore`.
104struct ScyllaDbClient {
105    session: Session,
106    namespace: String,
107    read_value: PreparedStatement,
108    contains_key: PreparedStatement,
109    write_batch_delete_prefix_unbounded: PreparedStatement,
110    write_batch_delete_prefix_bounded: PreparedStatement,
111    write_batch_deletion: PreparedStatement,
112    write_batch_insertion: PreparedStatement,
113    find_keys_by_prefix_unbounded: PreparedStatement,
114    find_keys_by_prefix_bounded: PreparedStatement,
115    find_key_values_by_prefix_unbounded: PreparedStatement,
116    find_key_values_by_prefix_bounded: PreparedStatement,
117    multi_key_values: papaya::HashMap<usize, PreparedStatement>,
118    multi_keys: papaya::HashMap<usize, PreparedStatement>,
119}
120
121impl ScyllaDbClient {
122    async fn new(session: Session, namespace: &str) -> Result<Self, ScyllaDbStoreInternalError> {
123        let namespace = namespace.to_string();
124        let read_value = session
125            .prepare(format!(
126                "SELECT v FROM {}.{} WHERE root_key = ? AND k = ?",
127                KEYSPACE, namespace
128            ))
129            .await?;
130
131        let contains_key = session
132            .prepare(format!(
133                "SELECT root_key FROM {}.{} WHERE root_key = ? AND k = ?",
134                KEYSPACE, namespace
135            ))
136            .await?;
137
138        let write_batch_delete_prefix_unbounded = session
139            .prepare(format!(
140                "DELETE FROM {}.{} WHERE root_key = ? AND k >= ?",
141                KEYSPACE, namespace
142            ))
143            .await?;
144
145        let write_batch_delete_prefix_bounded = session
146            .prepare(format!(
147                "DELETE FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?",
148                KEYSPACE, namespace
149            ))
150            .await?;
151
152        let write_batch_deletion = session
153            .prepare(format!(
154                "DELETE FROM {}.{} WHERE root_key = ? AND k = ?",
155                KEYSPACE, namespace
156            ))
157            .await?;
158
159        let write_batch_insertion = session
160            .prepare(format!(
161                "INSERT INTO {}.{} (root_key, k, v) VALUES (?, ?, ?)",
162                KEYSPACE, namespace
163            ))
164            .await?;
165
166        let find_keys_by_prefix_unbounded = session
167            .prepare(format!(
168                "SELECT k FROM {}.{} WHERE root_key = ? AND k >= ?",
169                KEYSPACE, namespace
170            ))
171            .await?;
172
173        let find_keys_by_prefix_bounded = session
174            .prepare(format!(
175                "SELECT k FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?",
176                KEYSPACE, namespace
177            ))
178            .await?;
179
180        let find_key_values_by_prefix_unbounded = session
181            .prepare(format!(
182                "SELECT k,v FROM {}.{} WHERE root_key = ? AND k >= ?",
183                KEYSPACE, namespace
184            ))
185            .await?;
186
187        let find_key_values_by_prefix_bounded = session
188            .prepare(format!(
189                "SELECT k,v FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?",
190                KEYSPACE, namespace
191            ))
192            .await?;
193
194        Ok(Self {
195            session,
196            namespace,
197            read_value,
198            contains_key,
199            write_batch_delete_prefix_unbounded,
200            write_batch_delete_prefix_bounded,
201            write_batch_deletion,
202            write_batch_insertion,
203            find_keys_by_prefix_unbounded,
204            find_keys_by_prefix_bounded,
205            find_key_values_by_prefix_unbounded,
206            find_key_values_by_prefix_bounded,
207            multi_key_values: papaya::HashMap::new(),
208            multi_keys: papaya::HashMap::new(),
209        })
210    }
211
212    fn build_default_policy() -> Arc<dyn LoadBalancingPolicy> {
213        DefaultPolicy::builder().token_aware(true).build()
214    }
215
216    fn build_default_execution_profile_handle(
217        policy: Arc<dyn LoadBalancingPolicy>,
218    ) -> ExecutionProfileHandle {
219        let default_profile = ExecutionProfile::builder()
220            .load_balancing_policy(policy)
221            .retry_policy(Arc::new(DefaultRetryPolicy::new()))
222            .consistency(Consistency::LocalQuorum)
223            .build();
224        default_profile.into_handle()
225    }
226
227    async fn build_default_session(uri: &str) -> Result<Session, ScyllaDbStoreInternalError> {
228        // This explicitly sets a lot of default parameters for clarity and for making future changes
229        // easier.
230        SessionBuilder::new()
231            .known_node(uri)
232            .default_execution_profile_handle(Self::build_default_execution_profile_handle(
233                Self::build_default_policy(),
234            ))
235            .build()
236            .boxed_sync()
237            .await
238            .map_err(Into::into)
239    }
240
241    async fn get_multi_key_values_statement(
242        &self,
243        num_markers: usize,
244    ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
245        if let Some(prepared_statement) = self.multi_key_values.pin().get(&num_markers) {
246            return Ok(prepared_statement.clone());
247        }
248        let markers = std::iter::repeat_n("?", num_markers)
249            .collect::<Vec<_>>()
250            .join(",");
251        let prepared_statement = self
252            .session
253            .prepare(format!(
254                "SELECT k,v FROM {}.{} WHERE root_key = ? AND k IN ({})",
255                KEYSPACE, self.namespace, markers
256            ))
257            .await?;
258        self.multi_key_values
259            .pin()
260            .insert(num_markers, prepared_statement.clone());
261        Ok(prepared_statement)
262    }
263
264    async fn get_multi_keys_statement(
265        &self,
266        num_markers: usize,
267    ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
268        if let Some(prepared_statement) = self.multi_keys.pin().get(&num_markers) {
269            return Ok(prepared_statement.clone());
270        };
271        let markers = std::iter::repeat_n("?", num_markers)
272            .collect::<Vec<_>>()
273            .join(",");
274        let prepared_statement = self
275            .session
276            .prepare(format!(
277                "SELECT k FROM {}.{} WHERE root_key = ? AND k IN ({})",
278                KEYSPACE, self.namespace, markers
279            ))
280            .await?;
281        self.multi_keys
282            .pin()
283            .insert(num_markers, prepared_statement.clone());
284        Ok(prepared_statement)
285    }
286
287    fn check_key_size(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
288        ensure!(
289            key.len() <= MAX_KEY_SIZE,
290            ScyllaDbStoreInternalError::KeyTooLong
291        );
292        Ok(())
293    }
294
295    fn check_value_size(value: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
296        ensure!(
297            value.len() <= RAW_MAX_VALUE_SIZE,
298            ScyllaDbStoreInternalError::ValueTooLong
299        );
300        Ok(())
301    }
302
303    fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> {
304        ensure!(
305            batch.len() <= MAX_BATCH_SIZE,
306            ScyllaDbStoreInternalError::BatchTooLong
307        );
308        Ok(())
309    }
310
311    async fn read_value_internal(
312        &self,
313        root_key: &[u8],
314        key: Vec<u8>,
315    ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
316        Self::check_key_size(&key)?;
317        let session = &self.session;
318        // Read the value of a key
319        let values = (root_key.to_vec(), key);
320
321        let (result, _) = session
322            .execute_single_page(&self.read_value, &values, PagingState::start())
323            .await?;
324        let rows = result.into_rows_result()?;
325        let mut rows = rows.rows::<(Vec<u8>,)>()?;
326        Ok(match rows.next() {
327            Some(row) => Some(row?.0),
328            None => None,
329        })
330    }
331
332    fn get_occurrences_map(
333        keys: Vec<Vec<u8>>,
334    ) -> Result<HashMap<Vec<u8>, Vec<usize>>, ScyllaDbStoreInternalError> {
335        let mut map = HashMap::<Vec<u8>, Vec<usize>>::new();
336        for (i_key, key) in keys.into_iter().enumerate() {
337            Self::check_key_size(&key)?;
338            map.entry(key).or_default().push(i_key);
339        }
340        Ok(map)
341    }
342
343    async fn read_multi_values_internal(
344        &self,
345        root_key: &[u8],
346        keys: Vec<Vec<u8>>,
347    ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
348        let mut values = vec![None; keys.len()];
349        let map = Self::get_occurrences_map(keys)?;
350        let statement = self.get_multi_key_values_statement(map.len()).await?;
351        let mut inputs = vec![root_key.to_vec()];
352        inputs.extend(map.keys().cloned());
353        let mut rows = self
354            .session
355            .execute_iter(statement, &inputs)
356            .await?
357            .rows_stream::<(Vec<u8>, Vec<u8>)>()?;
358
359        while let Some(row) = rows.next().await {
360            let (key, value) = row?;
361            for i_key in &map[&key] {
362                values[*i_key] = Some(value.clone());
363            }
364        }
365        Ok(values)
366    }
367
368    async fn contains_keys_internal(
369        &self,
370        root_key: &[u8],
371        keys: Vec<Vec<u8>>,
372    ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
373        let mut values = vec![false; keys.len()];
374        let map = Self::get_occurrences_map(keys)?;
375        let statement = self.get_multi_keys_statement(map.len()).await?;
376        let mut inputs = vec![root_key.to_vec()];
377        inputs.extend(map.keys().cloned());
378        let mut rows = self
379            .session
380            .execute_iter(statement, &inputs)
381            .await?
382            .rows_stream::<(Vec<u8>,)>()?;
383
384        while let Some(row) = rows.next().await {
385            let (key,) = row?;
386            for i_key in &map[&key] {
387                values[*i_key] = true;
388            }
389        }
390
391        Ok(values)
392    }
393
394    async fn contains_key_internal(
395        &self,
396        root_key: &[u8],
397        key: Vec<u8>,
398    ) -> Result<bool, ScyllaDbStoreInternalError> {
399        Self::check_key_size(&key)?;
400        let session = &self.session;
401        // Read the value of a key
402        let values = (root_key.to_vec(), key);
403
404        let (result, _) = session
405            .execute_single_page(&self.contains_key, &values, PagingState::start())
406            .await?;
407        let rows = result.into_rows_result()?;
408        let mut rows = rows.rows::<(Vec<u8>,)>()?;
409        Ok(rows.next().is_some())
410    }
411
412    async fn write_batch_internal(
413        &self,
414        root_key: &[u8],
415        batch: UnorderedBatch,
416    ) -> Result<(), ScyllaDbStoreInternalError> {
417        let session = &self.session;
418        let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged);
419        let mut batch_values = Vec::new();
420        let query1 = &self.write_batch_delete_prefix_unbounded;
421        let query2 = &self.write_batch_delete_prefix_bounded;
422        Self::check_batch_len(&batch)?;
423        for key_prefix in batch.key_prefix_deletions {
424            Self::check_key_size(&key_prefix)?;
425            match get_upper_bound_option(&key_prefix) {
426                None => {
427                    let values = vec![root_key.to_vec(), key_prefix];
428                    batch_values.push(values);
429                    batch_query.append_statement(query1.clone());
430                }
431                Some(upper_bound) => {
432                    let values = vec![root_key.to_vec(), key_prefix, upper_bound];
433                    batch_values.push(values);
434                    batch_query.append_statement(query2.clone());
435                }
436            }
437        }
438        let query3 = &self.write_batch_deletion;
439        for key in batch.simple_unordered_batch.deletions {
440            Self::check_key_size(&key)?;
441            let values = vec![root_key.to_vec(), key];
442            batch_values.push(values);
443            batch_query.append_statement(query3.clone());
444        }
445        let query4 = &self.write_batch_insertion;
446        for (key, value) in batch.simple_unordered_batch.insertions {
447            Self::check_key_size(&key)?;
448            Self::check_value_size(&value)?;
449            let values = vec![root_key.to_vec(), key, value];
450            batch_values.push(values);
451            batch_query.append_statement(query4.clone());
452        }
453        session.batch(&batch_query, batch_values).await?;
454        Ok(())
455    }
456
457    async fn find_keys_by_prefix_internal(
458        &self,
459        root_key: &[u8],
460        key_prefix: Vec<u8>,
461    ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
462        Self::check_key_size(&key_prefix)?;
463        let session = &self.session;
464        // Read the value of a key
465        let len = key_prefix.len();
466        let query_unbounded = &self.find_keys_by_prefix_unbounded;
467        let query_bounded = &self.find_keys_by_prefix_bounded;
468        let rows = match get_upper_bound_option(&key_prefix) {
469            None => {
470                let values = (root_key.to_vec(), key_prefix.clone());
471                session
472                    .execute_iter(query_unbounded.clone(), values)
473                    .await?
474            }
475            Some(upper_bound) => {
476                let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
477                session.execute_iter(query_bounded.clone(), values).await?
478            }
479        };
480        let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
481        let mut keys = Vec::new();
482        while let Some(row) = rows.next().await {
483            let (key,) = row?;
484            let short_key = key[len..].to_vec();
485            keys.push(short_key);
486        }
487        Ok(keys)
488    }
489
490    async fn find_key_values_by_prefix_internal(
491        &self,
492        root_key: &[u8],
493        key_prefix: Vec<u8>,
494    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
495        Self::check_key_size(&key_prefix)?;
496        let session = &self.session;
497        // Read the value of a key
498        let len = key_prefix.len();
499        let query_unbounded = &self.find_key_values_by_prefix_unbounded;
500        let query_bounded = &self.find_key_values_by_prefix_bounded;
501        let rows = match get_upper_bound_option(&key_prefix) {
502            None => {
503                let values = (root_key.to_vec(), key_prefix.clone());
504                session
505                    .execute_iter(query_unbounded.clone(), values)
506                    .await?
507            }
508            Some(upper_bound) => {
509                let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
510                session.execute_iter(query_bounded.clone(), values).await?
511            }
512        };
513        let mut rows = rows.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
514        let mut key_values = Vec::new();
515        while let Some(row) = rows.next().await {
516            let (key, value) = row?;
517            let short_key = key[len..].to_vec();
518            key_values.push((short_key, value));
519        }
520        Ok(key_values)
521    }
522}
523
524/// The client itself and the keeping of the count of active connections.
525#[derive(Clone)]
526pub struct ScyllaDbStoreInternal {
527    store: Arc<ScyllaDbClient>,
528    semaphore: Option<Arc<Semaphore>>,
529    max_stream_queries: usize,
530    root_key: Vec<u8>,
531}
532
533/// Database-level connection to ScyllaDB for managing namespaces and partitions.
534#[derive(Clone)]
535pub struct ScyllaDbDatabaseInternal {
536    store: Arc<ScyllaDbClient>,
537    semaphore: Option<Arc<Semaphore>>,
538    max_stream_queries: usize,
539}
540
541impl WithError for ScyllaDbDatabaseInternal {
542    type Error = ScyllaDbStoreInternalError;
543}
544
545/// The error type for [`ScyllaDbStoreInternal`]
546#[derive(Error, Debug)]
547pub enum ScyllaDbStoreInternalError {
548    /// BCS serialization error.
549    #[error(transparent)]
550    BcsError(#[from] bcs::Error),
551
552    /// The key must have at most `MAX_KEY_SIZE` bytes
553    #[error("The key must have at most MAX_KEY_SIZE")]
554    KeyTooLong,
555
556    /// The value must have at most `RAW_MAX_VALUE_SIZE` bytes
557    #[error("The value must have at most RAW_MAX_VALUE_SIZE")]
558    ValueTooLong,
559
560    /// A deserialization error in ScyllaDB
561    #[error(transparent)]
562    DeserializationError(#[from] DeserializationError),
563
564    /// A row error in ScyllaDB
565    #[error(transparent)]
566    RowsError(#[from] RowsError),
567
568    /// A type error in the accessed data in ScyllaDB
569    #[error(transparent)]
570    IntoRowsResultError(#[from] IntoRowsResultError),
571
572    /// A type check error in ScyllaDB
573    #[error(transparent)]
574    TypeCheckError(#[from] TypeCheckError),
575
576    /// A query error in ScyllaDB
577    #[error(transparent)]
578    PagerExecutionError(#[from] PagerExecutionError),
579
580    /// A query error in ScyllaDB
581    #[error(transparent)]
582    ScyllaDbNewSessionError(#[from] NewSessionError),
583
584    /// Namespace contains forbidden characters
585    #[error("Namespace contains forbidden characters")]
586    InvalidNamespace,
587
588    /// The journal is not coherent
589    #[error(transparent)]
590    JournalConsistencyError(#[from] JournalConsistencyError),
591
592    /// The batch is too long to be written
593    #[error("The batch is too long to be written")]
594    BatchTooLong,
595
596    /// A prepare error in ScyllaDB
597    #[error(transparent)]
598    PrepareError(#[from] PrepareError),
599
600    /// An execution error in ScyllaDB
601    #[error(transparent)]
602    ExecutionError(#[from] ExecutionError),
603
604    /// A next row error in ScyllaDB
605    #[error(transparent)]
606    NextRowError(#[from] NextRowError),
607}
608
609impl KeyValueStoreError for ScyllaDbStoreInternalError {
610    const BACKEND: &'static str = "scylla_db";
611}
612
613impl WithError for ScyllaDbStoreInternal {
614    type Error = ScyllaDbStoreInternalError;
615}
616
617impl ReadableKeyValueStore for ScyllaDbStoreInternal {
618    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
619
620    fn max_stream_queries(&self) -> usize {
621        self.max_stream_queries
622    }
623
624    async fn read_value_bytes(
625        &self,
626        key: &[u8],
627    ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
628        let store = self.store.deref();
629        let _guard = self.acquire().await;
630        store
631            .read_value_internal(&self.root_key, key.to_vec())
632            .await
633    }
634
635    async fn contains_key(&self, key: &[u8]) -> Result<bool, ScyllaDbStoreInternalError> {
636        let store = self.store.deref();
637        let _guard = self.acquire().await;
638        store
639            .contains_key_internal(&self.root_key, key.to_vec())
640            .await
641    }
642
643    async fn contains_keys(
644        &self,
645        keys: Vec<Vec<u8>>,
646    ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
647        if keys.is_empty() {
648            return Ok(Vec::new());
649        }
650        let store = self.store.deref();
651        let _guard = self.acquire().await;
652        let handles = keys
653            .chunks(MAX_MULTI_KEYS)
654            .map(|keys| store.contains_keys_internal(&self.root_key, keys.to_vec()));
655        let results: Vec<_> = join_all(handles)
656            .await
657            .into_iter()
658            .collect::<Result<_, _>>()?;
659        Ok(results.into_iter().flatten().collect())
660    }
661
662    async fn read_multi_values_bytes(
663        &self,
664        keys: Vec<Vec<u8>>,
665    ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
666        if keys.is_empty() {
667            return Ok(Vec::new());
668        }
669        let store = self.store.deref();
670        let _guard = self.acquire().await;
671        let handles = keys
672            .chunks(MAX_MULTI_KEYS)
673            .map(|keys| store.read_multi_values_internal(&self.root_key, keys.to_vec()));
674        let results: Vec<_> = join_all(handles)
675            .await
676            .into_iter()
677            .collect::<Result<_, _>>()?;
678        Ok(results.into_iter().flatten().collect())
679    }
680
681    async fn find_keys_by_prefix(
682        &self,
683        key_prefix: &[u8],
684    ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
685        let store = self.store.deref();
686        let _guard = self.acquire().await;
687        store
688            .find_keys_by_prefix_internal(&self.root_key, key_prefix.to_vec())
689            .await
690    }
691
692    async fn find_key_values_by_prefix(
693        &self,
694        key_prefix: &[u8],
695    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
696        let store = self.store.deref();
697        let _guard = self.acquire().await;
698        store
699            .find_key_values_by_prefix_internal(&self.root_key, key_prefix.to_vec())
700            .await
701    }
702}
703
704impl DirectWritableKeyValueStore for ScyllaDbStoreInternal {
705    const MAX_BATCH_SIZE: usize = MAX_BATCH_SIZE;
706    const MAX_BATCH_TOTAL_SIZE: usize = MAX_BATCH_TOTAL_SIZE;
707    const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
708
709    // ScyllaDB cannot take a `crate::batch::Batch` directly. Indeed, if a delete is
710    // followed by a write, then the delete takes priority. See the sentence "The first
711    // tie-breaking rule when two cells have the same write timestamp is that dead cells
712    // win over live cells" from
713    // https://github.com/scylladb/scylladb/blob/master/docs/dev/timestamp-conflict-resolution.md
714    type Batch = UnorderedBatch;
715
716    async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> {
717        let store = self.store.deref();
718        let _guard = self.acquire().await;
719        store.write_batch_internal(&self.root_key, batch).await
720    }
721}
722
723// ScyllaDB requires that the keys are non-empty.
724fn get_big_root_key(root_key: &[u8]) -> Vec<u8> {
725    let mut big_key = vec![0];
726    big_key.extend(root_key);
727    big_key
728}
729
730/// The type for building a new ScyllaDB Key Value Store
731#[derive(Clone, Debug, Deserialize, Serialize)]
732pub struct ScyllaDbStoreInternalConfig {
733    /// The URL to which the requests have to be sent
734    pub uri: String,
735    /// Maximum number of concurrent database queries allowed for this client.
736    pub max_concurrent_queries: Option<usize>,
737    /// Preferred buffer size for async streams.
738    pub max_stream_queries: usize,
739    /// The replication factor.
740    pub replication_factor: u32,
741}
742
743impl KeyValueDatabase for ScyllaDbDatabaseInternal {
744    type Config = ScyllaDbStoreInternalConfig;
745    type Store = ScyllaDbStoreInternal;
746
747    fn get_name() -> String {
748        "scylladb internal".to_string()
749    }
750
751    async fn connect(
752        config: &Self::Config,
753        namespace: &str,
754    ) -> Result<Self, ScyllaDbStoreInternalError> {
755        Self::check_namespace(namespace)?;
756        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
757        let store = ScyllaDbClient::new(session, namespace).await?;
758        let store = Arc::new(store);
759        let semaphore = config
760            .max_concurrent_queries
761            .map(|n| Arc::new(Semaphore::new(n)));
762        let max_stream_queries = config.max_stream_queries;
763        Ok(Self {
764            store,
765            semaphore,
766            max_stream_queries,
767        })
768    }
769
770    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
771        let store = self.store.clone();
772        let semaphore = self.semaphore.clone();
773        let max_stream_queries = self.max_stream_queries;
774        let root_key = get_big_root_key(root_key);
775        Ok(ScyllaDbStoreInternal {
776            store,
777            semaphore,
778            max_stream_queries,
779            root_key,
780        })
781    }
782
783    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
784        self.open_shared(root_key)
785    }
786
787    async fn list_all(config: &Self::Config) -> Result<Vec<String>, ScyllaDbStoreInternalError> {
788        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
789        let statement = session
790            .prepare(format!("DESCRIBE KEYSPACE {}", KEYSPACE))
791            .await?;
792        let result = session.execute_iter(statement, &[]).await;
793        let miss_msg = format!("'{}' not found in keyspaces", KEYSPACE);
794        let result = match result {
795            Ok(result) => result,
796            Err(error) => {
797                let invalid_or_keyspace_not_found = match &error {
798                    PagerExecutionError::NextPageError(NextPageError::RequestFailure(
799                        RequestError::LastAttemptError(RequestAttemptError::DbError(db_error, msg)),
800                    )) => *db_error == DbError::Invalid && msg.as_str() == miss_msg,
801                    _ => false,
802                };
803                if invalid_or_keyspace_not_found {
804                    return Ok(Vec::new());
805                } else {
806                    return Err(ScyllaDbStoreInternalError::PagerExecutionError(error));
807                }
808            }
809        };
810        let mut namespaces = Vec::new();
811        let mut rows_stream = result.rows_stream::<(String, String, String, String)>()?;
812        while let Some(row) = rows_stream.next().await {
813            let (_, object_kind, name, _) = row?;
814            if object_kind == "table" {
815                namespaces.push(name);
816            }
817        }
818        Ok(namespaces)
819    }
820
821    async fn list_root_keys(
822        config: &Self::Config,
823        namespace: &str,
824    ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
825        Self::check_namespace(namespace)?;
826        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
827        let statement = session
828            .prepare(format!(
829                "SELECT root_key FROM {}.{} ALLOW FILTERING",
830                KEYSPACE, namespace
831            ))
832            .await?;
833
834        // Execute the query
835        let rows = session.execute_iter(statement, &[]).await?;
836        let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
837        let mut root_keys = BTreeSet::new();
838        while let Some(row) = rows.next().await {
839            let (root_key,) = row?;
840            let root_key = root_key[1..].to_vec();
841            root_keys.insert(root_key);
842        }
843        Ok(root_keys.into_iter().collect::<Vec<_>>())
844    }
845
846    async fn delete_all(store_config: &Self::Config) -> Result<(), ScyllaDbStoreInternalError> {
847        let session = ScyllaDbClient::build_default_session(&store_config.uri).await?;
848        let statement = session
849            .prepare(format!("DROP KEYSPACE IF EXISTS {}", KEYSPACE))
850            .await?;
851
852        session
853            .execute_single_page(&statement, &[], PagingState::start())
854            .await?;
855        Ok(())
856    }
857
858    async fn exists(
859        config: &Self::Config,
860        namespace: &str,
861    ) -> Result<bool, ScyllaDbStoreInternalError> {
862        Self::check_namespace(namespace)?;
863        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
864
865        // We check the way the test can fail. It can fail in different ways.
866        let result = session
867            .prepare(format!(
868                "SELECT root_key FROM {}.{} LIMIT 1 ALLOW FILTERING",
869                KEYSPACE, namespace
870            ))
871            .await;
872
873        // The missing table translates into a very specific error that we matched
874        let miss_msg1 = format!("unconfigured table {}", namespace);
875        let miss_msg1 = miss_msg1.as_str();
876        let miss_msg2 = "Undefined name root_key in selection clause";
877        let miss_msg3 = format!("Keyspace {} does not exist", KEYSPACE);
878        let Err(error) = result else {
879            // If OK, then the table exists
880            return Ok(true);
881        };
882        let missing_table = match &error {
883            PrepareError::AllAttemptsFailed {
884                first_attempt: RequestAttemptError::DbError(db_error, msg),
885            } => {
886                if *db_error != DbError::Invalid {
887                    false
888                } else {
889                    msg.as_str() == miss_msg1
890                        || msg.as_str() == miss_msg2
891                        || msg.as_str() == miss_msg3
892                }
893            }
894            _ => false,
895        };
896        if missing_table {
897            Ok(false)
898        } else {
899            Err(ScyllaDbStoreInternalError::PrepareError(error))
900        }
901    }
902
903    async fn create(
904        config: &Self::Config,
905        namespace: &str,
906    ) -> Result<(), ScyllaDbStoreInternalError> {
907        Self::check_namespace(namespace)?;
908        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
909
910        // Create a keyspace if it doesn't exist
911        let statement = session
912            .prepare(format!(
913                "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ \
914                    'class' : 'NetworkTopologyStrategy', \
915                    'replication_factor' : {} \
916                }}",
917                KEYSPACE, config.replication_factor
918            ))
919            .await?;
920        session
921            .execute_single_page(&statement, &[], PagingState::start())
922            .await?;
923
924        // This explicitly sets a lot of default parameters for clarity and for making future
925        // changes easier.
926        let statement = session
927            .prepare(format!(
928                "CREATE TABLE {}.{} (\
929                    root_key blob, \
930                    k blob, \
931                    v blob, \
932                    PRIMARY KEY (root_key, k) \
933                ) \
934                WITH compaction = {{ \
935                    'class'            : 'SizeTieredCompactionStrategy', \
936                    'min_sstable_size' : 52428800, \
937                    'bucket_low'       : 0.5, \
938                    'bucket_high'      : 1.5, \
939                    'min_threshold'    : 4, \
940                    'max_threshold'    : 32 \
941                }} \
942                AND compression = {{ \
943                    'sstable_compression': 'LZ4Compressor', \
944                    'chunk_length_in_kb':'4' \
945                }} \
946                AND caching = {{ \
947                    'enabled': 'true' \
948                }}",
949                KEYSPACE, namespace
950            ))
951            .await?;
952        session
953            .execute_single_page(&statement, &[], PagingState::start())
954            .await?;
955        Ok(())
956    }
957
958    async fn delete(
959        config: &Self::Config,
960        namespace: &str,
961    ) -> Result<(), ScyllaDbStoreInternalError> {
962        Self::check_namespace(namespace)?;
963        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
964        let statement = session
965            .prepare(format!("DROP TABLE IF EXISTS {}.{};", KEYSPACE, namespace))
966            .await?;
967        session
968            .execute_single_page(&statement, &[], PagingState::start())
969            .await?;
970        Ok(())
971    }
972}
973
974impl ScyllaDbStoreInternal {
975    /// Obtains the semaphore lock on the database if needed.
976    async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
977        match &self.semaphore {
978            None => None,
979            Some(count) => Some(count.acquire().await),
980        }
981    }
982}
983
984impl ScyllaDbDatabaseInternal {
985    fn check_namespace(namespace: &str) -> Result<(), ScyllaDbStoreInternalError> {
986        if !namespace.is_empty()
987            && namespace.len() <= 48
988            && namespace
989                .chars()
990                .all(|c| c.is_ascii_alphanumeric() || c == '_')
991        {
992            return Ok(());
993        }
994        Err(ScyllaDbStoreInternalError::InvalidNamespace)
995    }
996}
997
998#[cfg(with_testing)]
999impl TestKeyValueDatabase for JournalingKeyValueDatabase<ScyllaDbDatabaseInternal> {
1000    async fn new_test_config() -> Result<ScyllaDbStoreInternalConfig, ScyllaDbStoreInternalError> {
1001        // TODO(#4114): Read the port from an environment variable.
1002        let uri = "localhost:9042".to_string();
1003        Ok(ScyllaDbStoreInternalConfig {
1004            uri,
1005            max_concurrent_queries: Some(10),
1006            max_stream_queries: 10,
1007            replication_factor: 1,
1008        })
1009    }
1010}
1011
1012/// The `ScyllaDbDatabase` composed type with metrics
1013#[cfg(with_metrics)]
1014pub type ScyllaDbDatabase = MeteredDatabase<
1015    LruCachingDatabase<
1016        MeteredDatabase<
1017            ValueSplittingDatabase<
1018                MeteredDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
1019            >,
1020        >,
1021    >,
1022>;
1023
1024/// The `ScyllaDbDatabase` composed type
1025#[cfg(not(with_metrics))]
1026pub type ScyllaDbDatabase = LruCachingDatabase<
1027    ValueSplittingDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
1028>;
1029
1030/// The `ScyllaDbStoreConfig` input type
1031pub type ScyllaDbStoreConfig = LruCachingConfig<ScyllaDbStoreInternalConfig>;
1032
1033/// The combined error type for the `ScyllaDbDatabase`.
1034pub type ScyllaDbStoreError = ValueSplittingError<ScyllaDbStoreInternalError>;