linera_views/backends/
scylla_db.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] for the ScyllaDB database.
5//!
6//! The current connection is done via a Session and a corresponding primary key called
7//! "namespace". The maximum number of concurrent queries is controlled by
8//! `max_concurrent_queries`.
9
10use std::{
11    collections::{BTreeSet, HashMap},
12    ops::Deref,
13    sync::Arc,
14};
15
16use async_lock::{Semaphore, SemaphoreGuard};
17use futures::{future::join_all, StreamExt as _};
18use linera_base::{ensure, util::future::FutureSyncExt as _};
19use scylla::{
20    client::{
21        execution_profile::{ExecutionProfile, ExecutionProfileHandle},
22        session::Session,
23        session_builder::SessionBuilder,
24    },
25    deserialize::{DeserializationError, TypeCheckError},
26    errors::{
27        DbError, ExecutionError, IntoRowsResultError, NewSessionError, NextPageError, NextRowError,
28        PagerExecutionError, PrepareError, RequestAttemptError, RequestError, RowsError,
29    },
30    policies::{
31        load_balancing::{DefaultPolicy, LoadBalancingPolicy},
32        retry::DefaultRetryPolicy,
33    },
34    response::PagingState,
35    statement::{batch::BatchType, prepared::PreparedStatement, Consistency},
36};
37use serde::{Deserialize, Serialize};
38use thiserror::Error;
39
40#[cfg(with_metrics)]
41use crate::metering::MeteredDatabase;
42#[cfg(with_testing)]
43use crate::store::TestKeyValueDatabase;
44use crate::{
45    batch::UnorderedBatch,
46    common::{get_uleb128_size, get_upper_bound_option},
47    journaling::{JournalConsistencyError, JournalingKeyValueDatabase},
48    lru_caching::{LruCachingConfig, LruCachingDatabase},
49    store::{
50        DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
51        WithError,
52    },
53    value_splitting::{ValueSplittingDatabase, ValueSplittingError},
54};
55
56/// Fundamental constant in ScyllaDB: The maximum size of a multi keys query
57/// The limit is in reality 100. But we need one entry for the root key.
58const MAX_MULTI_KEYS: usize = 100 - 1;
59
60/// The maximal size of an operation on ScyllaDB seems to be 16 MiB
61/// https://www.scylladb.com/2019/03/27/best-practices-for-scylla-applications/
62/// "There is a hard limit at 16 MiB, and nothing bigger than that can arrive at once
63///  at the database at any particular time"
64/// So, we set up the maximal size of 16 MiB - 10 KiB for the values and 10 KiB for the keys
65/// We also arbitrarily decrease the size by 4000 bytes because an amount of size is
66/// taken internally by the database.
67const RAW_MAX_VALUE_SIZE: usize = 16 * 1024 * 1024 - 10 * 1024 - 4000;
68const MAX_KEY_SIZE: usize = 10 * 1024;
69const MAX_BATCH_TOTAL_SIZE: usize = RAW_MAX_VALUE_SIZE + MAX_KEY_SIZE;
70
71/// The `RAW_MAX_VALUE_SIZE` is the maximum size on the ScyllaDB storage.
72/// However, the value being written can also be the serialization of a `SimpleUnorderedBatch`
73/// Therefore the actual `MAX_VALUE_SIZE` is lower.
74/// At the maximum the key size is 1024 bytes (see below) and we pack just one entry.
75/// So if the key has 1024 bytes this gets us the inequality
76/// `1 + 1 + 1 + serialized_size(MAX_KEY_SIZE)? + serialized_size(x)? <= RAW_MAX_VALUE_SIZE`.
77/// and so this simplifies to `1 + 1 + 1 + (2 + 10240) + (4 + x) <= RAW_MAX_VALUE_SIZE`
78/// Note on the above formula:
79/// * We write 4 because `get_uleb128_size(RAW_MAX_VALUE_SIZE) = 4)`
80/// * We write `1 + 1 + 1`  because the `UnorderedBatch` has three entries.
81///
82/// This gets us to a maximal value of 16752727.
83const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
84    - MAX_KEY_SIZE
85    - get_uleb128_size(RAW_MAX_VALUE_SIZE)
86    - get_uleb128_size(MAX_KEY_SIZE)
87    - 3;
88
89/// The constant 14000 is an empirical constant that was found to be necessary
90/// to make the ScyllaDB system work. We have not been able to find this or
91/// a similar constant in the source code or the documentation.
92/// An experimental approach gets us that 14796 is the latest value that is
93/// correct.
94const MAX_BATCH_SIZE: usize = 5000;
95
96/// The keyspace to use for the ScyllaDB database.
97const KEYSPACE: &str = "kv";
98
99/// The client for ScyllaDB:
100/// * The session allows to pass queries
101/// * The namespace that is being assigned to the database
102/// * The prepared queries used for implementing the features of `KeyValueStore`.
103struct ScyllaDbClient {
104    session: Session,
105    namespace: String,
106    read_value: PreparedStatement,
107    contains_key: PreparedStatement,
108    write_batch_delete_prefix_unbounded: PreparedStatement,
109    write_batch_delete_prefix_bounded: PreparedStatement,
110    write_batch_deletion: PreparedStatement,
111    write_batch_insertion: PreparedStatement,
112    find_keys_by_prefix_unbounded: PreparedStatement,
113    find_keys_by_prefix_bounded: PreparedStatement,
114    find_key_values_by_prefix_unbounded: PreparedStatement,
115    find_key_values_by_prefix_bounded: PreparedStatement,
116    multi_key_values: papaya::HashMap<usize, PreparedStatement>,
117    multi_keys: papaya::HashMap<usize, PreparedStatement>,
118}
119
120impl ScyllaDbClient {
121    async fn new(session: Session, namespace: &str) -> Result<Self, ScyllaDbStoreInternalError> {
122        let namespace = namespace.to_string();
123        let read_value = session
124            .prepare(format!(
125                "SELECT v FROM {}.\"{}\" WHERE root_key = ? AND k = ?",
126                KEYSPACE, namespace
127            ))
128            .await?;
129
130        let contains_key = session
131            .prepare(format!(
132                "SELECT root_key FROM {}.\"{}\" WHERE root_key = ? AND k = ?",
133                KEYSPACE, namespace
134            ))
135            .await?;
136
137        let write_batch_delete_prefix_unbounded = session
138            .prepare(format!(
139                "DELETE FROM {}.\"{}\" WHERE root_key = ? AND k >= ?",
140                KEYSPACE, namespace
141            ))
142            .await?;
143
144        let write_batch_delete_prefix_bounded = session
145            .prepare(format!(
146                "DELETE FROM {}.\"{}\" WHERE root_key = ? AND k >= ? AND k < ?",
147                KEYSPACE, namespace
148            ))
149            .await?;
150
151        let write_batch_deletion = session
152            .prepare(format!(
153                "DELETE FROM {}.\"{}\" WHERE root_key = ? AND k = ?",
154                KEYSPACE, namespace
155            ))
156            .await?;
157
158        let write_batch_insertion = session
159            .prepare(format!(
160                "INSERT INTO {}.\"{}\" (root_key, k, v) VALUES (?, ?, ?)",
161                KEYSPACE, namespace
162            ))
163            .await?;
164
165        let find_keys_by_prefix_unbounded = session
166            .prepare(format!(
167                "SELECT k FROM {}.\"{}\" WHERE root_key = ? AND k >= ?",
168                KEYSPACE, namespace
169            ))
170            .await?;
171
172        let find_keys_by_prefix_bounded = session
173            .prepare(format!(
174                "SELECT k FROM {}.\"{}\" WHERE root_key = ? AND k >= ? AND k < ?",
175                KEYSPACE, namespace
176            ))
177            .await?;
178
179        let find_key_values_by_prefix_unbounded = session
180            .prepare(format!(
181                "SELECT k,v FROM {}.\"{}\" WHERE root_key = ? AND k >= ?",
182                KEYSPACE, namespace
183            ))
184            .await?;
185
186        let find_key_values_by_prefix_bounded = session
187            .prepare(format!(
188                "SELECT k,v FROM {}.\"{}\" WHERE root_key = ? AND k >= ? AND k < ?",
189                KEYSPACE, namespace
190            ))
191            .await?;
192
193        Ok(Self {
194            session,
195            namespace,
196            read_value,
197            contains_key,
198            write_batch_delete_prefix_unbounded,
199            write_batch_delete_prefix_bounded,
200            write_batch_deletion,
201            write_batch_insertion,
202            find_keys_by_prefix_unbounded,
203            find_keys_by_prefix_bounded,
204            find_key_values_by_prefix_unbounded,
205            find_key_values_by_prefix_bounded,
206            multi_key_values: papaya::HashMap::new(),
207            multi_keys: papaya::HashMap::new(),
208        })
209    }
210
211    fn build_default_policy() -> Arc<dyn LoadBalancingPolicy> {
212        DefaultPolicy::builder().token_aware(true).build()
213    }
214
215    fn build_default_execution_profile_handle(
216        policy: Arc<dyn LoadBalancingPolicy>,
217    ) -> ExecutionProfileHandle {
218        let default_profile = ExecutionProfile::builder()
219            .load_balancing_policy(policy)
220            .retry_policy(Arc::new(DefaultRetryPolicy::new()))
221            .consistency(Consistency::LocalQuorum)
222            .build();
223        default_profile.into_handle()
224    }
225
226    async fn build_default_session(uri: &str) -> Result<Session, ScyllaDbStoreInternalError> {
227        // This explicitly sets a lot of default parameters for clarity and for making future changes
228        // easier.
229        SessionBuilder::new()
230            .known_node(uri)
231            .default_execution_profile_handle(Self::build_default_execution_profile_handle(
232                Self::build_default_policy(),
233            ))
234            .build()
235            .boxed_sync()
236            .await
237            .map_err(Into::into)
238    }
239
240    async fn get_multi_key_values_statement(
241        &self,
242        num_markers: usize,
243    ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
244        if let Some(prepared_statement) = self.multi_key_values.pin().get(&num_markers) {
245            return Ok(prepared_statement.clone());
246        }
247        let markers = std::iter::repeat_n("?", num_markers)
248            .collect::<Vec<_>>()
249            .join(",");
250        let prepared_statement = self
251            .session
252            .prepare(format!(
253                "SELECT k,v FROM {}.\"{}\" WHERE root_key = ? AND k IN ({})",
254                KEYSPACE, self.namespace, markers
255            ))
256            .await?;
257        self.multi_key_values
258            .pin()
259            .insert(num_markers, prepared_statement.clone());
260        Ok(prepared_statement)
261    }
262
263    async fn get_multi_keys_statement(
264        &self,
265        num_markers: usize,
266    ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
267        if let Some(prepared_statement) = self.multi_keys.pin().get(&num_markers) {
268            return Ok(prepared_statement.clone());
269        };
270        let markers = std::iter::repeat_n("?", num_markers)
271            .collect::<Vec<_>>()
272            .join(",");
273        let prepared_statement = self
274            .session
275            .prepare(format!(
276                "SELECT k FROM {}.\"{}\" WHERE root_key = ? AND k IN ({})",
277                KEYSPACE, self.namespace, markers
278            ))
279            .await?;
280        self.multi_keys
281            .pin()
282            .insert(num_markers, prepared_statement.clone());
283        Ok(prepared_statement)
284    }
285
286    fn check_key_size(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
287        ensure!(
288            key.len() <= MAX_KEY_SIZE,
289            ScyllaDbStoreInternalError::KeyTooLong
290        );
291        Ok(())
292    }
293
294    fn check_value_size(value: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
295        ensure!(
296            value.len() <= RAW_MAX_VALUE_SIZE,
297            ScyllaDbStoreInternalError::ValueTooLong
298        );
299        Ok(())
300    }
301
302    fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> {
303        ensure!(
304            batch.len() <= MAX_BATCH_SIZE,
305            ScyllaDbStoreInternalError::BatchTooLong
306        );
307        Ok(())
308    }
309
310    async fn read_value_internal(
311        &self,
312        root_key: &[u8],
313        key: Vec<u8>,
314    ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
315        Self::check_key_size(&key)?;
316        let session = &self.session;
317        // Read the value of a key
318        let values = (root_key.to_vec(), key);
319
320        let (result, _) = session
321            .execute_single_page(&self.read_value, &values, PagingState::start())
322            .await?;
323        let rows = result.into_rows_result()?;
324        let mut rows = rows.rows::<(Vec<u8>,)>()?;
325        Ok(match rows.next() {
326            Some(row) => Some(row?.0),
327            None => None,
328        })
329    }
330
331    fn get_occurrences_map(
332        keys: Vec<Vec<u8>>,
333    ) -> Result<HashMap<Vec<u8>, Vec<usize>>, ScyllaDbStoreInternalError> {
334        let mut map = HashMap::<Vec<u8>, Vec<usize>>::new();
335        for (i_key, key) in keys.into_iter().enumerate() {
336            Self::check_key_size(&key)?;
337            map.entry(key).or_default().push(i_key);
338        }
339        Ok(map)
340    }
341
342    async fn read_multi_values_internal(
343        &self,
344        root_key: &[u8],
345        keys: Vec<Vec<u8>>,
346    ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
347        let mut values = vec![None; keys.len()];
348        let map = Self::get_occurrences_map(keys)?;
349        let statement = self.get_multi_key_values_statement(map.len()).await?;
350        let mut inputs = vec![root_key.to_vec()];
351        inputs.extend(map.keys().cloned());
352        let mut rows = Box::pin(self.session.execute_iter(statement, &inputs))
353            .await?
354            .rows_stream::<(Vec<u8>, Vec<u8>)>()?;
355
356        while let Some(row) = rows.next().await {
357            let (key, value) = row?;
358            if let Some((&last, rest)) = map[&key].split_last() {
359                for position in rest {
360                    values[*position] = Some(value.clone());
361                }
362                values[last] = Some(value);
363            }
364        }
365        Ok(values)
366    }
367
368    async fn contains_keys_internal(
369        &self,
370        root_key: &[u8],
371        keys: Vec<Vec<u8>>,
372    ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
373        let mut values = vec![false; keys.len()];
374        let map = Self::get_occurrences_map(keys)?;
375        let statement = self.get_multi_keys_statement(map.len()).await?;
376        let mut inputs = vec![root_key.to_vec()];
377        inputs.extend(map.keys().cloned());
378        let mut rows = Box::pin(self.session.execute_iter(statement, &inputs))
379            .await?
380            .rows_stream::<(Vec<u8>,)>()?;
381
382        while let Some(row) = rows.next().await {
383            let (key,) = row?;
384            for i_key in &map[&key] {
385                values[*i_key] = true;
386            }
387        }
388
389        Ok(values)
390    }
391
392    async fn contains_key_internal(
393        &self,
394        root_key: &[u8],
395        key: Vec<u8>,
396    ) -> Result<bool, ScyllaDbStoreInternalError> {
397        Self::check_key_size(&key)?;
398        let session = &self.session;
399        // Read the value of a key
400        let values = (root_key.to_vec(), key);
401
402        let (result, _) = session
403            .execute_single_page(&self.contains_key, &values, PagingState::start())
404            .await?;
405        let rows = result.into_rows_result()?;
406        let mut rows = rows.rows::<(Vec<u8>,)>()?;
407        Ok(rows.next().is_some())
408    }
409
410    async fn write_batch_internal(
411        &self,
412        root_key: &[u8],
413        batch: UnorderedBatch,
414    ) -> Result<(), ScyllaDbStoreInternalError> {
415        let session = &self.session;
416        let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged);
417        let mut batch_values = Vec::new();
418        let query1 = &self.write_batch_delete_prefix_unbounded;
419        let query2 = &self.write_batch_delete_prefix_bounded;
420        Self::check_batch_len(&batch)?;
421        for key_prefix in batch.key_prefix_deletions {
422            Self::check_key_size(&key_prefix)?;
423            match get_upper_bound_option(&key_prefix) {
424                None => {
425                    let values = vec![root_key.to_vec(), key_prefix];
426                    batch_values.push(values);
427                    batch_query.append_statement(query1.clone());
428                }
429                Some(upper_bound) => {
430                    let values = vec![root_key.to_vec(), key_prefix, upper_bound];
431                    batch_values.push(values);
432                    batch_query.append_statement(query2.clone());
433                }
434            }
435        }
436        let query3 = &self.write_batch_deletion;
437        for key in batch.simple_unordered_batch.deletions {
438            Self::check_key_size(&key)?;
439            let values = vec![root_key.to_vec(), key];
440            batch_values.push(values);
441            batch_query.append_statement(query3.clone());
442        }
443        let query4 = &self.write_batch_insertion;
444        for (key, value) in batch.simple_unordered_batch.insertions {
445            Self::check_key_size(&key)?;
446            Self::check_value_size(&value)?;
447            let values = vec![root_key.to_vec(), key, value];
448            batch_values.push(values);
449            batch_query.append_statement(query4.clone());
450        }
451        session.batch(&batch_query, batch_values).await?;
452        Ok(())
453    }
454
455    async fn find_keys_by_prefix_internal(
456        &self,
457        root_key: &[u8],
458        key_prefix: Vec<u8>,
459    ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
460        Self::check_key_size(&key_prefix)?;
461        let session = &self.session;
462        // Read the value of a key
463        let len = key_prefix.len();
464        let query_unbounded = &self.find_keys_by_prefix_unbounded;
465        let query_bounded = &self.find_keys_by_prefix_bounded;
466        let rows = match get_upper_bound_option(&key_prefix) {
467            None => {
468                let values = (root_key.to_vec(), key_prefix.clone());
469                Box::pin(session.execute_iter(query_unbounded.clone(), values)).await?
470            }
471            Some(upper_bound) => {
472                let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
473                Box::pin(session.execute_iter(query_bounded.clone(), values)).await?
474            }
475        };
476        let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
477        let mut keys = Vec::new();
478        while let Some(row) = rows.next().await {
479            let (key,) = row?;
480            let short_key = key[len..].to_vec();
481            keys.push(short_key);
482        }
483        Ok(keys)
484    }
485
486    async fn find_key_values_by_prefix_internal(
487        &self,
488        root_key: &[u8],
489        key_prefix: Vec<u8>,
490    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
491        Self::check_key_size(&key_prefix)?;
492        let session = &self.session;
493        // Read the value of a key
494        let len = key_prefix.len();
495        let query_unbounded = &self.find_key_values_by_prefix_unbounded;
496        let query_bounded = &self.find_key_values_by_prefix_bounded;
497        let rows = match get_upper_bound_option(&key_prefix) {
498            None => {
499                let values = (root_key.to_vec(), key_prefix.clone());
500                Box::pin(session.execute_iter(query_unbounded.clone(), values)).await?
501            }
502            Some(upper_bound) => {
503                let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
504                Box::pin(session.execute_iter(query_bounded.clone(), values)).await?
505            }
506        };
507        let mut rows = rows.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
508        let mut key_values = Vec::new();
509        while let Some(row) = rows.next().await {
510            let (key, value) = row?;
511            let short_key = key[len..].to_vec();
512            key_values.push((short_key, value));
513        }
514        Ok(key_values)
515    }
516}
517
518/// The client itself and the keeping of the count of active connections.
519#[derive(Clone)]
520pub struct ScyllaDbStoreInternal {
521    store: Arc<ScyllaDbClient>,
522    semaphore: Option<Arc<Semaphore>>,
523    max_stream_queries: usize,
524    root_key: Vec<u8>,
525}
526
527/// Database-level connection to ScyllaDB for managing namespaces and partitions.
528#[derive(Clone)]
529pub struct ScyllaDbDatabaseInternal {
530    store: Arc<ScyllaDbClient>,
531    semaphore: Option<Arc<Semaphore>>,
532    max_stream_queries: usize,
533}
534
535impl WithError for ScyllaDbDatabaseInternal {
536    type Error = ScyllaDbStoreInternalError;
537}
538
539/// The error type for [`ScyllaDbStoreInternal`]
540#[derive(Error, Debug)]
541pub enum ScyllaDbStoreInternalError {
542    /// BCS serialization error.
543    #[error(transparent)]
544    BcsError(#[from] bcs::Error),
545
546    /// The key must have at most `MAX_KEY_SIZE` bytes
547    #[error("The key must have at most MAX_KEY_SIZE")]
548    KeyTooLong,
549
550    /// The value must have at most `RAW_MAX_VALUE_SIZE` bytes
551    #[error("The value must have at most RAW_MAX_VALUE_SIZE")]
552    ValueTooLong,
553
554    /// A deserialization error in ScyllaDB
555    #[error(transparent)]
556    DeserializationError(#[from] DeserializationError),
557
558    /// A row error in ScyllaDB
559    #[error(transparent)]
560    RowsError(#[from] RowsError),
561
562    /// A type error in the accessed data in ScyllaDB
563    #[error(transparent)]
564    IntoRowsResultError(#[from] IntoRowsResultError),
565
566    /// A type check error in ScyllaDB
567    #[error(transparent)]
568    TypeCheckError(#[from] TypeCheckError),
569
570    /// A query error in ScyllaDB
571    #[error(transparent)]
572    PagerExecutionError(#[from] PagerExecutionError),
573
574    /// A query error in ScyllaDB
575    #[error(transparent)]
576    ScyllaDbNewSessionError(#[from] NewSessionError),
577
578    /// Namespace contains forbidden characters
579    #[error("Namespace contains forbidden characters")]
580    InvalidNamespace,
581
582    /// The journal is not coherent
583    #[error(transparent)]
584    JournalConsistencyError(#[from] JournalConsistencyError),
585
586    /// The batch is too long to be written
587    #[error("The batch is too long to be written")]
588    BatchTooLong,
589
590    /// A prepare error in ScyllaDB
591    #[error(transparent)]
592    PrepareError(#[from] PrepareError),
593
594    /// An execution error in ScyllaDB
595    #[error(transparent)]
596    ExecutionError(#[from] ExecutionError),
597
598    /// A next row error in ScyllaDB
599    #[error(transparent)]
600    NextRowError(#[from] NextRowError),
601}
602
603impl KeyValueStoreError for ScyllaDbStoreInternalError {
604    const BACKEND: &'static str = "scylla_db";
605}
606
607impl WithError for ScyllaDbStoreInternal {
608    type Error = ScyllaDbStoreInternalError;
609}
610
611impl ReadableKeyValueStore for ScyllaDbStoreInternal {
612    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
613
614    fn max_stream_queries(&self) -> usize {
615        self.max_stream_queries
616    }
617
618    fn root_key(&self) -> Result<Vec<u8>, ScyllaDbStoreInternalError> {
619        Ok(self.root_key[1..].to_vec())
620    }
621
622    async fn read_value_bytes(
623        &self,
624        key: &[u8],
625    ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
626        let store = self.store.deref();
627        let _guard = self.acquire().await;
628        Box::pin(store.read_value_internal(&self.root_key, key.to_vec())).await
629    }
630
631    async fn contains_key(&self, key: &[u8]) -> Result<bool, ScyllaDbStoreInternalError> {
632        let store = self.store.deref();
633        let _guard = self.acquire().await;
634        Box::pin(store.contains_key_internal(&self.root_key, key.to_vec())).await
635    }
636
637    async fn contains_keys(
638        &self,
639        keys: &[Vec<u8>],
640    ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
641        if keys.is_empty() {
642            return Ok(Vec::new());
643        }
644        let store = self.store.deref();
645        let _guard = self.acquire().await;
646        let handles = keys
647            .chunks(MAX_MULTI_KEYS)
648            .map(|keys| store.contains_keys_internal(&self.root_key, keys.to_vec()));
649        let results: Vec<_> = join_all(handles)
650            .await
651            .into_iter()
652            .collect::<Result<_, _>>()?;
653        Ok(results.into_iter().flatten().collect())
654    }
655
656    async fn read_multi_values_bytes(
657        &self,
658        keys: &[Vec<u8>],
659    ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
660        if keys.is_empty() {
661            return Ok(Vec::new());
662        }
663        let store = self.store.deref();
664        let _guard = self.acquire().await;
665        let handles = keys
666            .chunks(MAX_MULTI_KEYS)
667            .map(|keys| store.read_multi_values_internal(&self.root_key, keys.to_vec()));
668        let results: Vec<_> = join_all(handles)
669            .await
670            .into_iter()
671            .collect::<Result<_, _>>()?;
672        Ok(results.into_iter().flatten().collect())
673    }
674
675    async fn find_keys_by_prefix(
676        &self,
677        key_prefix: &[u8],
678    ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
679        let store = self.store.deref();
680        let _guard = self.acquire().await;
681        Box::pin(store.find_keys_by_prefix_internal(&self.root_key, key_prefix.to_vec())).await
682    }
683
684    async fn find_key_values_by_prefix(
685        &self,
686        key_prefix: &[u8],
687    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
688        let store = self.store.deref();
689        let _guard = self.acquire().await;
690        Box::pin(store.find_key_values_by_prefix_internal(&self.root_key, key_prefix.to_vec()))
691            .await
692    }
693}
694
695impl DirectWritableKeyValueStore for ScyllaDbStoreInternal {
696    const MAX_BATCH_SIZE: usize = MAX_BATCH_SIZE;
697    const MAX_BATCH_TOTAL_SIZE: usize = MAX_BATCH_TOTAL_SIZE;
698    const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
699
700    // ScyllaDB cannot take a `crate::batch::Batch` directly. Indeed, if a delete is
701    // followed by a write, then the delete takes priority. See the sentence "The first
702    // tie-breaking rule when two cells have the same write timestamp is that dead cells
703    // win over live cells" from
704    // https://github.com/scylladb/scylladb/blob/master/docs/dev/timestamp-conflict-resolution.md
705    type Batch = UnorderedBatch;
706
707    async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> {
708        let store = self.store.deref();
709        let _guard = self.acquire().await;
710        store.write_batch_internal(&self.root_key, batch).await
711    }
712}
713
714// ScyllaDB requires that the keys are non-empty.
715fn get_big_root_key(root_key: &[u8]) -> Vec<u8> {
716    let mut big_key = vec![0];
717    big_key.extend(root_key);
718    big_key
719}
720
721/// The type for building a new ScyllaDB Key Value Store
722#[derive(Clone, Debug, Deserialize, Serialize)]
723pub struct ScyllaDbStoreInternalConfig {
724    /// The URL to which the requests have to be sent
725    pub uri: String,
726    /// Maximum number of concurrent database queries allowed for this client.
727    pub max_concurrent_queries: Option<usize>,
728    /// Preferred buffer size for async streams.
729    pub max_stream_queries: usize,
730    /// The replication factor.
731    pub replication_factor: u32,
732}
733
734impl KeyValueDatabase for ScyllaDbDatabaseInternal {
735    type Config = ScyllaDbStoreInternalConfig;
736    type Store = ScyllaDbStoreInternal;
737
738    fn get_name() -> String {
739        "scylladb internal".to_string()
740    }
741
742    async fn connect(
743        config: &Self::Config,
744        namespace: &str,
745    ) -> Result<Self, ScyllaDbStoreInternalError> {
746        Self::check_namespace(namespace)?;
747        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
748        let store = ScyllaDbClient::new(session, namespace).await?;
749        let store = Arc::new(store);
750        let semaphore = config
751            .max_concurrent_queries
752            .map(|n| Arc::new(Semaphore::new(n)));
753        let max_stream_queries = config.max_stream_queries;
754        Ok(Self {
755            store,
756            semaphore,
757            max_stream_queries,
758        })
759    }
760
761    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
762        let store = self.store.clone();
763        let semaphore = self.semaphore.clone();
764        let max_stream_queries = self.max_stream_queries;
765        let root_key = get_big_root_key(root_key);
766        Ok(ScyllaDbStoreInternal {
767            store,
768            semaphore,
769            max_stream_queries,
770            root_key,
771        })
772    }
773
774    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
775        self.open_shared(root_key)
776    }
777
778    async fn list_all(config: &Self::Config) -> Result<Vec<String>, ScyllaDbStoreInternalError> {
779        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
780        let statement = session
781            .prepare(format!("DESCRIBE KEYSPACE {}", KEYSPACE))
782            .await?;
783        let result = Box::pin(session.execute_iter(statement, &[])).await;
784        let miss_msg = format!("'{}' not found in keyspaces", KEYSPACE);
785        let result = match result {
786            Ok(result) => result,
787            Err(error) => {
788                let invalid_or_keyspace_not_found = match &error {
789                    PagerExecutionError::NextPageError(NextPageError::RequestFailure(
790                        RequestError::LastAttemptError(RequestAttemptError::DbError(db_error, msg)),
791                    )) => *db_error == DbError::Invalid && msg.as_str() == miss_msg,
792                    _ => false,
793                };
794                if invalid_or_keyspace_not_found {
795                    return Ok(Vec::new());
796                } else {
797                    return Err(ScyllaDbStoreInternalError::PagerExecutionError(error));
798                }
799            }
800        };
801        let mut namespaces = Vec::new();
802        let mut rows_stream = result.rows_stream::<(String, String, String, String)>()?;
803        while let Some(row) = rows_stream.next().await {
804            let (_, object_kind, name, _) = row?;
805            if object_kind == "table" {
806                namespaces.push(name);
807            }
808        }
809        Ok(namespaces)
810    }
811
812    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
813        let statement = self
814            .store
815            .session
816            .prepare(format!(
817                "SELECT root_key FROM {}.\"{}\" ALLOW FILTERING",
818                KEYSPACE, self.store.namespace
819            ))
820            .await?;
821
822        // Execute the query
823        let rows = Box::pin(self.store.session.execute_iter(statement, &[])).await?;
824        let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
825        let mut root_keys = BTreeSet::new();
826        while let Some(row) = rows.next().await {
827            let (root_key,) = row?;
828            let root_key = root_key[1..].to_vec();
829            root_keys.insert(root_key);
830        }
831        Ok(root_keys.into_iter().collect::<Vec<_>>())
832    }
833
834    async fn delete_all(store_config: &Self::Config) -> Result<(), ScyllaDbStoreInternalError> {
835        let session = ScyllaDbClient::build_default_session(&store_config.uri).await?;
836        let statement = session
837            .prepare(format!("DROP KEYSPACE IF EXISTS {}", KEYSPACE))
838            .await?;
839
840        session
841            .execute_single_page(&statement, &[], PagingState::start())
842            .await?;
843        Ok(())
844    }
845
846    async fn exists(
847        config: &Self::Config,
848        namespace: &str,
849    ) -> Result<bool, ScyllaDbStoreInternalError> {
850        Self::check_namespace(namespace)?;
851        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
852
853        // We check the way the test can fail. It can fail in different ways.
854        let result = session
855            .prepare(format!(
856                "SELECT root_key FROM {}.\"{}\" LIMIT 1 ALLOW FILTERING",
857                KEYSPACE, namespace
858            ))
859            .await;
860
861        // The missing table translates into a very specific error that we matched
862        let miss_msg1 = format!("unconfigured table {}", namespace);
863        let miss_msg1 = miss_msg1.as_str();
864        let miss_msg2 = "Undefined name root_key in selection clause";
865        let miss_msg3 = format!("Keyspace {} does not exist", KEYSPACE);
866        let Err(error) = result else {
867            // If OK, then the table exists
868            return Ok(true);
869        };
870        let missing_table = match &error {
871            PrepareError::AllAttemptsFailed {
872                first_attempt: RequestAttemptError::DbError(db_error, msg),
873            } => {
874                if *db_error != DbError::Invalid {
875                    false
876                } else {
877                    msg.as_str() == miss_msg1
878                        || msg.as_str() == miss_msg2
879                        || msg.as_str() == miss_msg3
880                }
881            }
882            _ => false,
883        };
884        if missing_table {
885            Ok(false)
886        } else {
887            Err(ScyllaDbStoreInternalError::PrepareError(error))
888        }
889    }
890
891    async fn create(
892        config: &Self::Config,
893        namespace: &str,
894    ) -> Result<(), ScyllaDbStoreInternalError> {
895        Self::check_namespace(namespace)?;
896        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
897
898        // Create a keyspace if it doesn't exist
899        let statement = session
900            .prepare(format!(
901                "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ \
902                    'class' : 'NetworkTopologyStrategy', \
903                    'replication_factor' : {} \
904                }}",
905                KEYSPACE, config.replication_factor
906            ))
907            .await?;
908        session
909            .execute_single_page(&statement, &[], PagingState::start())
910            .await?;
911
912        // This explicitly sets a lot of default parameters for clarity and for making future
913        // changes easier.
914        let statement = session
915            .prepare(format!(
916                "CREATE TABLE {}.\"{}\" (\
917                    root_key blob, \
918                    k blob, \
919                    v blob, \
920                    PRIMARY KEY (root_key, k) \
921                ) \
922                WITH compaction = {{ \
923                    'class'            : 'SizeTieredCompactionStrategy', \
924                    'min_sstable_size' : 52428800, \
925                    'bucket_low'       : 0.5, \
926                    'bucket_high'      : 1.5, \
927                    'min_threshold'    : 4, \
928                    'max_threshold'    : 32 \
929                }} \
930                AND compression = {{ \
931                    'sstable_compression': 'LZ4Compressor', \
932                    'chunk_length_in_kb':'4' \
933                }} \
934                AND caching = {{ \
935                    'enabled': 'true' \
936                }}",
937                KEYSPACE, namespace
938            ))
939            .await?;
940        session
941            .execute_single_page(&statement, &[], PagingState::start())
942            .await?;
943        Ok(())
944    }
945
946    async fn delete(
947        config: &Self::Config,
948        namespace: &str,
949    ) -> Result<(), ScyllaDbStoreInternalError> {
950        Self::check_namespace(namespace)?;
951        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
952        let statement = session
953            .prepare(format!(
954                "DROP TABLE IF EXISTS {}.\"{}\";",
955                KEYSPACE, namespace
956            ))
957            .await?;
958        session
959            .execute_single_page(&statement, &[], PagingState::start())
960            .await?;
961        Ok(())
962    }
963}
964
965impl ScyllaDbStoreInternal {
966    /// Obtains the semaphore lock on the database if needed.
967    async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
968        match &self.semaphore {
969            None => None,
970            Some(count) => Some(count.acquire().await),
971        }
972    }
973}
974
975impl ScyllaDbDatabaseInternal {
976    fn check_namespace(namespace: &str) -> Result<(), ScyllaDbStoreInternalError> {
977        if !namespace.is_empty()
978            && namespace.len() <= 48
979            && namespace
980                .chars()
981                .all(|c| c.is_ascii_alphanumeric() || c == '_')
982        {
983            return Ok(());
984        }
985        Err(ScyllaDbStoreInternalError::InvalidNamespace)
986    }
987}
988
989#[cfg(with_testing)]
990impl TestKeyValueDatabase for JournalingKeyValueDatabase<ScyllaDbDatabaseInternal> {
991    async fn new_test_config() -> Result<ScyllaDbStoreInternalConfig, ScyllaDbStoreInternalError> {
992        // TODO(#4114): Read the port from an environment variable.
993        let uri = "localhost:9042".to_string();
994        Ok(ScyllaDbStoreInternalConfig {
995            uri,
996            max_concurrent_queries: Some(10),
997            max_stream_queries: 10,
998            replication_factor: 1,
999        })
1000    }
1001}
1002
1003/// The `ScyllaDbDatabase` composed type with metrics
1004#[cfg(with_metrics)]
1005pub type ScyllaDbDatabase = MeteredDatabase<
1006    LruCachingDatabase<
1007        MeteredDatabase<
1008            ValueSplittingDatabase<
1009                MeteredDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
1010            >,
1011        >,
1012    >,
1013>;
1014
1015/// The `ScyllaDbDatabase` composed type
1016#[cfg(not(with_metrics))]
1017pub type ScyllaDbDatabase = LruCachingDatabase<
1018    ValueSplittingDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
1019>;
1020
1021/// The `ScyllaDbStoreConfig` input type
1022pub type ScyllaDbStoreConfig = LruCachingConfig<ScyllaDbStoreInternalConfig>;
1023
1024/// The combined error type for the `ScyllaDbDatabase`.
1025pub type ScyllaDbStoreError = ValueSplittingError<ScyllaDbStoreInternalError>;