linera_views/backends/
scylla_db.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] for the ScyllaDB database.
5//!
6//! The current connection is done via a Session and a corresponding primary key called
7//! "namespace". The maximum number of concurrent queries is controlled by
8//! `max_concurrent_queries`.
9
10use std::{
11    collections::{BTreeSet, HashMap},
12    ops::Deref,
13    sync::Arc,
14};
15
16use async_lock::{Semaphore, SemaphoreGuard};
17use futures::{future::join_all, StreamExt as _};
18use linera_base::{ensure, util::future::FutureSyncExt as _};
19use scylla::{
20    client::{
21        execution_profile::{ExecutionProfile, ExecutionProfileHandle},
22        session::Session,
23        session_builder::SessionBuilder,
24    },
25    deserialize::{DeserializationError, TypeCheckError},
26    errors::{
27        DbError, ExecutionError, IntoRowsResultError, NewSessionError, NextPageError, NextRowError,
28        PagerExecutionError, PrepareError, RequestAttemptError, RequestError, RowsError,
29    },
30    policies::{
31        load_balancing::{DefaultPolicy, LoadBalancingPolicy},
32        retry::DefaultRetryPolicy,
33    },
34    response::PagingState,
35    statement::{batch::BatchType, prepared::PreparedStatement, Consistency},
36};
37use serde::{Deserialize, Serialize};
38use thiserror::Error;
39
40#[cfg(with_metrics)]
41use crate::metering::MeteredDatabase;
42#[cfg(with_testing)]
43use crate::store::TestKeyValueDatabase;
44use crate::{
45    batch::UnorderedBatch,
46    common::{get_uleb128_size, get_upper_bound_option},
47    journaling::{JournalingError, JournalingKeyValueDatabase},
48    lru_caching::{LruCachingConfig, LruCachingDatabase},
49    store::{
50        DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
51        WithError,
52    },
53    value_splitting::{ValueSplittingDatabase, ValueSplittingError},
54};
55
56/// Fundamental constant in ScyllaDB: The maximum size of a multi keys query
57/// The limit is in reality 100. But we need one entry for the root key.
58const MAX_MULTI_KEYS: usize = 100 - 1;
59
60/// The maximal size of an operation on ScyllaDB seems to be 16 MiB
61/// https://www.scylladb.com/2019/03/27/best-practices-for-scylla-applications/
62/// "There is a hard limit at 16 MiB, and nothing bigger than that can arrive at once
63///  at the database at any particular time"
64/// So, we set up the maximal size of 16 MiB - 10 KiB for the values and 10 KiB for the keys
65/// We also arbitrarily decrease the size by 4000 bytes because an amount of size is
66/// taken internally by the database.
67const RAW_MAX_VALUE_SIZE: usize = 16 * 1024 * 1024 - 10 * 1024 - 4000;
68const MAX_KEY_SIZE: usize = 10 * 1024;
69const MAX_BATCH_TOTAL_SIZE: usize = RAW_MAX_VALUE_SIZE + MAX_KEY_SIZE;
70
71/// The `RAW_MAX_VALUE_SIZE` is the maximum size on the ScyllaDB storage.
72/// However, the value being written can also be the serialization of a `SimpleUnorderedBatch`
73/// Therefore the actual `MAX_VALUE_SIZE` is lower.
74/// At the maximum the key size is 1024 bytes (see below) and we pack just one entry.
75/// So if the key has 1024 bytes this gets us the inequality
76/// `1 + 1 + 1 + serialized_size(MAX_KEY_SIZE)? + serialized_size(x)? <= RAW_MAX_VALUE_SIZE`.
77/// and so this simplifies to `1 + 1 + 1 + (2 + 10240) + (4 + x) <= RAW_MAX_VALUE_SIZE`
78/// Note on the above formula:
79/// * We write 4 because `get_uleb128_size(RAW_MAX_VALUE_SIZE) = 4)`
80/// * We write `1 + 1 + 1`  because the `UnorderedBatch` has three entries.
81///
82/// This gets us to a maximal value of 16752727.
83const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
84    - MAX_KEY_SIZE
85    - get_uleb128_size(RAW_MAX_VALUE_SIZE)
86    - get_uleb128_size(MAX_KEY_SIZE)
87    - 3;
88
89/// The constant 14000 is an empirical constant that was found to be necessary
90/// to make the ScyllaDB system work. We have not been able to find this or
91/// a similar constant in the source code or the documentation.
92/// An experimental approach gets us that 14796 is the latest value that is
93/// correct.
94const MAX_BATCH_SIZE: usize = 5000;
95
96/// The keyspace to use for the ScyllaDB database.
97const KEYSPACE: &str = "kv";
98
99/// The client for ScyllaDB:
100/// * The session allows to pass queries
101/// * The namespace that is being assigned to the database
102/// * The prepared queries used for implementing the features of `KeyValueStore`.
103struct ScyllaDbClient {
104    session: Session,
105    namespace: String,
106    read_value: PreparedStatement,
107    contains_key: PreparedStatement,
108    write_batch_delete_prefix_unbounded: PreparedStatement,
109    write_batch_delete_prefix_bounded: PreparedStatement,
110    write_batch_deletion: PreparedStatement,
111    write_batch_insertion: PreparedStatement,
112    find_keys_by_prefix_unbounded: PreparedStatement,
113    find_keys_by_prefix_bounded: PreparedStatement,
114    find_key_values_by_prefix_unbounded: PreparedStatement,
115    find_key_values_by_prefix_bounded: PreparedStatement,
116    multi_key_values: papaya::HashMap<usize, PreparedStatement>,
117    multi_keys: papaya::HashMap<usize, PreparedStatement>,
118}
119
120impl ScyllaDbClient {
121    async fn new(session: Session, namespace: &str) -> Result<Self, ScyllaDbStoreInternalError> {
122        let namespace = namespace.to_string();
123        let read_value = session
124            .prepare(format!(
125                "SELECT v FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k = ?"
126            ))
127            .await?;
128
129        let contains_key = session
130            .prepare(format!(
131                "SELECT root_key FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k = ?"
132            ))
133            .await?;
134
135        let write_batch_delete_prefix_unbounded = session
136            .prepare(format!(
137                "DELETE FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ?"
138            ))
139            .await?;
140
141        let write_batch_delete_prefix_bounded = session
142            .prepare(format!(
143                "DELETE FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ? AND k < ?"
144            ))
145            .await?;
146
147        let write_batch_deletion = session
148            .prepare(format!(
149                "DELETE FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k = ?"
150            ))
151            .await?;
152
153        let write_batch_insertion = session
154            .prepare(format!(
155                "INSERT INTO {KEYSPACE}.\"{namespace}\" (root_key, k, v) VALUES (?, ?, ?)"
156            ))
157            .await?;
158
159        let find_keys_by_prefix_unbounded = session
160            .prepare(format!(
161                "SELECT k FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ?"
162            ))
163            .await?;
164
165        let find_keys_by_prefix_bounded = session
166            .prepare(format!(
167                "SELECT k FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ? AND k < ?"
168            ))
169            .await?;
170
171        let find_key_values_by_prefix_unbounded = session
172            .prepare(format!(
173                "SELECT k,v FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ?"
174            ))
175            .await?;
176
177        let find_key_values_by_prefix_bounded = session
178            .prepare(format!(
179                "SELECT k,v FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ? AND k < ?"
180            ))
181            .await?;
182
183        Ok(Self {
184            session,
185            namespace,
186            read_value,
187            contains_key,
188            write_batch_delete_prefix_unbounded,
189            write_batch_delete_prefix_bounded,
190            write_batch_deletion,
191            write_batch_insertion,
192            find_keys_by_prefix_unbounded,
193            find_keys_by_prefix_bounded,
194            find_key_values_by_prefix_unbounded,
195            find_key_values_by_prefix_bounded,
196            multi_key_values: papaya::HashMap::new(),
197            multi_keys: papaya::HashMap::new(),
198        })
199    }
200
201    fn build_default_policy() -> Arc<dyn LoadBalancingPolicy> {
202        DefaultPolicy::builder().token_aware(true).build()
203    }
204
205    fn build_default_execution_profile_handle(
206        policy: Arc<dyn LoadBalancingPolicy>,
207    ) -> ExecutionProfileHandle {
208        let default_profile = ExecutionProfile::builder()
209            .load_balancing_policy(policy)
210            .retry_policy(Arc::new(DefaultRetryPolicy::new()))
211            .consistency(Consistency::LocalQuorum)
212            .build();
213        default_profile.into_handle()
214    }
215
216    async fn build_default_session(uri: &str) -> Result<Session, ScyllaDbStoreInternalError> {
217        // This explicitly sets a lot of default parameters for clarity and for making future changes
218        // easier.
219        SessionBuilder::new()
220            .known_node(uri)
221            .default_execution_profile_handle(Self::build_default_execution_profile_handle(
222                Self::build_default_policy(),
223            ))
224            .build()
225            .boxed_sync()
226            .await
227            .map_err(Into::into)
228    }
229
230    async fn get_multi_key_values_statement(
231        &self,
232        num_markers: usize,
233    ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
234        if let Some(prepared_statement) = self.multi_key_values.pin().get(&num_markers) {
235            return Ok(prepared_statement.clone());
236        }
237        let markers = std::iter::repeat_n("?", num_markers)
238            .collect::<Vec<_>>()
239            .join(",");
240        let prepared_statement = self
241            .session
242            .prepare(format!(
243                "SELECT k,v FROM {}.\"{}\" WHERE root_key = ? AND k IN ({})",
244                KEYSPACE, self.namespace, markers
245            ))
246            .await?;
247        self.multi_key_values
248            .pin()
249            .insert(num_markers, prepared_statement.clone());
250        Ok(prepared_statement)
251    }
252
253    async fn get_multi_keys_statement(
254        &self,
255        num_markers: usize,
256    ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
257        if let Some(prepared_statement) = self.multi_keys.pin().get(&num_markers) {
258            return Ok(prepared_statement.clone());
259        };
260        let markers = std::iter::repeat_n("?", num_markers)
261            .collect::<Vec<_>>()
262            .join(",");
263        let prepared_statement = self
264            .session
265            .prepare(format!(
266                "SELECT k FROM {}.\"{}\" WHERE root_key = ? AND k IN ({})",
267                KEYSPACE, self.namespace, markers
268            ))
269            .await?;
270        self.multi_keys
271            .pin()
272            .insert(num_markers, prepared_statement.clone());
273        Ok(prepared_statement)
274    }
275
276    fn check_key_size(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
277        ensure!(
278            key.len() <= MAX_KEY_SIZE,
279            ScyllaDbStoreInternalError::KeyTooLong
280        );
281        Ok(())
282    }
283
284    fn check_value_size(value: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
285        ensure!(
286            value.len() <= RAW_MAX_VALUE_SIZE,
287            ScyllaDbStoreInternalError::ValueTooLong
288        );
289        Ok(())
290    }
291
292    fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> {
293        ensure!(
294            batch.len() <= MAX_BATCH_SIZE,
295            ScyllaDbStoreInternalError::BatchTooLong
296        );
297        Ok(())
298    }
299
300    async fn read_value_internal(
301        &self,
302        root_key: &[u8],
303        key: Vec<u8>,
304    ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
305        Self::check_key_size(&key)?;
306        let session = &self.session;
307        // Read the value of a key
308        let values = (root_key.to_vec(), key);
309
310        let (result, _) = session
311            .execute_single_page(&self.read_value, &values, PagingState::start())
312            .await?;
313        let rows = result.into_rows_result()?;
314        let mut rows = rows.rows::<(Vec<u8>,)>()?;
315        Ok(match rows.next() {
316            Some(row) => Some(row?.0),
317            None => None,
318        })
319    }
320
321    fn get_occurrences_map(
322        keys: Vec<Vec<u8>>,
323    ) -> Result<HashMap<Vec<u8>, Vec<usize>>, ScyllaDbStoreInternalError> {
324        let mut map = HashMap::<Vec<u8>, Vec<usize>>::new();
325        for (i_key, key) in keys.into_iter().enumerate() {
326            Self::check_key_size(&key)?;
327            map.entry(key).or_default().push(i_key);
328        }
329        Ok(map)
330    }
331
332    async fn read_multi_values_internal(
333        &self,
334        root_key: &[u8],
335        keys: Vec<Vec<u8>>,
336    ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
337        let mut values = vec![None; keys.len()];
338        let map = Self::get_occurrences_map(keys)?;
339        let statement = self.get_multi_key_values_statement(map.len()).await?;
340        let mut inputs = vec![root_key.to_vec()];
341        inputs.extend(map.keys().cloned());
342        let mut rows = Box::pin(self.session.execute_iter(statement, &inputs))
343            .await?
344            .rows_stream::<(Vec<u8>, Vec<u8>)>()?;
345
346        while let Some(row) = rows.next().await {
347            let (key, value) = row?;
348            if let Some((&last, rest)) = map[&key].split_last() {
349                for position in rest {
350                    values[*position] = Some(value.clone());
351                }
352                values[last] = Some(value);
353            }
354        }
355        Ok(values)
356    }
357
358    async fn contains_keys_internal(
359        &self,
360        root_key: &[u8],
361        keys: Vec<Vec<u8>>,
362    ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
363        let mut values = vec![false; keys.len()];
364        let map = Self::get_occurrences_map(keys)?;
365        let statement = self.get_multi_keys_statement(map.len()).await?;
366        let mut inputs = vec![root_key.to_vec()];
367        inputs.extend(map.keys().cloned());
368        let mut rows = Box::pin(self.session.execute_iter(statement, &inputs))
369            .await?
370            .rows_stream::<(Vec<u8>,)>()?;
371
372        while let Some(row) = rows.next().await {
373            let (key,) = row?;
374            for i_key in &map[&key] {
375                values[*i_key] = true;
376            }
377        }
378
379        Ok(values)
380    }
381
382    async fn contains_key_internal(
383        &self,
384        root_key: &[u8],
385        key: Vec<u8>,
386    ) -> Result<bool, ScyllaDbStoreInternalError> {
387        Self::check_key_size(&key)?;
388        let session = &self.session;
389        // Read the value of a key
390        let values = (root_key.to_vec(), key);
391
392        let (result, _) = session
393            .execute_single_page(&self.contains_key, &values, PagingState::start())
394            .await?;
395        let rows = result.into_rows_result()?;
396        let mut rows = rows.rows::<(Vec<u8>,)>()?;
397        Ok(rows.next().is_some())
398    }
399
400    async fn write_batch_internal(
401        &self,
402        root_key: &[u8],
403        batch: UnorderedBatch,
404    ) -> Result<(), ScyllaDbStoreInternalError> {
405        let session = &self.session;
406        let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged);
407        let mut batch_values = Vec::new();
408        let query1 = &self.write_batch_delete_prefix_unbounded;
409        let query2 = &self.write_batch_delete_prefix_bounded;
410        Self::check_batch_len(&batch)?;
411        for key_prefix in batch.key_prefix_deletions {
412            Self::check_key_size(&key_prefix)?;
413            match get_upper_bound_option(&key_prefix) {
414                None => {
415                    let values = vec![root_key.to_vec(), key_prefix];
416                    batch_values.push(values);
417                    batch_query.append_statement(query1.clone());
418                }
419                Some(upper_bound) => {
420                    let values = vec![root_key.to_vec(), key_prefix, upper_bound];
421                    batch_values.push(values);
422                    batch_query.append_statement(query2.clone());
423                }
424            }
425        }
426        let query3 = &self.write_batch_deletion;
427        for key in batch.simple_unordered_batch.deletions {
428            Self::check_key_size(&key)?;
429            let values = vec![root_key.to_vec(), key];
430            batch_values.push(values);
431            batch_query.append_statement(query3.clone());
432        }
433        let query4 = &self.write_batch_insertion;
434        for (key, value) in batch.simple_unordered_batch.insertions {
435            Self::check_key_size(&key)?;
436            Self::check_value_size(&value)?;
437            let values = vec![root_key.to_vec(), key, value];
438            batch_values.push(values);
439            batch_query.append_statement(query4.clone());
440        }
441        session.batch(&batch_query, batch_values).await?;
442        Ok(())
443    }
444
445    async fn find_keys_by_prefix_internal(
446        &self,
447        root_key: &[u8],
448        key_prefix: Vec<u8>,
449    ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
450        Self::check_key_size(&key_prefix)?;
451        let session = &self.session;
452        // Read the value of a key
453        let len = key_prefix.len();
454        let query_unbounded = &self.find_keys_by_prefix_unbounded;
455        let query_bounded = &self.find_keys_by_prefix_bounded;
456        let rows = match get_upper_bound_option(&key_prefix) {
457            None => {
458                let values = (root_key.to_vec(), key_prefix.clone());
459                Box::pin(session.execute_iter(query_unbounded.clone(), values)).await?
460            }
461            Some(upper_bound) => {
462                let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
463                Box::pin(session.execute_iter(query_bounded.clone(), values)).await?
464            }
465        };
466        let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
467        let mut keys = Vec::new();
468        while let Some(row) = rows.next().await {
469            let (key,) = row?;
470            let short_key = key[len..].to_vec();
471            keys.push(short_key);
472        }
473        Ok(keys)
474    }
475
476    async fn find_key_values_by_prefix_internal(
477        &self,
478        root_key: &[u8],
479        key_prefix: Vec<u8>,
480    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
481        Self::check_key_size(&key_prefix)?;
482        let session = &self.session;
483        // Read the value of a key
484        let len = key_prefix.len();
485        let query_unbounded = &self.find_key_values_by_prefix_unbounded;
486        let query_bounded = &self.find_key_values_by_prefix_bounded;
487        let rows = match get_upper_bound_option(&key_prefix) {
488            None => {
489                let values = (root_key.to_vec(), key_prefix.clone());
490                Box::pin(session.execute_iter(query_unbounded.clone(), values)).await?
491            }
492            Some(upper_bound) => {
493                let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
494                Box::pin(session.execute_iter(query_bounded.clone(), values)).await?
495            }
496        };
497        let mut rows = rows.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
498        let mut key_values = Vec::new();
499        while let Some(row) = rows.next().await {
500            let (key, value) = row?;
501            let short_key = key[len..].to_vec();
502            key_values.push((short_key, value));
503        }
504        Ok(key_values)
505    }
506}
507
508/// The client itself and the keeping of the count of active connections.
509#[derive(Clone)]
510pub struct ScyllaDbStoreInternal {
511    store: Arc<ScyllaDbClient>,
512    semaphore: Option<Arc<Semaphore>>,
513    max_stream_queries: usize,
514    root_key: Vec<u8>,
515}
516
517/// Database-level connection to ScyllaDB for managing namespaces and partitions.
518#[derive(Clone)]
519pub struct ScyllaDbDatabaseInternal {
520    store: Arc<ScyllaDbClient>,
521    semaphore: Option<Arc<Semaphore>>,
522    max_stream_queries: usize,
523}
524
525impl WithError for ScyllaDbDatabaseInternal {
526    type Error = ScyllaDbStoreInternalError;
527}
528
529/// The error type for [`ScyllaDbStoreInternal`]
530#[derive(Error, Debug)]
531pub enum ScyllaDbStoreInternalError {
532    /// BCS serialization error.
533    #[error(transparent)]
534    BcsError(#[from] bcs::Error),
535
536    /// The key must have at most `MAX_KEY_SIZE` bytes
537    #[error("The key must have at most MAX_KEY_SIZE")]
538    KeyTooLong,
539
540    /// The value must have at most `RAW_MAX_VALUE_SIZE` bytes
541    #[error("The value must have at most RAW_MAX_VALUE_SIZE")]
542    ValueTooLong,
543
544    /// A deserialization error in ScyllaDB
545    #[error(transparent)]
546    DeserializationError(#[from] DeserializationError),
547
548    /// A row error in ScyllaDB
549    #[error(transparent)]
550    RowsError(#[from] RowsError),
551
552    /// A type error in the accessed data in ScyllaDB
553    #[error(transparent)]
554    IntoRowsResultError(#[from] IntoRowsResultError),
555
556    /// A type check error in ScyllaDB
557    #[error(transparent)]
558    TypeCheckError(#[from] TypeCheckError),
559
560    /// A query error in ScyllaDB
561    #[error(transparent)]
562    PagerExecutionError(#[from] PagerExecutionError),
563
564    /// A query error in ScyllaDB
565    #[error(transparent)]
566    ScyllaDbNewSessionError(#[from] NewSessionError),
567
568    /// Namespace contains forbidden characters
569    #[error("Namespace contains forbidden characters")]
570    InvalidNamespace,
571
572    /// The batch is too long to be written
573    #[error("The batch is too long to be written")]
574    BatchTooLong,
575
576    /// A prepare error in ScyllaDB
577    #[error(transparent)]
578    PrepareError(#[from] PrepareError),
579
580    /// An execution error in ScyllaDB
581    #[error(transparent)]
582    ExecutionError(#[from] ExecutionError),
583
584    /// A next row error in ScyllaDB
585    #[error(transparent)]
586    NextRowError(#[from] NextRowError),
587}
588
589impl KeyValueStoreError for ScyllaDbStoreInternalError {
590    const BACKEND: &'static str = "scylla_db";
591}
592
593impl WithError for ScyllaDbStoreInternal {
594    type Error = ScyllaDbStoreInternalError;
595}
596
597impl ReadableKeyValueStore for ScyllaDbStoreInternal {
598    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
599
600    fn max_stream_queries(&self) -> usize {
601        self.max_stream_queries
602    }
603
604    fn root_key(&self) -> Result<Vec<u8>, ScyllaDbStoreInternalError> {
605        Ok(self.root_key[1..].to_vec())
606    }
607
608    async fn read_value_bytes(
609        &self,
610        key: &[u8],
611    ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
612        let store = self.store.deref();
613        let _guard = self.acquire().await;
614        Box::pin(store.read_value_internal(&self.root_key, key.to_vec())).await
615    }
616
617    async fn contains_key(&self, key: &[u8]) -> Result<bool, ScyllaDbStoreInternalError> {
618        let store = self.store.deref();
619        let _guard = self.acquire().await;
620        Box::pin(store.contains_key_internal(&self.root_key, key.to_vec())).await
621    }
622
623    async fn contains_keys(
624        &self,
625        keys: &[Vec<u8>],
626    ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
627        if keys.is_empty() {
628            return Ok(Vec::new());
629        }
630        let store = self.store.deref();
631        let _guard = self.acquire().await;
632        let handles = keys
633            .chunks(MAX_MULTI_KEYS)
634            .map(|keys| store.contains_keys_internal(&self.root_key, keys.to_vec()));
635        let results: Vec<_> = join_all(handles)
636            .await
637            .into_iter()
638            .collect::<Result<_, _>>()?;
639        Ok(results.into_iter().flatten().collect())
640    }
641
642    async fn read_multi_values_bytes(
643        &self,
644        keys: &[Vec<u8>],
645    ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
646        if keys.is_empty() {
647            return Ok(Vec::new());
648        }
649        let store = self.store.deref();
650        let _guard = self.acquire().await;
651        let handles = keys
652            .chunks(MAX_MULTI_KEYS)
653            .map(|keys| store.read_multi_values_internal(&self.root_key, keys.to_vec()));
654        let results: Vec<_> = join_all(handles)
655            .await
656            .into_iter()
657            .collect::<Result<_, _>>()?;
658        Ok(results.into_iter().flatten().collect())
659    }
660
661    async fn find_keys_by_prefix(
662        &self,
663        key_prefix: &[u8],
664    ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
665        let store = self.store.deref();
666        let _guard = self.acquire().await;
667        Box::pin(store.find_keys_by_prefix_internal(&self.root_key, key_prefix.to_vec())).await
668    }
669
670    async fn find_key_values_by_prefix(
671        &self,
672        key_prefix: &[u8],
673    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
674        let store = self.store.deref();
675        let _guard = self.acquire().await;
676        Box::pin(store.find_key_values_by_prefix_internal(&self.root_key, key_prefix.to_vec()))
677            .await
678    }
679}
680
681impl DirectWritableKeyValueStore for ScyllaDbStoreInternal {
682    const MAX_BATCH_SIZE: usize = MAX_BATCH_SIZE;
683    const MAX_BATCH_TOTAL_SIZE: usize = MAX_BATCH_TOTAL_SIZE;
684    const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
685
686    // ScyllaDB cannot take a `crate::batch::Batch` directly. Indeed, if a delete is
687    // followed by a write, then the delete takes priority. See the sentence "The first
688    // tie-breaking rule when two cells have the same write timestamp is that dead cells
689    // win over live cells" from
690    // https://github.com/scylladb/scylladb/blob/master/docs/dev/timestamp-conflict-resolution.md
691    type Batch = UnorderedBatch;
692
693    async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> {
694        let store = self.store.deref();
695        let _guard = self.acquire().await;
696        store.write_batch_internal(&self.root_key, batch).await
697    }
698}
699
700// ScyllaDB requires that the keys are non-empty.
701fn get_big_root_key(root_key: &[u8]) -> Vec<u8> {
702    let mut big_key = vec![0];
703    big_key.extend(root_key);
704    big_key
705}
706
707/// The type for building a new ScyllaDB Key Value Store
708#[derive(Clone, Debug, Deserialize, Serialize)]
709pub struct ScyllaDbStoreInternalConfig {
710    /// The URL to which the requests have to be sent
711    pub uri: String,
712    /// Maximum number of concurrent database queries allowed for this client.
713    pub max_concurrent_queries: Option<usize>,
714    /// Preferred buffer size for async streams.
715    pub max_stream_queries: usize,
716    /// The replication factor.
717    pub replication_factor: u32,
718}
719
720impl KeyValueDatabase for ScyllaDbDatabaseInternal {
721    type Config = ScyllaDbStoreInternalConfig;
722    type Store = ScyllaDbStoreInternal;
723
724    fn get_name() -> String {
725        "scylladb internal".to_string()
726    }
727
728    async fn connect(
729        config: &Self::Config,
730        namespace: &str,
731    ) -> Result<Self, ScyllaDbStoreInternalError> {
732        Self::check_namespace(namespace)?;
733        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
734        let store = ScyllaDbClient::new(session, namespace).await?;
735        let store = Arc::new(store);
736        let semaphore = config
737            .max_concurrent_queries
738            .map(|n| Arc::new(Semaphore::new(n)));
739        let max_stream_queries = config.max_stream_queries;
740        Ok(Self {
741            store,
742            semaphore,
743            max_stream_queries,
744        })
745    }
746
747    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
748        let store = self.store.clone();
749        let semaphore = self.semaphore.clone();
750        let max_stream_queries = self.max_stream_queries;
751        let root_key = get_big_root_key(root_key);
752        Ok(ScyllaDbStoreInternal {
753            store,
754            semaphore,
755            max_stream_queries,
756            root_key,
757        })
758    }
759
760    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
761        self.open_shared(root_key)
762    }
763
764    async fn list_all(config: &Self::Config) -> Result<Vec<String>, ScyllaDbStoreInternalError> {
765        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
766        let statement = session
767            .prepare(format!("DESCRIBE KEYSPACE {KEYSPACE}"))
768            .await?;
769        let result = Box::pin(session.execute_iter(statement, &[])).await;
770        let miss_msg = format!("'{KEYSPACE}' not found in keyspaces");
771        let result = match result {
772            Ok(result) => result,
773            Err(error) => {
774                let invalid_or_keyspace_not_found = match &error {
775                    PagerExecutionError::NextPageError(NextPageError::RequestFailure(
776                        RequestError::LastAttemptError(RequestAttemptError::DbError(db_error, msg)),
777                    )) => *db_error == DbError::Invalid && msg.as_str() == miss_msg,
778                    _ => false,
779                };
780                if invalid_or_keyspace_not_found {
781                    return Ok(Vec::new());
782                } else {
783                    return Err(ScyllaDbStoreInternalError::PagerExecutionError(error));
784                }
785            }
786        };
787        let mut namespaces = Vec::new();
788        let mut rows_stream = result.rows_stream::<(String, String, String, String)>()?;
789        while let Some(row) = rows_stream.next().await {
790            let (_, object_kind, name, _) = row?;
791            if object_kind == "table" {
792                namespaces.push(name);
793            }
794        }
795        Ok(namespaces)
796    }
797
798    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
799        let statement = self
800            .store
801            .session
802            .prepare(format!(
803                "SELECT root_key FROM {}.\"{}\" ALLOW FILTERING",
804                KEYSPACE, self.store.namespace
805            ))
806            .await?;
807
808        // Execute the query
809        let rows = Box::pin(self.store.session.execute_iter(statement, &[])).await?;
810        let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
811        let mut root_keys = BTreeSet::new();
812        while let Some(row) = rows.next().await {
813            let (root_key,) = row?;
814            let root_key = root_key[1..].to_vec();
815            root_keys.insert(root_key);
816        }
817        Ok(root_keys.into_iter().collect::<Vec<_>>())
818    }
819
820    async fn delete_all(store_config: &Self::Config) -> Result<(), ScyllaDbStoreInternalError> {
821        let session = ScyllaDbClient::build_default_session(&store_config.uri).await?;
822        let statement = session
823            .prepare(format!("DROP KEYSPACE IF EXISTS {KEYSPACE}"))
824            .await?;
825
826        session
827            .execute_single_page(&statement, &[], PagingState::start())
828            .await?;
829        Ok(())
830    }
831
832    async fn exists(
833        config: &Self::Config,
834        namespace: &str,
835    ) -> Result<bool, ScyllaDbStoreInternalError> {
836        Self::check_namespace(namespace)?;
837        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
838
839        // We check the way the test can fail. It can fail in different ways.
840        let result = session
841            .prepare(format!(
842                "SELECT root_key FROM {KEYSPACE}.\"{namespace}\" LIMIT 1 ALLOW FILTERING"
843            ))
844            .await;
845
846        // The missing table translates into a very specific error that we matched
847        let miss_msg1 = format!("unconfigured table {namespace}");
848        let miss_msg1 = miss_msg1.as_str();
849        let miss_msg2 = "Undefined name root_key in selection clause";
850        let miss_msg3 = format!("Keyspace {KEYSPACE} does not exist");
851        let Err(error) = result else {
852            // If OK, then the table exists
853            return Ok(true);
854        };
855        let missing_table = match &error {
856            PrepareError::AllAttemptsFailed {
857                first_attempt: RequestAttemptError::DbError(db_error, msg),
858            } => {
859                if *db_error != DbError::Invalid {
860                    false
861                } else {
862                    msg.as_str() == miss_msg1
863                        || msg.as_str() == miss_msg2
864                        || msg.as_str() == miss_msg3
865                }
866            }
867            _ => false,
868        };
869        if missing_table {
870            Ok(false)
871        } else {
872            Err(ScyllaDbStoreInternalError::PrepareError(error))
873        }
874    }
875
876    async fn create(
877        config: &Self::Config,
878        namespace: &str,
879    ) -> Result<(), ScyllaDbStoreInternalError> {
880        Self::check_namespace(namespace)?;
881        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
882
883        // Create a keyspace if it doesn't exist
884        let statement = session
885            .prepare(format!(
886                "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ \
887                    'class' : 'NetworkTopologyStrategy', \
888                    'replication_factor' : {} \
889                }}",
890                KEYSPACE, config.replication_factor
891            ))
892            .await?;
893        session
894            .execute_single_page(&statement, &[], PagingState::start())
895            .await?;
896
897        // This explicitly sets a lot of default parameters for clarity and for making future
898        // changes easier.
899        let statement = session
900            .prepare(format!(
901                "CREATE TABLE {KEYSPACE}.\"{namespace}\" (\
902                    root_key blob, \
903                    k blob, \
904                    v blob, \
905                    PRIMARY KEY (root_key, k) \
906                ) \
907                WITH compaction = {{ \
908                    'class'          : 'LeveledCompactionStrategy', \
909                    'sstable_size_in_mb' : 160 \
910                }} \
911                AND compression = {{ \
912                    'sstable_compression': 'LZ4Compressor', \
913                    'chunk_length_in_kb':'4' \
914                }} \
915                AND caching = {{ \
916                    'enabled': 'true' \
917                }} \
918                AND gc_grace_seconds = 0 \
919                AND tombstone_gc = {{'mode': 'immediate'}}"
920            ))
921            .await?;
922        session
923            .execute_single_page(&statement, &[], PagingState::start())
924            .await?;
925        Ok(())
926    }
927
928    async fn delete(
929        config: &Self::Config,
930        namespace: &str,
931    ) -> Result<(), ScyllaDbStoreInternalError> {
932        Self::check_namespace(namespace)?;
933        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
934        let statement = session
935            .prepare(format!("DROP TABLE IF EXISTS {KEYSPACE}.\"{namespace}\";"))
936            .await?;
937        session
938            .execute_single_page(&statement, &[], PagingState::start())
939            .await?;
940        Ok(())
941    }
942}
943
944impl ScyllaDbStoreInternal {
945    /// Obtains the semaphore lock on the database if needed.
946    async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
947        match &self.semaphore {
948            None => None,
949            Some(count) => Some(count.acquire().await),
950        }
951    }
952}
953
954impl ScyllaDbDatabaseInternal {
955    fn check_namespace(namespace: &str) -> Result<(), ScyllaDbStoreInternalError> {
956        if !namespace.is_empty()
957            && namespace.len() <= 48
958            && namespace
959                .chars()
960                .all(|c| c.is_ascii_alphanumeric() || c == '_')
961        {
962            return Ok(());
963        }
964        Err(ScyllaDbStoreInternalError::InvalidNamespace)
965    }
966}
967
968#[cfg(with_testing)]
969impl TestKeyValueDatabase for JournalingKeyValueDatabase<ScyllaDbDatabaseInternal> {
970    async fn new_test_config(
971    ) -> Result<ScyllaDbStoreInternalConfig, JournalingError<ScyllaDbStoreInternalError>> {
972        // TODO(#4114): Read the port from an environment variable.
973        let uri = "localhost:9042".to_string();
974        Ok(ScyllaDbStoreInternalConfig {
975            uri,
976            max_concurrent_queries: Some(10),
977            max_stream_queries: 10,
978            replication_factor: 1,
979        })
980    }
981}
982
983/// The `ScyllaDbDatabase` composed type with metrics
984#[cfg(with_metrics)]
985pub type ScyllaDbDatabase = MeteredDatabase<
986    LruCachingDatabase<
987        MeteredDatabase<
988            ValueSplittingDatabase<
989                MeteredDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
990            >,
991        >,
992    >,
993>;
994
995/// The `ScyllaDbDatabase` composed type
996#[cfg(not(with_metrics))]
997pub type ScyllaDbDatabase = LruCachingDatabase<
998    ValueSplittingDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
999>;
1000
1001/// The `ScyllaDbStoreConfig` input type
1002pub type ScyllaDbStoreConfig = LruCachingConfig<ScyllaDbStoreInternalConfig>;
1003
1004/// The combined error type for the `ScyllaDbDatabase`.
1005pub type ScyllaDbStoreError = ValueSplittingError<JournalingError<ScyllaDbStoreInternalError>>;