linera_views/backends/
dynamo_db.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] for the DynamoDB database.
5
6use std::{
7    collections::HashMap,
8    env,
9    sync::{
10        atomic::{AtomicBool, Ordering},
11        Arc,
12    },
13};
14
15use async_lock::{Semaphore, SemaphoreGuard};
16use aws_sdk_dynamodb::{
17    error::SdkError,
18    operation::{
19        create_table::CreateTableError,
20        delete_table::DeleteTableError,
21        get_item::GetItemError,
22        list_tables::ListTablesError,
23        query::{QueryError, QueryOutput},
24        transact_write_items::TransactWriteItemsError,
25    },
26    primitives::Blob,
27    types::{
28        AttributeDefinition, AttributeValue, Delete, KeySchemaElement, KeyType,
29        ProvisionedThroughput, Put, ScalarAttributeType, TransactWriteItem,
30    },
31    Client,
32};
33use aws_smithy_types::error::operation::BuildError;
34use futures::future::join_all;
35use linera_base::ensure;
36use serde::{Deserialize, Serialize};
37use thiserror::Error;
38
39#[cfg(with_metrics)]
40use crate::metering::MeteredStore;
41#[cfg(with_testing)]
42use crate::store::TestKeyValueStore;
43use crate::{
44    batch::SimpleUnorderedBatch,
45    common::get_uleb128_size,
46    journaling::{DirectWritableKeyValueStore, JournalConsistencyError, JournalingKeyValueStore},
47    lru_caching::{LruCachingConfig, LruCachingStore},
48    store::{AdminKeyValueStore, KeyValueStoreError, ReadableKeyValueStore, WithError},
49    value_splitting::{ValueSplittingError, ValueSplittingStore},
50    FutureSyncExt as _,
51};
52
53/// Name of the environment variable with the address to a DynamoDB local instance.
54const DYNAMODB_LOCAL_ENDPOINT: &str = "DYNAMODB_LOCAL_ENDPOINT";
55
56/// Gets the AWS configuration from the environment
57async fn get_base_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError> {
58    let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
59        .boxed_sync()
60        .await;
61    Ok((&base_config).into())
62}
63
64fn get_endpoint_address() -> Option<String> {
65    env::var(DYNAMODB_LOCAL_ENDPOINT).ok()
66}
67
68/// Gets the DynamoDB local config
69async fn get_dynamodb_local_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError>
70{
71    let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
72        .boxed_sync()
73        .await;
74    let endpoint_address = get_endpoint_address().unwrap();
75    let config = aws_sdk_dynamodb::config::Builder::from(&base_config)
76        .endpoint_url(endpoint_address)
77        .build();
78    Ok(config)
79}
80
81/// DynamoDB forbids the iteration over the partition keys.
82/// Therefore we use a special partition key named `[1]` for storing
83/// the root keys. For normal root keys, we simply put a `[0]` in
84/// front therefore no intersection is possible.
85const PARTITION_KEY_ROOT_KEY: &[u8] = &[1];
86
87/// The attribute name of the partition key.
88const PARTITION_ATTRIBUTE: &str = "item_partition";
89
90/// A root key being used for testing existence of tables
91const EMPTY_ROOT_KEY: &[u8] = &[0];
92
93/// A key being used for testing existence of tables
94const DB_KEY: &[u8] = &[0];
95
96/// The attribute name of the primary key (used as a sort key).
97const KEY_ATTRIBUTE: &str = "item_key";
98
99/// The attribute name of the table value blob.
100const VALUE_ATTRIBUTE: &str = "item_value";
101
102/// The attribute for obtaining the primary key (used as a sort key) with the stored value.
103const KEY_VALUE_ATTRIBUTE: &str = "item_key, item_value";
104
105/// TODO(#1084): The scheme below with the `MAX_VALUE_SIZE` has to be checked
106/// This is the maximum size of a raw value in DynamoDB.
107const RAW_MAX_VALUE_SIZE: usize = 409600;
108
109/// Fundamental constants in DynamoDB: The maximum size of a value is 400 KB
110/// See https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ServiceQuotas.html
111/// However, the value being written can also be the serialization of a `SimpleUnorderedBatch`
112/// Therefore the actual `MAX_VALUE_SIZE` might be lower.
113/// At the maximum key size is 1024 bytes (see below) and we pack just one entry.
114/// So if the key has 1024 bytes this gets us the inequality
115/// `1 + 1 + serialized_size(1024)? + serialized_size(x)? <= 400*1024`
116/// and so this simplifies to `1 + 1 + (2 + 1024) + (3 + x) <= 400 * 1024`
117/// Note on the following formula:
118/// * We write 3 because `get_uleb128_size(400*1024) == 3`
119/// * We write `1 + 1` because the `SimpleUnorderedBatch` has two entries
120///
121/// This gets us a maximal value of 408569;
122const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
123    - MAX_KEY_SIZE
124    - get_uleb128_size(RAW_MAX_VALUE_SIZE)
125    - get_uleb128_size(MAX_KEY_SIZE)
126    - 1
127    - 1;
128
129/// Fundamental constant in DynamoDB: The maximum size of a key is 1024 bytes
130/// See https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html
131const MAX_KEY_SIZE: usize = 1024;
132
133/// Fundamental constants in DynamoDB: The maximum size of a [`TransactWriteItem`] is 4 MB.
134/// See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html
135/// We're taking a conservative value because the mode of computation is unclear.
136const MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE: usize = 4000000;
137
138/// The DynamoDB database is potentially handling an infinite number of connections.
139/// However, for testing or some other purpose we really need to decrease the number of
140/// connections.
141#[cfg(with_testing)]
142const TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES: usize = 10;
143
144/// The number of entries in a stream of the tests can be controlled by this parameter for tests.
145#[cfg(with_testing)]
146const TEST_DYNAMO_DB_MAX_STREAM_QUERIES: usize = 10;
147
148/// Fundamental constants in DynamoDB: The maximum size of a [`TransactWriteItem`] is 100.
149/// See <https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html>
150const MAX_TRANSACT_WRITE_ITEM_SIZE: usize = 100;
151
152/// Keys of length 0 are not allowed, so we extend by having a prefix on start
153fn extend_root_key(root_key: &[u8]) -> Vec<u8> {
154    let mut start_key = EMPTY_ROOT_KEY.to_vec();
155    start_key.extend(root_key);
156    start_key
157}
158
159/// Builds the key attributes for a table item.
160///
161/// The key is composed of two attributes that are both binary blobs. The first attribute is a
162/// partition key and is currently just a dummy value that ensures all items are in the same
163/// partition. This is necessary for range queries to work correctly.
164///
165/// The second attribute is the actual key value, which is generated by concatenating the
166/// context prefix. `The Vec<u8>` expression is obtained from `self.derive_key`.
167fn build_key(start_key: &[u8], key: Vec<u8>) -> HashMap<String, AttributeValue> {
168    [
169        (
170            PARTITION_ATTRIBUTE.to_owned(),
171            AttributeValue::B(Blob::new(start_key.to_vec())),
172        ),
173        (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
174    ]
175    .into()
176}
177
178/// Builds the value attribute for storing a table item.
179fn build_key_value(
180    start_key: &[u8],
181    key: Vec<u8>,
182    value: Vec<u8>,
183) -> HashMap<String, AttributeValue> {
184    [
185        (
186            PARTITION_ATTRIBUTE.to_owned(),
187            AttributeValue::B(Blob::new(start_key.to_vec())),
188        ),
189        (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
190        (
191            VALUE_ATTRIBUTE.to_owned(),
192            AttributeValue::B(Blob::new(value)),
193        ),
194    ]
195    .into()
196}
197
198/// Checks that a key is of the correct size
199fn check_key_size(key: &[u8]) -> Result<(), DynamoDbStoreInternalError> {
200    ensure!(!key.is_empty(), DynamoDbStoreInternalError::ZeroLengthKey);
201    ensure!(
202        key.len() <= MAX_KEY_SIZE,
203        DynamoDbStoreInternalError::KeyTooLong
204    );
205    Ok(())
206}
207
208/// Extracts the key attribute from an item.
209fn extract_key(
210    prefix_len: usize,
211    attributes: &HashMap<String, AttributeValue>,
212) -> Result<&[u8], DynamoDbStoreInternalError> {
213    let key = attributes
214        .get(KEY_ATTRIBUTE)
215        .ok_or(DynamoDbStoreInternalError::MissingKey)?;
216    match key {
217        AttributeValue::B(blob) => Ok(&blob.as_ref()[prefix_len..]),
218        key => Err(DynamoDbStoreInternalError::wrong_key_type(key)),
219    }
220}
221
222/// Extracts the value attribute from an item.
223fn extract_value(
224    attributes: &HashMap<String, AttributeValue>,
225) -> Result<&[u8], DynamoDbStoreInternalError> {
226    // According to the official AWS DynamoDB documentation:
227    // "Binary must have a length greater than zero if the attribute is used as a key attribute for a table or index"
228    let value = attributes
229        .get(VALUE_ATTRIBUTE)
230        .ok_or(DynamoDbStoreInternalError::MissingValue)?;
231    match value {
232        AttributeValue::B(blob) => Ok(blob.as_ref()),
233        value => Err(DynamoDbStoreInternalError::wrong_value_type(value)),
234    }
235}
236
237/// Extracts the value attribute from an item (returned by value).
238fn extract_value_owned(
239    attributes: &mut HashMap<String, AttributeValue>,
240) -> Result<Vec<u8>, DynamoDbStoreInternalError> {
241    let value = attributes
242        .remove(VALUE_ATTRIBUTE)
243        .ok_or(DynamoDbStoreInternalError::MissingValue)?;
244    match value {
245        AttributeValue::B(blob) => Ok(blob.into_inner()),
246        value => Err(DynamoDbStoreInternalError::wrong_value_type(&value)),
247    }
248}
249
250/// Extracts the key and value attributes from an item.
251fn extract_key_value(
252    prefix_len: usize,
253    attributes: &HashMap<String, AttributeValue>,
254) -> Result<(&[u8], &[u8]), DynamoDbStoreInternalError> {
255    let key = extract_key(prefix_len, attributes)?;
256    let value = extract_value(attributes)?;
257    Ok((key, value))
258}
259
260struct TransactionBuilder {
261    start_key: Vec<u8>,
262    transactions: Vec<TransactWriteItem>,
263}
264
265impl TransactionBuilder {
266    fn new(start_key: &[u8]) -> Self {
267        Self {
268            start_key: start_key.to_vec(),
269            transactions: Vec::new(),
270        }
271    }
272
273    fn insert_delete_request(
274        &mut self,
275        key: Vec<u8>,
276        store: &DynamoDbStoreInternal,
277    ) -> Result<(), DynamoDbStoreInternalError> {
278        let transaction = store.build_delete_transaction(&self.start_key, key)?;
279        self.transactions.push(transaction);
280        Ok(())
281    }
282
283    fn insert_put_request(
284        &mut self,
285        key: Vec<u8>,
286        value: Vec<u8>,
287        store: &DynamoDbStoreInternal,
288    ) -> Result<(), DynamoDbStoreInternalError> {
289        let transaction = store.build_put_transaction(&self.start_key, key, value)?;
290        self.transactions.push(transaction);
291        Ok(())
292    }
293}
294
295/// A DynamoDB client.
296#[derive(Clone, Debug)]
297pub struct DynamoDbStoreInternal {
298    client: Client,
299    namespace: String,
300    semaphore: Option<Arc<Semaphore>>,
301    max_stream_queries: usize,
302    start_key: Vec<u8>,
303    root_key_written: Arc<AtomicBool>,
304}
305
306/// The initial configuration of the system.
307#[derive(Debug, Clone, Serialize, Deserialize)]
308pub struct DynamoDbStoreInternalConfig {
309    /// Whether to use DynamoDB local or not.
310    pub use_dynamodb_local: bool,
311    /// Maximum number of concurrent database queries allowed for this client.
312    pub max_concurrent_queries: Option<usize>,
313    /// Preferred buffer size for async streams.
314    pub max_stream_queries: usize,
315}
316
317impl DynamoDbStoreInternalConfig {
318    async fn client(&self) -> Result<Client, DynamoDbStoreInternalError> {
319        let config = if self.use_dynamodb_local {
320            get_dynamodb_local_config().await?
321        } else {
322            get_base_config().await?
323        };
324        Ok(Client::from_conf(config))
325    }
326}
327
328impl AdminKeyValueStore for DynamoDbStoreInternal {
329    type Config = DynamoDbStoreInternalConfig;
330
331    fn get_name() -> String {
332        "dynamodb internal".to_string()
333    }
334
335    async fn connect(
336        config: &Self::Config,
337        namespace: &str,
338    ) -> Result<Self, DynamoDbStoreInternalError> {
339        Self::check_namespace(namespace)?;
340        let client = config.client().await?;
341        let semaphore = config
342            .max_concurrent_queries
343            .map(|n| Arc::new(Semaphore::new(n)));
344        let max_stream_queries = config.max_stream_queries;
345        let namespace = namespace.to_string();
346        let start_key = extend_root_key(&[]);
347        let store = Self {
348            client,
349            namespace,
350            semaphore,
351            max_stream_queries,
352            start_key,
353            root_key_written: Arc::new(AtomicBool::new(false)),
354        };
355        Ok(store)
356    }
357
358    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, DynamoDbStoreInternalError> {
359        let client = self.client.clone();
360        let namespace = self.namespace.clone();
361        let semaphore = self.semaphore.clone();
362        let max_stream_queries = self.max_stream_queries;
363        let start_key = extend_root_key(root_key);
364        Ok(Self {
365            client,
366            namespace,
367            semaphore,
368            max_stream_queries,
369            start_key,
370            root_key_written: Arc::new(AtomicBool::new(false)),
371        })
372    }
373
374    async fn list_all(config: &Self::Config) -> Result<Vec<String>, DynamoDbStoreInternalError> {
375        let client = config.client().await?;
376        let mut namespaces = Vec::new();
377        let mut start_table = None;
378        loop {
379            let response = client
380                .list_tables()
381                .set_exclusive_start_table_name(start_table)
382                .send()
383                .boxed_sync()
384                .await?;
385            if let Some(namespaces_blk) = response.table_names {
386                namespaces.extend(namespaces_blk);
387            }
388            if response.last_evaluated_table_name.is_none() {
389                break;
390            } else {
391                start_table = response.last_evaluated_table_name;
392            }
393        }
394        Ok(namespaces)
395    }
396
397    async fn list_root_keys(
398        config: &Self::Config,
399        namespace: &str,
400    ) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
401        let mut store = Self::connect(config, namespace).await?;
402        store.start_key = PARTITION_KEY_ROOT_KEY.to_vec();
403        store.find_keys_by_prefix(EMPTY_ROOT_KEY).await
404    }
405
406    async fn delete_all(config: &Self::Config) -> Result<(), DynamoDbStoreInternalError> {
407        let client = config.client().await?;
408        let tables = Self::list_all(config).await?;
409        for table in tables {
410            client
411                .delete_table()
412                .table_name(&table)
413                .send()
414                .boxed_sync()
415                .await?;
416        }
417        Ok(())
418    }
419
420    async fn exists(
421        config: &Self::Config,
422        namespace: &str,
423    ) -> Result<bool, DynamoDbStoreInternalError> {
424        Self::check_namespace(namespace)?;
425        let client = config.client().await?;
426        let key_db = build_key(EMPTY_ROOT_KEY, DB_KEY.to_vec());
427        let response = client
428            .get_item()
429            .table_name(namespace)
430            .set_key(Some(key_db))
431            .send()
432            .boxed_sync()
433            .await;
434        let Err(error) = response else {
435            return Ok(true);
436        };
437        let test = match &error {
438            SdkError::ServiceError(error) => match error.err() {
439                GetItemError::ResourceNotFoundException(error) => {
440                    error.message
441                        == Some("Cannot do operations on a non-existent table".to_string())
442                }
443                _ => false,
444            },
445            _ => false,
446        };
447        if test {
448            Ok(false)
449        } else {
450            Err(error.into())
451        }
452    }
453
454    async fn create(
455        config: &Self::Config,
456        namespace: &str,
457    ) -> Result<(), DynamoDbStoreInternalError> {
458        Self::check_namespace(namespace)?;
459        let client = config.client().await?;
460        client
461            .create_table()
462            .table_name(namespace)
463            .attribute_definitions(
464                AttributeDefinition::builder()
465                    .attribute_name(PARTITION_ATTRIBUTE)
466                    .attribute_type(ScalarAttributeType::B)
467                    .build()?,
468            )
469            .attribute_definitions(
470                AttributeDefinition::builder()
471                    .attribute_name(KEY_ATTRIBUTE)
472                    .attribute_type(ScalarAttributeType::B)
473                    .build()?,
474            )
475            .key_schema(
476                KeySchemaElement::builder()
477                    .attribute_name(PARTITION_ATTRIBUTE)
478                    .key_type(KeyType::Hash)
479                    .build()?,
480            )
481            .key_schema(
482                KeySchemaElement::builder()
483                    .attribute_name(KEY_ATTRIBUTE)
484                    .key_type(KeyType::Range)
485                    .build()?,
486            )
487            .provisioned_throughput(
488                ProvisionedThroughput::builder()
489                    .read_capacity_units(10)
490                    .write_capacity_units(10)
491                    .build()?,
492            )
493            .send()
494            .boxed_sync()
495            .await?;
496        Ok(())
497    }
498
499    async fn delete(
500        config: &Self::Config,
501        namespace: &str,
502    ) -> Result<(), DynamoDbStoreInternalError> {
503        Self::check_namespace(namespace)?;
504        let client = config.client().await?;
505        client
506            .delete_table()
507            .table_name(namespace)
508            .send()
509            .boxed_sync()
510            .await?;
511        Ok(())
512    }
513}
514
515impl DynamoDbStoreInternal {
516    /// Namespaces are named table names in DynamoDB [naming
517    /// rules](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.NamingRules),
518    /// so we need to check correctness of the namespace
519    fn check_namespace(namespace: &str) -> Result<(), InvalidNamespace> {
520        if namespace.len() < 3 {
521            return Err(InvalidNamespace::TooShort);
522        }
523        if namespace.len() > 255 {
524            return Err(InvalidNamespace::TooLong);
525        }
526        if !namespace.chars().all(|character| {
527            character.is_ascii_alphanumeric()
528                || character == '.'
529                || character == '-'
530                || character == '_'
531        }) {
532            return Err(InvalidNamespace::InvalidCharacter);
533        }
534        Ok(())
535    }
536
537    fn build_delete_transaction(
538        &self,
539        start_key: &[u8],
540        key: Vec<u8>,
541    ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
542        check_key_size(&key)?;
543        let request = Delete::builder()
544            .table_name(&self.namespace)
545            .set_key(Some(build_key(start_key, key)))
546            .build()?;
547        Ok(TransactWriteItem::builder().delete(request).build())
548    }
549
550    fn build_put_transaction(
551        &self,
552        start_key: &[u8],
553        key: Vec<u8>,
554        value: Vec<u8>,
555    ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
556        check_key_size(&key)?;
557        ensure!(
558            value.len() <= RAW_MAX_VALUE_SIZE,
559            DynamoDbStoreInternalError::ValueLengthTooLarge
560        );
561        let request = Put::builder()
562            .table_name(&self.namespace)
563            .set_item(Some(build_key_value(start_key, key, value)))
564            .build()?;
565        Ok(TransactWriteItem::builder().put(request).build())
566    }
567
568    /// Obtains the semaphore lock on the database if needed.
569    async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
570        match &self.semaphore {
571            None => None,
572            Some(count) => Some(count.acquire().await),
573        }
574    }
575
576    async fn get_query_output(
577        &self,
578        attribute_str: &str,
579        start_key: &[u8],
580        key_prefix: &[u8],
581        start_key_map: Option<HashMap<String, AttributeValue>>,
582    ) -> Result<QueryOutput, DynamoDbStoreInternalError> {
583        let _guard = self.acquire().await;
584        let start_key = start_key.to_vec();
585        let response = self
586            .client
587            .query()
588            .table_name(&self.namespace)
589            .projection_expression(attribute_str)
590            .key_condition_expression(format!(
591                "{PARTITION_ATTRIBUTE} = :partition and begins_with({KEY_ATTRIBUTE}, :prefix)"
592            ))
593            .expression_attribute_values(":partition", AttributeValue::B(Blob::new(start_key)))
594            .expression_attribute_values(":prefix", AttributeValue::B(Blob::new(key_prefix)))
595            .set_exclusive_start_key(start_key_map)
596            .send()
597            .boxed_sync()
598            .await?;
599        Ok(response)
600    }
601
602    async fn read_value_bytes_general(
603        &self,
604        key_db: HashMap<String, AttributeValue>,
605    ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
606        let _guard = self.acquire().await;
607        let response = self
608            .client
609            .get_item()
610            .table_name(&self.namespace)
611            .set_key(Some(key_db))
612            .send()
613            .boxed_sync()
614            .await?;
615
616        match response.item {
617            Some(mut item) => {
618                let value = extract_value_owned(&mut item)?;
619                Ok(Some(value))
620            }
621            None => Ok(None),
622        }
623    }
624
625    async fn contains_key_general(
626        &self,
627        key_db: HashMap<String, AttributeValue>,
628    ) -> Result<bool, DynamoDbStoreInternalError> {
629        let _guard = self.acquire().await;
630        let response = self
631            .client
632            .get_item()
633            .table_name(&self.namespace)
634            .set_key(Some(key_db))
635            .projection_expression(PARTITION_ATTRIBUTE)
636            .send()
637            .boxed_sync()
638            .await?;
639
640        Ok(response.item.is_some())
641    }
642
643    async fn get_list_responses(
644        &self,
645        attribute: &str,
646        start_key: &[u8],
647        key_prefix: &[u8],
648    ) -> Result<QueryResponses, DynamoDbStoreInternalError> {
649        check_key_size(key_prefix)?;
650        let mut responses = Vec::new();
651        let mut start_key_map = None;
652        loop {
653            let response = self
654                .get_query_output(attribute, start_key, key_prefix, start_key_map)
655                .await?;
656            let last_evaluated = response.last_evaluated_key.clone();
657            responses.push(response);
658            match last_evaluated {
659                None => {
660                    break;
661                }
662                Some(value) => {
663                    start_key_map = Some(value);
664                }
665            }
666        }
667        Ok(QueryResponses {
668            prefix_len: key_prefix.len(),
669            responses,
670        })
671    }
672}
673
674struct QueryResponses {
675    prefix_len: usize,
676    responses: Vec<QueryOutput>,
677}
678
679impl QueryResponses {
680    fn keys(&self) -> impl Iterator<Item = Result<&[u8], DynamoDbStoreInternalError>> {
681        self.responses
682            .iter()
683            .flat_map(|response| response.items.iter().flatten())
684            .map(|item| extract_key(self.prefix_len, item))
685    }
686
687    fn key_values(
688        &self,
689    ) -> impl Iterator<Item = Result<(&[u8], &[u8]), DynamoDbStoreInternalError>> {
690        self.responses
691            .iter()
692            .flat_map(|response| response.items.iter().flatten())
693            .map(|item| extract_key_value(self.prefix_len, item))
694    }
695}
696
697impl WithError for DynamoDbStoreInternal {
698    type Error = DynamoDbStoreInternalError;
699}
700
701impl ReadableKeyValueStore for DynamoDbStoreInternal {
702    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
703
704    fn max_stream_queries(&self) -> usize {
705        self.max_stream_queries
706    }
707
708    async fn read_value_bytes(
709        &self,
710        key: &[u8],
711    ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
712        check_key_size(key)?;
713        let key_db = build_key(&self.start_key, key.to_vec());
714        self.read_value_bytes_general(key_db).await
715    }
716
717    async fn contains_key(&self, key: &[u8]) -> Result<bool, DynamoDbStoreInternalError> {
718        check_key_size(key)?;
719        let key_db = build_key(&self.start_key, key.to_vec());
720        self.contains_key_general(key_db).await
721    }
722
723    async fn contains_keys(
724        &self,
725        keys: Vec<Vec<u8>>,
726    ) -> Result<Vec<bool>, DynamoDbStoreInternalError> {
727        let mut handles = Vec::new();
728        for key in keys {
729            check_key_size(&key)?;
730            let key_db = build_key(&self.start_key, key);
731            let handle = self.contains_key_general(key_db);
732            handles.push(handle);
733        }
734        join_all(handles)
735            .await
736            .into_iter()
737            .collect::<Result<_, _>>()
738    }
739
740    async fn read_multi_values_bytes(
741        &self,
742        keys: Vec<Vec<u8>>,
743    ) -> Result<Vec<Option<Vec<u8>>>, DynamoDbStoreInternalError> {
744        let mut handles = Vec::new();
745        for key in keys {
746            check_key_size(&key)?;
747            let key_db = build_key(&self.start_key, key);
748            let handle = self.read_value_bytes_general(key_db);
749            handles.push(handle);
750        }
751        join_all(handles)
752            .await
753            .into_iter()
754            .collect::<Result<_, _>>()
755    }
756
757    async fn find_keys_by_prefix(
758        &self,
759        key_prefix: &[u8],
760    ) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
761        let result_queries = self
762            .get_list_responses(KEY_ATTRIBUTE, &self.start_key, key_prefix)
763            .await?;
764        result_queries
765            .keys()
766            .map(|key| key.map(|k| k.to_vec()))
767            .collect()
768    }
769
770    async fn find_key_values_by_prefix(
771        &self,
772        key_prefix: &[u8],
773    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, DynamoDbStoreInternalError> {
774        let result_queries = self
775            .get_list_responses(KEY_VALUE_ATTRIBUTE, &self.start_key, key_prefix)
776            .await?;
777        result_queries
778            .key_values()
779            .map(|entry| entry.map(|(key, value)| (key.to_vec(), value.to_vec())))
780            .collect()
781    }
782}
783
784impl DirectWritableKeyValueStore for DynamoDbStoreInternal {
785    const MAX_BATCH_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_SIZE;
786    const MAX_BATCH_TOTAL_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE;
787    const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
788
789    // DynamoDB does not support the `DeletePrefix` operation.
790    type Batch = SimpleUnorderedBatch;
791
792    async fn write_batch(&self, batch: Self::Batch) -> Result<(), DynamoDbStoreInternalError> {
793        if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
794            let mut builder = TransactionBuilder::new(PARTITION_KEY_ROOT_KEY);
795            builder.insert_put_request(self.start_key.clone(), vec![], self)?;
796            self.client
797                .transact_write_items()
798                .set_transact_items(Some(builder.transactions))
799                .send()
800                .boxed_sync()
801                .await?;
802        }
803        let mut builder = TransactionBuilder::new(&self.start_key);
804        for key in batch.deletions {
805            builder.insert_delete_request(key, self)?;
806        }
807        for (key, value) in batch.insertions {
808            builder.insert_put_request(key, value, self)?;
809        }
810        if !builder.transactions.is_empty() {
811            let _guard = self.acquire().await;
812            self.client
813                .transact_write_items()
814                .set_transact_items(Some(builder.transactions))
815                .send()
816                .boxed_sync()
817                .await?;
818        }
819        Ok(())
820    }
821}
822
823/// Error when validating a namespace
824#[derive(Debug, Error)]
825pub enum InvalidNamespace {
826    /// The namespace should be at least 3 characters.
827    #[error("Namespace must have at least 3 characters")]
828    TooShort,
829
830    /// The namespace should be at most 63 characters.
831    #[error("Namespace must be at most 63 characters")]
832    TooLong,
833
834    /// allowed characters are lowercase letters, numbers, periods and hyphens
835    #[error("Namespace must only contain lowercase letters, numbers, periods and hyphens")]
836    InvalidCharacter,
837}
838
839/// Errors that occur when using [`DynamoDbStoreInternal`].
840#[derive(Debug, Error)]
841pub enum DynamoDbStoreInternalError {
842    /// An error occurred while getting the item.
843    #[error(transparent)]
844    Get(#[from] Box<SdkError<GetItemError>>),
845
846    /// An error occurred while writing a transaction of items.
847    #[error(transparent)]
848    TransactWriteItem(#[from] Box<SdkError<TransactWriteItemsError>>),
849
850    /// An error occurred while doing a Query.
851    #[error(transparent)]
852    Query(#[from] Box<SdkError<QueryError>>),
853
854    /// An error occurred while deleting a table
855    #[error(transparent)]
856    DeleteTable(#[from] Box<SdkError<DeleteTableError>>),
857
858    /// An error occurred while listing tables
859    #[error(transparent)]
860    ListTables(#[from] Box<SdkError<ListTablesError>>),
861
862    /// The transact maximum size is `MAX_TRANSACT_WRITE_ITEM_SIZE`.
863    #[error("The transact must have length at most MAX_TRANSACT_WRITE_ITEM_SIZE")]
864    TransactUpperLimitSize,
865
866    /// Keys have to be of non-zero length.
867    #[error("The key must be of strictly positive length")]
868    ZeroLengthKey,
869
870    /// The key must have at most 1024 bytes
871    #[error("The key must have at most 1024 bytes")]
872    KeyTooLong,
873
874    /// The key prefix must have at most 1024 bytes
875    #[error("The key prefix must have at most 1024 bytes")]
876    KeyPrefixTooLong,
877
878    /// Key prefixes have to be of non-zero length.
879    #[error("The key_prefix must be of strictly positive length")]
880    ZeroLengthKeyPrefix,
881
882    /// The journal is not coherent
883    #[error(transparent)]
884    JournalConsistencyError(#[from] JournalConsistencyError),
885
886    /// The length of the value should be at most 400 KB.
887    #[error("The DynamoDB value should be less than 400 KB")]
888    ValueLengthTooLarge,
889
890    /// The stored key is missing.
891    #[error("The stored key attribute is missing")]
892    MissingKey,
893
894    /// The type of the keys was not correct (It should have been a binary blob).
895    #[error("Key was stored as {0}, but it was expected to be stored as a binary blob")]
896    WrongKeyType(String),
897
898    /// The value attribute is missing.
899    #[error("The stored value attribute is missing")]
900    MissingValue,
901
902    /// The value was stored as the wrong type (it should be a binary blob).
903    #[error("Value was stored as {0}, but it was expected to be stored as a binary blob")]
904    WrongValueType(String),
905
906    /// A BCS error occurred.
907    #[error(transparent)]
908    BcsError(#[from] bcs::Error),
909
910    /// A wrong namespace error occurred
911    #[error(transparent)]
912    InvalidNamespace(#[from] InvalidNamespace),
913
914    /// An error occurred while creating the table.
915    #[error(transparent)]
916    CreateTable(#[from] Box<SdkError<CreateTableError>>),
917
918    /// An error occurred while building an object
919    #[error(transparent)]
920    Build(#[from] Box<BuildError>),
921}
922
923impl<InnerError> From<SdkError<InnerError>> for DynamoDbStoreInternalError
924where
925    DynamoDbStoreInternalError: From<Box<SdkError<InnerError>>>,
926{
927    fn from(error: SdkError<InnerError>) -> Self {
928        Box::new(error).into()
929    }
930}
931
932impl From<BuildError> for DynamoDbStoreInternalError {
933    fn from(error: BuildError) -> Self {
934        Box::new(error).into()
935    }
936}
937
938impl DynamoDbStoreInternalError {
939    /// Creates a [`DynamoDbStoreInternalError::WrongKeyType`] instance based on the returned value type.
940    ///
941    /// # Panics
942    ///
943    /// If the value type is in the correct type, a binary blob.
944    pub fn wrong_key_type(value: &AttributeValue) -> Self {
945        DynamoDbStoreInternalError::WrongKeyType(Self::type_description_of(value))
946    }
947
948    /// Creates a [`DynamoDbStoreInternalError::WrongValueType`] instance based on the returned value type.
949    ///
950    /// # Panics
951    ///
952    /// If the value type is in the correct type, a binary blob.
953    pub fn wrong_value_type(value: &AttributeValue) -> Self {
954        DynamoDbStoreInternalError::WrongValueType(Self::type_description_of(value))
955    }
956
957    fn type_description_of(value: &AttributeValue) -> String {
958        match value {
959            AttributeValue::B(_) => unreachable!("creating an error type for the correct type"),
960            AttributeValue::Bool(_) => "a boolean",
961            AttributeValue::Bs(_) => "a list of binary blobs",
962            AttributeValue::L(_) => "a list",
963            AttributeValue::M(_) => "a map",
964            AttributeValue::N(_) => "a number",
965            AttributeValue::Ns(_) => "a list of numbers",
966            AttributeValue::Null(_) => "a null value",
967            AttributeValue::S(_) => "a string",
968            AttributeValue::Ss(_) => "a list of strings",
969            _ => "an unknown type",
970        }
971        .to_owned()
972    }
973}
974
975impl KeyValueStoreError for DynamoDbStoreInternalError {
976    const BACKEND: &'static str = "dynamo_db";
977}
978
979#[cfg(with_testing)]
980impl TestKeyValueStore for JournalingKeyValueStore<DynamoDbStoreInternal> {
981    async fn new_test_config() -> Result<DynamoDbStoreInternalConfig, DynamoDbStoreInternalError> {
982        Ok(DynamoDbStoreInternalConfig {
983            use_dynamodb_local: true,
984            max_concurrent_queries: Some(TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES),
985            max_stream_queries: TEST_DYNAMO_DB_MAX_STREAM_QUERIES,
986        })
987    }
988}
989
990/// A shared DB client for DynamoDB implementing LRU caching and metrics
991#[cfg(with_metrics)]
992pub type DynamoDbStore = MeteredStore<
993    LruCachingStore<
994        MeteredStore<
995            ValueSplittingStore<MeteredStore<JournalingKeyValueStore<DynamoDbStoreInternal>>>,
996        >,
997    >,
998>;
999
1000/// A shared DB client for DynamoDB implementing LRU caching
1001#[cfg(not(with_metrics))]
1002pub type DynamoDbStore =
1003    LruCachingStore<ValueSplittingStore<JournalingKeyValueStore<DynamoDbStoreInternal>>>;
1004
1005/// The combined error type for [`DynamoDbStore`].
1006pub type DynamoDbStoreError = ValueSplittingError<DynamoDbStoreInternalError>;
1007
1008/// The config type for [`DynamoDbStore`]`
1009pub type DynamoDbStoreConfig = LruCachingConfig<DynamoDbStoreInternalConfig>;
1010
1011#[cfg(test)]
1012mod tests {
1013    use bcs::serialized_size;
1014
1015    use crate::common::get_uleb128_size;
1016
1017    #[test]
1018    fn test_serialization_len() {
1019        for n in [0, 10, 127, 128, 129, 16383, 16384, 20000] {
1020            let vec = vec![0u8; n];
1021            let est_size = get_uleb128_size(n) + n;
1022            let serial_size = serialized_size(&vec).unwrap();
1023            assert_eq!(est_size, serial_size);
1024        }
1025    }
1026}