linera_views/backends/
scylla_db.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] for the ScyllaDB database.
5//!
6//! The current connection is done via a Session and a corresponding primary key called
7//! "namespace". The maximum number of concurrent queries is controlled by
8//! `max_concurrent_queries`.
9
10use std::{
11    collections::{BTreeSet, HashMap},
12    ops::Deref,
13    sync::Arc,
14};
15
16use async_lock::{Semaphore, SemaphoreGuard};
17use dashmap::{mapref::entry::Entry, DashMap};
18use futures::{future::join_all, StreamExt as _};
19use linera_base::ensure;
20use scylla::{
21    client::{
22        execution_profile::{ExecutionProfile, ExecutionProfileHandle},
23        session::Session,
24        session_builder::SessionBuilder,
25    },
26    deserialize::{DeserializationError, TypeCheckError},
27    errors::{
28        DbError, ExecutionError, IntoRowsResultError, NewSessionError, NextPageError, NextRowError,
29        PagerExecutionError, PrepareError, RequestAttemptError, RequestError, RowsError,
30    },
31    policies::{
32        load_balancing::{DefaultPolicy, LoadBalancingPolicy},
33        retry::DefaultRetryPolicy,
34    },
35    response::PagingState,
36    statement::{batch::BatchType, prepared::PreparedStatement, Consistency},
37};
38use serde::{Deserialize, Serialize};
39use thiserror::Error;
40
41#[cfg(with_metrics)]
42use crate::metering::MeteredStore;
43#[cfg(with_testing)]
44use crate::store::TestKeyValueStore;
45use crate::{
46    batch::UnorderedBatch,
47    common::{get_uleb128_size, get_upper_bound_option},
48    journaling::{DirectWritableKeyValueStore, JournalConsistencyError, JournalingKeyValueStore},
49    lru_caching::{LruCachingConfig, LruCachingStore},
50    store::{
51        AdminKeyValueStore, CommonStoreInternalConfig, KeyValueStoreError, ReadableKeyValueStore,
52        WithError,
53    },
54    value_splitting::{ValueSplittingError, ValueSplittingStore},
55    FutureSyncExt as _,
56};
57
58/// Fundamental constant in ScyllaDB: The maximum size of a multi keys query
59/// The limit is in reality 100. But we need one entry for the root key.
60const MAX_MULTI_KEYS: usize = 100 - 1;
61
62/// The maximal size of an operation on ScyllaDB seems to be 16 MiB
63/// https://www.scylladb.com/2019/03/27/best-practices-for-scylla-applications/
64/// "There is a hard limit at 16 MiB, and nothing bigger than that can arrive at once
65///  at the database at any particular time"
66/// So, we set up the maximal size of 16 MiB - 10 KiB for the values and 10 KiB for the keys
67/// We also arbitrarily decrease the size by 4000 bytes because an amount of size is
68/// taken internally by the database.
69const RAW_MAX_VALUE_SIZE: usize = 16 * 1024 * 1024 - 10 * 1024 - 4000;
70const MAX_KEY_SIZE: usize = 10 * 1024;
71const MAX_BATCH_TOTAL_SIZE: usize = RAW_MAX_VALUE_SIZE + MAX_KEY_SIZE;
72
73/// The `RAW_MAX_VALUE_SIZE` is the maximum size on the ScyllaDB storage.
74/// However, the value being written can also be the serialization of a `SimpleUnorderedBatch`
75/// Therefore the actual `MAX_VALUE_SIZE` is lower.
76/// At the maximum the key size is 1024 bytes (see below) and we pack just one entry.
77/// So if the key has 1024 bytes this gets us the inequality
78/// `1 + 1 + 1 + serialized_size(MAX_KEY_SIZE)? + serialized_size(x)? <= RAW_MAX_VALUE_SIZE`.
79/// and so this simplifies to `1 + 1 + 1 + (2 + 10240) + (4 + x) <= RAW_MAX_VALUE_SIZE`
80/// Note on the above formula:
81/// * We write 4 because `get_uleb128_size(RAW_MAX_VALUE_SIZE) = 4)`
82/// * We write `1 + 1 + 1`  because the `UnorderedBatch` has three entries.
83///
84/// This gets us to a maximal value of 16752727.
85const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
86    - MAX_KEY_SIZE
87    - get_uleb128_size(RAW_MAX_VALUE_SIZE)
88    - get_uleb128_size(MAX_KEY_SIZE)
89    - 3;
90
91/// The constant 14000 is an empirical constant that was found to be necessary
92/// to make the ScyllaDB system work. We have not been able to find this or
93/// a similar constant in the source code or the documentation.
94/// An experimental approach gets us that 14796 is the latest value that is
95/// correct.
96const MAX_BATCH_SIZE: usize = 5000;
97
98/// The keyspace to use for the ScyllaDB database.
99const KEYSPACE: &str = "kv";
100
101/// The client for ScyllaDB:
102/// * The session allows to pass queries
103/// * The namespace that is being assigned to the database
104/// * The prepared queries used for implementing the features of `KeyValueStore`.
105struct ScyllaDbClient {
106    session: Session,
107    namespace: String,
108    read_value: PreparedStatement,
109    contains_key: PreparedStatement,
110    write_batch_delete_prefix_unbounded: PreparedStatement,
111    write_batch_delete_prefix_bounded: PreparedStatement,
112    write_batch_deletion: PreparedStatement,
113    write_batch_insertion: PreparedStatement,
114    find_keys_by_prefix_unbounded: PreparedStatement,
115    find_keys_by_prefix_bounded: PreparedStatement,
116    find_key_values_by_prefix_unbounded: PreparedStatement,
117    find_key_values_by_prefix_bounded: PreparedStatement,
118    multi_key_values: DashMap<usize, PreparedStatement>,
119    multi_keys: DashMap<usize, PreparedStatement>,
120}
121
122impl ScyllaDbClient {
123    async fn new(session: Session, namespace: &str) -> Result<Self, ScyllaDbStoreInternalError> {
124        let namespace = namespace.to_string();
125        let read_value = session
126            .prepare(format!(
127                "SELECT v FROM {}.{} WHERE root_key = ? AND k = ?",
128                KEYSPACE, namespace
129            ))
130            .await?;
131
132        let contains_key = session
133            .prepare(format!(
134                "SELECT root_key FROM {}.{} WHERE root_key = ? AND k = ?",
135                KEYSPACE, namespace
136            ))
137            .await?;
138
139        let write_batch_delete_prefix_unbounded = session
140            .prepare(format!(
141                "DELETE FROM {}.{} WHERE root_key = ? AND k >= ?",
142                KEYSPACE, namespace
143            ))
144            .await?;
145
146        let write_batch_delete_prefix_bounded = session
147            .prepare(format!(
148                "DELETE FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?",
149                KEYSPACE, namespace
150            ))
151            .await?;
152
153        let write_batch_deletion = session
154            .prepare(format!(
155                "DELETE FROM {}.{} WHERE root_key = ? AND k = ?",
156                KEYSPACE, namespace
157            ))
158            .await?;
159
160        let write_batch_insertion = session
161            .prepare(format!(
162                "INSERT INTO {}.{} (root_key, k, v) VALUES (?, ?, ?)",
163                KEYSPACE, namespace
164            ))
165            .await?;
166
167        let find_keys_by_prefix_unbounded = session
168            .prepare(format!(
169                "SELECT k FROM {}.{} WHERE root_key = ? AND k >= ?",
170                KEYSPACE, namespace
171            ))
172            .await?;
173
174        let find_keys_by_prefix_bounded = session
175            .prepare(format!(
176                "SELECT k FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?",
177                KEYSPACE, namespace
178            ))
179            .await?;
180
181        let find_key_values_by_prefix_unbounded = session
182            .prepare(format!(
183                "SELECT k,v FROM {}.{} WHERE root_key = ? AND k >= ?",
184                KEYSPACE, namespace
185            ))
186            .await?;
187
188        let find_key_values_by_prefix_bounded = session
189            .prepare(format!(
190                "SELECT k,v FROM {}.{} WHERE root_key = ? AND k >= ? AND k < ?",
191                KEYSPACE, namespace
192            ))
193            .await?;
194
195        Ok(Self {
196            session,
197            namespace,
198            read_value,
199            contains_key,
200            write_batch_delete_prefix_unbounded,
201            write_batch_delete_prefix_bounded,
202            write_batch_deletion,
203            write_batch_insertion,
204            find_keys_by_prefix_unbounded,
205            find_keys_by_prefix_bounded,
206            find_key_values_by_prefix_unbounded,
207            find_key_values_by_prefix_bounded,
208            multi_key_values: DashMap::new(),
209            multi_keys: DashMap::new(),
210        })
211    }
212
213    fn build_default_policy() -> Arc<dyn LoadBalancingPolicy> {
214        DefaultPolicy::builder().token_aware(true).build()
215    }
216
217    fn build_default_execution_profile_handle(
218        policy: Arc<dyn LoadBalancingPolicy>,
219    ) -> ExecutionProfileHandle {
220        let default_profile = ExecutionProfile::builder()
221            .load_balancing_policy(policy)
222            .retry_policy(Arc::new(DefaultRetryPolicy::new()))
223            .consistency(Consistency::LocalQuorum)
224            .build();
225        default_profile.into_handle()
226    }
227
228    async fn build_default_session(uri: &str) -> Result<Session, ScyllaDbStoreInternalError> {
229        // This explicitly sets a lot of default parameters for clarity and for making future changes
230        // easier.
231        SessionBuilder::new()
232            .known_node(uri)
233            .default_execution_profile_handle(Self::build_default_execution_profile_handle(
234                Self::build_default_policy(),
235            ))
236            .build()
237            .boxed_sync()
238            .await
239            .map_err(Into::into)
240    }
241
242    async fn get_multi_key_values_statement(
243        &self,
244        num_markers: usize,
245    ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
246        let entry = self.multi_key_values.entry(num_markers);
247        match entry {
248            Entry::Occupied(entry) => Ok(entry.get().clone()),
249            Entry::Vacant(entry) => {
250                let markers = std::iter::repeat_n("?", num_markers)
251                    .collect::<Vec<_>>()
252                    .join(",");
253                let prepared_statement = self
254                    .session
255                    .prepare(format!(
256                        "SELECT k,v FROM {}.{} WHERE root_key = ? AND k IN ({})",
257                        KEYSPACE, self.namespace, markers
258                    ))
259                    .await?;
260                entry.insert(prepared_statement.clone());
261                Ok(prepared_statement)
262            }
263        }
264    }
265
266    async fn get_multi_keys_statement(
267        &self,
268        num_markers: usize,
269    ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
270        let entry = self.multi_keys.entry(num_markers);
271        match entry {
272            Entry::Occupied(entry) => Ok(entry.get().clone()),
273            Entry::Vacant(entry) => {
274                let markers = std::iter::repeat_n("?", num_markers)
275                    .collect::<Vec<_>>()
276                    .join(",");
277                let prepared_statement = self
278                    .session
279                    .prepare(format!(
280                        "SELECT k FROM {}.{} WHERE root_key = ? AND k IN ({})",
281                        KEYSPACE, self.namespace, markers
282                    ))
283                    .await?;
284                entry.insert(prepared_statement.clone());
285                Ok(prepared_statement)
286            }
287        }
288    }
289
290    fn check_key_size(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
291        ensure!(
292            key.len() <= MAX_KEY_SIZE,
293            ScyllaDbStoreInternalError::KeyTooLong
294        );
295        Ok(())
296    }
297
298    fn check_value_size(value: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
299        ensure!(
300            value.len() <= RAW_MAX_VALUE_SIZE,
301            ScyllaDbStoreInternalError::ValueTooLong
302        );
303        Ok(())
304    }
305
306    fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> {
307        ensure!(
308            batch.len() <= MAX_BATCH_SIZE,
309            ScyllaDbStoreInternalError::BatchTooLong
310        );
311        Ok(())
312    }
313
314    async fn read_value_internal(
315        &self,
316        root_key: &[u8],
317        key: Vec<u8>,
318    ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
319        Self::check_key_size(&key)?;
320        let session = &self.session;
321        // Read the value of a key
322        let values = (root_key.to_vec(), key);
323
324        let (result, _) = session
325            .execute_single_page(&self.read_value, &values, PagingState::start())
326            .await?;
327        let rows = result.into_rows_result()?;
328        let mut rows = rows.rows::<(Vec<u8>,)>()?;
329        Ok(match rows.next() {
330            Some(row) => Some(row?.0),
331            None => None,
332        })
333    }
334
335    fn get_occurrences_map(
336        keys: Vec<Vec<u8>>,
337    ) -> Result<HashMap<Vec<u8>, Vec<usize>>, ScyllaDbStoreInternalError> {
338        let mut map = HashMap::<Vec<u8>, Vec<usize>>::new();
339        for (i_key, key) in keys.into_iter().enumerate() {
340            Self::check_key_size(&key)?;
341            map.entry(key).or_default().push(i_key);
342        }
343        Ok(map)
344    }
345
346    async fn read_multi_values_internal(
347        &self,
348        root_key: &[u8],
349        keys: Vec<Vec<u8>>,
350    ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
351        let mut values = vec![None; keys.len()];
352        let map = Self::get_occurrences_map(keys)?;
353        let statement = self.get_multi_key_values_statement(map.len()).await?;
354        let mut inputs = vec![root_key.to_vec()];
355        inputs.extend(map.keys().cloned());
356        let mut rows = self
357            .session
358            .execute_iter(statement, &inputs)
359            .await?
360            .rows_stream::<(Vec<u8>, Vec<u8>)>()?;
361
362        while let Some(row) = rows.next().await {
363            let (key, value) = row?;
364            for i_key in &map[&key] {
365                values[*i_key] = Some(value.clone());
366            }
367        }
368        Ok(values)
369    }
370
371    async fn contains_keys_internal(
372        &self,
373        root_key: &[u8],
374        keys: Vec<Vec<u8>>,
375    ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
376        let mut values = vec![false; keys.len()];
377        let map = Self::get_occurrences_map(keys)?;
378        let statement = self.get_multi_keys_statement(map.len()).await?;
379        let mut inputs = vec![root_key.to_vec()];
380        inputs.extend(map.keys().cloned());
381        let mut rows = self
382            .session
383            .execute_iter(statement, &inputs)
384            .await?
385            .rows_stream::<(Vec<u8>,)>()?;
386
387        while let Some(row) = rows.next().await {
388            let (key,) = row?;
389            for i_key in &map[&key] {
390                values[*i_key] = true;
391            }
392        }
393
394        Ok(values)
395    }
396
397    async fn contains_key_internal(
398        &self,
399        root_key: &[u8],
400        key: Vec<u8>,
401    ) -> Result<bool, ScyllaDbStoreInternalError> {
402        Self::check_key_size(&key)?;
403        let session = &self.session;
404        // Read the value of a key
405        let values = (root_key.to_vec(), key);
406
407        let (result, _) = session
408            .execute_single_page(&self.contains_key, &values, PagingState::start())
409            .await?;
410        let rows = result.into_rows_result()?;
411        let mut rows = rows.rows::<(Vec<u8>,)>()?;
412        Ok(rows.next().is_some())
413    }
414
415    async fn write_batch_internal(
416        &self,
417        root_key: &[u8],
418        batch: UnorderedBatch,
419    ) -> Result<(), ScyllaDbStoreInternalError> {
420        let session = &self.session;
421        let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged);
422        let mut batch_values = Vec::new();
423        let query1 = &self.write_batch_delete_prefix_unbounded;
424        let query2 = &self.write_batch_delete_prefix_bounded;
425        Self::check_batch_len(&batch)?;
426        for key_prefix in batch.key_prefix_deletions {
427            Self::check_key_size(&key_prefix)?;
428            match get_upper_bound_option(&key_prefix) {
429                None => {
430                    let values = vec![root_key.to_vec(), key_prefix];
431                    batch_values.push(values);
432                    batch_query.append_statement(query1.clone());
433                }
434                Some(upper_bound) => {
435                    let values = vec![root_key.to_vec(), key_prefix, upper_bound];
436                    batch_values.push(values);
437                    batch_query.append_statement(query2.clone());
438                }
439            }
440        }
441        let query3 = &self.write_batch_deletion;
442        for key in batch.simple_unordered_batch.deletions {
443            Self::check_key_size(&key)?;
444            let values = vec![root_key.to_vec(), key];
445            batch_values.push(values);
446            batch_query.append_statement(query3.clone());
447        }
448        let query4 = &self.write_batch_insertion;
449        for (key, value) in batch.simple_unordered_batch.insertions {
450            Self::check_key_size(&key)?;
451            Self::check_value_size(&value)?;
452            let values = vec![root_key.to_vec(), key, value];
453            batch_values.push(values);
454            batch_query.append_statement(query4.clone());
455        }
456        session.batch(&batch_query, batch_values).await?;
457        Ok(())
458    }
459
460    async fn find_keys_by_prefix_internal(
461        &self,
462        root_key: &[u8],
463        key_prefix: Vec<u8>,
464    ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
465        Self::check_key_size(&key_prefix)?;
466        let session = &self.session;
467        // Read the value of a key
468        let len = key_prefix.len();
469        let query_unbounded = &self.find_keys_by_prefix_unbounded;
470        let query_bounded = &self.find_keys_by_prefix_bounded;
471        let rows = match get_upper_bound_option(&key_prefix) {
472            None => {
473                let values = (root_key.to_vec(), key_prefix.clone());
474                session
475                    .execute_iter(query_unbounded.clone(), values)
476                    .await?
477            }
478            Some(upper_bound) => {
479                let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
480                session.execute_iter(query_bounded.clone(), values).await?
481            }
482        };
483        let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
484        let mut keys = Vec::new();
485        while let Some(row) = rows.next().await {
486            let (key,) = row?;
487            let short_key = key[len..].to_vec();
488            keys.push(short_key);
489        }
490        Ok(keys)
491    }
492
493    async fn find_key_values_by_prefix_internal(
494        &self,
495        root_key: &[u8],
496        key_prefix: Vec<u8>,
497    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
498        Self::check_key_size(&key_prefix)?;
499        let session = &self.session;
500        // Read the value of a key
501        let len = key_prefix.len();
502        let query_unbounded = &self.find_key_values_by_prefix_unbounded;
503        let query_bounded = &self.find_key_values_by_prefix_bounded;
504        let rows = match get_upper_bound_option(&key_prefix) {
505            None => {
506                let values = (root_key.to_vec(), key_prefix.clone());
507                session
508                    .execute_iter(query_unbounded.clone(), values)
509                    .await?
510            }
511            Some(upper_bound) => {
512                let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
513                session.execute_iter(query_bounded.clone(), values).await?
514            }
515        };
516        let mut rows = rows.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
517        let mut key_values = Vec::new();
518        while let Some(row) = rows.next().await {
519            let (key, value) = row?;
520            let short_key = key[len..].to_vec();
521            key_values.push((short_key, value));
522        }
523        Ok(key_values)
524    }
525}
526
527/// The client itself and the keeping of the count of active connections.
528#[derive(Clone)]
529pub struct ScyllaDbStoreInternal {
530    store: Arc<ScyllaDbClient>,
531    semaphore: Option<Arc<Semaphore>>,
532    max_stream_queries: usize,
533    root_key: Vec<u8>,
534}
535
536/// The error type for [`ScyllaDbStoreInternal`]
537#[derive(Error, Debug)]
538pub enum ScyllaDbStoreInternalError {
539    /// BCS serialization error.
540    #[error(transparent)]
541    BcsError(#[from] bcs::Error),
542
543    /// The key must have at most `MAX_KEY_SIZE` bytes
544    #[error("The key must have at most MAX_KEY_SIZE")]
545    KeyTooLong,
546
547    /// The value must have at most `RAW_MAX_VALUE_SIZE` bytes
548    #[error("The value must have at most RAW_MAX_VALUE_SIZE")]
549    ValueTooLong,
550
551    /// A deserialization error in ScyllaDB
552    #[error(transparent)]
553    DeserializationError(#[from] DeserializationError),
554
555    /// A row error in ScyllaDB
556    #[error(transparent)]
557    RowsError(#[from] RowsError),
558
559    /// A type error in the accessed data in ScyllaDB
560    #[error(transparent)]
561    IntoRowsResultError(#[from] IntoRowsResultError),
562
563    /// A type check error in ScyllaDB
564    #[error(transparent)]
565    TypeCheckError(#[from] TypeCheckError),
566
567    /// A query error in ScyllaDB
568    #[error(transparent)]
569    PagerExecutionError(#[from] PagerExecutionError),
570
571    /// A query error in ScyllaDB
572    #[error(transparent)]
573    ScyllaDbNewSessionError(#[from] NewSessionError),
574
575    /// Namespace contains forbidden characters
576    #[error("Namespace contains forbidden characters")]
577    InvalidNamespace,
578
579    /// The journal is not coherent
580    #[error(transparent)]
581    JournalConsistencyError(#[from] JournalConsistencyError),
582
583    /// The batch is too long to be written
584    #[error("The batch is too long to be written")]
585    BatchTooLong,
586
587    /// A prepare error in ScyllaDB
588    #[error(transparent)]
589    PrepareError(#[from] PrepareError),
590
591    /// An execution error in ScyllaDB
592    #[error(transparent)]
593    ExecutionError(#[from] ExecutionError),
594
595    /// A next row error in ScyllaDB
596    #[error(transparent)]
597    NextRowError(#[from] NextRowError),
598}
599
600impl KeyValueStoreError for ScyllaDbStoreInternalError {
601    const BACKEND: &'static str = "scylla_db";
602}
603
604impl WithError for ScyllaDbStoreInternal {
605    type Error = ScyllaDbStoreInternalError;
606}
607
608impl ReadableKeyValueStore for ScyllaDbStoreInternal {
609    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
610    type Keys = Vec<Vec<u8>>;
611    type KeyValues = Vec<(Vec<u8>, Vec<u8>)>;
612
613    fn max_stream_queries(&self) -> usize {
614        self.max_stream_queries
615    }
616
617    async fn read_value_bytes(
618        &self,
619        key: &[u8],
620    ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
621        let store = self.store.deref();
622        let _guard = self.acquire().await;
623        store
624            .read_value_internal(&self.root_key, key.to_vec())
625            .await
626    }
627
628    async fn contains_key(&self, key: &[u8]) -> Result<bool, ScyllaDbStoreInternalError> {
629        let store = self.store.deref();
630        let _guard = self.acquire().await;
631        store
632            .contains_key_internal(&self.root_key, key.to_vec())
633            .await
634    }
635
636    async fn contains_keys(
637        &self,
638        keys: Vec<Vec<u8>>,
639    ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
640        if keys.is_empty() {
641            return Ok(Vec::new());
642        }
643        let store = self.store.deref();
644        let _guard = self.acquire().await;
645        let handles = keys
646            .chunks(MAX_MULTI_KEYS)
647            .map(|keys| store.contains_keys_internal(&self.root_key, keys.to_vec()));
648        let results: Vec<_> = join_all(handles)
649            .await
650            .into_iter()
651            .collect::<Result<_, _>>()?;
652        Ok(results.into_iter().flatten().collect())
653    }
654
655    async fn read_multi_values_bytes(
656        &self,
657        keys: Vec<Vec<u8>>,
658    ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
659        if keys.is_empty() {
660            return Ok(Vec::new());
661        }
662        let store = self.store.deref();
663        let _guard = self.acquire().await;
664        let handles = keys
665            .chunks(MAX_MULTI_KEYS)
666            .map(|keys| store.read_multi_values_internal(&self.root_key, keys.to_vec()));
667        let results: Vec<_> = join_all(handles)
668            .await
669            .into_iter()
670            .collect::<Result<_, _>>()?;
671        Ok(results.into_iter().flatten().collect())
672    }
673
674    async fn find_keys_by_prefix(
675        &self,
676        key_prefix: &[u8],
677    ) -> Result<Self::Keys, ScyllaDbStoreInternalError> {
678        let store = self.store.deref();
679        let _guard = self.acquire().await;
680        store
681            .find_keys_by_prefix_internal(&self.root_key, key_prefix.to_vec())
682            .await
683    }
684
685    async fn find_key_values_by_prefix(
686        &self,
687        key_prefix: &[u8],
688    ) -> Result<Self::KeyValues, ScyllaDbStoreInternalError> {
689        let store = self.store.deref();
690        let _guard = self.acquire().await;
691        store
692            .find_key_values_by_prefix_internal(&self.root_key, key_prefix.to_vec())
693            .await
694    }
695}
696
697impl DirectWritableKeyValueStore for ScyllaDbStoreInternal {
698    const MAX_BATCH_SIZE: usize = MAX_BATCH_SIZE;
699    const MAX_BATCH_TOTAL_SIZE: usize = MAX_BATCH_TOTAL_SIZE;
700    const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
701
702    // ScyllaDB cannot take a `crate::batch::Batch` directly. Indeed, if a delete is
703    // followed by a write, then the delete takes priority. See the sentence "The first
704    // tie-breaking rule when two cells have the same write timestamp is that dead cells
705    // win over live cells" from
706    // https://github.com/scylladb/scylladb/blob/master/docs/dev/timestamp-conflict-resolution.md
707    type Batch = UnorderedBatch;
708
709    async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> {
710        let store = self.store.deref();
711        let _guard = self.acquire().await;
712        store.write_batch_internal(&self.root_key, batch).await
713    }
714}
715
716// ScyllaDB requires that the keys are non-empty.
717fn get_big_root_key(root_key: &[u8]) -> Vec<u8> {
718    let mut big_key = vec![0];
719    big_key.extend(root_key);
720    big_key
721}
722
723/// The type for building a new ScyllaDB Key Value Store
724#[derive(Clone, Debug, Deserialize, Serialize)]
725pub struct ScyllaDbStoreInternalConfig {
726    /// The URL to which the requests have to be sent
727    pub uri: String,
728    /// The common configuration of the key value store
729    common_config: CommonStoreInternalConfig,
730}
731
732impl AdminKeyValueStore for ScyllaDbStoreInternal {
733    type Config = ScyllaDbStoreInternalConfig;
734
735    fn get_name() -> String {
736        "scylladb internal".to_string()
737    }
738
739    async fn connect(
740        config: &Self::Config,
741        namespace: &str,
742    ) -> Result<Self, ScyllaDbStoreInternalError> {
743        Self::check_namespace(namespace)?;
744        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
745        let store = ScyllaDbClient::new(session, namespace).await?;
746        let store = Arc::new(store);
747        let semaphore = config
748            .common_config
749            .max_concurrent_queries
750            .map(|n| Arc::new(Semaphore::new(n)));
751        let max_stream_queries = config.common_config.max_stream_queries;
752        let root_key = get_big_root_key(&[]);
753        Ok(Self {
754            store,
755            semaphore,
756            max_stream_queries,
757            root_key,
758        })
759    }
760
761    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, ScyllaDbStoreInternalError> {
762        let store = self.store.clone();
763        let semaphore = self.semaphore.clone();
764        let max_stream_queries = self.max_stream_queries;
765        let root_key = get_big_root_key(root_key);
766        Ok(Self {
767            store,
768            semaphore,
769            max_stream_queries,
770            root_key,
771        })
772    }
773
774    async fn list_all(config: &Self::Config) -> Result<Vec<String>, ScyllaDbStoreInternalError> {
775        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
776        let statement = session
777            .prepare(format!("DESCRIBE KEYSPACE {}", KEYSPACE))
778            .await?;
779        let result = session.execute_iter(statement, &[]).await;
780        let miss_msg = format!("'{}' not found in keyspaces", KEYSPACE);
781        let result = match result {
782            Ok(result) => result,
783            Err(error) => {
784                let invalid_or_keyspace_not_found = match &error {
785                    PagerExecutionError::NextPageError(NextPageError::RequestFailure(
786                        RequestError::LastAttemptError(RequestAttemptError::DbError(db_error, msg)),
787                    )) => *db_error == DbError::Invalid && msg.as_str() == miss_msg,
788                    _ => false,
789                };
790                if invalid_or_keyspace_not_found {
791                    return Ok(Vec::new());
792                } else {
793                    return Err(ScyllaDbStoreInternalError::PagerExecutionError(error));
794                }
795            }
796        };
797        let mut namespaces = Vec::new();
798        let mut rows_stream = result.rows_stream::<(String, String, String, String)>()?;
799        while let Some(row) = rows_stream.next().await {
800            let (_, object_kind, name, _) = row?;
801            if object_kind == "table" {
802                namespaces.push(name);
803            }
804        }
805        Ok(namespaces)
806    }
807
808    async fn list_root_keys(
809        config: &Self::Config,
810        namespace: &str,
811    ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
812        Self::check_namespace(namespace)?;
813        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
814        let statement = session
815            .prepare(format!(
816                "SELECT root_key FROM {}.{} ALLOW FILTERING",
817                KEYSPACE, namespace
818            ))
819            .await?;
820
821        // Execute the query
822        let rows = session.execute_iter(statement, &[]).await?;
823        let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
824        let mut root_keys = BTreeSet::new();
825        while let Some(row) = rows.next().await {
826            let (root_key,) = row?;
827            let root_key = root_key[1..].to_vec();
828            root_keys.insert(root_key);
829        }
830        Ok(root_keys.into_iter().collect::<Vec<_>>())
831    }
832
833    async fn delete_all(store_config: &Self::Config) -> Result<(), ScyllaDbStoreInternalError> {
834        let session = ScyllaDbClient::build_default_session(&store_config.uri).await?;
835        let statement = session
836            .prepare(format!("DROP KEYSPACE IF EXISTS {}", KEYSPACE))
837            .await?;
838
839        session
840            .execute_single_page(&statement, &[], PagingState::start())
841            .await?;
842        Ok(())
843    }
844
845    async fn exists(
846        config: &Self::Config,
847        namespace: &str,
848    ) -> Result<bool, ScyllaDbStoreInternalError> {
849        Self::check_namespace(namespace)?;
850        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
851
852        // We check the way the test can fail. It can fail in different ways.
853        let result = session
854            .prepare(format!(
855                "SELECT root_key FROM {}.{} LIMIT 1 ALLOW FILTERING",
856                KEYSPACE, namespace
857            ))
858            .await;
859
860        // The missing table translates into a very specific error that we matched
861        let miss_msg1 = format!("unconfigured table {}", namespace);
862        let miss_msg1 = miss_msg1.as_str();
863        let miss_msg2 = "Undefined name root_key in selection clause";
864        let miss_msg3 = format!("Keyspace {} does not exist", KEYSPACE);
865        let Err(error) = result else {
866            // If OK, then the table exists
867            return Ok(true);
868        };
869        let missing_table = match &error {
870            PrepareError::AllAttemptsFailed {
871                first_attempt: RequestAttemptError::DbError(db_error, msg),
872            } => {
873                if *db_error != DbError::Invalid {
874                    false
875                } else {
876                    msg.as_str() == miss_msg1
877                        || msg.as_str() == miss_msg2
878                        || msg.as_str() == miss_msg3
879                }
880            }
881            _ => false,
882        };
883        if missing_table {
884            Ok(false)
885        } else {
886            Err(ScyllaDbStoreInternalError::PrepareError(error))
887        }
888    }
889
890    async fn create(
891        config: &Self::Config,
892        namespace: &str,
893    ) -> Result<(), ScyllaDbStoreInternalError> {
894        Self::check_namespace(namespace)?;
895        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
896
897        // Create a keyspace if it doesn't exist
898        let statement = session
899            .prepare(format!(
900                "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ \
901                    'class' : 'NetworkTopologyStrategy', \
902                    'replication_factor' : {} \
903                }}",
904                KEYSPACE, config.common_config.replication_factor
905            ))
906            .await?;
907        session
908            .execute_single_page(&statement, &[], PagingState::start())
909            .await?;
910
911        // This explicitly sets a lot of default parameters for clarity and for making future
912        // changes easier.
913        let statement = session
914            .prepare(format!(
915                "CREATE TABLE {}.{} (\
916                    root_key blob, \
917                    k blob, \
918                    v blob, \
919                    PRIMARY KEY (root_key, k) \
920                ) \
921                WITH compaction = {{ \
922                    'class'            : 'SizeTieredCompactionStrategy', \
923                    'min_sstable_size' : 52428800, \
924                    'bucket_low'       : 0.5, \
925                    'bucket_high'      : 1.5, \
926                    'min_threshold'    : 4, \
927                    'max_threshold'    : 32 \
928                }} \
929                AND compression = {{ \
930                    'sstable_compression': 'LZ4Compressor', \
931                    'chunk_length_in_kb':'4' \
932                }} \
933                AND caching = {{ \
934                    'enabled': 'true' \
935                }}",
936                KEYSPACE, namespace
937            ))
938            .await?;
939        session
940            .execute_single_page(&statement, &[], PagingState::start())
941            .await?;
942        Ok(())
943    }
944
945    async fn delete(
946        config: &Self::Config,
947        namespace: &str,
948    ) -> Result<(), ScyllaDbStoreInternalError> {
949        Self::check_namespace(namespace)?;
950        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
951        let statement = session
952            .prepare(format!("DROP TABLE IF EXISTS {}.{};", KEYSPACE, namespace))
953            .await?;
954        session
955            .execute_single_page(&statement, &[], PagingState::start())
956            .await?;
957        Ok(())
958    }
959}
960
961impl ScyllaDbStoreInternal {
962    /// Obtains the semaphore lock on the database if needed.
963    async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
964        match &self.semaphore {
965            None => None,
966            Some(count) => Some(count.acquire().await),
967        }
968    }
969
970    fn check_namespace(namespace: &str) -> Result<(), ScyllaDbStoreInternalError> {
971        if !namespace.is_empty()
972            && namespace.len() <= 48
973            && namespace
974                .chars()
975                .all(|c| c.is_ascii_alphanumeric() || c == '_')
976        {
977            return Ok(());
978        }
979        Err(ScyllaDbStoreInternalError::InvalidNamespace)
980    }
981}
982
983/// We limit the number of connections that can be done for tests.
984#[cfg(with_testing)]
985const TEST_SCYLLA_DB_MAX_CONCURRENT_QUERIES: usize = 10;
986
987/// The number of connections in the stream is limited for tests.
988#[cfg(with_testing)]
989const TEST_SCYLLA_DB_MAX_STREAM_QUERIES: usize = 10;
990
991#[cfg(with_testing)]
992impl TestKeyValueStore for JournalingKeyValueStore<ScyllaDbStoreInternal> {
993    async fn new_test_config() -> Result<ScyllaDbStoreInternalConfig, ScyllaDbStoreInternalError> {
994        let uri = "localhost:9042".to_string();
995        let common_config = CommonStoreInternalConfig {
996            max_concurrent_queries: Some(TEST_SCYLLA_DB_MAX_CONCURRENT_QUERIES),
997            max_stream_queries: TEST_SCYLLA_DB_MAX_STREAM_QUERIES,
998            replication_factor: 1,
999        };
1000        Ok(ScyllaDbStoreInternalConfig { uri, common_config })
1001    }
1002}
1003
1004/// The `ScyllaDbStore` composed type with metrics
1005#[cfg(with_metrics)]
1006pub type ScyllaDbStore = MeteredStore<
1007    LruCachingStore<
1008        MeteredStore<
1009            ValueSplittingStore<MeteredStore<JournalingKeyValueStore<ScyllaDbStoreInternal>>>,
1010        >,
1011    >,
1012>;
1013
1014/// The `ScyllaDbStore` composed type
1015#[cfg(not(with_metrics))]
1016pub type ScyllaDbStore =
1017    LruCachingStore<ValueSplittingStore<JournalingKeyValueStore<ScyllaDbStoreInternal>>>;
1018
1019/// The `ScyllaDbStoreConfig` input type
1020pub type ScyllaDbStoreConfig = LruCachingConfig<ScyllaDbStoreInternalConfig>;
1021
1022impl ScyllaDbStoreConfig {
1023    /// Creates a `ScyllaDbStoreConfig` from the inputs.
1024    pub fn new(uri: String, common_config: crate::store::CommonStoreConfig) -> ScyllaDbStoreConfig {
1025        let inner_config = ScyllaDbStoreInternalConfig {
1026            uri,
1027            common_config: common_config.reduced(),
1028        };
1029        ScyllaDbStoreConfig {
1030            inner_config,
1031            storage_cache_config: common_config.storage_cache_config,
1032        }
1033    }
1034}
1035
1036/// The combined error type for the `ScyllaDbStore`.
1037pub type ScyllaDbStoreError = ValueSplittingError<ScyllaDbStoreInternalError>;