linera_views/backends/
dynamo_db.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] for the DynamoDB database.
5
6use std::{
7    collections::HashMap,
8    env,
9    sync::{
10        atomic::{AtomicBool, Ordering},
11        Arc,
12    },
13};
14
15use async_lock::{Semaphore, SemaphoreGuard};
16use aws_sdk_dynamodb::{
17    error::SdkError,
18    operation::{
19        batch_get_item::BatchGetItemError,
20        create_table::CreateTableError,
21        delete_table::DeleteTableError,
22        get_item::GetItemError,
23        list_tables::ListTablesError,
24        query::{QueryError, QueryOutput},
25        transact_write_items::TransactWriteItemsError,
26    },
27    primitives::Blob,
28    types::{
29        AttributeDefinition, AttributeValue, Delete, KeySchemaElement, KeyType, KeysAndAttributes,
30        ProvisionedThroughput, Put, ScalarAttributeType, TransactWriteItem,
31    },
32    Client,
33};
34use aws_smithy_types::error::operation::BuildError;
35use futures::future::join_all;
36use linera_base::{ensure, util::future::FutureSyncExt as _};
37use serde::{Deserialize, Serialize};
38use thiserror::Error;
39
40#[cfg(with_metrics)]
41use crate::metering::MeteredDatabase;
42#[cfg(with_testing)]
43use crate::store::TestKeyValueDatabase;
44use crate::{
45    batch::SimpleUnorderedBatch,
46    common::get_uleb128_size,
47    journaling::{JournalConsistencyError, JournalingKeyValueDatabase},
48    lru_caching::{LruCachingConfig, LruCachingDatabase},
49    store::{
50        DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
51        WithError,
52    },
53    value_splitting::{ValueSplittingDatabase, ValueSplittingError},
54};
55
56/// Name of the environment variable with the address to a DynamoDB local instance.
57const DYNAMODB_LOCAL_ENDPOINT: &str = "DYNAMODB_LOCAL_ENDPOINT";
58
59/// Gets the AWS configuration from the environment
60async fn get_base_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError> {
61    let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
62        .boxed_sync()
63        .await;
64    Ok((&base_config).into())
65}
66
67fn get_endpoint_address() -> Option<String> {
68    env::var(DYNAMODB_LOCAL_ENDPOINT).ok()
69}
70
71/// Gets the DynamoDB local config
72async fn get_dynamodb_local_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError>
73{
74    let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
75        .boxed_sync()
76        .await;
77    let endpoint_address = get_endpoint_address().unwrap();
78    let config = aws_sdk_dynamodb::config::Builder::from(&base_config)
79        .endpoint_url(endpoint_address)
80        .build();
81    Ok(config)
82}
83
84/// DynamoDB forbids the iteration over the partition keys.
85/// Therefore we use a special partition key named `[1]` for storing
86/// the root keys. For normal root keys, we simply put a `[0]` in
87/// front therefore no intersection is possible.
88const PARTITION_KEY_ROOT_KEY: &[u8] = &[1];
89
90/// The attribute name of the partition key.
91const PARTITION_ATTRIBUTE: &str = "item_partition";
92
93/// A root key being used for testing existence of tables
94const EMPTY_ROOT_KEY: &[u8] = &[0];
95
96/// A key being used for testing existence of tables
97const DB_KEY: &[u8] = &[0];
98
99/// The attribute name of the primary key (used as a sort key).
100const KEY_ATTRIBUTE: &str = "item_key";
101
102/// The attribute name of the table value blob.
103const VALUE_ATTRIBUTE: &str = "item_value";
104
105/// The attribute for obtaining the primary key (used as a sort key) with the stored value.
106const KEY_VALUE_ATTRIBUTE: &str = "item_key, item_value";
107
108/// TODO(#1084): The scheme below with the `MAX_VALUE_SIZE` has to be checked
109/// This is the maximum size of a raw value in DynamoDB.
110const RAW_MAX_VALUE_SIZE: usize = 409600;
111
112/// Fundamental constants in DynamoDB: The maximum size of a value is 400 KB
113/// See https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ServiceQuotas.html
114/// However, the value being written can also be the serialization of a `SimpleUnorderedBatch`
115/// Therefore the actual `MAX_VALUE_SIZE` might be lower.
116/// At the maximum key size is 1024 bytes (see below) and we pack just one entry.
117/// So if the key has 1024 bytes this gets us the inequality
118/// `1 + 1 + serialized_size(1024)? + serialized_size(x)? <= 400*1024`
119/// and so this simplifies to `1 + 1 + (2 + 1024) + (3 + x) <= 400 * 1024`
120/// Note on the following formula:
121/// * We write 3 because `get_uleb128_size(400*1024) == 3`
122/// * We write `1 + 1` because the `SimpleUnorderedBatch` has two entries
123///
124/// This gets us a maximal value of 408569;
125const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
126    - MAX_KEY_SIZE
127    - get_uleb128_size(RAW_MAX_VALUE_SIZE)
128    - get_uleb128_size(MAX_KEY_SIZE)
129    - 1
130    - 1;
131
132/// Fundamental constant in DynamoDB: The maximum size of a key is 1024 bytes.
133/// We decrease by 1 because we append a [1] as prefix.
134/// See https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html
135const MAX_KEY_SIZE: usize = 1023;
136
137/// Fundamental constants in DynamoDB: The maximum size of a [`TransactWriteItem`] is 4 MB.
138/// See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html
139/// We're taking a conservative value because the mode of computation is unclear.
140const MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE: usize = 4000000;
141
142/// The DynamoDB database is potentially handling an infinite number of connections.
143/// However, for testing or some other purpose we really need to decrease the number of
144/// connections.
145#[cfg(with_testing)]
146const TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES: usize = 10;
147
148/// The number of entries in a stream of the tests can be controlled by this parameter for tests.
149#[cfg(with_testing)]
150const TEST_DYNAMO_DB_MAX_STREAM_QUERIES: usize = 10;
151
152/// Fundamental constants in DynamoDB: The maximum size of a [`TransactWriteItem`] is 100.
153/// See <https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html>
154const MAX_TRANSACT_WRITE_ITEM_SIZE: usize = 100;
155
156/// Maximum number of entries that can be obtained in a [`BatchGetItem`] operation.
157/// The two constraints are at most 100 operations and at most 16M in total.
158/// Since the maximum size of a value is 400K, this gets us 40 as upper limit
159const MAX_BATCH_GET_ITEM_SIZE: usize = 40;
160
161/// Builds the key attributes for a table item.
162///
163/// The key is composed of two attributes that are both binary blobs. The first attribute is a
164/// partition key and is currently just a dummy value that ensures all items are in the same
165/// partition. This is necessary for range queries to work correctly.
166///
167/// The second attribute is the actual key value, which is generated by concatenating the
168/// context prefix. `The Vec<u8>` expression is obtained from `self.derive_key`.
169fn build_key(start_key: &[u8], key: Vec<u8>) -> HashMap<String, AttributeValue> {
170    let mut prefixed_key = vec![1];
171    prefixed_key.extend(key);
172    [
173        (
174            PARTITION_ATTRIBUTE.to_owned(),
175            AttributeValue::B(Blob::new(start_key.to_vec())),
176        ),
177        (
178            KEY_ATTRIBUTE.to_owned(),
179            AttributeValue::B(Blob::new(prefixed_key)),
180        ),
181    ]
182    .into()
183}
184
185/// Builds the value attribute for storing a table item.
186fn build_key_value(
187    start_key: &[u8],
188    key: Vec<u8>,
189    value: Vec<u8>,
190) -> HashMap<String, AttributeValue> {
191    let mut prefixed_key = vec![1];
192    prefixed_key.extend(key);
193    [
194        (
195            PARTITION_ATTRIBUTE.to_owned(),
196            AttributeValue::B(Blob::new(start_key.to_vec())),
197        ),
198        (
199            KEY_ATTRIBUTE.to_owned(),
200            AttributeValue::B(Blob::new(prefixed_key)),
201        ),
202        (
203            VALUE_ATTRIBUTE.to_owned(),
204            AttributeValue::B(Blob::new(value)),
205        ),
206    ]
207    .into()
208}
209
210/// Checks that a key is of the correct size
211fn check_key_size(key: &[u8]) -> Result<(), DynamoDbStoreInternalError> {
212    ensure!(
213        key.len() <= MAX_KEY_SIZE,
214        DynamoDbStoreInternalError::KeyTooLong
215    );
216    Ok(())
217}
218
219/// Extracts the key attribute from an item.
220fn extract_key(
221    prefix_len: usize,
222    attributes: &HashMap<String, AttributeValue>,
223) -> Result<&[u8], DynamoDbStoreInternalError> {
224    let key = attributes
225        .get(KEY_ATTRIBUTE)
226        .ok_or(DynamoDbStoreInternalError::MissingKey)?;
227    match key {
228        AttributeValue::B(blob) => Ok(&blob.as_ref()[1 + prefix_len..]),
229        key => Err(DynamoDbStoreInternalError::wrong_key_type(key)),
230    }
231}
232
233/// Extracts the value attribute from an item.
234fn extract_value(
235    attributes: &HashMap<String, AttributeValue>,
236) -> Result<&[u8], DynamoDbStoreInternalError> {
237    // According to the official AWS DynamoDB documentation:
238    // "Binary must have a length greater than zero if the attribute is used as a key attribute for a table or index"
239    let value = attributes
240        .get(VALUE_ATTRIBUTE)
241        .ok_or(DynamoDbStoreInternalError::MissingValue)?;
242    match value {
243        AttributeValue::B(blob) => Ok(blob.as_ref()),
244        value => Err(DynamoDbStoreInternalError::wrong_value_type(value)),
245    }
246}
247
248/// Extracts the value attribute from an item (returned by value).
249fn extract_value_owned(
250    attributes: &mut HashMap<String, AttributeValue>,
251) -> Result<Vec<u8>, DynamoDbStoreInternalError> {
252    let value = attributes
253        .remove(VALUE_ATTRIBUTE)
254        .ok_or(DynamoDbStoreInternalError::MissingValue)?;
255    match value {
256        AttributeValue::B(blob) => Ok(blob.into_inner()),
257        value => Err(DynamoDbStoreInternalError::wrong_value_type(&value)),
258    }
259}
260
261/// Extracts the key and value attributes from an item.
262fn extract_key_value(
263    prefix_len: usize,
264    attributes: &HashMap<String, AttributeValue>,
265) -> Result<(&[u8], &[u8]), DynamoDbStoreInternalError> {
266    let key = extract_key(prefix_len, attributes)?;
267    let value = extract_value(attributes)?;
268    Ok((key, value))
269}
270
271struct TransactionBuilder {
272    start_key: Vec<u8>,
273    transactions: Vec<TransactWriteItem>,
274}
275
276impl TransactionBuilder {
277    fn new(start_key: &[u8]) -> Self {
278        Self {
279            start_key: start_key.to_vec(),
280            transactions: Vec::new(),
281        }
282    }
283
284    fn insert_delete_request(
285        &mut self,
286        key: Vec<u8>,
287        store: &DynamoDbStoreInternal,
288    ) -> Result<(), DynamoDbStoreInternalError> {
289        let transaction = store.build_delete_transaction(&self.start_key, key)?;
290        self.transactions.push(transaction);
291        Ok(())
292    }
293
294    fn insert_put_request(
295        &mut self,
296        key: Vec<u8>,
297        value: Vec<u8>,
298        store: &DynamoDbStoreInternal,
299    ) -> Result<(), DynamoDbStoreInternalError> {
300        let transaction = store.build_put_transaction(&self.start_key, key, value)?;
301        self.transactions.push(transaction);
302        Ok(())
303    }
304}
305
306/// A DynamoDB client.
307#[derive(Clone, Debug)]
308pub struct DynamoDbStoreInternal {
309    client: Client,
310    namespace: String,
311    semaphore: Option<Arc<Semaphore>>,
312    max_stream_queries: usize,
313    start_key: Vec<u8>,
314    root_key_written: Arc<AtomicBool>,
315}
316
317/// Database-level connection to DynamoDB for managing namespaces and partitions.
318#[derive(Clone)]
319pub struct DynamoDbDatabaseInternal {
320    client: Client,
321    namespace: String,
322    semaphore: Option<Arc<Semaphore>>,
323    max_stream_queries: usize,
324}
325
326impl WithError for DynamoDbDatabaseInternal {
327    type Error = DynamoDbStoreInternalError;
328}
329
330/// The initial configuration of the system.
331#[derive(Debug, Clone, Serialize, Deserialize)]
332pub struct DynamoDbStoreInternalConfig {
333    /// Whether to use DynamoDB local or not.
334    pub use_dynamodb_local: bool,
335    /// Maximum number of concurrent database queries allowed for this client.
336    pub max_concurrent_queries: Option<usize>,
337    /// Preferred buffer size for async streams.
338    pub max_stream_queries: usize,
339}
340
341impl DynamoDbStoreInternalConfig {
342    async fn client(&self) -> Result<Client, DynamoDbStoreInternalError> {
343        let config = if self.use_dynamodb_local {
344            get_dynamodb_local_config().await?
345        } else {
346            get_base_config().await?
347        };
348        Ok(Client::from_conf(config))
349    }
350}
351
352impl KeyValueDatabase for DynamoDbDatabaseInternal {
353    type Config = DynamoDbStoreInternalConfig;
354    type Store = DynamoDbStoreInternal;
355
356    fn get_name() -> String {
357        "dynamodb internal".to_string()
358    }
359
360    async fn connect(
361        config: &Self::Config,
362        namespace: &str,
363    ) -> Result<Self, DynamoDbStoreInternalError> {
364        Self::check_namespace(namespace)?;
365        let client = config.client().await?;
366        let semaphore = config
367            .max_concurrent_queries
368            .map(|n| Arc::new(Semaphore::new(n)));
369        let max_stream_queries = config.max_stream_queries;
370        let namespace = namespace.to_string();
371        let store = Self {
372            client,
373            namespace,
374            semaphore,
375            max_stream_queries,
376        };
377        Ok(store)
378    }
379
380    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, DynamoDbStoreInternalError> {
381        let mut start_key = EMPTY_ROOT_KEY.to_vec();
382        start_key.extend(root_key);
383        Ok(self.open_internal(start_key))
384    }
385
386    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, DynamoDbStoreInternalError> {
387        self.open_shared(root_key)
388    }
389
390    async fn list_all(config: &Self::Config) -> Result<Vec<String>, DynamoDbStoreInternalError> {
391        let client = config.client().await?;
392        let mut namespaces = Vec::new();
393        let mut start_table = None;
394        loop {
395            let response = client
396                .list_tables()
397                .set_exclusive_start_table_name(start_table)
398                .send()
399                .boxed_sync()
400                .await?;
401            if let Some(namespaces_blk) = response.table_names {
402                namespaces.extend(namespaces_blk);
403            }
404            if response.last_evaluated_table_name.is_none() {
405                break;
406            } else {
407                start_table = response.last_evaluated_table_name;
408            }
409        }
410        Ok(namespaces)
411    }
412
413    async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
414        let store = self.open_internal(PARTITION_KEY_ROOT_KEY.to_vec());
415        store.find_keys_by_prefix(EMPTY_ROOT_KEY).await
416    }
417
418    async fn delete_all(config: &Self::Config) -> Result<(), DynamoDbStoreInternalError> {
419        let client = config.client().await?;
420        let tables = Self::list_all(config).await?;
421        for table in tables {
422            client
423                .delete_table()
424                .table_name(&table)
425                .send()
426                .boxed_sync()
427                .await?;
428        }
429        Ok(())
430    }
431
432    async fn exists(
433        config: &Self::Config,
434        namespace: &str,
435    ) -> Result<bool, DynamoDbStoreInternalError> {
436        Self::check_namespace(namespace)?;
437        let client = config.client().await?;
438        let key_db = build_key(EMPTY_ROOT_KEY, DB_KEY.to_vec());
439        let response = client
440            .get_item()
441            .table_name(namespace)
442            .set_key(Some(key_db))
443            .send()
444            .boxed_sync()
445            .await;
446        let Err(error) = response else {
447            return Ok(true);
448        };
449        let test = match &error {
450            SdkError::ServiceError(error) => match error.err() {
451                GetItemError::ResourceNotFoundException(error) => {
452                    error.message
453                        == Some("Cannot do operations on a non-existent table".to_string())
454                }
455                _ => false,
456            },
457            _ => false,
458        };
459        if test {
460            Ok(false)
461        } else {
462            Err(error.into())
463        }
464    }
465
466    async fn create(
467        config: &Self::Config,
468        namespace: &str,
469    ) -> Result<(), DynamoDbStoreInternalError> {
470        Self::check_namespace(namespace)?;
471        let client = config.client().await?;
472        client
473            .create_table()
474            .table_name(namespace)
475            .attribute_definitions(
476                AttributeDefinition::builder()
477                    .attribute_name(PARTITION_ATTRIBUTE)
478                    .attribute_type(ScalarAttributeType::B)
479                    .build()?,
480            )
481            .attribute_definitions(
482                AttributeDefinition::builder()
483                    .attribute_name(KEY_ATTRIBUTE)
484                    .attribute_type(ScalarAttributeType::B)
485                    .build()?,
486            )
487            .key_schema(
488                KeySchemaElement::builder()
489                    .attribute_name(PARTITION_ATTRIBUTE)
490                    .key_type(KeyType::Hash)
491                    .build()?,
492            )
493            .key_schema(
494                KeySchemaElement::builder()
495                    .attribute_name(KEY_ATTRIBUTE)
496                    .key_type(KeyType::Range)
497                    .build()?,
498            )
499            .provisioned_throughput(
500                ProvisionedThroughput::builder()
501                    .read_capacity_units(10)
502                    .write_capacity_units(10)
503                    .build()?,
504            )
505            .send()
506            .boxed_sync()
507            .await?;
508        Ok(())
509    }
510
511    async fn delete(
512        config: &Self::Config,
513        namespace: &str,
514    ) -> Result<(), DynamoDbStoreInternalError> {
515        Self::check_namespace(namespace)?;
516        let client = config.client().await?;
517        client
518            .delete_table()
519            .table_name(namespace)
520            .send()
521            .boxed_sync()
522            .await?;
523        Ok(())
524    }
525}
526
527impl DynamoDbDatabaseInternal {
528    /// Namespaces are named table names in DynamoDB [naming
529    /// rules](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.NamingRules),
530    /// so we need to check correctness of the namespace
531    fn check_namespace(namespace: &str) -> Result<(), InvalidNamespace> {
532        if namespace.len() < 3 {
533            return Err(InvalidNamespace::TooShort);
534        }
535        if namespace.len() > 255 {
536            return Err(InvalidNamespace::TooLong);
537        }
538        if !namespace.chars().all(|character| {
539            character.is_ascii_alphanumeric()
540                || character == '.'
541                || character == '-'
542                || character == '_'
543        }) {
544            return Err(InvalidNamespace::InvalidCharacter);
545        }
546        Ok(())
547    }
548
549    fn open_internal(&self, start_key: Vec<u8>) -> DynamoDbStoreInternal {
550        let client = self.client.clone();
551        let namespace = self.namespace.clone();
552        let semaphore = self.semaphore.clone();
553        let max_stream_queries = self.max_stream_queries;
554        DynamoDbStoreInternal {
555            client,
556            namespace,
557            semaphore,
558            max_stream_queries,
559            start_key,
560            root_key_written: Arc::new(AtomicBool::new(false)),
561        }
562    }
563}
564
565impl DynamoDbStoreInternal {
566    fn build_delete_transaction(
567        &self,
568        start_key: &[u8],
569        key: Vec<u8>,
570    ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
571        check_key_size(&key)?;
572        let request = Delete::builder()
573            .table_name(&self.namespace)
574            .set_key(Some(build_key(start_key, key)))
575            .build()?;
576        Ok(TransactWriteItem::builder().delete(request).build())
577    }
578
579    fn build_put_transaction(
580        &self,
581        start_key: &[u8],
582        key: Vec<u8>,
583        value: Vec<u8>,
584    ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
585        check_key_size(&key)?;
586        ensure!(
587            value.len() <= RAW_MAX_VALUE_SIZE,
588            DynamoDbStoreInternalError::ValueLengthTooLarge
589        );
590        let request = Put::builder()
591            .table_name(&self.namespace)
592            .set_item(Some(build_key_value(start_key, key, value)))
593            .build()?;
594        Ok(TransactWriteItem::builder().put(request).build())
595    }
596
597    /// Obtains the semaphore lock on the database if needed.
598    async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
599        match &self.semaphore {
600            None => None,
601            Some(count) => Some(count.acquire().await),
602        }
603    }
604
605    async fn get_query_output(
606        &self,
607        attribute_str: &str,
608        start_key: &[u8],
609        key_prefix: &[u8],
610        start_key_map: Option<HashMap<String, AttributeValue>>,
611    ) -> Result<QueryOutput, DynamoDbStoreInternalError> {
612        let _guard = self.acquire().await;
613        let start_key = start_key.to_vec();
614        let mut prefixed_key_prefix = vec![1];
615        prefixed_key_prefix.extend(key_prefix);
616        let response = self
617            .client
618            .query()
619            .table_name(&self.namespace)
620            .projection_expression(attribute_str)
621            .key_condition_expression(format!(
622                "{PARTITION_ATTRIBUTE} = :partition and begins_with({KEY_ATTRIBUTE}, :prefix)"
623            ))
624            .expression_attribute_values(":partition", AttributeValue::B(Blob::new(start_key)))
625            .expression_attribute_values(
626                ":prefix",
627                AttributeValue::B(Blob::new(prefixed_key_prefix)),
628            )
629            .set_exclusive_start_key(start_key_map)
630            .send()
631            .boxed_sync()
632            .await?;
633        Ok(response)
634    }
635
636    async fn read_value_bytes_general(
637        &self,
638        key_db: HashMap<String, AttributeValue>,
639    ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
640        let _guard = self.acquire().await;
641        let response = self
642            .client
643            .get_item()
644            .table_name(&self.namespace)
645            .set_key(Some(key_db))
646            .send()
647            .boxed_sync()
648            .await?;
649
650        match response.item {
651            Some(mut item) => {
652                let value = extract_value_owned(&mut item)?;
653                Ok(Some(value))
654            }
655            None => Ok(None),
656        }
657    }
658
659    async fn contains_key_general(
660        &self,
661        key_db: HashMap<String, AttributeValue>,
662    ) -> Result<bool, DynamoDbStoreInternalError> {
663        let _guard = self.acquire().await;
664        let response = self
665            .client
666            .get_item()
667            .table_name(&self.namespace)
668            .set_key(Some(key_db))
669            .projection_expression(PARTITION_ATTRIBUTE)
670            .send()
671            .boxed_sync()
672            .await?;
673
674        Ok(response.item.is_some())
675    }
676
677    async fn get_list_responses(
678        &self,
679        attribute: &str,
680        start_key: &[u8],
681        key_prefix: &[u8],
682    ) -> Result<QueryResponses, DynamoDbStoreInternalError> {
683        check_key_size(key_prefix)?;
684        let mut responses = Vec::new();
685        let mut start_key_map = None;
686        loop {
687            let response = self
688                .get_query_output(attribute, start_key, key_prefix, start_key_map)
689                .await?;
690            let last_evaluated = response.last_evaluated_key.clone();
691            responses.push(response);
692            match last_evaluated {
693                None => {
694                    break;
695                }
696                Some(value) => {
697                    start_key_map = Some(value);
698                }
699            }
700        }
701        Ok(QueryResponses {
702            prefix_len: key_prefix.len(),
703            responses,
704        })
705    }
706
707    async fn read_batch_values_bytes(
708        &self,
709        keys: &[Vec<u8>],
710    ) -> Result<Vec<Option<Vec<u8>>>, DynamoDbStoreInternalError> {
711        // Early return for empty keys
712        if keys.is_empty() {
713            return Ok(Vec::new());
714        }
715        let mut results = vec![None; keys.len()];
716
717        // Build the request keys
718        let mut request_keys = Vec::new();
719        let mut key_to_index = HashMap::<Vec<u8>, Vec<usize>>::new();
720
721        for (i, key) in keys.iter().enumerate() {
722            check_key_size(key)?;
723            let key_attrs = build_key(&self.start_key, key.clone());
724            key_to_index.entry(key.clone()).or_default().push(i);
725            request_keys.push(key_attrs);
726        }
727
728        let keys_and_attributes = KeysAndAttributes::builder()
729            .set_keys(Some(request_keys))
730            .build()?;
731
732        let mut request_items = HashMap::new();
733        request_items.insert(self.namespace.clone(), keys_and_attributes);
734
735        // Execute batch get item request with retry for unprocessed keys
736        let mut remaining_request_items = Some(request_items);
737
738        while let Some(request_items) = remaining_request_items {
739            // Skip if the request items are empty
740            if request_items.is_empty() {
741                break;
742            }
743
744            let _guard = self.acquire().await;
745            let response = self
746                .client
747                .batch_get_item()
748                .set_request_items(Some(request_items))
749                .send()
750                .boxed_sync()
751                .await?;
752
753            // Process returned items
754            if let Some(mut responses) = response.responses {
755                if let Some(items) = responses.remove(&self.namespace) {
756                    for mut item in items {
757                        // Extract key to find the original index
758                        let key_attr = item
759                            .get(KEY_ATTRIBUTE)
760                            .ok_or(DynamoDbStoreInternalError::MissingKey)?;
761
762                        if let AttributeValue::B(blob) = key_attr {
763                            let prefixed_key = blob.as_ref();
764                            let key = &prefixed_key[1..]; // Remove the [1] prefix
765                            if let Some(indices) = key_to_index.get(key) {
766                                if let Some((&last, rest)) = indices.split_last() {
767                                    let value = extract_value_owned(&mut item)?;
768                                    for index in rest {
769                                        results[*index] = Some(value.clone());
770                                    }
771                                    results[last] = Some(value);
772                                }
773                            }
774                        }
775                    }
776                }
777            }
778
779            // Handle unprocessed keys
780            remaining_request_items = response.unprocessed_keys;
781        }
782
783        Ok(results)
784    }
785}
786
787struct QueryResponses {
788    prefix_len: usize,
789    responses: Vec<QueryOutput>,
790}
791
792impl QueryResponses {
793    fn keys(&self) -> impl Iterator<Item = Result<&[u8], DynamoDbStoreInternalError>> {
794        self.responses
795            .iter()
796            .flat_map(|response| response.items.iter().flatten())
797            .map(|item| extract_key(self.prefix_len, item))
798    }
799
800    fn key_values(
801        &self,
802    ) -> impl Iterator<Item = Result<(&[u8], &[u8]), DynamoDbStoreInternalError>> {
803        self.responses
804            .iter()
805            .flat_map(|response| response.items.iter().flatten())
806            .map(|item| extract_key_value(self.prefix_len, item))
807    }
808}
809
810impl WithError for DynamoDbStoreInternal {
811    type Error = DynamoDbStoreInternalError;
812}
813
814impl ReadableKeyValueStore for DynamoDbStoreInternal {
815    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
816
817    fn max_stream_queries(&self) -> usize {
818        self.max_stream_queries
819    }
820
821    fn root_key(&self) -> Result<Vec<u8>, DynamoDbStoreInternalError> {
822        assert!(self.start_key.starts_with(EMPTY_ROOT_KEY));
823        Ok(self.start_key[EMPTY_ROOT_KEY.len()..].to_vec())
824    }
825
826    async fn read_value_bytes(
827        &self,
828        key: &[u8],
829    ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
830        check_key_size(key)?;
831        let key_db = build_key(&self.start_key, key.to_vec());
832        self.read_value_bytes_general(key_db).await
833    }
834
835    async fn contains_key(&self, key: &[u8]) -> Result<bool, DynamoDbStoreInternalError> {
836        check_key_size(key)?;
837        let key_db = build_key(&self.start_key, key.to_vec());
838        self.contains_key_general(key_db).await
839    }
840
841    async fn contains_keys(
842        &self,
843        keys: &[Vec<u8>],
844    ) -> Result<Vec<bool>, DynamoDbStoreInternalError> {
845        let mut handles = Vec::new();
846        for key in keys {
847            check_key_size(key)?;
848            let key_db = build_key(&self.start_key, key.clone());
849            let handle = self.contains_key_general(key_db);
850            handles.push(handle);
851        }
852        join_all(handles)
853            .await
854            .into_iter()
855            .collect::<Result<_, _>>()
856    }
857
858    async fn read_multi_values_bytes(
859        &self,
860        keys: &[Vec<u8>],
861    ) -> Result<Vec<Option<Vec<u8>>>, DynamoDbStoreInternalError> {
862        if keys.is_empty() {
863            return Ok(Vec::new());
864        }
865
866        let handles = keys
867            .chunks(MAX_BATCH_GET_ITEM_SIZE)
868            .map(|key_batch| self.read_batch_values_bytes(key_batch));
869        let results: Vec<_> = join_all(handles)
870            .await
871            .into_iter()
872            .collect::<Result<_, _>>()?;
873        Ok(results.into_iter().flatten().collect())
874    }
875
876    async fn find_keys_by_prefix(
877        &self,
878        key_prefix: &[u8],
879    ) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
880        let result_queries = self
881            .get_list_responses(KEY_ATTRIBUTE, &self.start_key, key_prefix)
882            .await?;
883        result_queries
884            .keys()
885            .map(|key| key.map(|k| k.to_vec()))
886            .collect()
887    }
888
889    async fn find_key_values_by_prefix(
890        &self,
891        key_prefix: &[u8],
892    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, DynamoDbStoreInternalError> {
893        let result_queries = self
894            .get_list_responses(KEY_VALUE_ATTRIBUTE, &self.start_key, key_prefix)
895            .await?;
896        result_queries
897            .key_values()
898            .map(|entry| entry.map(|(key, value)| (key.to_vec(), value.to_vec())))
899            .collect()
900    }
901}
902
903impl DirectWritableKeyValueStore for DynamoDbStoreInternal {
904    const MAX_BATCH_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_SIZE;
905    const MAX_BATCH_TOTAL_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE;
906    const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
907
908    // DynamoDB does not support the `DeletePrefix` operation.
909    type Batch = SimpleUnorderedBatch;
910
911    async fn write_batch(&self, batch: Self::Batch) -> Result<(), DynamoDbStoreInternalError> {
912        if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
913            let mut builder = TransactionBuilder::new(PARTITION_KEY_ROOT_KEY);
914            builder.insert_put_request(self.start_key.clone(), vec![], self)?;
915            self.client
916                .transact_write_items()
917                .set_transact_items(Some(builder.transactions))
918                .send()
919                .boxed_sync()
920                .await?;
921        }
922        let mut builder = TransactionBuilder::new(&self.start_key);
923        for key in batch.deletions {
924            builder.insert_delete_request(key, self)?;
925        }
926        for (key, value) in batch.insertions {
927            builder.insert_put_request(key, value, self)?;
928        }
929        if !builder.transactions.is_empty() {
930            let _guard = self.acquire().await;
931            self.client
932                .transact_write_items()
933                .set_transact_items(Some(builder.transactions))
934                .send()
935                .boxed_sync()
936                .await?;
937        }
938        Ok(())
939    }
940}
941
942/// Error when validating a namespace
943#[derive(Debug, Error)]
944pub enum InvalidNamespace {
945    /// The namespace should be at least 3 characters.
946    #[error("Namespace must have at least 3 characters")]
947    TooShort,
948
949    /// The namespace should be at most 63 characters.
950    #[error("Namespace must be at most 63 characters")]
951    TooLong,
952
953    /// allowed characters are lowercase letters, numbers, periods and hyphens
954    #[error("Namespace must only contain lowercase letters, numbers, periods and hyphens")]
955    InvalidCharacter,
956}
957
958/// Errors that occur when using [`DynamoDbStoreInternal`].
959#[derive(Debug, Error)]
960pub enum DynamoDbStoreInternalError {
961    /// An error occurred while getting the item.
962    #[error(transparent)]
963    Get(#[from] Box<SdkError<GetItemError>>),
964
965    /// An error occurred while batch getting items.
966    #[error(transparent)]
967    BatchGet(#[from] Box<SdkError<BatchGetItemError>>),
968
969    /// An error occurred while writing a transaction of items.
970    #[error(transparent)]
971    TransactWriteItem(#[from] Box<SdkError<TransactWriteItemsError>>),
972
973    /// An error occurred while doing a Query.
974    #[error(transparent)]
975    Query(#[from] Box<SdkError<QueryError>>),
976
977    /// An error occurred while deleting a table
978    #[error(transparent)]
979    DeleteTable(#[from] Box<SdkError<DeleteTableError>>),
980
981    /// An error occurred while listing tables
982    #[error(transparent)]
983    ListTables(#[from] Box<SdkError<ListTablesError>>),
984
985    /// The transact maximum size is `MAX_TRANSACT_WRITE_ITEM_SIZE`.
986    #[error("The transact must have length at most MAX_TRANSACT_WRITE_ITEM_SIZE")]
987    TransactUpperLimitSize,
988
989    /// The key must have at most 1024 bytes
990    #[error("The key must have at most 1024 bytes")]
991    KeyTooLong,
992
993    /// The key prefix must have at most 1024 bytes
994    #[error("The key prefix must have at most 1024 bytes")]
995    KeyPrefixTooLong,
996
997    /// The journal is not coherent
998    #[error(transparent)]
999    JournalConsistencyError(#[from] JournalConsistencyError),
1000
1001    /// The length of the value should be at most 400 KB.
1002    #[error("The DynamoDB value should be less than 400 KB")]
1003    ValueLengthTooLarge,
1004
1005    /// The stored key is missing.
1006    #[error("The stored key attribute is missing")]
1007    MissingKey,
1008
1009    /// The type of the keys was not correct (It should have been a binary blob).
1010    #[error("Key was stored as {0}, but it was expected to be stored as a binary blob")]
1011    WrongKeyType(String),
1012
1013    /// The value attribute is missing.
1014    #[error("The stored value attribute is missing")]
1015    MissingValue,
1016
1017    /// The value was stored as the wrong type (it should be a binary blob).
1018    #[error("Value was stored as {0}, but it was expected to be stored as a binary blob")]
1019    WrongValueType(String),
1020
1021    /// A BCS error occurred.
1022    #[error(transparent)]
1023    BcsError(#[from] bcs::Error),
1024
1025    /// A wrong namespace error occurred
1026    #[error(transparent)]
1027    InvalidNamespace(#[from] InvalidNamespace),
1028
1029    /// An error occurred while creating the table.
1030    #[error(transparent)]
1031    CreateTable(#[from] Box<SdkError<CreateTableError>>),
1032
1033    /// An error occurred while building an object
1034    #[error(transparent)]
1035    Build(#[from] Box<BuildError>),
1036}
1037
1038impl<InnerError> From<SdkError<InnerError>> for DynamoDbStoreInternalError
1039where
1040    DynamoDbStoreInternalError: From<Box<SdkError<InnerError>>>,
1041{
1042    fn from(error: SdkError<InnerError>) -> Self {
1043        Box::new(error).into()
1044    }
1045}
1046
1047impl From<BuildError> for DynamoDbStoreInternalError {
1048    fn from(error: BuildError) -> Self {
1049        Box::new(error).into()
1050    }
1051}
1052
1053impl DynamoDbStoreInternalError {
1054    /// Creates a [`DynamoDbStoreInternalError::WrongKeyType`] instance based on the returned value type.
1055    ///
1056    /// # Panics
1057    ///
1058    /// If the value type is in the correct type, a binary blob.
1059    pub fn wrong_key_type(value: &AttributeValue) -> Self {
1060        DynamoDbStoreInternalError::WrongKeyType(Self::type_description_of(value))
1061    }
1062
1063    /// Creates a [`DynamoDbStoreInternalError::WrongValueType`] instance based on the returned value type.
1064    ///
1065    /// # Panics
1066    ///
1067    /// If the value type is in the correct type, a binary blob.
1068    pub fn wrong_value_type(value: &AttributeValue) -> Self {
1069        DynamoDbStoreInternalError::WrongValueType(Self::type_description_of(value))
1070    }
1071
1072    fn type_description_of(value: &AttributeValue) -> String {
1073        match value {
1074            AttributeValue::B(_) => unreachable!("creating an error type for the correct type"),
1075            AttributeValue::Bool(_) => "a boolean",
1076            AttributeValue::Bs(_) => "a list of binary blobs",
1077            AttributeValue::L(_) => "a list",
1078            AttributeValue::M(_) => "a map",
1079            AttributeValue::N(_) => "a number",
1080            AttributeValue::Ns(_) => "a list of numbers",
1081            AttributeValue::Null(_) => "a null value",
1082            AttributeValue::S(_) => "a string",
1083            AttributeValue::Ss(_) => "a list of strings",
1084            _ => "an unknown type",
1085        }
1086        .to_owned()
1087    }
1088}
1089
1090impl KeyValueStoreError for DynamoDbStoreInternalError {
1091    const BACKEND: &'static str = "dynamo_db";
1092}
1093
1094#[cfg(with_testing)]
1095impl TestKeyValueDatabase for JournalingKeyValueDatabase<DynamoDbDatabaseInternal> {
1096    async fn new_test_config() -> Result<DynamoDbStoreInternalConfig, DynamoDbStoreInternalError> {
1097        Ok(DynamoDbStoreInternalConfig {
1098            use_dynamodb_local: true,
1099            max_concurrent_queries: Some(TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES),
1100            max_stream_queries: TEST_DYNAMO_DB_MAX_STREAM_QUERIES,
1101        })
1102    }
1103}
1104
1105/// The combined error type for [`DynamoDbDatabase`].
1106pub type DynamoDbStoreError = ValueSplittingError<DynamoDbStoreInternalError>;
1107
1108/// The config type for [`DynamoDbDatabase`]`
1109pub type DynamoDbStoreConfig = LruCachingConfig<DynamoDbStoreInternalConfig>;
1110
1111/// A shared DB client for DynamoDB with metrics
1112#[cfg(with_metrics)]
1113pub type DynamoDbDatabase = MeteredDatabase<
1114    LruCachingDatabase<
1115        MeteredDatabase<
1116            ValueSplittingDatabase<
1117                MeteredDatabase<JournalingKeyValueDatabase<DynamoDbDatabaseInternal>>,
1118            >,
1119        >,
1120    >,
1121>;
1122/// A shared DB client for DynamoDB
1123#[cfg(not(with_metrics))]
1124pub type DynamoDbDatabase = LruCachingDatabase<
1125    ValueSplittingDatabase<JournalingKeyValueDatabase<DynamoDbDatabaseInternal>>,
1126>;
1127
1128#[cfg(test)]
1129mod tests {
1130    use bcs::serialized_size;
1131
1132    use crate::common::get_uleb128_size;
1133
1134    #[test]
1135    fn test_serialization_len() {
1136        for n in [0, 10, 127, 128, 129, 16383, 16384, 20000] {
1137            let vec = vec![0u8; n];
1138            let est_size = get_uleb128_size(n) + n;
1139            let serial_size = serialized_size(&vec).unwrap();
1140            assert_eq!(est_size, serial_size);
1141        }
1142    }
1143}