linera_views/backends/
dynamo_db.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] for the DynamoDB database.
5
6use std::{
7    collections::HashMap,
8    env,
9    sync::{
10        atomic::{AtomicBool, Ordering},
11        Arc,
12    },
13};
14
15use async_lock::{Semaphore, SemaphoreGuard};
16use aws_sdk_dynamodb::{
17    error::SdkError,
18    operation::{
19        batch_get_item::BatchGetItemError,
20        create_table::CreateTableError,
21        delete_table::DeleteTableError,
22        get_item::GetItemError,
23        list_tables::ListTablesError,
24        query::{QueryError, QueryOutput},
25        transact_write_items::TransactWriteItemsError,
26    },
27    primitives::Blob,
28    types::{
29        AttributeDefinition, AttributeValue, Delete, KeySchemaElement, KeyType, KeysAndAttributes,
30        ProvisionedThroughput, Put, ScalarAttributeType, TransactWriteItem,
31    },
32    Client,
33};
34use aws_smithy_types::error::operation::BuildError;
35use futures::future::join_all;
36use linera_base::ensure;
37use serde::{Deserialize, Serialize};
38use thiserror::Error;
39
40#[cfg(with_metrics)]
41use crate::metering::MeteredDatabase;
42#[cfg(with_testing)]
43use crate::store::TestKeyValueDatabase;
44use crate::{
45    batch::SimpleUnorderedBatch,
46    common::get_uleb128_size,
47    journaling::{JournalConsistencyError, JournalingKeyValueDatabase},
48    lru_caching::{LruCachingConfig, LruCachingDatabase},
49    store::{
50        DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
51        WithError,
52    },
53    value_splitting::{ValueSplittingDatabase, ValueSplittingError},
54    FutureSyncExt as _,
55};
56
57/// Name of the environment variable with the address to a DynamoDB local instance.
58const DYNAMODB_LOCAL_ENDPOINT: &str = "DYNAMODB_LOCAL_ENDPOINT";
59
60/// Gets the AWS configuration from the environment
61async fn get_base_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError> {
62    let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
63        .boxed_sync()
64        .await;
65    Ok((&base_config).into())
66}
67
68fn get_endpoint_address() -> Option<String> {
69    env::var(DYNAMODB_LOCAL_ENDPOINT).ok()
70}
71
72/// Gets the DynamoDB local config
73async fn get_dynamodb_local_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError>
74{
75    let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
76        .boxed_sync()
77        .await;
78    let endpoint_address = get_endpoint_address().unwrap();
79    let config = aws_sdk_dynamodb::config::Builder::from(&base_config)
80        .endpoint_url(endpoint_address)
81        .build();
82    Ok(config)
83}
84
85/// DynamoDB forbids the iteration over the partition keys.
86/// Therefore we use a special partition key named `[1]` for storing
87/// the root keys. For normal root keys, we simply put a `[0]` in
88/// front therefore no intersection is possible.
89const PARTITION_KEY_ROOT_KEY: &[u8] = &[1];
90
91/// The attribute name of the partition key.
92const PARTITION_ATTRIBUTE: &str = "item_partition";
93
94/// A root key being used for testing existence of tables
95const EMPTY_ROOT_KEY: &[u8] = &[0];
96
97/// A key being used for testing existence of tables
98const DB_KEY: &[u8] = &[0];
99
100/// The attribute name of the primary key (used as a sort key).
101const KEY_ATTRIBUTE: &str = "item_key";
102
103/// The attribute name of the table value blob.
104const VALUE_ATTRIBUTE: &str = "item_value";
105
106/// The attribute for obtaining the primary key (used as a sort key) with the stored value.
107const KEY_VALUE_ATTRIBUTE: &str = "item_key, item_value";
108
109/// TODO(#1084): The scheme below with the `MAX_VALUE_SIZE` has to be checked
110/// This is the maximum size of a raw value in DynamoDB.
111const RAW_MAX_VALUE_SIZE: usize = 409600;
112
113/// Fundamental constants in DynamoDB: The maximum size of a value is 400 KB
114/// See https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ServiceQuotas.html
115/// However, the value being written can also be the serialization of a `SimpleUnorderedBatch`
116/// Therefore the actual `MAX_VALUE_SIZE` might be lower.
117/// At the maximum key size is 1024 bytes (see below) and we pack just one entry.
118/// So if the key has 1024 bytes this gets us the inequality
119/// `1 + 1 + serialized_size(1024)? + serialized_size(x)? <= 400*1024`
120/// and so this simplifies to `1 + 1 + (2 + 1024) + (3 + x) <= 400 * 1024`
121/// Note on the following formula:
122/// * We write 3 because `get_uleb128_size(400*1024) == 3`
123/// * We write `1 + 1` because the `SimpleUnorderedBatch` has two entries
124///
125/// This gets us a maximal value of 408569;
126const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
127    - MAX_KEY_SIZE
128    - get_uleb128_size(RAW_MAX_VALUE_SIZE)
129    - get_uleb128_size(MAX_KEY_SIZE)
130    - 1
131    - 1;
132
133/// Fundamental constant in DynamoDB: The maximum size of a key is 1024 bytes
134/// See https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html
135const MAX_KEY_SIZE: usize = 1024;
136
137/// Fundamental constants in DynamoDB: The maximum size of a [`TransactWriteItem`] is 4 MB.
138/// See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html
139/// We're taking a conservative value because the mode of computation is unclear.
140const MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE: usize = 4000000;
141
142/// The DynamoDB database is potentially handling an infinite number of connections.
143/// However, for testing or some other purpose we really need to decrease the number of
144/// connections.
145#[cfg(with_testing)]
146const TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES: usize = 10;
147
148/// The number of entries in a stream of the tests can be controlled by this parameter for tests.
149#[cfg(with_testing)]
150const TEST_DYNAMO_DB_MAX_STREAM_QUERIES: usize = 10;
151
152/// Fundamental constants in DynamoDB: The maximum size of a [`TransactWriteItem`] is 100.
153/// See <https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html>
154const MAX_TRANSACT_WRITE_ITEM_SIZE: usize = 100;
155
156/// Maximum number of entries that can be obtained in a [`BatchGetItem`] operation.
157/// The two constraints are at most 100 operations and at most 16M in total.
158/// Since the maximum size of a value is 400K, this gets us 40 as upper limit
159const MAX_BATCH_GET_ITEM_SIZE: usize = 40;
160
161/// Builds the key attributes for a table item.
162///
163/// The key is composed of two attributes that are both binary blobs. The first attribute is a
164/// partition key and is currently just a dummy value that ensures all items are in the same
165/// partition. This is necessary for range queries to work correctly.
166///
167/// The second attribute is the actual key value, which is generated by concatenating the
168/// context prefix. `The Vec<u8>` expression is obtained from `self.derive_key`.
169fn build_key(start_key: &[u8], key: Vec<u8>) -> HashMap<String, AttributeValue> {
170    [
171        (
172            PARTITION_ATTRIBUTE.to_owned(),
173            AttributeValue::B(Blob::new(start_key.to_vec())),
174        ),
175        (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
176    ]
177    .into()
178}
179
180/// Builds the value attribute for storing a table item.
181fn build_key_value(
182    start_key: &[u8],
183    key: Vec<u8>,
184    value: Vec<u8>,
185) -> HashMap<String, AttributeValue> {
186    [
187        (
188            PARTITION_ATTRIBUTE.to_owned(),
189            AttributeValue::B(Blob::new(start_key.to_vec())),
190        ),
191        (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
192        (
193            VALUE_ATTRIBUTE.to_owned(),
194            AttributeValue::B(Blob::new(value)),
195        ),
196    ]
197    .into()
198}
199
200/// Checks that a key is of the correct size
201fn check_key_size(key: &[u8]) -> Result<(), DynamoDbStoreInternalError> {
202    ensure!(!key.is_empty(), DynamoDbStoreInternalError::ZeroLengthKey);
203    ensure!(
204        key.len() <= MAX_KEY_SIZE,
205        DynamoDbStoreInternalError::KeyTooLong
206    );
207    Ok(())
208}
209
210/// Extracts the key attribute from an item.
211fn extract_key(
212    prefix_len: usize,
213    attributes: &HashMap<String, AttributeValue>,
214) -> Result<&[u8], DynamoDbStoreInternalError> {
215    let key = attributes
216        .get(KEY_ATTRIBUTE)
217        .ok_or(DynamoDbStoreInternalError::MissingKey)?;
218    match key {
219        AttributeValue::B(blob) => Ok(&blob.as_ref()[prefix_len..]),
220        key => Err(DynamoDbStoreInternalError::wrong_key_type(key)),
221    }
222}
223
224/// Extracts the value attribute from an item.
225fn extract_value(
226    attributes: &HashMap<String, AttributeValue>,
227) -> Result<&[u8], DynamoDbStoreInternalError> {
228    // According to the official AWS DynamoDB documentation:
229    // "Binary must have a length greater than zero if the attribute is used as a key attribute for a table or index"
230    let value = attributes
231        .get(VALUE_ATTRIBUTE)
232        .ok_or(DynamoDbStoreInternalError::MissingValue)?;
233    match value {
234        AttributeValue::B(blob) => Ok(blob.as_ref()),
235        value => Err(DynamoDbStoreInternalError::wrong_value_type(value)),
236    }
237}
238
239/// Extracts the value attribute from an item (returned by value).
240fn extract_value_owned(
241    attributes: &mut HashMap<String, AttributeValue>,
242) -> Result<Vec<u8>, DynamoDbStoreInternalError> {
243    let value = attributes
244        .remove(VALUE_ATTRIBUTE)
245        .ok_or(DynamoDbStoreInternalError::MissingValue)?;
246    match value {
247        AttributeValue::B(blob) => Ok(blob.into_inner()),
248        value => Err(DynamoDbStoreInternalError::wrong_value_type(&value)),
249    }
250}
251
252/// Extracts the key and value attributes from an item.
253fn extract_key_value(
254    prefix_len: usize,
255    attributes: &HashMap<String, AttributeValue>,
256) -> Result<(&[u8], &[u8]), DynamoDbStoreInternalError> {
257    let key = extract_key(prefix_len, attributes)?;
258    let value = extract_value(attributes)?;
259    Ok((key, value))
260}
261
262struct TransactionBuilder {
263    start_key: Vec<u8>,
264    transactions: Vec<TransactWriteItem>,
265}
266
267impl TransactionBuilder {
268    fn new(start_key: &[u8]) -> Self {
269        Self {
270            start_key: start_key.to_vec(),
271            transactions: Vec::new(),
272        }
273    }
274
275    fn insert_delete_request(
276        &mut self,
277        key: Vec<u8>,
278        store: &DynamoDbStoreInternal,
279    ) -> Result<(), DynamoDbStoreInternalError> {
280        let transaction = store.build_delete_transaction(&self.start_key, key)?;
281        self.transactions.push(transaction);
282        Ok(())
283    }
284
285    fn insert_put_request(
286        &mut self,
287        key: Vec<u8>,
288        value: Vec<u8>,
289        store: &DynamoDbStoreInternal,
290    ) -> Result<(), DynamoDbStoreInternalError> {
291        let transaction = store.build_put_transaction(&self.start_key, key, value)?;
292        self.transactions.push(transaction);
293        Ok(())
294    }
295}
296
297/// A DynamoDB client.
298#[derive(Clone, Debug)]
299pub struct DynamoDbStoreInternal {
300    client: Client,
301    namespace: String,
302    semaphore: Option<Arc<Semaphore>>,
303    max_stream_queries: usize,
304    start_key: Vec<u8>,
305    root_key_written: Arc<AtomicBool>,
306}
307
308/// Database-level connection to DynamoDB for managing namespaces and partitions.
309#[derive(Clone)]
310pub struct DynamoDbDatabaseInternal {
311    client: Client,
312    namespace: String,
313    semaphore: Option<Arc<Semaphore>>,
314    max_stream_queries: usize,
315}
316
317impl WithError for DynamoDbDatabaseInternal {
318    type Error = DynamoDbStoreInternalError;
319}
320
321/// The initial configuration of the system.
322#[derive(Debug, Clone, Serialize, Deserialize)]
323pub struct DynamoDbStoreInternalConfig {
324    /// Whether to use DynamoDB local or not.
325    pub use_dynamodb_local: bool,
326    /// Maximum number of concurrent database queries allowed for this client.
327    pub max_concurrent_queries: Option<usize>,
328    /// Preferred buffer size for async streams.
329    pub max_stream_queries: usize,
330}
331
332impl DynamoDbStoreInternalConfig {
333    async fn client(&self) -> Result<Client, DynamoDbStoreInternalError> {
334        let config = if self.use_dynamodb_local {
335            get_dynamodb_local_config().await?
336        } else {
337            get_base_config().await?
338        };
339        Ok(Client::from_conf(config))
340    }
341}
342
343impl KeyValueDatabase for DynamoDbDatabaseInternal {
344    type Config = DynamoDbStoreInternalConfig;
345    type Store = DynamoDbStoreInternal;
346
347    fn get_name() -> String {
348        "dynamodb internal".to_string()
349    }
350
351    async fn connect(
352        config: &Self::Config,
353        namespace: &str,
354    ) -> Result<Self, DynamoDbStoreInternalError> {
355        Self::check_namespace(namespace)?;
356        let client = config.client().await?;
357        let semaphore = config
358            .max_concurrent_queries
359            .map(|n| Arc::new(Semaphore::new(n)));
360        let max_stream_queries = config.max_stream_queries;
361        let namespace = namespace.to_string();
362        let store = Self {
363            client,
364            namespace,
365            semaphore,
366            max_stream_queries,
367        };
368        Ok(store)
369    }
370
371    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, DynamoDbStoreInternalError> {
372        let mut start_key = EMPTY_ROOT_KEY.to_vec();
373        start_key.extend(root_key);
374        Ok(self.open_internal(start_key))
375    }
376
377    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, DynamoDbStoreInternalError> {
378        self.open_shared(root_key)
379    }
380
381    async fn list_all(config: &Self::Config) -> Result<Vec<String>, DynamoDbStoreInternalError> {
382        let client = config.client().await?;
383        let mut namespaces = Vec::new();
384        let mut start_table = None;
385        loop {
386            let response = client
387                .list_tables()
388                .set_exclusive_start_table_name(start_table)
389                .send()
390                .boxed_sync()
391                .await?;
392            if let Some(namespaces_blk) = response.table_names {
393                namespaces.extend(namespaces_blk);
394            }
395            if response.last_evaluated_table_name.is_none() {
396                break;
397            } else {
398                start_table = response.last_evaluated_table_name;
399            }
400        }
401        Ok(namespaces)
402    }
403
404    async fn list_root_keys(
405        config: &Self::Config,
406        namespace: &str,
407    ) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
408        let database = Self::connect(config, namespace).await?;
409        let store = database.open_internal(PARTITION_KEY_ROOT_KEY.to_vec());
410        store.find_keys_by_prefix(EMPTY_ROOT_KEY).await
411    }
412
413    async fn delete_all(config: &Self::Config) -> Result<(), DynamoDbStoreInternalError> {
414        let client = config.client().await?;
415        let tables = Self::list_all(config).await?;
416        for table in tables {
417            client
418                .delete_table()
419                .table_name(&table)
420                .send()
421                .boxed_sync()
422                .await?;
423        }
424        Ok(())
425    }
426
427    async fn exists(
428        config: &Self::Config,
429        namespace: &str,
430    ) -> Result<bool, DynamoDbStoreInternalError> {
431        Self::check_namespace(namespace)?;
432        let client = config.client().await?;
433        let key_db = build_key(EMPTY_ROOT_KEY, DB_KEY.to_vec());
434        let response = client
435            .get_item()
436            .table_name(namespace)
437            .set_key(Some(key_db))
438            .send()
439            .boxed_sync()
440            .await;
441        let Err(error) = response else {
442            return Ok(true);
443        };
444        let test = match &error {
445            SdkError::ServiceError(error) => match error.err() {
446                GetItemError::ResourceNotFoundException(error) => {
447                    error.message
448                        == Some("Cannot do operations on a non-existent table".to_string())
449                }
450                _ => false,
451            },
452            _ => false,
453        };
454        if test {
455            Ok(false)
456        } else {
457            Err(error.into())
458        }
459    }
460
461    async fn create(
462        config: &Self::Config,
463        namespace: &str,
464    ) -> Result<(), DynamoDbStoreInternalError> {
465        Self::check_namespace(namespace)?;
466        let client = config.client().await?;
467        client
468            .create_table()
469            .table_name(namespace)
470            .attribute_definitions(
471                AttributeDefinition::builder()
472                    .attribute_name(PARTITION_ATTRIBUTE)
473                    .attribute_type(ScalarAttributeType::B)
474                    .build()?,
475            )
476            .attribute_definitions(
477                AttributeDefinition::builder()
478                    .attribute_name(KEY_ATTRIBUTE)
479                    .attribute_type(ScalarAttributeType::B)
480                    .build()?,
481            )
482            .key_schema(
483                KeySchemaElement::builder()
484                    .attribute_name(PARTITION_ATTRIBUTE)
485                    .key_type(KeyType::Hash)
486                    .build()?,
487            )
488            .key_schema(
489                KeySchemaElement::builder()
490                    .attribute_name(KEY_ATTRIBUTE)
491                    .key_type(KeyType::Range)
492                    .build()?,
493            )
494            .provisioned_throughput(
495                ProvisionedThroughput::builder()
496                    .read_capacity_units(10)
497                    .write_capacity_units(10)
498                    .build()?,
499            )
500            .send()
501            .boxed_sync()
502            .await?;
503        Ok(())
504    }
505
506    async fn delete(
507        config: &Self::Config,
508        namespace: &str,
509    ) -> Result<(), DynamoDbStoreInternalError> {
510        Self::check_namespace(namespace)?;
511        let client = config.client().await?;
512        client
513            .delete_table()
514            .table_name(namespace)
515            .send()
516            .boxed_sync()
517            .await?;
518        Ok(())
519    }
520}
521
522impl DynamoDbDatabaseInternal {
523    /// Namespaces are named table names in DynamoDB [naming
524    /// rules](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.NamingRules),
525    /// so we need to check correctness of the namespace
526    fn check_namespace(namespace: &str) -> Result<(), InvalidNamespace> {
527        if namespace.len() < 3 {
528            return Err(InvalidNamespace::TooShort);
529        }
530        if namespace.len() > 255 {
531            return Err(InvalidNamespace::TooLong);
532        }
533        if !namespace.chars().all(|character| {
534            character.is_ascii_alphanumeric()
535                || character == '.'
536                || character == '-'
537                || character == '_'
538        }) {
539            return Err(InvalidNamespace::InvalidCharacter);
540        }
541        Ok(())
542    }
543
544    fn open_internal(&self, start_key: Vec<u8>) -> DynamoDbStoreInternal {
545        let client = self.client.clone();
546        let namespace = self.namespace.clone();
547        let semaphore = self.semaphore.clone();
548        let max_stream_queries = self.max_stream_queries;
549        DynamoDbStoreInternal {
550            client,
551            namespace,
552            semaphore,
553            max_stream_queries,
554            start_key,
555            root_key_written: Arc::new(AtomicBool::new(false)),
556        }
557    }
558}
559
560impl DynamoDbStoreInternal {
561    fn build_delete_transaction(
562        &self,
563        start_key: &[u8],
564        key: Vec<u8>,
565    ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
566        check_key_size(&key)?;
567        let request = Delete::builder()
568            .table_name(&self.namespace)
569            .set_key(Some(build_key(start_key, key)))
570            .build()?;
571        Ok(TransactWriteItem::builder().delete(request).build())
572    }
573
574    fn build_put_transaction(
575        &self,
576        start_key: &[u8],
577        key: Vec<u8>,
578        value: Vec<u8>,
579    ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
580        check_key_size(&key)?;
581        ensure!(
582            value.len() <= RAW_MAX_VALUE_SIZE,
583            DynamoDbStoreInternalError::ValueLengthTooLarge
584        );
585        let request = Put::builder()
586            .table_name(&self.namespace)
587            .set_item(Some(build_key_value(start_key, key, value)))
588            .build()?;
589        Ok(TransactWriteItem::builder().put(request).build())
590    }
591
592    /// Obtains the semaphore lock on the database if needed.
593    async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
594        match &self.semaphore {
595            None => None,
596            Some(count) => Some(count.acquire().await),
597        }
598    }
599
600    async fn get_query_output(
601        &self,
602        attribute_str: &str,
603        start_key: &[u8],
604        key_prefix: &[u8],
605        start_key_map: Option<HashMap<String, AttributeValue>>,
606    ) -> Result<QueryOutput, DynamoDbStoreInternalError> {
607        let _guard = self.acquire().await;
608        let start_key = start_key.to_vec();
609        let response = self
610            .client
611            .query()
612            .table_name(&self.namespace)
613            .projection_expression(attribute_str)
614            .key_condition_expression(format!(
615                "{PARTITION_ATTRIBUTE} = :partition and begins_with({KEY_ATTRIBUTE}, :prefix)"
616            ))
617            .expression_attribute_values(":partition", AttributeValue::B(Blob::new(start_key)))
618            .expression_attribute_values(":prefix", AttributeValue::B(Blob::new(key_prefix)))
619            .set_exclusive_start_key(start_key_map)
620            .send()
621            .boxed_sync()
622            .await?;
623        Ok(response)
624    }
625
626    async fn read_value_bytes_general(
627        &self,
628        key_db: HashMap<String, AttributeValue>,
629    ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
630        let _guard = self.acquire().await;
631        let response = self
632            .client
633            .get_item()
634            .table_name(&self.namespace)
635            .set_key(Some(key_db))
636            .send()
637            .boxed_sync()
638            .await?;
639
640        match response.item {
641            Some(mut item) => {
642                let value = extract_value_owned(&mut item)?;
643                Ok(Some(value))
644            }
645            None => Ok(None),
646        }
647    }
648
649    async fn contains_key_general(
650        &self,
651        key_db: HashMap<String, AttributeValue>,
652    ) -> Result<bool, DynamoDbStoreInternalError> {
653        let _guard = self.acquire().await;
654        let response = self
655            .client
656            .get_item()
657            .table_name(&self.namespace)
658            .set_key(Some(key_db))
659            .projection_expression(PARTITION_ATTRIBUTE)
660            .send()
661            .boxed_sync()
662            .await?;
663
664        Ok(response.item.is_some())
665    }
666
667    async fn get_list_responses(
668        &self,
669        attribute: &str,
670        start_key: &[u8],
671        key_prefix: &[u8],
672    ) -> Result<QueryResponses, DynamoDbStoreInternalError> {
673        check_key_size(key_prefix)?;
674        let mut responses = Vec::new();
675        let mut start_key_map = None;
676        loop {
677            let response = self
678                .get_query_output(attribute, start_key, key_prefix, start_key_map)
679                .await?;
680            let last_evaluated = response.last_evaluated_key.clone();
681            responses.push(response);
682            match last_evaluated {
683                None => {
684                    break;
685                }
686                Some(value) => {
687                    start_key_map = Some(value);
688                }
689            }
690        }
691        Ok(QueryResponses {
692            prefix_len: key_prefix.len(),
693            responses,
694        })
695    }
696
697    async fn read_batch_values_bytes(
698        &self,
699        keys: &[Vec<u8>],
700    ) -> Result<Vec<Option<Vec<u8>>>, DynamoDbStoreInternalError> {
701        // Early return for empty keys
702        if keys.is_empty() {
703            return Ok(Vec::new());
704        }
705        let mut results = vec![None; keys.len()];
706
707        // Build the request keys
708        let mut request_keys = Vec::new();
709        let mut key_to_index = HashMap::<Vec<u8>, Vec<usize>>::new();
710
711        for (i, key) in keys.iter().enumerate() {
712            check_key_size(key)?;
713            let key_attrs = build_key(&self.start_key, key.clone());
714            key_to_index.entry(key.clone()).or_default().push(i);
715            request_keys.push(key_attrs);
716        }
717
718        let keys_and_attributes = KeysAndAttributes::builder()
719            .set_keys(Some(request_keys))
720            .build()?;
721
722        let mut request_items = HashMap::new();
723        request_items.insert(self.namespace.clone(), keys_and_attributes);
724
725        // Execute batch get item request with retry for unprocessed keys
726        let mut remaining_request_items = Some(request_items);
727
728        while let Some(request_items) = remaining_request_items {
729            // Skip if the request items are empty
730            if request_items.is_empty() {
731                break;
732            }
733
734            let _guard = self.acquire().await;
735            let response = self
736                .client
737                .batch_get_item()
738                .set_request_items(Some(request_items))
739                .send()
740                .boxed_sync()
741                .await?;
742
743            // Process returned items
744            if let Some(mut responses) = response.responses {
745                if let Some(items) = responses.remove(&self.namespace) {
746                    for mut item in items {
747                        // Extract key to find the original index
748                        let key_attr = item
749                            .get(KEY_ATTRIBUTE)
750                            .ok_or(DynamoDbStoreInternalError::MissingKey)?;
751
752                        if let AttributeValue::B(blob) = key_attr {
753                            let key = blob.as_ref();
754                            if let Some(indices) = key_to_index.get(key) {
755                                if let Some((&last, rest)) = indices.split_last() {
756                                    let value = extract_value_owned(&mut item)?;
757                                    for index in rest {
758                                        results[*index] = Some(value.clone());
759                                    }
760                                    results[last] = Some(value);
761                                }
762                            }
763                        }
764                    }
765                }
766            }
767
768            // Handle unprocessed keys
769            remaining_request_items = response.unprocessed_keys;
770        }
771
772        Ok(results)
773    }
774}
775
776struct QueryResponses {
777    prefix_len: usize,
778    responses: Vec<QueryOutput>,
779}
780
781impl QueryResponses {
782    fn keys(&self) -> impl Iterator<Item = Result<&[u8], DynamoDbStoreInternalError>> {
783        self.responses
784            .iter()
785            .flat_map(|response| response.items.iter().flatten())
786            .map(|item| extract_key(self.prefix_len, item))
787    }
788
789    fn key_values(
790        &self,
791    ) -> impl Iterator<Item = Result<(&[u8], &[u8]), DynamoDbStoreInternalError>> {
792        self.responses
793            .iter()
794            .flat_map(|response| response.items.iter().flatten())
795            .map(|item| extract_key_value(self.prefix_len, item))
796    }
797}
798
799impl WithError for DynamoDbStoreInternal {
800    type Error = DynamoDbStoreInternalError;
801}
802
803impl ReadableKeyValueStore for DynamoDbStoreInternal {
804    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
805
806    fn max_stream_queries(&self) -> usize {
807        self.max_stream_queries
808    }
809
810    async fn read_value_bytes(
811        &self,
812        key: &[u8],
813    ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
814        check_key_size(key)?;
815        let key_db = build_key(&self.start_key, key.to_vec());
816        self.read_value_bytes_general(key_db).await
817    }
818
819    async fn contains_key(&self, key: &[u8]) -> Result<bool, DynamoDbStoreInternalError> {
820        check_key_size(key)?;
821        let key_db = build_key(&self.start_key, key.to_vec());
822        self.contains_key_general(key_db).await
823    }
824
825    async fn contains_keys(
826        &self,
827        keys: Vec<Vec<u8>>,
828    ) -> Result<Vec<bool>, DynamoDbStoreInternalError> {
829        let mut handles = Vec::new();
830        for key in keys {
831            check_key_size(&key)?;
832            let key_db = build_key(&self.start_key, key);
833            let handle = self.contains_key_general(key_db);
834            handles.push(handle);
835        }
836        join_all(handles)
837            .await
838            .into_iter()
839            .collect::<Result<_, _>>()
840    }
841
842    async fn read_multi_values_bytes(
843        &self,
844        keys: Vec<Vec<u8>>,
845    ) -> Result<Vec<Option<Vec<u8>>>, DynamoDbStoreInternalError> {
846        if keys.is_empty() {
847            return Ok(Vec::new());
848        }
849
850        let handles = keys
851            .chunks(MAX_BATCH_GET_ITEM_SIZE)
852            .map(|key_batch| self.read_batch_values_bytes(key_batch));
853        let results: Vec<_> = join_all(handles)
854            .await
855            .into_iter()
856            .collect::<Result<_, _>>()?;
857        Ok(results.into_iter().flatten().collect())
858    }
859
860    async fn find_keys_by_prefix(
861        &self,
862        key_prefix: &[u8],
863    ) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
864        let result_queries = self
865            .get_list_responses(KEY_ATTRIBUTE, &self.start_key, key_prefix)
866            .await?;
867        result_queries
868            .keys()
869            .map(|key| key.map(|k| k.to_vec()))
870            .collect()
871    }
872
873    async fn find_key_values_by_prefix(
874        &self,
875        key_prefix: &[u8],
876    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, DynamoDbStoreInternalError> {
877        let result_queries = self
878            .get_list_responses(KEY_VALUE_ATTRIBUTE, &self.start_key, key_prefix)
879            .await?;
880        result_queries
881            .key_values()
882            .map(|entry| entry.map(|(key, value)| (key.to_vec(), value.to_vec())))
883            .collect()
884    }
885}
886
887impl DirectWritableKeyValueStore for DynamoDbStoreInternal {
888    const MAX_BATCH_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_SIZE;
889    const MAX_BATCH_TOTAL_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE;
890    const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
891
892    // DynamoDB does not support the `DeletePrefix` operation.
893    type Batch = SimpleUnorderedBatch;
894
895    async fn write_batch(&self, batch: Self::Batch) -> Result<(), DynamoDbStoreInternalError> {
896        if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
897            let mut builder = TransactionBuilder::new(PARTITION_KEY_ROOT_KEY);
898            builder.insert_put_request(self.start_key.clone(), vec![], self)?;
899            self.client
900                .transact_write_items()
901                .set_transact_items(Some(builder.transactions))
902                .send()
903                .boxed_sync()
904                .await?;
905        }
906        let mut builder = TransactionBuilder::new(&self.start_key);
907        for key in batch.deletions {
908            builder.insert_delete_request(key, self)?;
909        }
910        for (key, value) in batch.insertions {
911            builder.insert_put_request(key, value, self)?;
912        }
913        if !builder.transactions.is_empty() {
914            let _guard = self.acquire().await;
915            self.client
916                .transact_write_items()
917                .set_transact_items(Some(builder.transactions))
918                .send()
919                .boxed_sync()
920                .await?;
921        }
922        Ok(())
923    }
924}
925
926/// Error when validating a namespace
927#[derive(Debug, Error)]
928pub enum InvalidNamespace {
929    /// The namespace should be at least 3 characters.
930    #[error("Namespace must have at least 3 characters")]
931    TooShort,
932
933    /// The namespace should be at most 63 characters.
934    #[error("Namespace must be at most 63 characters")]
935    TooLong,
936
937    /// allowed characters are lowercase letters, numbers, periods and hyphens
938    #[error("Namespace must only contain lowercase letters, numbers, periods and hyphens")]
939    InvalidCharacter,
940}
941
942/// Errors that occur when using [`DynamoDbStoreInternal`].
943#[derive(Debug, Error)]
944pub enum DynamoDbStoreInternalError {
945    /// An error occurred while getting the item.
946    #[error(transparent)]
947    Get(#[from] Box<SdkError<GetItemError>>),
948
949    /// An error occurred while batch getting items.
950    #[error(transparent)]
951    BatchGet(#[from] Box<SdkError<BatchGetItemError>>),
952
953    /// An error occurred while writing a transaction of items.
954    #[error(transparent)]
955    TransactWriteItem(#[from] Box<SdkError<TransactWriteItemsError>>),
956
957    /// An error occurred while doing a Query.
958    #[error(transparent)]
959    Query(#[from] Box<SdkError<QueryError>>),
960
961    /// An error occurred while deleting a table
962    #[error(transparent)]
963    DeleteTable(#[from] Box<SdkError<DeleteTableError>>),
964
965    /// An error occurred while listing tables
966    #[error(transparent)]
967    ListTables(#[from] Box<SdkError<ListTablesError>>),
968
969    /// The transact maximum size is `MAX_TRANSACT_WRITE_ITEM_SIZE`.
970    #[error("The transact must have length at most MAX_TRANSACT_WRITE_ITEM_SIZE")]
971    TransactUpperLimitSize,
972
973    /// Keys have to be of non-zero length.
974    #[error("The key must be of strictly positive length")]
975    ZeroLengthKey,
976
977    /// The key must have at most 1024 bytes
978    #[error("The key must have at most 1024 bytes")]
979    KeyTooLong,
980
981    /// The key prefix must have at most 1024 bytes
982    #[error("The key prefix must have at most 1024 bytes")]
983    KeyPrefixTooLong,
984
985    /// Key prefixes have to be of non-zero length.
986    #[error("The key_prefix must be of strictly positive length")]
987    ZeroLengthKeyPrefix,
988
989    /// The journal is not coherent
990    #[error(transparent)]
991    JournalConsistencyError(#[from] JournalConsistencyError),
992
993    /// The length of the value should be at most 400 KB.
994    #[error("The DynamoDB value should be less than 400 KB")]
995    ValueLengthTooLarge,
996
997    /// The stored key is missing.
998    #[error("The stored key attribute is missing")]
999    MissingKey,
1000
1001    /// The type of the keys was not correct (It should have been a binary blob).
1002    #[error("Key was stored as {0}, but it was expected to be stored as a binary blob")]
1003    WrongKeyType(String),
1004
1005    /// The value attribute is missing.
1006    #[error("The stored value attribute is missing")]
1007    MissingValue,
1008
1009    /// The value was stored as the wrong type (it should be a binary blob).
1010    #[error("Value was stored as {0}, but it was expected to be stored as a binary blob")]
1011    WrongValueType(String),
1012
1013    /// A BCS error occurred.
1014    #[error(transparent)]
1015    BcsError(#[from] bcs::Error),
1016
1017    /// A wrong namespace error occurred
1018    #[error(transparent)]
1019    InvalidNamespace(#[from] InvalidNamespace),
1020
1021    /// An error occurred while creating the table.
1022    #[error(transparent)]
1023    CreateTable(#[from] Box<SdkError<CreateTableError>>),
1024
1025    /// An error occurred while building an object
1026    #[error(transparent)]
1027    Build(#[from] Box<BuildError>),
1028}
1029
1030impl<InnerError> From<SdkError<InnerError>> for DynamoDbStoreInternalError
1031where
1032    DynamoDbStoreInternalError: From<Box<SdkError<InnerError>>>,
1033{
1034    fn from(error: SdkError<InnerError>) -> Self {
1035        Box::new(error).into()
1036    }
1037}
1038
1039impl From<BuildError> for DynamoDbStoreInternalError {
1040    fn from(error: BuildError) -> Self {
1041        Box::new(error).into()
1042    }
1043}
1044
1045impl DynamoDbStoreInternalError {
1046    /// Creates a [`DynamoDbStoreInternalError::WrongKeyType`] instance based on the returned value type.
1047    ///
1048    /// # Panics
1049    ///
1050    /// If the value type is in the correct type, a binary blob.
1051    pub fn wrong_key_type(value: &AttributeValue) -> Self {
1052        DynamoDbStoreInternalError::WrongKeyType(Self::type_description_of(value))
1053    }
1054
1055    /// Creates a [`DynamoDbStoreInternalError::WrongValueType`] instance based on the returned value type.
1056    ///
1057    /// # Panics
1058    ///
1059    /// If the value type is in the correct type, a binary blob.
1060    pub fn wrong_value_type(value: &AttributeValue) -> Self {
1061        DynamoDbStoreInternalError::WrongValueType(Self::type_description_of(value))
1062    }
1063
1064    fn type_description_of(value: &AttributeValue) -> String {
1065        match value {
1066            AttributeValue::B(_) => unreachable!("creating an error type for the correct type"),
1067            AttributeValue::Bool(_) => "a boolean",
1068            AttributeValue::Bs(_) => "a list of binary blobs",
1069            AttributeValue::L(_) => "a list",
1070            AttributeValue::M(_) => "a map",
1071            AttributeValue::N(_) => "a number",
1072            AttributeValue::Ns(_) => "a list of numbers",
1073            AttributeValue::Null(_) => "a null value",
1074            AttributeValue::S(_) => "a string",
1075            AttributeValue::Ss(_) => "a list of strings",
1076            _ => "an unknown type",
1077        }
1078        .to_owned()
1079    }
1080}
1081
1082impl KeyValueStoreError for DynamoDbStoreInternalError {
1083    const BACKEND: &'static str = "dynamo_db";
1084}
1085
1086#[cfg(with_testing)]
1087impl TestKeyValueDatabase for JournalingKeyValueDatabase<DynamoDbDatabaseInternal> {
1088    async fn new_test_config() -> Result<DynamoDbStoreInternalConfig, DynamoDbStoreInternalError> {
1089        Ok(DynamoDbStoreInternalConfig {
1090            use_dynamodb_local: true,
1091            max_concurrent_queries: Some(TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES),
1092            max_stream_queries: TEST_DYNAMO_DB_MAX_STREAM_QUERIES,
1093        })
1094    }
1095}
1096
1097/// The combined error type for [`DynamoDbDatabase`].
1098pub type DynamoDbStoreError = ValueSplittingError<DynamoDbStoreInternalError>;
1099
1100/// The config type for [`DynamoDbDatabase`]`
1101pub type DynamoDbStoreConfig = LruCachingConfig<DynamoDbStoreInternalConfig>;
1102
1103/// A shared DB client for DynamoDB with metrics
1104#[cfg(with_metrics)]
1105pub type DynamoDbDatabase = MeteredDatabase<
1106    LruCachingDatabase<
1107        MeteredDatabase<
1108            ValueSplittingDatabase<
1109                MeteredDatabase<JournalingKeyValueDatabase<DynamoDbDatabaseInternal>>,
1110            >,
1111        >,
1112    >,
1113>;
1114/// A shared DB client for DynamoDB
1115#[cfg(not(with_metrics))]
1116pub type DynamoDbDatabase = LruCachingDatabase<
1117    ValueSplittingDatabase<JournalingKeyValueDatabase<DynamoDbDatabaseInternal>>,
1118>;
1119
1120#[cfg(test)]
1121mod tests {
1122    use bcs::serialized_size;
1123
1124    use crate::common::get_uleb128_size;
1125
1126    #[test]
1127    fn test_serialization_len() {
1128        for n in [0, 10, 127, 128, 129, 16383, 16384, 20000] {
1129            let vec = vec![0u8; n];
1130            let est_size = get_uleb128_size(n) + n;
1131            let serial_size = serialized_size(&vec).unwrap();
1132            assert_eq!(est_size, serial_size);
1133        }
1134    }
1135}