Skip to main content

linera_views/backends/
scylla_db.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] for the ScyllaDB database.
5//!
6//! The current connection is done via a Session and a corresponding primary key called
7//! "namespace". The maximum number of concurrent queries is controlled by
8//! `max_concurrent_queries`.
9
10use std::{
11    collections::{BTreeSet, HashMap},
12    ops::Deref,
13    sync::{
14        atomic::{AtomicI64, Ordering},
15        Arc,
16    },
17    time::{SystemTime, UNIX_EPOCH},
18};
19
20use async_lock::{Semaphore, SemaphoreGuard};
21use futures::{future::join_all, StreamExt as _};
22use linera_base::{ensure, util::future::FutureSyncExt as _};
23use scylla::{
24    client::{
25        execution_profile::{ExecutionProfile, ExecutionProfileHandle},
26        session::Session,
27        session_builder::SessionBuilder,
28    },
29    deserialize::{DeserializationError, TypeCheckError},
30    errors::{
31        DbError, ExecutionError, IntoRowsResultError, NewSessionError, NextPageError, NextRowError,
32        PagerExecutionError, PrepareError, RequestAttemptError, RequestError, RowsError,
33    },
34    policies::{
35        load_balancing::{DefaultPolicy, LoadBalancingPolicy},
36        retry::DefaultRetryPolicy,
37    },
38    response::PagingState,
39    statement::{batch::BatchType, prepared::PreparedStatement, Consistency},
40    value::CqlValue,
41};
42use serde::{Deserialize, Serialize};
43use thiserror::Error;
44
45#[cfg(with_metrics)]
46use crate::metering::MeteredDatabase;
47#[cfg(with_testing)]
48use crate::store::TestKeyValueDatabase;
49use crate::{
50    batch::{SimpleUnorderedBatch, UnorderedBatch},
51    common::{get_uleb128_size, get_upper_bound_option},
52    journaling::{JournalingError, JournalingKeyValueDatabase},
53    lru_caching::{LruCachingConfig, LruCachingDatabase},
54    store::{
55        DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
56        WithError,
57    },
58    value_splitting::{ValueSplittingDatabase, ValueSplittingError},
59};
60
61/// Fundamental constant in ScyllaDB: The maximum size of a multi keys query
62/// The limit is in reality 100. But we need one entry for the root key.
63const MAX_MULTI_KEYS: usize = 100 - 1;
64
65/// The maximal size of an operation on ScyllaDB seems to be 16 MiB
66/// https://www.scylladb.com/2019/03/27/best-practices-for-scylla-applications/
67/// "There is a hard limit at 16 MiB, and nothing bigger than that can arrive at once
68///  at the database at any particular time"
69/// So, we set up the maximal size of 16 MiB - 10 KiB for the values and 10 KiB for the keys
70/// We also arbitrarily decrease the size by 4000 bytes because an amount of size is
71/// taken internally by the database.
72const RAW_MAX_VALUE_SIZE: usize = 16 * 1024 * 1024 - 10 * 1024 - 4000;
73const MAX_KEY_SIZE: usize = 10 * 1024;
74const MAX_BATCH_TOTAL_SIZE: usize = RAW_MAX_VALUE_SIZE + MAX_KEY_SIZE;
75
76/// The `RAW_MAX_VALUE_SIZE` is the maximum size on the ScyllaDB storage.
77/// However, the value being written can also be the serialization of a `SimpleUnorderedBatch`
78/// Therefore the actual `MAX_VALUE_SIZE` is lower.
79/// At the maximum the key size is 1024 bytes (see below) and we pack just one entry.
80/// So if the key has 1024 bytes this gets us the inequality
81/// `1 + 1 + 1 + serialized_size(MAX_KEY_SIZE)? + serialized_size(x)? <= RAW_MAX_VALUE_SIZE`.
82/// and so this simplifies to `1 + 1 + 1 + (2 + 10240) + (4 + x) <= RAW_MAX_VALUE_SIZE`
83/// Note on the above formula:
84/// * We write 4 because `get_uleb128_size(RAW_MAX_VALUE_SIZE) = 4)`
85/// * We write `1 + 1 + 1`  because the `UnorderedBatch` has three entries.
86///
87/// This gets us to a maximal value of 16752727.
88const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
89    - MAX_KEY_SIZE
90    - get_uleb128_size(RAW_MAX_VALUE_SIZE)
91    - get_uleb128_size(MAX_KEY_SIZE)
92    - 3;
93
94/// The constant 14000 is an empirical constant that was found to be necessary
95/// to make the ScyllaDB system work. We have not been able to find this or
96/// a similar constant in the source code or the documentation.
97/// An experimental approach gets us that 14796 is the latest value that is
98/// correct.
99const MAX_BATCH_SIZE: usize = 5000;
100
101/// The keyspace to use for the ScyllaDB database.
102const KEYSPACE: &str = "kv";
103
104/// The client for ScyllaDB:
105/// * The session allows to pass queries
106/// * The namespace that is being assigned to the database
107/// * The prepared queries used for implementing the features of `KeyValueStore`.
108struct ScyllaDbClient {
109    session: Session,
110    namespace: String,
111    read_value: PreparedStatement,
112    read_writetime: PreparedStatement,
113    contains_key: PreparedStatement,
114    write_batch_delete_prefix_unbounded: PreparedStatement,
115    write_batch_delete_prefix_bounded: PreparedStatement,
116    write_batch_deletion: PreparedStatement,
117    write_batch_insertion: PreparedStatement,
118    // Variants carrying an explicit `USING TIMESTAMP ?` marker, used by the
119    // single-batch exclusive-mode write path (`write_batch_exclusive`).
120    write_batch_delete_prefix_unbounded_ts: PreparedStatement,
121    write_batch_delete_prefix_bounded_ts: PreparedStatement,
122    write_batch_deletion_ts: PreparedStatement,
123    write_batch_insertion_ts: PreparedStatement,
124    find_keys_by_prefix_unbounded: PreparedStatement,
125    find_keys_by_prefix_bounded: PreparedStatement,
126    find_key_values_by_prefix_unbounded: PreparedStatement,
127    find_key_values_by_prefix_bounded: PreparedStatement,
128    multi_key_values: papaya::HashMap<usize, PreparedStatement>,
129    multi_keys: papaya::HashMap<usize, PreparedStatement>,
130}
131
132impl ScyllaDbClient {
133    async fn new(session: Session, namespace: &str) -> Result<Self, ScyllaDbStoreInternalError> {
134        let namespace = namespace.to_string();
135        let read_value = session
136            .prepare(format!(
137                "SELECT v FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k = ?"
138            ))
139            .await?;
140
141        let read_writetime = session
142            .prepare(format!(
143                "SELECT WRITETIME(v) FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k = ?"
144            ))
145            .await?;
146
147        let contains_key = session
148            .prepare(format!(
149                "SELECT root_key FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k = ?"
150            ))
151            .await?;
152
153        let write_batch_delete_prefix_unbounded = session
154            .prepare(format!(
155                "DELETE FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ?"
156            ))
157            .await?;
158
159        let write_batch_delete_prefix_bounded = session
160            .prepare(format!(
161                "DELETE FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ? AND k < ?"
162            ))
163            .await?;
164
165        let write_batch_deletion = session
166            .prepare(format!(
167                "DELETE FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k = ?"
168            ))
169            .await?;
170
171        let write_batch_insertion = session
172            .prepare(format!(
173                "INSERT INTO {KEYSPACE}.\"{namespace}\" (root_key, k, v) VALUES (?, ?, ?)"
174            ))
175            .await?;
176
177        // Timestamped variants used by the single-batch exclusive-mode path. The
178        // explicit `USING TIMESTAMP ?` lets prefix-deletions (`T`) and the
179        // insertions/deletions (`T + 1`) share one atomic batch without the range
180        // tombstone shadowing the inserts.
181        let write_batch_delete_prefix_unbounded_ts = session
182            .prepare(format!(
183                "DELETE FROM {KEYSPACE}.\"{namespace}\" USING TIMESTAMP ? WHERE root_key = ? AND k >= ?"
184            ))
185            .await?;
186
187        let write_batch_delete_prefix_bounded_ts = session
188            .prepare(format!(
189                "DELETE FROM {KEYSPACE}.\"{namespace}\" USING TIMESTAMP ? \
190                 WHERE root_key = ? AND k >= ? AND k < ?"
191            ))
192            .await?;
193
194        let write_batch_deletion_ts = session
195            .prepare(format!(
196                "DELETE FROM {KEYSPACE}.\"{namespace}\" USING TIMESTAMP ? WHERE root_key = ? AND k = ?"
197            ))
198            .await?;
199
200        let write_batch_insertion_ts = session
201            .prepare(format!(
202                "INSERT INTO {KEYSPACE}.\"{namespace}\" (root_key, k, v) VALUES (?, ?, ?) \
203                 USING TIMESTAMP ?"
204            ))
205            .await?;
206
207        let find_keys_by_prefix_unbounded = session
208            .prepare(format!(
209                "SELECT k FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ?"
210            ))
211            .await?;
212
213        let find_keys_by_prefix_bounded = session
214            .prepare(format!(
215                "SELECT k FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ? AND k < ?"
216            ))
217            .await?;
218
219        let find_key_values_by_prefix_unbounded = session
220            .prepare(format!(
221                "SELECT k,v FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ?"
222            ))
223            .await?;
224
225        let find_key_values_by_prefix_bounded = session
226            .prepare(format!(
227                "SELECT k,v FROM {KEYSPACE}.\"{namespace}\" WHERE root_key = ? AND k >= ? AND k < ?"
228            ))
229            .await?;
230
231        Ok(Self {
232            session,
233            namespace,
234            read_value,
235            read_writetime,
236            contains_key,
237            write_batch_delete_prefix_unbounded,
238            write_batch_delete_prefix_bounded,
239            write_batch_deletion,
240            write_batch_insertion,
241            write_batch_delete_prefix_unbounded_ts,
242            write_batch_delete_prefix_bounded_ts,
243            write_batch_deletion_ts,
244            write_batch_insertion_ts,
245            find_keys_by_prefix_unbounded,
246            find_keys_by_prefix_bounded,
247            find_key_values_by_prefix_unbounded,
248            find_key_values_by_prefix_bounded,
249            multi_key_values: papaya::HashMap::new(),
250            multi_keys: papaya::HashMap::new(),
251        })
252    }
253
254    fn build_default_policy() -> Arc<dyn LoadBalancingPolicy> {
255        DefaultPolicy::builder().token_aware(true).build()
256    }
257
258    fn build_default_execution_profile_handle(
259        policy: Arc<dyn LoadBalancingPolicy>,
260    ) -> ExecutionProfileHandle {
261        let default_profile = ExecutionProfile::builder()
262            .load_balancing_policy(policy)
263            .retry_policy(Arc::new(DefaultRetryPolicy::new()))
264            .consistency(Consistency::LocalQuorum)
265            .build();
266        default_profile.into_handle()
267    }
268
269    async fn build_default_session(uri: &str) -> Result<Session, ScyllaDbStoreInternalError> {
270        // This explicitly sets a lot of default parameters for clarity and for making future changes
271        // easier.
272        SessionBuilder::new()
273            .known_node(uri)
274            .default_execution_profile_handle(Self::build_default_execution_profile_handle(
275                Self::build_default_policy(),
276            ))
277            .build()
278            .boxed_sync()
279            .await
280            .map_err(Into::into)
281    }
282
283    async fn get_multi_key_values_statement(
284        &self,
285        num_markers: usize,
286    ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
287        if let Some(prepared_statement) = self.multi_key_values.pin().get(&num_markers) {
288            return Ok(prepared_statement.clone());
289        }
290        let markers = std::iter::repeat_n("?", num_markers)
291            .collect::<Vec<_>>()
292            .join(",");
293        let prepared_statement = self
294            .session
295            .prepare(format!(
296                "SELECT k,v FROM {}.\"{}\" WHERE root_key = ? AND k IN ({})",
297                KEYSPACE, self.namespace, markers
298            ))
299            .await?;
300        self.multi_key_values
301            .pin()
302            .insert(num_markers, prepared_statement.clone());
303        Ok(prepared_statement)
304    }
305
306    async fn get_multi_keys_statement(
307        &self,
308        num_markers: usize,
309    ) -> Result<PreparedStatement, ScyllaDbStoreInternalError> {
310        if let Some(prepared_statement) = self.multi_keys.pin().get(&num_markers) {
311            return Ok(prepared_statement.clone());
312        };
313        let markers = std::iter::repeat_n("?", num_markers)
314            .collect::<Vec<_>>()
315            .join(",");
316        let prepared_statement = self
317            .session
318            .prepare(format!(
319                "SELECT k FROM {}.\"{}\" WHERE root_key = ? AND k IN ({})",
320                KEYSPACE, self.namespace, markers
321            ))
322            .await?;
323        self.multi_keys
324            .pin()
325            .insert(num_markers, prepared_statement.clone());
326        Ok(prepared_statement)
327    }
328
329    fn check_key_size(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
330        ensure!(
331            key.len() <= MAX_KEY_SIZE,
332            ScyllaDbStoreInternalError::KeyTooLong
333        );
334        Ok(())
335    }
336
337    fn check_value_size(value: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
338        ensure!(
339            value.len() <= RAW_MAX_VALUE_SIZE,
340            ScyllaDbStoreInternalError::ValueTooLong
341        );
342        Ok(())
343    }
344
345    /// Validates a key supplied by a caller's batch. Besides the size limit, the
346    /// key must be non-empty: the empty (zero-length) key is `WRITETIME_SENTINEL_KEY`,
347    /// reserved for the per-store timestamp sentinel that exclusive mode writes
348    /// internally. Prefix scans now deliberately hide that key, so any caller
349    /// content stored there would be silently invisible to reads.
350    fn check_batch_key(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> {
351        Self::check_key_size(key)?;
352        ensure!(!key.is_empty(), ScyllaDbStoreInternalError::ZeroLengthKey);
353        Ok(())
354    }
355
356    fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> {
357        ensure!(
358            batch.len() <= MAX_BATCH_SIZE,
359            ScyllaDbStoreInternalError::BatchTooLong
360        );
361        Ok(())
362    }
363
364    async fn read_value_internal(
365        &self,
366        root_key: &[u8],
367        key: Vec<u8>,
368    ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
369        Self::check_key_size(&key)?;
370        let session = &self.session;
371        // Read the value of a key
372        let values = (root_key.to_vec(), key);
373
374        let (result, _) = session
375            .execute_single_page(&self.read_value, &values, PagingState::start())
376            .await
377            .map_err(ScyllaDbStoreInternalError::ExecutionError)?;
378        let rows = result.into_rows_result()?;
379        let mut rows = rows.rows::<(Vec<u8>,)>()?;
380        Ok(match rows.next() {
381            Some(row) => Some(row?.0),
382            None => None,
383        })
384    }
385
386    fn get_occurrences_map(
387        keys: Vec<Vec<u8>>,
388    ) -> Result<HashMap<Vec<u8>, Vec<usize>>, ScyllaDbStoreInternalError> {
389        let mut map = HashMap::<Vec<u8>, Vec<usize>>::new();
390        for (i_key, key) in keys.into_iter().enumerate() {
391            Self::check_key_size(&key)?;
392            map.entry(key).or_default().push(i_key);
393        }
394        Ok(map)
395    }
396
397    async fn read_multi_values_internal(
398        &self,
399        root_key: &[u8],
400        keys: Vec<Vec<u8>>,
401    ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
402        let mut values = vec![None; keys.len()];
403        let map = Self::get_occurrences_map(keys)?;
404        let statement = self.get_multi_key_values_statement(map.len()).await?;
405        let mut inputs = vec![root_key.to_vec()];
406        inputs.extend(map.keys().cloned());
407        let mut rows = Box::pin(self.session.execute_iter(statement, &inputs))
408            .await?
409            .rows_stream::<(Vec<u8>, Vec<u8>)>()?;
410
411        while let Some(row) = rows.next().await {
412            let (key, value) = row?;
413            if let Some((&last, rest)) = map[&key].split_last() {
414                for position in rest {
415                    values[*position] = Some(value.clone());
416                }
417                values[last] = Some(value);
418            }
419        }
420        Ok(values)
421    }
422
423    async fn contains_keys_internal(
424        &self,
425        root_key: &[u8],
426        keys: Vec<Vec<u8>>,
427    ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
428        let mut values = vec![false; keys.len()];
429        let map = Self::get_occurrences_map(keys)?;
430        let statement = self.get_multi_keys_statement(map.len()).await?;
431        let mut inputs = vec![root_key.to_vec()];
432        inputs.extend(map.keys().cloned());
433        let mut rows = Box::pin(self.session.execute_iter(statement, &inputs))
434            .await?
435            .rows_stream::<(Vec<u8>,)>()?;
436
437        while let Some(row) = rows.next().await {
438            let (key,) = row?;
439            for i_key in &map[&key] {
440                values[*i_key] = true;
441            }
442        }
443
444        Ok(values)
445    }
446
447    async fn contains_key_internal(
448        &self,
449        root_key: &[u8],
450        key: Vec<u8>,
451    ) -> Result<bool, ScyllaDbStoreInternalError> {
452        Self::check_key_size(&key)?;
453        let session = &self.session;
454        // Read the value of a key
455        let values = (root_key.to_vec(), key);
456
457        let (result, _) = session
458            .execute_single_page(&self.contains_key, &values, PagingState::start())
459            .await
460            .map_err(ScyllaDbStoreInternalError::ExecutionError)?;
461        let rows = result.into_rows_result()?;
462        let mut rows = rows.rows::<(Vec<u8>,)>()?;
463        Ok(rows.next().is_some())
464    }
465
466    /// Reads the write-time of a single row in microseconds since Unix epoch,
467    /// returning `None` if the row does not exist or carries no live value.
468    async fn read_writetime_internal(
469        &self,
470        root_key: &[u8],
471        key: Vec<u8>,
472    ) -> Result<Option<i64>, ScyllaDbStoreInternalError> {
473        Self::check_key_size(&key)?;
474        let session = &self.session;
475        let values = (root_key.to_vec(), key);
476        let (result, _) = session
477            .execute_single_page(&self.read_writetime, &values, PagingState::start())
478            .await
479            .map_err(ScyllaDbStoreInternalError::ExecutionError)?;
480        let rows = result.into_rows_result()?;
481        let mut rows = rows.rows::<(Option<i64>,)>()?;
482        Ok(match rows.next() {
483            Some(row) => row?.0,
484            None => None,
485        })
486    }
487
488    /// Issues an unlogged batch that contains only prefix-delete statements,
489    /// letting the coordinator assign the write timestamp.
490    async fn write_batch_prefix_deletes(
491        &self,
492        root_key: &[u8],
493        key_prefix_deletions: Vec<Vec<u8>>,
494    ) -> Result<(), ScyllaDbStoreInternalError> {
495        if key_prefix_deletions.is_empty() {
496            return Ok(());
497        }
498        let session = &self.session;
499        let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged);
500        let mut batch_values = Vec::new();
501        let q_unbounded = &self.write_batch_delete_prefix_unbounded;
502        let q_bounded = &self.write_batch_delete_prefix_bounded;
503        for key_prefix in key_prefix_deletions {
504            Self::check_key_size(&key_prefix)?;
505            match get_upper_bound_option(&key_prefix) {
506                None => {
507                    batch_values.push(vec![root_key.to_vec(), key_prefix]);
508                    batch_query.append_statement(q_unbounded.clone());
509                }
510                Some(upper_bound) => {
511                    batch_values.push(vec![root_key.to_vec(), key_prefix, upper_bound]);
512                    batch_query.append_statement(q_bounded.clone());
513                }
514            }
515        }
516        session
517            .batch(&batch_query, batch_values)
518            .await
519            .map_err(ScyllaDbStoreInternalError::WriteBatchExecutionError)?;
520        Ok(())
521    }
522
523    /// Issues an unlogged batch containing the single-key deletions and the
524    /// insertions, letting the coordinator assign the write timestamp.
525    async fn write_simple_batch(
526        &self,
527        root_key: &[u8],
528        batch: SimpleUnorderedBatch,
529    ) -> Result<(), ScyllaDbStoreInternalError> {
530        if batch.deletions.is_empty() && batch.insertions.is_empty() {
531            return Ok(());
532        }
533        let session = &self.session;
534        let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged);
535        let mut batch_values = Vec::new();
536        let q_deletion = &self.write_batch_deletion;
537        for key in batch.deletions {
538            Self::check_batch_key(&key)?;
539            batch_values.push(vec![root_key.to_vec(), key]);
540            batch_query.append_statement(q_deletion.clone());
541        }
542        let q_insertion = &self.write_batch_insertion;
543        for (key, value) in batch.insertions {
544            Self::check_batch_key(&key)?;
545            Self::check_value_size(&value)?;
546            batch_values.push(vec![root_key.to_vec(), key, value]);
547            batch_query.append_statement(q_insertion.clone());
548        }
549        session
550            .batch(&batch_query, batch_values)
551            .await
552            .map_err(ScyllaDbStoreInternalError::WriteBatchExecutionError)?;
553        Ok(())
554    }
555
556    /// Issues the whole write as a single atomic unlogged batch, used in
557    /// exclusive mode. Every statement carries an explicit `USING TIMESTAMP`:
558    /// the prefix-deletions use `t`, while the single-key deletions, the
559    /// insertions, and the sentinel write use `t + 1`. The higher timestamp on
560    /// the data ensures a range tombstone never shadows an insertion belonging
561    /// to the same logical batch (at equal timestamps, dead cells win over live
562    /// cells). Because the intended ordering is fixed by these timestamps rather
563    /// than by send order, the prefix-deletions and the data can — and must —
564    /// share one batch, preserving the atomicity that `write_batch` callers rely
565    /// on. The sentinel write at `WRITETIME_SENTINEL_KEY` lets a future process
566    /// recover this store's timestamp floor (see `ensure_ts_seeded`).
567    async fn write_batch_exclusive(
568        &self,
569        root_key: &[u8],
570        batch: UnorderedBatch,
571        t: i64,
572    ) -> Result<(), ScyllaDbStoreInternalError> {
573        let UnorderedBatch {
574            key_prefix_deletions,
575            simple_unordered_batch:
576                SimpleUnorderedBatch {
577                    deletions,
578                    insertions,
579                },
580        } = batch;
581        let session = &self.session;
582        let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged);
583        let mut batch_values = Vec::new();
584
585        // Prefix-deletions at timestamp `t`.
586        for key_prefix in key_prefix_deletions {
587            Self::check_key_size(&key_prefix)?;
588            match get_upper_bound_option(&key_prefix) {
589                None => {
590                    batch_values.push(vec![
591                        CqlValue::BigInt(t),
592                        CqlValue::Blob(root_key.to_vec()),
593                        CqlValue::Blob(key_prefix),
594                    ]);
595                    batch_query
596                        .append_statement(self.write_batch_delete_prefix_unbounded_ts.clone());
597                }
598                Some(upper_bound) => {
599                    batch_values.push(vec![
600                        CqlValue::BigInt(t),
601                        CqlValue::Blob(root_key.to_vec()),
602                        CqlValue::Blob(key_prefix),
603                        CqlValue::Blob(upper_bound),
604                    ]);
605                    batch_query.append_statement(self.write_batch_delete_prefix_bounded_ts.clone());
606                }
607            }
608        }
609
610        // Single-key deletions, insertions, and the sentinel at timestamp `t + 1`.
611        let t_data = t + 1;
612        for key in deletions {
613            Self::check_batch_key(&key)?;
614            batch_values.push(vec![
615                CqlValue::BigInt(t_data),
616                CqlValue::Blob(root_key.to_vec()),
617                CqlValue::Blob(key),
618            ]);
619            batch_query.append_statement(self.write_batch_deletion_ts.clone());
620        }
621        for (key, value) in insertions {
622            Self::check_batch_key(&key)?;
623            Self::check_value_size(&value)?;
624            batch_values.push(vec![
625                CqlValue::Blob(root_key.to_vec()),
626                CqlValue::Blob(key),
627                CqlValue::Blob(value),
628                CqlValue::BigInt(t_data),
629            ]);
630            batch_query.append_statement(self.write_batch_insertion_ts.clone());
631        }
632        batch_values.push(vec![
633            CqlValue::Blob(root_key.to_vec()),
634            CqlValue::Blob(WRITETIME_SENTINEL_KEY.to_vec()),
635            CqlValue::Blob(Vec::new()),
636            CqlValue::BigInt(t_data),
637        ]);
638        batch_query.append_statement(self.write_batch_insertion_ts.clone());
639
640        session
641            .batch(&batch_query, batch_values)
642            .await
643            .map_err(ScyllaDbStoreInternalError::WriteBatchExecutionError)?;
644        Ok(())
645    }
646
647    async fn find_keys_by_prefix_internal(
648        &self,
649        root_key: &[u8],
650        key_prefix: Vec<u8>,
651    ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
652        Self::check_key_size(&key_prefix)?;
653        let session = &self.session;
654        // Read the value of a key
655        let len = key_prefix.len();
656        let query_unbounded = &self.find_keys_by_prefix_unbounded;
657        let query_bounded = &self.find_keys_by_prefix_bounded;
658        let rows = match get_upper_bound_option(&key_prefix) {
659            None => {
660                let values = (root_key.to_vec(), key_prefix.clone());
661                Box::pin(session.execute_iter(query_unbounded.clone(), values)).await?
662            }
663            Some(upper_bound) => {
664                let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
665                Box::pin(session.execute_iter(query_bounded.clone(), values)).await?
666            }
667        };
668        let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
669        let mut keys = Vec::new();
670        while let Some(row) = rows.next().await {
671            let (key,) = row?;
672            // Skip the reserved timestamp sentinel (exclusive mode writes it at the
673            // empty clustering key). It is an internal implementation detail and must
674            // not surface to callers; it can only match an empty-prefix scan.
675            if key == WRITETIME_SENTINEL_KEY {
676                continue;
677            }
678            let short_key = key[len..].to_vec();
679            keys.push(short_key);
680        }
681        Ok(keys)
682    }
683
684    async fn find_key_values_by_prefix_internal(
685        &self,
686        root_key: &[u8],
687        key_prefix: Vec<u8>,
688    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
689        Self::check_key_size(&key_prefix)?;
690        let session = &self.session;
691        // Read the value of a key
692        let len = key_prefix.len();
693        let query_unbounded = &self.find_key_values_by_prefix_unbounded;
694        let query_bounded = &self.find_key_values_by_prefix_bounded;
695        let rows = match get_upper_bound_option(&key_prefix) {
696            None => {
697                let values = (root_key.to_vec(), key_prefix.clone());
698                Box::pin(session.execute_iter(query_unbounded.clone(), values)).await?
699            }
700            Some(upper_bound) => {
701                let values = (root_key.to_vec(), key_prefix.clone(), upper_bound);
702                Box::pin(session.execute_iter(query_bounded.clone(), values)).await?
703            }
704        };
705        let mut rows = rows.rows_stream::<(Vec<u8>, Vec<u8>)>()?;
706        let mut key_values = Vec::new();
707        while let Some(row) = rows.next().await {
708            let (key, value) = row?;
709            // Skip the reserved timestamp sentinel; see `find_keys_by_prefix_internal`.
710            if key == WRITETIME_SENTINEL_KEY {
711                continue;
712            }
713            let short_key = key[len..].to_vec();
714            key_values.push((short_key, value));
715        }
716        Ok(key_values)
717    }
718}
719
720/// The client itself and the keeping of the count of active connections.
721#[derive(Clone)]
722pub struct ScyllaDbStoreInternal {
723    store: Arc<ScyllaDbClient>,
724    semaphore: Option<Arc<Semaphore>>,
725    max_stream_queries: usize,
726    root_key: Vec<u8>,
727    /// Whether this store was opened with `open_exclusive`. When true, `write_batch`
728    /// resolves in-batch prefix/insert collisions via per-statement `USING TIMESTAMP`;
729    /// when false, it splits the batch into two sequential sub-batches with
730    /// server-side timestamps to preserve ordering across writers.
731    is_exclusive: bool,
732    /// Per-partition timestamp floor for exclusive-mode `USING TIMESTAMP` writes.
733    /// Value 0 means unseeded; populated lazily on first write by reading
734    /// `WRITETIME` of a sentinel row. Each batch reserves 2 µs (T and T+1).
735    ts_floor: Arc<AtomicI64>,
736}
737
738/// Database-level connection to ScyllaDB for managing namespaces and partitions.
739#[derive(Clone)]
740pub struct ScyllaDbDatabaseInternal {
741    store: Arc<ScyllaDbClient>,
742    semaphore: Option<Arc<Semaphore>>,
743    max_stream_queries: usize,
744}
745
746impl WithError for ScyllaDbDatabaseInternal {
747    type Error = ScyllaDbStoreInternalError;
748}
749
750/// The error type for [`ScyllaDbStoreInternal`]
751#[derive(Error, Debug)]
752pub enum ScyllaDbStoreInternalError {
753    /// BCS serialization error.
754    #[error(transparent)]
755    BcsError(#[from] bcs::Error),
756
757    /// A deserialization error
758    #[error(transparent)]
759    DeserializationError(#[from] DeserializationError),
760
761    /// A row error
762    #[error(transparent)]
763    RowsError(#[from] RowsError),
764
765    /// A conversion error in the accessed data
766    #[error(transparent)]
767    IntoRowsResultError(#[from] IntoRowsResultError),
768
769    /// A type check error
770    #[error(transparent)]
771    TypeCheckError(#[from] TypeCheckError),
772
773    /// A pager execution error
774    #[error(transparent)]
775    PagerExecutionError(#[from] PagerExecutionError),
776
777    /// A prepare error
778    #[error(transparent)]
779    PrepareError(#[from] PrepareError),
780
781    /// An execution error during a query (except write-batch).
782    #[error(transparent)]
783    ExecutionError(ExecutionError),
784
785    /// An execution error during a write-batch operation.
786    #[error(transparent)]
787    WriteBatchExecutionError(ExecutionError),
788
789    /// A session creation error
790    #[error(transparent)]
791    NewSessionError(#[from] NewSessionError),
792
793    /// A next row error in ScyllaDB
794    #[error(transparent)]
795    NextRowError(#[from] NextRowError),
796
797    /// Namespace contains forbidden characters
798    #[error("Namespace contains forbidden characters")]
799    InvalidNamespace,
800
801    /// The key must have at most `MAX_KEY_SIZE` bytes
802    #[error("The key must have at most MAX_KEY_SIZE")]
803    KeyTooLong,
804
805    /// The value must have at most `RAW_MAX_VALUE_SIZE` bytes
806    #[error("The value must have at most RAW_MAX_VALUE_SIZE")]
807    ValueTooLong,
808
809    /// The batch is too long to be written
810    #[error("The batch is too long to be written")]
811    BatchTooLong,
812
813    /// Keys have to be of nonzero length (the empty key is reserved for the
814    /// timestamp sentinel).
815    #[error("The key must be of nonzero length")]
816    ZeroLengthKey,
817}
818
819impl KeyValueStoreError for ScyllaDbStoreInternalError {
820    const BACKEND: &'static str = "scylla_db";
821
822    fn must_reload_view(&self) -> bool {
823        // Errors (notably timeouts) during a `write_batch` may leave the view in a
824        // undetermined state where the batch may or may not have happened.
825        matches!(self, Self::WriteBatchExecutionError(_))
826    }
827}
828
829impl WithError for ScyllaDbStoreInternal {
830    type Error = ScyllaDbStoreInternalError;
831}
832
833impl ReadableKeyValueStore for ScyllaDbStoreInternal {
834    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
835
836    fn max_stream_queries(&self) -> usize {
837        self.max_stream_queries
838    }
839
840    fn root_key(&self) -> Result<Vec<u8>, ScyllaDbStoreInternalError> {
841        Ok(self.root_key[1..].to_vec())
842    }
843
844    async fn read_value_bytes(
845        &self,
846        key: &[u8],
847    ) -> Result<Option<Vec<u8>>, ScyllaDbStoreInternalError> {
848        let store = self.store.deref();
849        let _guard = self.acquire().await;
850        Box::pin(store.read_value_internal(&self.root_key, key.to_vec())).await
851    }
852
853    async fn contains_key(&self, key: &[u8]) -> Result<bool, ScyllaDbStoreInternalError> {
854        let store = self.store.deref();
855        let _guard = self.acquire().await;
856        Box::pin(store.contains_key_internal(&self.root_key, key.to_vec())).await
857    }
858
859    async fn contains_keys(
860        &self,
861        keys: &[Vec<u8>],
862    ) -> Result<Vec<bool>, ScyllaDbStoreInternalError> {
863        if keys.is_empty() {
864            return Ok(Vec::new());
865        }
866        let store = self.store.deref();
867        let _guard = self.acquire().await;
868        let handles = keys
869            .chunks(MAX_MULTI_KEYS)
870            .map(|keys| store.contains_keys_internal(&self.root_key, keys.to_vec()));
871        let results: Vec<_> = join_all(handles)
872            .await
873            .into_iter()
874            .collect::<Result<_, _>>()?;
875        Ok(results.into_iter().flatten().collect())
876    }
877
878    async fn read_multi_values_bytes(
879        &self,
880        keys: &[Vec<u8>],
881    ) -> Result<Vec<Option<Vec<u8>>>, ScyllaDbStoreInternalError> {
882        if keys.is_empty() {
883            return Ok(Vec::new());
884        }
885        let store = self.store.deref();
886        let _guard = self.acquire().await;
887        let handles = keys
888            .chunks(MAX_MULTI_KEYS)
889            .map(|keys| store.read_multi_values_internal(&self.root_key, keys.to_vec()));
890        let results: Vec<_> = join_all(handles)
891            .await
892            .into_iter()
893            .collect::<Result<_, _>>()?;
894        Ok(results.into_iter().flatten().collect())
895    }
896
897    async fn find_keys_by_prefix(
898        &self,
899        key_prefix: &[u8],
900    ) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
901        let store = self.store.deref();
902        let _guard = self.acquire().await;
903        Box::pin(store.find_keys_by_prefix_internal(&self.root_key, key_prefix.to_vec())).await
904    }
905
906    async fn find_key_values_by_prefix(
907        &self,
908        key_prefix: &[u8],
909    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ScyllaDbStoreInternalError> {
910        let store = self.store.deref();
911        let _guard = self.acquire().await;
912        Box::pin(store.find_key_values_by_prefix_internal(&self.root_key, key_prefix.to_vec()))
913            .await
914    }
915}
916
917impl DirectWritableKeyValueStore for ScyllaDbStoreInternal {
918    const MAX_BATCH_SIZE: usize = MAX_BATCH_SIZE;
919    const MAX_BATCH_TOTAL_SIZE: usize = MAX_BATCH_TOTAL_SIZE;
920    const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
921
922    // ScyllaDB cannot take a `crate::batch::Batch` directly. Indeed, if a delete is
923    // followed by a write, then the delete takes priority. See the sentence "The first
924    // tie-breaking rule when two cells have the same write timestamp is that dead cells
925    // win over live cells" from
926    // https://github.com/scylladb/scylladb/blob/master/docs/dev/timestamp-conflict-resolution.md
927    //
928    // We therefore order the prefix-deletions strictly before the insertions:
929    //   * In exclusive mode we own the timestamps, so we issue a single atomic CQL
930    //     batch with explicit per-statement `USING TIMESTAMP` (`T` for the
931    //     prefix-deletions, `T + 1` for the data). See `write_batch_exclusive`.
932    //   * In shared mode the coordinator owns the timestamps, so we split the write
933    //     into two sequential CQL batches.
934    type Batch = UnorderedBatch;
935
936    async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> {
937        let store = self.store.deref();
938        let _guard = self.acquire().await;
939        ScyllaDbClient::check_batch_len(&batch)?;
940        if self.is_exclusive {
941            // A single atomic batch; ordering is pinned by the explicit timestamps.
942            let t = self.next_batch_ts().await?;
943            store.write_batch_exclusive(&self.root_key, batch, t).await
944        } else {
945            store
946                .write_batch_prefix_deletes(&self.root_key, batch.key_prefix_deletions)
947                .await?;
948            store
949                .write_simple_batch(&self.root_key, batch.simple_unordered_batch)
950                .await?;
951            Ok(())
952        }
953    }
954}
955
956impl ScyllaDbStoreInternal {
957    /// Seeds the per-store timestamp floor on first write in exclusive mode.
958    /// Reads `WRITETIME` of this chain's row in the reserved sentinel
959    /// partition (written by every prior exclusive batch). Falls back to the
960    /// current wall clock if the row does not yet exist. Idempotent — only
961    /// the first caller wins the compare-exchange.
962    async fn ensure_ts_seeded(&self) -> Result<(), ScyllaDbStoreInternalError> {
963        if self.ts_floor.load(Ordering::Relaxed) > 0 {
964            return Ok(());
965        }
966        let writetime = self
967            .store
968            .read_writetime_internal(&self.root_key, WRITETIME_SENTINEL_KEY.to_vec())
969            .await?
970            .unwrap_or(0);
971        let now_us = SystemTime::now()
972            .duration_since(UNIX_EPOCH)
973            .ok()
974            .and_then(|d| i64::try_from(d.as_micros()).ok())
975            .unwrap_or(0);
976        // `writetime` is the last batch's `T + 1`, i.e. the highest timestamp it
977        // consumed; that is exactly what `ts_floor` tracks, so seed it directly.
978        let seed = now_us.max(writetime);
979        if self
980            .ts_floor
981            .compare_exchange(0, seed, Ordering::Relaxed, Ordering::Relaxed)
982            .is_err()
983        {
984            // Another caller seeded first; their value wins.
985        }
986        Ok(())
987    }
988
989    /// Returns the base timestamp `T` for the next batch in exclusive mode.
990    /// The batch may also use `T + 1`; the generator advances by 2 per call,
991    /// preserving monotonicity across batches in this process.
992    async fn next_batch_ts(&self) -> Result<i64, ScyllaDbStoreInternalError> {
993        self.ensure_ts_seeded().await?;
994        loop {
995            let prev = self.ts_floor.load(Ordering::Relaxed);
996            let now_us = SystemTime::now()
997                .duration_since(UNIX_EPOCH)
998                .ok()
999                .and_then(|d| i64::try_from(d.as_micros()).ok())
1000                .unwrap_or(prev);
1001            let next = std::cmp::max(now_us, prev + 1);
1002            // The batch uses `next` (`T`) and `next + 1` (`T + 1`); store the latter
1003            // so the following batch starts strictly above both.
1004            if self
1005                .ts_floor
1006                .compare_exchange_weak(prev, next + 1, Ordering::Relaxed, Ordering::Relaxed)
1007                .is_ok()
1008            {
1009                return Ok(next);
1010            }
1011        }
1012    }
1013}
1014
1015// ScyllaDB requires that the keys are non-empty.
1016fn get_big_root_key(root_key: &[u8]) -> Vec<u8> {
1017    let mut big_key = vec![0];
1018    big_key.extend(root_key);
1019    big_key
1020}
1021
1022/// Reserved clustering key inside each chain's partition that holds the
1023/// timestamp sentinel used to seed the per-store client timestamp generator
1024/// in exclusive mode. The empty clustering key is unused by any caller:
1025/// views always write keys prefixed with a tag byte (>= `MIN_VIEW_TAG`),
1026/// and the journaling layer writes 6-byte keys starting with `[0, ...]`.
1027const WRITETIME_SENTINEL_KEY: &[u8] = &[];
1028
1029/// The type for building a new ScyllaDB Key Value Store
1030#[derive(Clone, Debug, Deserialize, Serialize)]
1031pub struct ScyllaDbStoreInternalConfig {
1032    /// The URL to which the requests have to be sent
1033    pub uri: String,
1034    /// Maximum number of concurrent database queries allowed for this client.
1035    pub max_concurrent_queries: Option<usize>,
1036    /// Preferred buffer size for async streams.
1037    pub max_stream_queries: usize,
1038    /// The replication factor.
1039    pub replication_factor: u32,
1040}
1041
1042impl KeyValueDatabase for ScyllaDbDatabaseInternal {
1043    type Config = ScyllaDbStoreInternalConfig;
1044    type Store = ScyllaDbStoreInternal;
1045
1046    fn get_name() -> String {
1047        "scylladb internal".to_string()
1048    }
1049
1050    async fn connect(
1051        config: &Self::Config,
1052        namespace: &str,
1053    ) -> Result<Self, ScyllaDbStoreInternalError> {
1054        Self::check_namespace(namespace)?;
1055        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
1056        let store = ScyllaDbClient::new(session, namespace).await?;
1057        let store = Arc::new(store);
1058        let semaphore = config
1059            .max_concurrent_queries
1060            .map(|n| Arc::new(Semaphore::new(n)));
1061        let max_stream_queries = config.max_stream_queries;
1062        Ok(Self {
1063            store,
1064            semaphore,
1065            max_stream_queries,
1066        })
1067    }
1068
1069    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
1070        let store = self.store.clone();
1071        let semaphore = self.semaphore.clone();
1072        let max_stream_queries = self.max_stream_queries;
1073        let root_key = get_big_root_key(root_key);
1074        Ok(ScyllaDbStoreInternal {
1075            store,
1076            semaphore,
1077            max_stream_queries,
1078            root_key,
1079            is_exclusive: false,
1080            ts_floor: Arc::new(AtomicI64::new(0)),
1081        })
1082    }
1083
1084    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, ScyllaDbStoreInternalError> {
1085        let store = self.store.clone();
1086        let semaphore = self.semaphore.clone();
1087        let max_stream_queries = self.max_stream_queries;
1088        let root_key = get_big_root_key(root_key);
1089        Ok(ScyllaDbStoreInternal {
1090            store,
1091            semaphore,
1092            max_stream_queries,
1093            root_key,
1094            is_exclusive: true,
1095            ts_floor: Arc::new(AtomicI64::new(0)),
1096        })
1097    }
1098
1099    async fn list_all(config: &Self::Config) -> Result<Vec<String>, ScyllaDbStoreInternalError> {
1100        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
1101        let statement = session
1102            .prepare(format!("DESCRIBE KEYSPACE {KEYSPACE}"))
1103            .await?;
1104        let result = Box::pin(session.execute_iter(statement, &[])).await;
1105        let miss_msg = format!("'{KEYSPACE}' not found in keyspaces");
1106        let result = match result {
1107            Ok(result) => result,
1108            Err(error) => {
1109                let invalid_or_keyspace_not_found = match &error {
1110                    PagerExecutionError::NextPageError(NextPageError::RequestFailure(
1111                        RequestError::LastAttemptError(RequestAttemptError::DbError(db_error, msg)),
1112                    )) => *db_error == DbError::Invalid && msg.as_str() == miss_msg,
1113                    _ => false,
1114                };
1115                if invalid_or_keyspace_not_found {
1116                    return Ok(Vec::new());
1117                } else {
1118                    return Err(ScyllaDbStoreInternalError::PagerExecutionError(error));
1119                }
1120            }
1121        };
1122        let mut namespaces = Vec::new();
1123        let mut rows_stream = result.rows_stream::<(String, String, String, String)>()?;
1124        while let Some(row) = rows_stream.next().await {
1125            let (_, object_kind, name, _) = row?;
1126            if object_kind == "table" {
1127                namespaces.push(name);
1128            }
1129        }
1130        Ok(namespaces)
1131    }
1132
1133    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, ScyllaDbStoreInternalError> {
1134        let statement = self
1135            .store
1136            .session
1137            .prepare(format!(
1138                "SELECT root_key FROM {}.\"{}\" ALLOW FILTERING",
1139                KEYSPACE, self.store.namespace
1140            ))
1141            .await?;
1142
1143        // Execute the query
1144        let rows = Box::pin(self.store.session.execute_iter(statement, &[])).await?;
1145        let mut rows = rows.rows_stream::<(Vec<u8>,)>()?;
1146        let mut root_keys = BTreeSet::new();
1147        while let Some(row) = rows.next().await {
1148            let (root_key,) = row?;
1149            let root_key = root_key[1..].to_vec();
1150            root_keys.insert(root_key);
1151        }
1152        Ok(root_keys.into_iter().collect::<Vec<_>>())
1153    }
1154
1155    async fn delete_all(store_config: &Self::Config) -> Result<(), ScyllaDbStoreInternalError> {
1156        let session = ScyllaDbClient::build_default_session(&store_config.uri).await?;
1157        let statement = session
1158            .prepare(format!("DROP KEYSPACE IF EXISTS {KEYSPACE}"))
1159            .await?;
1160
1161        session
1162            .execute_single_page(&statement, &[], PagingState::start())
1163            .await
1164            .map_err(ScyllaDbStoreInternalError::ExecutionError)?;
1165        Ok(())
1166    }
1167
1168    async fn exists(
1169        config: &Self::Config,
1170        namespace: &str,
1171    ) -> Result<bool, ScyllaDbStoreInternalError> {
1172        Self::check_namespace(namespace)?;
1173        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
1174
1175        // We check the way the test can fail. It can fail in different ways.
1176        let result = session
1177            .prepare(format!(
1178                "SELECT root_key FROM {KEYSPACE}.\"{namespace}\" LIMIT 1 ALLOW FILTERING"
1179            ))
1180            .await;
1181
1182        // The missing table translates into a very specific error that we matched
1183        let miss_msg1 = format!("unconfigured table {namespace}");
1184        let miss_msg1 = miss_msg1.as_str();
1185        let miss_msg2 = "Undefined name root_key in selection clause";
1186        let miss_msg3 = format!("Keyspace {KEYSPACE} does not exist");
1187        let Err(error) = result else {
1188            // If OK, then the table exists
1189            return Ok(true);
1190        };
1191        let missing_table = match &error {
1192            PrepareError::AllAttemptsFailed {
1193                first_attempt: RequestAttemptError::DbError(db_error, msg),
1194            } => {
1195                if *db_error != DbError::Invalid {
1196                    false
1197                } else {
1198                    msg.as_str() == miss_msg1
1199                        || msg.as_str() == miss_msg2
1200                        || msg.as_str() == miss_msg3
1201                }
1202            }
1203            _ => false,
1204        };
1205        if missing_table {
1206            Ok(false)
1207        } else {
1208            Err(ScyllaDbStoreInternalError::PrepareError(error))
1209        }
1210    }
1211
1212    async fn create(
1213        config: &Self::Config,
1214        namespace: &str,
1215    ) -> Result<(), ScyllaDbStoreInternalError> {
1216        Self::check_namespace(namespace)?;
1217        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
1218
1219        // Create a keyspace if it doesn't exist
1220        let statement = session
1221            .prepare(format!(
1222                "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ \
1223                    'class' : 'NetworkTopologyStrategy', \
1224                    'replication_factor' : {} \
1225                }}",
1226                KEYSPACE, config.replication_factor
1227            ))
1228            .await?;
1229        session
1230            .execute_single_page(&statement, &[], PagingState::start())
1231            .await
1232            .map_err(ScyllaDbStoreInternalError::ExecutionError)?;
1233
1234        // This explicitly sets a lot of default parameters for clarity and for making future
1235        // changes easier.
1236        let statement = session
1237            .prepare(format!(
1238                "CREATE TABLE {KEYSPACE}.\"{namespace}\" (\
1239                    root_key blob, \
1240                    k blob, \
1241                    v blob, \
1242                    PRIMARY KEY (root_key, k) \
1243                ) \
1244                WITH compaction = {{ \
1245                    'class'          : 'LeveledCompactionStrategy', \
1246                    'sstable_size_in_mb' : 160 \
1247                }} \
1248                AND compression = {{ \
1249                    'sstable_compression': 'LZ4Compressor', \
1250                    'chunk_length_in_kb':'4' \
1251                }} \
1252                AND caching = {{ \
1253                    'enabled': 'true' \
1254                }} \
1255                AND gc_grace_seconds = 0 \
1256                AND tombstone_gc = {{'mode': 'immediate'}}"
1257            ))
1258            .await?;
1259        session
1260            .execute_single_page(&statement, &[], PagingState::start())
1261            .await
1262            .map_err(ScyllaDbStoreInternalError::ExecutionError)?;
1263        Ok(())
1264    }
1265
1266    async fn delete(
1267        config: &Self::Config,
1268        namespace: &str,
1269    ) -> Result<(), ScyllaDbStoreInternalError> {
1270        Self::check_namespace(namespace)?;
1271        let session = ScyllaDbClient::build_default_session(&config.uri).await?;
1272        let statement = session
1273            .prepare(format!("DROP TABLE IF EXISTS {KEYSPACE}.\"{namespace}\";"))
1274            .await?;
1275        session
1276            .execute_single_page(&statement, &[], PagingState::start())
1277            .await
1278            .map_err(ScyllaDbStoreInternalError::ExecutionError)?;
1279        Ok(())
1280    }
1281}
1282
1283impl ScyllaDbStoreInternal {
1284    /// Obtains the semaphore lock on the database if needed.
1285    async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
1286        match &self.semaphore {
1287            None => None,
1288            Some(count) => Some(count.acquire().await),
1289        }
1290    }
1291}
1292
1293impl ScyllaDbDatabaseInternal {
1294    fn check_namespace(namespace: &str) -> Result<(), ScyllaDbStoreInternalError> {
1295        if !namespace.is_empty()
1296            && namespace.len() <= 48
1297            && namespace
1298                .chars()
1299                .all(|c| c.is_ascii_alphanumeric() || c == '_')
1300        {
1301            return Ok(());
1302        }
1303        Err(ScyllaDbStoreInternalError::InvalidNamespace)
1304    }
1305}
1306
1307#[cfg(with_testing)]
1308impl TestKeyValueDatabase for JournalingKeyValueDatabase<ScyllaDbDatabaseInternal> {
1309    async fn new_test_config(
1310    ) -> Result<ScyllaDbStoreInternalConfig, JournalingError<ScyllaDbStoreInternalError>> {
1311        // TODO(#4114): Read the port from an environment variable.
1312        let uri = "localhost:9042".to_string();
1313        Ok(ScyllaDbStoreInternalConfig {
1314            uri,
1315            max_concurrent_queries: Some(10),
1316            max_stream_queries: 10,
1317            replication_factor: 1,
1318        })
1319    }
1320}
1321
1322/// The `ScyllaDbDatabase` composed type with metrics
1323#[cfg(with_metrics)]
1324pub type ScyllaDbDatabase = MeteredDatabase<
1325    LruCachingDatabase<
1326        MeteredDatabase<
1327            ValueSplittingDatabase<
1328                MeteredDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
1329            >,
1330        >,
1331    >,
1332>;
1333
1334/// The `ScyllaDbDatabase` composed type
1335#[cfg(not(with_metrics))]
1336pub type ScyllaDbDatabase = LruCachingDatabase<
1337    ValueSplittingDatabase<JournalingKeyValueDatabase<ScyllaDbDatabaseInternal>>,
1338>;
1339
1340/// The `ScyllaDbStoreConfig` input type
1341pub type ScyllaDbStoreConfig = LruCachingConfig<ScyllaDbStoreInternalConfig>;
1342
1343/// The combined error type for the `ScyllaDbDatabase`.
1344pub type ScyllaDbStoreError = ValueSplittingError<JournalingError<ScyllaDbStoreInternalError>>;