linera_views/backends/
dynamo_db.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] for the DynamoDB database.
5
6use std::{
7    collections::HashMap,
8    env,
9    sync::{
10        atomic::{AtomicBool, Ordering},
11        Arc,
12    },
13};
14
15use async_lock::{Semaphore, SemaphoreGuard};
16use aws_sdk_dynamodb::{
17    error::SdkError,
18    operation::{
19        create_table::CreateTableError,
20        delete_table::DeleteTableError,
21        get_item::GetItemError,
22        list_tables::ListTablesError,
23        query::{QueryError, QueryOutput},
24        transact_write_items::TransactWriteItemsError,
25    },
26    primitives::Blob,
27    types::{
28        AttributeDefinition, AttributeValue, Delete, KeySchemaElement, KeyType,
29        ProvisionedThroughput, Put, ScalarAttributeType, TransactWriteItem,
30    },
31    Client,
32};
33use aws_smithy_types::error::operation::BuildError;
34use futures::future::join_all;
35use linera_base::ensure;
36use serde::{Deserialize, Serialize};
37use thiserror::Error;
38
39#[cfg(with_metrics)]
40use crate::metering::MeteredDatabase;
41#[cfg(with_testing)]
42use crate::store::TestKeyValueDatabase;
43use crate::{
44    batch::SimpleUnorderedBatch,
45    common::get_uleb128_size,
46    journaling::{JournalConsistencyError, JournalingKeyValueDatabase},
47    lru_caching::{LruCachingConfig, LruCachingDatabase},
48    store::{
49        DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
50        WithError,
51    },
52    value_splitting::{ValueSplittingDatabase, ValueSplittingError},
53    FutureSyncExt as _,
54};
55
56/// Name of the environment variable with the address to a DynamoDB local instance.
57const DYNAMODB_LOCAL_ENDPOINT: &str = "DYNAMODB_LOCAL_ENDPOINT";
58
59/// Gets the AWS configuration from the environment
60async fn get_base_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError> {
61    let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
62        .boxed_sync()
63        .await;
64    Ok((&base_config).into())
65}
66
67fn get_endpoint_address() -> Option<String> {
68    env::var(DYNAMODB_LOCAL_ENDPOINT).ok()
69}
70
71/// Gets the DynamoDB local config
72async fn get_dynamodb_local_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError>
73{
74    let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
75        .boxed_sync()
76        .await;
77    let endpoint_address = get_endpoint_address().unwrap();
78    let config = aws_sdk_dynamodb::config::Builder::from(&base_config)
79        .endpoint_url(endpoint_address)
80        .build();
81    Ok(config)
82}
83
84/// DynamoDB forbids the iteration over the partition keys.
85/// Therefore we use a special partition key named `[1]` for storing
86/// the root keys. For normal root keys, we simply put a `[0]` in
87/// front therefore no intersection is possible.
88const PARTITION_KEY_ROOT_KEY: &[u8] = &[1];
89
90/// The attribute name of the partition key.
91const PARTITION_ATTRIBUTE: &str = "item_partition";
92
93/// A root key being used for testing existence of tables
94const EMPTY_ROOT_KEY: &[u8] = &[0];
95
96/// A key being used for testing existence of tables
97const DB_KEY: &[u8] = &[0];
98
99/// The attribute name of the primary key (used as a sort key).
100const KEY_ATTRIBUTE: &str = "item_key";
101
102/// The attribute name of the table value blob.
103const VALUE_ATTRIBUTE: &str = "item_value";
104
105/// The attribute for obtaining the primary key (used as a sort key) with the stored value.
106const KEY_VALUE_ATTRIBUTE: &str = "item_key, item_value";
107
108/// TODO(#1084): The scheme below with the `MAX_VALUE_SIZE` has to be checked
109/// This is the maximum size of a raw value in DynamoDB.
110const RAW_MAX_VALUE_SIZE: usize = 409600;
111
112/// Fundamental constants in DynamoDB: The maximum size of a value is 400 KB
113/// See https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ServiceQuotas.html
114/// However, the value being written can also be the serialization of a `SimpleUnorderedBatch`
115/// Therefore the actual `MAX_VALUE_SIZE` might be lower.
116/// At the maximum key size is 1024 bytes (see below) and we pack just one entry.
117/// So if the key has 1024 bytes this gets us the inequality
118/// `1 + 1 + serialized_size(1024)? + serialized_size(x)? <= 400*1024`
119/// and so this simplifies to `1 + 1 + (2 + 1024) + (3 + x) <= 400 * 1024`
120/// Note on the following formula:
121/// * We write 3 because `get_uleb128_size(400*1024) == 3`
122/// * We write `1 + 1` because the `SimpleUnorderedBatch` has two entries
123///
124/// This gets us a maximal value of 408569;
125const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
126    - MAX_KEY_SIZE
127    - get_uleb128_size(RAW_MAX_VALUE_SIZE)
128    - get_uleb128_size(MAX_KEY_SIZE)
129    - 1
130    - 1;
131
132/// Fundamental constant in DynamoDB: The maximum size of a key is 1024 bytes
133/// See https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html
134const MAX_KEY_SIZE: usize = 1024;
135
136/// Fundamental constants in DynamoDB: The maximum size of a [`TransactWriteItem`] is 4 MB.
137/// See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html
138/// We're taking a conservative value because the mode of computation is unclear.
139const MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE: usize = 4000000;
140
141/// The DynamoDB database is potentially handling an infinite number of connections.
142/// However, for testing or some other purpose we really need to decrease the number of
143/// connections.
144#[cfg(with_testing)]
145const TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES: usize = 10;
146
147/// The number of entries in a stream of the tests can be controlled by this parameter for tests.
148#[cfg(with_testing)]
149const TEST_DYNAMO_DB_MAX_STREAM_QUERIES: usize = 10;
150
151/// Fundamental constants in DynamoDB: The maximum size of a [`TransactWriteItem`] is 100.
152/// See <https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html>
153const MAX_TRANSACT_WRITE_ITEM_SIZE: usize = 100;
154
155/// Builds the key attributes for a table item.
156///
157/// The key is composed of two attributes that are both binary blobs. The first attribute is a
158/// partition key and is currently just a dummy value that ensures all items are in the same
159/// partition. This is necessary for range queries to work correctly.
160///
161/// The second attribute is the actual key value, which is generated by concatenating the
162/// context prefix. `The Vec<u8>` expression is obtained from `self.derive_key`.
163fn build_key(start_key: &[u8], key: Vec<u8>) -> HashMap<String, AttributeValue> {
164    [
165        (
166            PARTITION_ATTRIBUTE.to_owned(),
167            AttributeValue::B(Blob::new(start_key.to_vec())),
168        ),
169        (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
170    ]
171    .into()
172}
173
174/// Builds the value attribute for storing a table item.
175fn build_key_value(
176    start_key: &[u8],
177    key: Vec<u8>,
178    value: Vec<u8>,
179) -> HashMap<String, AttributeValue> {
180    [
181        (
182            PARTITION_ATTRIBUTE.to_owned(),
183            AttributeValue::B(Blob::new(start_key.to_vec())),
184        ),
185        (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
186        (
187            VALUE_ATTRIBUTE.to_owned(),
188            AttributeValue::B(Blob::new(value)),
189        ),
190    ]
191    .into()
192}
193
194/// Checks that a key is of the correct size
195fn check_key_size(key: &[u8]) -> Result<(), DynamoDbStoreInternalError> {
196    ensure!(!key.is_empty(), DynamoDbStoreInternalError::ZeroLengthKey);
197    ensure!(
198        key.len() <= MAX_KEY_SIZE,
199        DynamoDbStoreInternalError::KeyTooLong
200    );
201    Ok(())
202}
203
204/// Extracts the key attribute from an item.
205fn extract_key(
206    prefix_len: usize,
207    attributes: &HashMap<String, AttributeValue>,
208) -> Result<&[u8], DynamoDbStoreInternalError> {
209    let key = attributes
210        .get(KEY_ATTRIBUTE)
211        .ok_or(DynamoDbStoreInternalError::MissingKey)?;
212    match key {
213        AttributeValue::B(blob) => Ok(&blob.as_ref()[prefix_len..]),
214        key => Err(DynamoDbStoreInternalError::wrong_key_type(key)),
215    }
216}
217
218/// Extracts the value attribute from an item.
219fn extract_value(
220    attributes: &HashMap<String, AttributeValue>,
221) -> Result<&[u8], DynamoDbStoreInternalError> {
222    // According to the official AWS DynamoDB documentation:
223    // "Binary must have a length greater than zero if the attribute is used as a key attribute for a table or index"
224    let value = attributes
225        .get(VALUE_ATTRIBUTE)
226        .ok_or(DynamoDbStoreInternalError::MissingValue)?;
227    match value {
228        AttributeValue::B(blob) => Ok(blob.as_ref()),
229        value => Err(DynamoDbStoreInternalError::wrong_value_type(value)),
230    }
231}
232
233/// Extracts the value attribute from an item (returned by value).
234fn extract_value_owned(
235    attributes: &mut HashMap<String, AttributeValue>,
236) -> Result<Vec<u8>, DynamoDbStoreInternalError> {
237    let value = attributes
238        .remove(VALUE_ATTRIBUTE)
239        .ok_or(DynamoDbStoreInternalError::MissingValue)?;
240    match value {
241        AttributeValue::B(blob) => Ok(blob.into_inner()),
242        value => Err(DynamoDbStoreInternalError::wrong_value_type(&value)),
243    }
244}
245
246/// Extracts the key and value attributes from an item.
247fn extract_key_value(
248    prefix_len: usize,
249    attributes: &HashMap<String, AttributeValue>,
250) -> Result<(&[u8], &[u8]), DynamoDbStoreInternalError> {
251    let key = extract_key(prefix_len, attributes)?;
252    let value = extract_value(attributes)?;
253    Ok((key, value))
254}
255
256struct TransactionBuilder {
257    start_key: Vec<u8>,
258    transactions: Vec<TransactWriteItem>,
259}
260
261impl TransactionBuilder {
262    fn new(start_key: &[u8]) -> Self {
263        Self {
264            start_key: start_key.to_vec(),
265            transactions: Vec::new(),
266        }
267    }
268
269    fn insert_delete_request(
270        &mut self,
271        key: Vec<u8>,
272        store: &DynamoDbStoreInternal,
273    ) -> Result<(), DynamoDbStoreInternalError> {
274        let transaction = store.build_delete_transaction(&self.start_key, key)?;
275        self.transactions.push(transaction);
276        Ok(())
277    }
278
279    fn insert_put_request(
280        &mut self,
281        key: Vec<u8>,
282        value: Vec<u8>,
283        store: &DynamoDbStoreInternal,
284    ) -> Result<(), DynamoDbStoreInternalError> {
285        let transaction = store.build_put_transaction(&self.start_key, key, value)?;
286        self.transactions.push(transaction);
287        Ok(())
288    }
289}
290
291/// A DynamoDB client.
292#[derive(Clone, Debug)]
293pub struct DynamoDbStoreInternal {
294    client: Client,
295    namespace: String,
296    semaphore: Option<Arc<Semaphore>>,
297    max_stream_queries: usize,
298    start_key: Vec<u8>,
299    root_key_written: Arc<AtomicBool>,
300}
301
302/// Database-level connection to DynamoDB for managing namespaces and partitions.
303#[derive(Clone)]
304pub struct DynamoDbDatabaseInternal {
305    client: Client,
306    namespace: String,
307    semaphore: Option<Arc<Semaphore>>,
308    max_stream_queries: usize,
309}
310
311impl WithError for DynamoDbDatabaseInternal {
312    type Error = DynamoDbStoreInternalError;
313}
314
315/// The initial configuration of the system.
316#[derive(Debug, Clone, Serialize, Deserialize)]
317pub struct DynamoDbStoreInternalConfig {
318    /// Whether to use DynamoDB local or not.
319    pub use_dynamodb_local: bool,
320    /// Maximum number of concurrent database queries allowed for this client.
321    pub max_concurrent_queries: Option<usize>,
322    /// Preferred buffer size for async streams.
323    pub max_stream_queries: usize,
324}
325
326impl DynamoDbStoreInternalConfig {
327    async fn client(&self) -> Result<Client, DynamoDbStoreInternalError> {
328        let config = if self.use_dynamodb_local {
329            get_dynamodb_local_config().await?
330        } else {
331            get_base_config().await?
332        };
333        Ok(Client::from_conf(config))
334    }
335}
336
337impl KeyValueDatabase for DynamoDbDatabaseInternal {
338    type Config = DynamoDbStoreInternalConfig;
339    type Store = DynamoDbStoreInternal;
340
341    fn get_name() -> String {
342        "dynamodb internal".to_string()
343    }
344
345    async fn connect(
346        config: &Self::Config,
347        namespace: &str,
348    ) -> Result<Self, DynamoDbStoreInternalError> {
349        Self::check_namespace(namespace)?;
350        let client = config.client().await?;
351        let semaphore = config
352            .max_concurrent_queries
353            .map(|n| Arc::new(Semaphore::new(n)));
354        let max_stream_queries = config.max_stream_queries;
355        let namespace = namespace.to_string();
356        let store = Self {
357            client,
358            namespace,
359            semaphore,
360            max_stream_queries,
361        };
362        Ok(store)
363    }
364
365    fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, DynamoDbStoreInternalError> {
366        let mut start_key = EMPTY_ROOT_KEY.to_vec();
367        start_key.extend(root_key);
368        Ok(self.open_internal(start_key))
369    }
370
371    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, DynamoDbStoreInternalError> {
372        self.open_shared(root_key)
373    }
374
375    async fn list_all(config: &Self::Config) -> Result<Vec<String>, DynamoDbStoreInternalError> {
376        let client = config.client().await?;
377        let mut namespaces = Vec::new();
378        let mut start_table = None;
379        loop {
380            let response = client
381                .list_tables()
382                .set_exclusive_start_table_name(start_table)
383                .send()
384                .boxed_sync()
385                .await?;
386            if let Some(namespaces_blk) = response.table_names {
387                namespaces.extend(namespaces_blk);
388            }
389            if response.last_evaluated_table_name.is_none() {
390                break;
391            } else {
392                start_table = response.last_evaluated_table_name;
393            }
394        }
395        Ok(namespaces)
396    }
397
398    async fn list_root_keys(
399        config: &Self::Config,
400        namespace: &str,
401    ) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
402        let database = Self::connect(config, namespace).await?;
403        let store = database.open_internal(PARTITION_KEY_ROOT_KEY.to_vec());
404        store.find_keys_by_prefix(EMPTY_ROOT_KEY).await
405    }
406
407    async fn delete_all(config: &Self::Config) -> Result<(), DynamoDbStoreInternalError> {
408        let client = config.client().await?;
409        let tables = Self::list_all(config).await?;
410        for table in tables {
411            client
412                .delete_table()
413                .table_name(&table)
414                .send()
415                .boxed_sync()
416                .await?;
417        }
418        Ok(())
419    }
420
421    async fn exists(
422        config: &Self::Config,
423        namespace: &str,
424    ) -> Result<bool, DynamoDbStoreInternalError> {
425        Self::check_namespace(namespace)?;
426        let client = config.client().await?;
427        let key_db = build_key(EMPTY_ROOT_KEY, DB_KEY.to_vec());
428        let response = client
429            .get_item()
430            .table_name(namespace)
431            .set_key(Some(key_db))
432            .send()
433            .boxed_sync()
434            .await;
435        let Err(error) = response else {
436            return Ok(true);
437        };
438        let test = match &error {
439            SdkError::ServiceError(error) => match error.err() {
440                GetItemError::ResourceNotFoundException(error) => {
441                    error.message
442                        == Some("Cannot do operations on a non-existent table".to_string())
443                }
444                _ => false,
445            },
446            _ => false,
447        };
448        if test {
449            Ok(false)
450        } else {
451            Err(error.into())
452        }
453    }
454
455    async fn create(
456        config: &Self::Config,
457        namespace: &str,
458    ) -> Result<(), DynamoDbStoreInternalError> {
459        Self::check_namespace(namespace)?;
460        let client = config.client().await?;
461        client
462            .create_table()
463            .table_name(namespace)
464            .attribute_definitions(
465                AttributeDefinition::builder()
466                    .attribute_name(PARTITION_ATTRIBUTE)
467                    .attribute_type(ScalarAttributeType::B)
468                    .build()?,
469            )
470            .attribute_definitions(
471                AttributeDefinition::builder()
472                    .attribute_name(KEY_ATTRIBUTE)
473                    .attribute_type(ScalarAttributeType::B)
474                    .build()?,
475            )
476            .key_schema(
477                KeySchemaElement::builder()
478                    .attribute_name(PARTITION_ATTRIBUTE)
479                    .key_type(KeyType::Hash)
480                    .build()?,
481            )
482            .key_schema(
483                KeySchemaElement::builder()
484                    .attribute_name(KEY_ATTRIBUTE)
485                    .key_type(KeyType::Range)
486                    .build()?,
487            )
488            .provisioned_throughput(
489                ProvisionedThroughput::builder()
490                    .read_capacity_units(10)
491                    .write_capacity_units(10)
492                    .build()?,
493            )
494            .send()
495            .boxed_sync()
496            .await?;
497        Ok(())
498    }
499
500    async fn delete(
501        config: &Self::Config,
502        namespace: &str,
503    ) -> Result<(), DynamoDbStoreInternalError> {
504        Self::check_namespace(namespace)?;
505        let client = config.client().await?;
506        client
507            .delete_table()
508            .table_name(namespace)
509            .send()
510            .boxed_sync()
511            .await?;
512        Ok(())
513    }
514}
515
516impl DynamoDbDatabaseInternal {
517    /// Namespaces are named table names in DynamoDB [naming
518    /// rules](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.NamingRules),
519    /// so we need to check correctness of the namespace
520    fn check_namespace(namespace: &str) -> Result<(), InvalidNamespace> {
521        if namespace.len() < 3 {
522            return Err(InvalidNamespace::TooShort);
523        }
524        if namespace.len() > 255 {
525            return Err(InvalidNamespace::TooLong);
526        }
527        if !namespace.chars().all(|character| {
528            character.is_ascii_alphanumeric()
529                || character == '.'
530                || character == '-'
531                || character == '_'
532        }) {
533            return Err(InvalidNamespace::InvalidCharacter);
534        }
535        Ok(())
536    }
537
538    fn open_internal(&self, start_key: Vec<u8>) -> DynamoDbStoreInternal {
539        let client = self.client.clone();
540        let namespace = self.namespace.clone();
541        let semaphore = self.semaphore.clone();
542        let max_stream_queries = self.max_stream_queries;
543        DynamoDbStoreInternal {
544            client,
545            namespace,
546            semaphore,
547            max_stream_queries,
548            start_key,
549            root_key_written: Arc::new(AtomicBool::new(false)),
550        }
551    }
552}
553
554impl DynamoDbStoreInternal {
555    fn build_delete_transaction(
556        &self,
557        start_key: &[u8],
558        key: Vec<u8>,
559    ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
560        check_key_size(&key)?;
561        let request = Delete::builder()
562            .table_name(&self.namespace)
563            .set_key(Some(build_key(start_key, key)))
564            .build()?;
565        Ok(TransactWriteItem::builder().delete(request).build())
566    }
567
568    fn build_put_transaction(
569        &self,
570        start_key: &[u8],
571        key: Vec<u8>,
572        value: Vec<u8>,
573    ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
574        check_key_size(&key)?;
575        ensure!(
576            value.len() <= RAW_MAX_VALUE_SIZE,
577            DynamoDbStoreInternalError::ValueLengthTooLarge
578        );
579        let request = Put::builder()
580            .table_name(&self.namespace)
581            .set_item(Some(build_key_value(start_key, key, value)))
582            .build()?;
583        Ok(TransactWriteItem::builder().put(request).build())
584    }
585
586    /// Obtains the semaphore lock on the database if needed.
587    async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
588        match &self.semaphore {
589            None => None,
590            Some(count) => Some(count.acquire().await),
591        }
592    }
593
594    async fn get_query_output(
595        &self,
596        attribute_str: &str,
597        start_key: &[u8],
598        key_prefix: &[u8],
599        start_key_map: Option<HashMap<String, AttributeValue>>,
600    ) -> Result<QueryOutput, DynamoDbStoreInternalError> {
601        let _guard = self.acquire().await;
602        let start_key = start_key.to_vec();
603        let response = self
604            .client
605            .query()
606            .table_name(&self.namespace)
607            .projection_expression(attribute_str)
608            .key_condition_expression(format!(
609                "{PARTITION_ATTRIBUTE} = :partition and begins_with({KEY_ATTRIBUTE}, :prefix)"
610            ))
611            .expression_attribute_values(":partition", AttributeValue::B(Blob::new(start_key)))
612            .expression_attribute_values(":prefix", AttributeValue::B(Blob::new(key_prefix)))
613            .set_exclusive_start_key(start_key_map)
614            .send()
615            .boxed_sync()
616            .await?;
617        Ok(response)
618    }
619
620    async fn read_value_bytes_general(
621        &self,
622        key_db: HashMap<String, AttributeValue>,
623    ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
624        let _guard = self.acquire().await;
625        let response = self
626            .client
627            .get_item()
628            .table_name(&self.namespace)
629            .set_key(Some(key_db))
630            .send()
631            .boxed_sync()
632            .await?;
633
634        match response.item {
635            Some(mut item) => {
636                let value = extract_value_owned(&mut item)?;
637                Ok(Some(value))
638            }
639            None => Ok(None),
640        }
641    }
642
643    async fn contains_key_general(
644        &self,
645        key_db: HashMap<String, AttributeValue>,
646    ) -> Result<bool, DynamoDbStoreInternalError> {
647        let _guard = self.acquire().await;
648        let response = self
649            .client
650            .get_item()
651            .table_name(&self.namespace)
652            .set_key(Some(key_db))
653            .projection_expression(PARTITION_ATTRIBUTE)
654            .send()
655            .boxed_sync()
656            .await?;
657
658        Ok(response.item.is_some())
659    }
660
661    async fn get_list_responses(
662        &self,
663        attribute: &str,
664        start_key: &[u8],
665        key_prefix: &[u8],
666    ) -> Result<QueryResponses, DynamoDbStoreInternalError> {
667        check_key_size(key_prefix)?;
668        let mut responses = Vec::new();
669        let mut start_key_map = None;
670        loop {
671            let response = self
672                .get_query_output(attribute, start_key, key_prefix, start_key_map)
673                .await?;
674            let last_evaluated = response.last_evaluated_key.clone();
675            responses.push(response);
676            match last_evaluated {
677                None => {
678                    break;
679                }
680                Some(value) => {
681                    start_key_map = Some(value);
682                }
683            }
684        }
685        Ok(QueryResponses {
686            prefix_len: key_prefix.len(),
687            responses,
688        })
689    }
690}
691
692struct QueryResponses {
693    prefix_len: usize,
694    responses: Vec<QueryOutput>,
695}
696
697impl QueryResponses {
698    fn keys(&self) -> impl Iterator<Item = Result<&[u8], DynamoDbStoreInternalError>> {
699        self.responses
700            .iter()
701            .flat_map(|response| response.items.iter().flatten())
702            .map(|item| extract_key(self.prefix_len, item))
703    }
704
705    fn key_values(
706        &self,
707    ) -> impl Iterator<Item = Result<(&[u8], &[u8]), DynamoDbStoreInternalError>> {
708        self.responses
709            .iter()
710            .flat_map(|response| response.items.iter().flatten())
711            .map(|item| extract_key_value(self.prefix_len, item))
712    }
713}
714
715impl WithError for DynamoDbStoreInternal {
716    type Error = DynamoDbStoreInternalError;
717}
718
719impl ReadableKeyValueStore for DynamoDbStoreInternal {
720    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
721
722    fn max_stream_queries(&self) -> usize {
723        self.max_stream_queries
724    }
725
726    async fn read_value_bytes(
727        &self,
728        key: &[u8],
729    ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
730        check_key_size(key)?;
731        let key_db = build_key(&self.start_key, key.to_vec());
732        self.read_value_bytes_general(key_db).await
733    }
734
735    async fn contains_key(&self, key: &[u8]) -> Result<bool, DynamoDbStoreInternalError> {
736        check_key_size(key)?;
737        let key_db = build_key(&self.start_key, key.to_vec());
738        self.contains_key_general(key_db).await
739    }
740
741    async fn contains_keys(
742        &self,
743        keys: Vec<Vec<u8>>,
744    ) -> Result<Vec<bool>, DynamoDbStoreInternalError> {
745        let mut handles = Vec::new();
746        for key in keys {
747            check_key_size(&key)?;
748            let key_db = build_key(&self.start_key, key);
749            let handle = self.contains_key_general(key_db);
750            handles.push(handle);
751        }
752        join_all(handles)
753            .await
754            .into_iter()
755            .collect::<Result<_, _>>()
756    }
757
758    async fn read_multi_values_bytes(
759        &self,
760        keys: Vec<Vec<u8>>,
761    ) -> Result<Vec<Option<Vec<u8>>>, DynamoDbStoreInternalError> {
762        let mut handles = Vec::new();
763        for key in keys {
764            check_key_size(&key)?;
765            let key_db = build_key(&self.start_key, key);
766            let handle = self.read_value_bytes_general(key_db);
767            handles.push(handle);
768        }
769        join_all(handles)
770            .await
771            .into_iter()
772            .collect::<Result<_, _>>()
773    }
774
775    async fn find_keys_by_prefix(
776        &self,
777        key_prefix: &[u8],
778    ) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
779        let result_queries = self
780            .get_list_responses(KEY_ATTRIBUTE, &self.start_key, key_prefix)
781            .await?;
782        result_queries
783            .keys()
784            .map(|key| key.map(|k| k.to_vec()))
785            .collect()
786    }
787
788    async fn find_key_values_by_prefix(
789        &self,
790        key_prefix: &[u8],
791    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, DynamoDbStoreInternalError> {
792        let result_queries = self
793            .get_list_responses(KEY_VALUE_ATTRIBUTE, &self.start_key, key_prefix)
794            .await?;
795        result_queries
796            .key_values()
797            .map(|entry| entry.map(|(key, value)| (key.to_vec(), value.to_vec())))
798            .collect()
799    }
800}
801
802impl DirectWritableKeyValueStore for DynamoDbStoreInternal {
803    const MAX_BATCH_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_SIZE;
804    const MAX_BATCH_TOTAL_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE;
805    const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
806
807    // DynamoDB does not support the `DeletePrefix` operation.
808    type Batch = SimpleUnorderedBatch;
809
810    async fn write_batch(&self, batch: Self::Batch) -> Result<(), DynamoDbStoreInternalError> {
811        if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
812            let mut builder = TransactionBuilder::new(PARTITION_KEY_ROOT_KEY);
813            builder.insert_put_request(self.start_key.clone(), vec![], self)?;
814            self.client
815                .transact_write_items()
816                .set_transact_items(Some(builder.transactions))
817                .send()
818                .boxed_sync()
819                .await?;
820        }
821        let mut builder = TransactionBuilder::new(&self.start_key);
822        for key in batch.deletions {
823            builder.insert_delete_request(key, self)?;
824        }
825        for (key, value) in batch.insertions {
826            builder.insert_put_request(key, value, self)?;
827        }
828        if !builder.transactions.is_empty() {
829            let _guard = self.acquire().await;
830            self.client
831                .transact_write_items()
832                .set_transact_items(Some(builder.transactions))
833                .send()
834                .boxed_sync()
835                .await?;
836        }
837        Ok(())
838    }
839}
840
841/// Error when validating a namespace
842#[derive(Debug, Error)]
843pub enum InvalidNamespace {
844    /// The namespace should be at least 3 characters.
845    #[error("Namespace must have at least 3 characters")]
846    TooShort,
847
848    /// The namespace should be at most 63 characters.
849    #[error("Namespace must be at most 63 characters")]
850    TooLong,
851
852    /// allowed characters are lowercase letters, numbers, periods and hyphens
853    #[error("Namespace must only contain lowercase letters, numbers, periods and hyphens")]
854    InvalidCharacter,
855}
856
857/// Errors that occur when using [`DynamoDbStoreInternal`].
858#[derive(Debug, Error)]
859pub enum DynamoDbStoreInternalError {
860    /// An error occurred while getting the item.
861    #[error(transparent)]
862    Get(#[from] Box<SdkError<GetItemError>>),
863
864    /// An error occurred while writing a transaction of items.
865    #[error(transparent)]
866    TransactWriteItem(#[from] Box<SdkError<TransactWriteItemsError>>),
867
868    /// An error occurred while doing a Query.
869    #[error(transparent)]
870    Query(#[from] Box<SdkError<QueryError>>),
871
872    /// An error occurred while deleting a table
873    #[error(transparent)]
874    DeleteTable(#[from] Box<SdkError<DeleteTableError>>),
875
876    /// An error occurred while listing tables
877    #[error(transparent)]
878    ListTables(#[from] Box<SdkError<ListTablesError>>),
879
880    /// The transact maximum size is `MAX_TRANSACT_WRITE_ITEM_SIZE`.
881    #[error("The transact must have length at most MAX_TRANSACT_WRITE_ITEM_SIZE")]
882    TransactUpperLimitSize,
883
884    /// Keys have to be of non-zero length.
885    #[error("The key must be of strictly positive length")]
886    ZeroLengthKey,
887
888    /// The key must have at most 1024 bytes
889    #[error("The key must have at most 1024 bytes")]
890    KeyTooLong,
891
892    /// The key prefix must have at most 1024 bytes
893    #[error("The key prefix must have at most 1024 bytes")]
894    KeyPrefixTooLong,
895
896    /// Key prefixes have to be of non-zero length.
897    #[error("The key_prefix must be of strictly positive length")]
898    ZeroLengthKeyPrefix,
899
900    /// The journal is not coherent
901    #[error(transparent)]
902    JournalConsistencyError(#[from] JournalConsistencyError),
903
904    /// The length of the value should be at most 400 KB.
905    #[error("The DynamoDB value should be less than 400 KB")]
906    ValueLengthTooLarge,
907
908    /// The stored key is missing.
909    #[error("The stored key attribute is missing")]
910    MissingKey,
911
912    /// The type of the keys was not correct (It should have been a binary blob).
913    #[error("Key was stored as {0}, but it was expected to be stored as a binary blob")]
914    WrongKeyType(String),
915
916    /// The value attribute is missing.
917    #[error("The stored value attribute is missing")]
918    MissingValue,
919
920    /// The value was stored as the wrong type (it should be a binary blob).
921    #[error("Value was stored as {0}, but it was expected to be stored as a binary blob")]
922    WrongValueType(String),
923
924    /// A BCS error occurred.
925    #[error(transparent)]
926    BcsError(#[from] bcs::Error),
927
928    /// A wrong namespace error occurred
929    #[error(transparent)]
930    InvalidNamespace(#[from] InvalidNamespace),
931
932    /// An error occurred while creating the table.
933    #[error(transparent)]
934    CreateTable(#[from] Box<SdkError<CreateTableError>>),
935
936    /// An error occurred while building an object
937    #[error(transparent)]
938    Build(#[from] Box<BuildError>),
939}
940
941impl<InnerError> From<SdkError<InnerError>> for DynamoDbStoreInternalError
942where
943    DynamoDbStoreInternalError: From<Box<SdkError<InnerError>>>,
944{
945    fn from(error: SdkError<InnerError>) -> Self {
946        Box::new(error).into()
947    }
948}
949
950impl From<BuildError> for DynamoDbStoreInternalError {
951    fn from(error: BuildError) -> Self {
952        Box::new(error).into()
953    }
954}
955
956impl DynamoDbStoreInternalError {
957    /// Creates a [`DynamoDbStoreInternalError::WrongKeyType`] instance based on the returned value type.
958    ///
959    /// # Panics
960    ///
961    /// If the value type is in the correct type, a binary blob.
962    pub fn wrong_key_type(value: &AttributeValue) -> Self {
963        DynamoDbStoreInternalError::WrongKeyType(Self::type_description_of(value))
964    }
965
966    /// Creates a [`DynamoDbStoreInternalError::WrongValueType`] instance based on the returned value type.
967    ///
968    /// # Panics
969    ///
970    /// If the value type is in the correct type, a binary blob.
971    pub fn wrong_value_type(value: &AttributeValue) -> Self {
972        DynamoDbStoreInternalError::WrongValueType(Self::type_description_of(value))
973    }
974
975    fn type_description_of(value: &AttributeValue) -> String {
976        match value {
977            AttributeValue::B(_) => unreachable!("creating an error type for the correct type"),
978            AttributeValue::Bool(_) => "a boolean",
979            AttributeValue::Bs(_) => "a list of binary blobs",
980            AttributeValue::L(_) => "a list",
981            AttributeValue::M(_) => "a map",
982            AttributeValue::N(_) => "a number",
983            AttributeValue::Ns(_) => "a list of numbers",
984            AttributeValue::Null(_) => "a null value",
985            AttributeValue::S(_) => "a string",
986            AttributeValue::Ss(_) => "a list of strings",
987            _ => "an unknown type",
988        }
989        .to_owned()
990    }
991}
992
993impl KeyValueStoreError for DynamoDbStoreInternalError {
994    const BACKEND: &'static str = "dynamo_db";
995}
996
997#[cfg(with_testing)]
998impl TestKeyValueDatabase for JournalingKeyValueDatabase<DynamoDbDatabaseInternal> {
999    async fn new_test_config() -> Result<DynamoDbStoreInternalConfig, DynamoDbStoreInternalError> {
1000        Ok(DynamoDbStoreInternalConfig {
1001            use_dynamodb_local: true,
1002            max_concurrent_queries: Some(TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES),
1003            max_stream_queries: TEST_DYNAMO_DB_MAX_STREAM_QUERIES,
1004        })
1005    }
1006}
1007
1008/// The combined error type for [`DynamoDbDatabase`].
1009pub type DynamoDbStoreError = ValueSplittingError<DynamoDbStoreInternalError>;
1010
1011/// The config type for [`DynamoDbDatabase`]`
1012pub type DynamoDbStoreConfig = LruCachingConfig<DynamoDbStoreInternalConfig>;
1013
1014/// A shared DB client for DynamoDB with metrics
1015#[cfg(with_metrics)]
1016pub type DynamoDbDatabase = MeteredDatabase<
1017    LruCachingDatabase<
1018        MeteredDatabase<
1019            ValueSplittingDatabase<
1020                MeteredDatabase<JournalingKeyValueDatabase<DynamoDbDatabaseInternal>>,
1021            >,
1022        >,
1023    >,
1024>;
1025/// A shared DB client for DynamoDB
1026#[cfg(not(with_metrics))]
1027pub type DynamoDbDatabase = LruCachingDatabase<
1028    ValueSplittingDatabase<JournalingKeyValueDatabase<DynamoDbDatabaseInternal>>,
1029>;
1030
1031#[cfg(test)]
1032mod tests {
1033    use bcs::serialized_size;
1034
1035    use crate::common::get_uleb128_size;
1036
1037    #[test]
1038    fn test_serialization_len() {
1039        for n in [0, 10, 127, 128, 129, 16383, 16384, 20000] {
1040            let vec = vec![0u8; n];
1041            let est_size = get_uleb128_size(n) + n;
1042            let serial_size = serialized_size(&vec).unwrap();
1043            assert_eq!(est_size, serial_size);
1044        }
1045    }
1046}