linera_views/backends/
dynamo_db.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Implements [`crate::store::KeyValueStore`] for the DynamoDB database.
5
6use std::{
7    collections::HashMap,
8    env,
9    sync::{
10        atomic::{AtomicBool, Ordering},
11        Arc,
12    },
13};
14
15use async_lock::{Semaphore, SemaphoreGuard};
16use aws_sdk_dynamodb::{
17    error::SdkError,
18    operation::{
19        create_table::CreateTableError,
20        delete_table::DeleteTableError,
21        get_item::GetItemError,
22        list_tables::ListTablesError,
23        query::{QueryError, QueryOutput},
24        transact_write_items::TransactWriteItemsError,
25    },
26    primitives::Blob,
27    types::{
28        AttributeDefinition, AttributeValue, Delete, KeySchemaElement, KeyType,
29        ProvisionedThroughput, Put, ScalarAttributeType, TransactWriteItem,
30    },
31    Client,
32};
33use aws_smithy_types::error::operation::BuildError;
34use futures::future::join_all;
35use linera_base::ensure;
36use serde::{Deserialize, Serialize};
37use thiserror::Error;
38
39#[cfg(with_metrics)]
40use crate::metering::MeteredStore;
41#[cfg(with_testing)]
42use crate::store::TestKeyValueStore;
43use crate::{
44    batch::SimpleUnorderedBatch,
45    common::get_uleb128_size,
46    journaling::{DirectWritableKeyValueStore, JournalConsistencyError, JournalingKeyValueStore},
47    lru_caching::{LruCachingConfig, LruCachingStore},
48    store::{
49        AdminKeyValueStore, CommonStoreInternalConfig, KeyIterable, KeyValueIterable,
50        KeyValueStoreError, ReadableKeyValueStore, WithError,
51    },
52    value_splitting::{ValueSplittingError, ValueSplittingStore},
53    FutureSyncExt as _,
54};
55
56/// Name of the environment variable with the address to a DynamoDB local instance.
57const DYNAMODB_LOCAL_ENDPOINT: &str = "DYNAMODB_LOCAL_ENDPOINT";
58
59/// Gets the AWS configuration from the environment
60async fn get_base_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError> {
61    let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
62        .boxed_sync()
63        .await;
64    Ok((&base_config).into())
65}
66
67fn get_endpoint_address() -> Option<String> {
68    env::var(DYNAMODB_LOCAL_ENDPOINT).ok()
69}
70
71/// Gets the DynamoDB local config
72async fn get_dynamodb_local_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError>
73{
74    let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
75        .boxed_sync()
76        .await;
77    let endpoint_address = get_endpoint_address().unwrap();
78    let config = aws_sdk_dynamodb::config::Builder::from(&base_config)
79        .endpoint_url(endpoint_address)
80        .build();
81    Ok(config)
82}
83
84/// DynamoDB forbids the iteration over the partition keys.
85/// Therefore we use a special partition key named `[1]` for storing
86/// the root keys. For normal root keys, we simply put a `[0]` in
87/// front therefore no intersection is possible.
88const PARTITION_KEY_ROOT_KEY: &[u8] = &[1];
89
90/// The attribute name of the partition key.
91const PARTITION_ATTRIBUTE: &str = "item_partition";
92
93/// A root key being used for testing existence of tables
94const EMPTY_ROOT_KEY: &[u8] = &[0];
95
96/// A key being used for testing existence of tables
97const DB_KEY: &[u8] = &[0];
98
99/// The attribute name of the primary key (used as a sort key).
100const KEY_ATTRIBUTE: &str = "item_key";
101
102/// The attribute name of the table value blob.
103const VALUE_ATTRIBUTE: &str = "item_value";
104
105/// The attribute for obtaining the primary key (used as a sort key) with the stored value.
106const KEY_VALUE_ATTRIBUTE: &str = "item_key, item_value";
107
108/// TODO(#1084): The scheme below with the `MAX_VALUE_SIZE` has to be checked
109/// This is the maximum size of a raw value in DynamoDB.
110const RAW_MAX_VALUE_SIZE: usize = 409600;
111
112/// Fundamental constants in DynamoDB: The maximum size of a value is 400 KB
113/// See https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ServiceQuotas.html
114/// However, the value being written can also be the serialization of a `SimpleUnorderedBatch`
115/// Therefore the actual `MAX_VALUE_SIZE` might be lower.
116/// At the maximum key size is 1024 bytes (see below) and we pack just one entry.
117/// So if the key has 1024 bytes this gets us the inequality
118/// `1 + 1 + serialized_size(1024)? + serialized_size(x)? <= 400*1024`
119/// and so this simplifies to `1 + 1 + (2 + 1024) + (3 + x) <= 400 * 1024`
120/// Note on the following formula:
121/// * We write 3 because `get_uleb128_size(400*1024) == 3`
122/// * We write `1 + 1` because the `SimpleUnorderedBatch` has two entries
123///
124/// This gets us a maximal value of 408569;
125const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
126    - MAX_KEY_SIZE
127    - get_uleb128_size(RAW_MAX_VALUE_SIZE)
128    - get_uleb128_size(MAX_KEY_SIZE)
129    - 1
130    - 1;
131
132/// Fundamental constant in DynamoDB: The maximum size of a key is 1024 bytes
133/// See https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html
134const MAX_KEY_SIZE: usize = 1024;
135
136/// Fundamental constants in DynamoDB: The maximum size of a [`TransactWriteItem`] is 4 MB.
137/// See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html
138/// We're taking a conservative value because the mode of computation is unclear.
139const MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE: usize = 4000000;
140
141/// The DynamoDB database is potentially handling an infinite number of connections.
142/// However, for testing or some other purpose we really need to decrease the number of
143/// connections.
144#[cfg(with_testing)]
145const TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES: usize = 10;
146
147/// The number of entries in a stream of the tests can be controlled by this parameter for tests.
148#[cfg(with_testing)]
149const TEST_DYNAMO_DB_MAX_STREAM_QUERIES: usize = 10;
150
151/// Fundamental constants in DynamoDB: The maximum size of a [`TransactWriteItem`] is 100.
152/// See <https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html>
153const MAX_TRANSACT_WRITE_ITEM_SIZE: usize = 100;
154
155/// Keys of length 0 are not allowed, so we extend by having a prefix on start
156fn extend_root_key(root_key: &[u8]) -> Vec<u8> {
157    let mut start_key = EMPTY_ROOT_KEY.to_vec();
158    start_key.extend(root_key);
159    start_key
160}
161
162/// Builds the key attributes for a table item.
163///
164/// The key is composed of two attributes that are both binary blobs. The first attribute is a
165/// partition key and is currently just a dummy value that ensures all items are in the same
166/// partition. This is necessary for range queries to work correctly.
167///
168/// The second attribute is the actual key value, which is generated by concatenating the
169/// context prefix. `The Vec<u8>` expression is obtained from `self.derive_key`.
170fn build_key(start_key: &[u8], key: Vec<u8>) -> HashMap<String, AttributeValue> {
171    [
172        (
173            PARTITION_ATTRIBUTE.to_owned(),
174            AttributeValue::B(Blob::new(start_key.to_vec())),
175        ),
176        (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
177    ]
178    .into()
179}
180
181/// Builds the value attribute for storing a table item.
182fn build_key_value(
183    start_key: &[u8],
184    key: Vec<u8>,
185    value: Vec<u8>,
186) -> HashMap<String, AttributeValue> {
187    [
188        (
189            PARTITION_ATTRIBUTE.to_owned(),
190            AttributeValue::B(Blob::new(start_key.to_vec())),
191        ),
192        (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
193        (
194            VALUE_ATTRIBUTE.to_owned(),
195            AttributeValue::B(Blob::new(value)),
196        ),
197    ]
198    .into()
199}
200
201/// Checks that a key is of the correct size
202fn check_key_size(key: &[u8]) -> Result<(), DynamoDbStoreInternalError> {
203    ensure!(!key.is_empty(), DynamoDbStoreInternalError::ZeroLengthKey);
204    ensure!(
205        key.len() <= MAX_KEY_SIZE,
206        DynamoDbStoreInternalError::KeyTooLong
207    );
208    Ok(())
209}
210
211/// Extracts the key attribute from an item.
212fn extract_key(
213    prefix_len: usize,
214    attributes: &HashMap<String, AttributeValue>,
215) -> Result<&[u8], DynamoDbStoreInternalError> {
216    let key = attributes
217        .get(KEY_ATTRIBUTE)
218        .ok_or(DynamoDbStoreInternalError::MissingKey)?;
219    match key {
220        AttributeValue::B(blob) => Ok(&blob.as_ref()[prefix_len..]),
221        key => Err(DynamoDbStoreInternalError::wrong_key_type(key)),
222    }
223}
224
225/// Extracts the value attribute from an item.
226fn extract_value(
227    attributes: &HashMap<String, AttributeValue>,
228) -> Result<&[u8], DynamoDbStoreInternalError> {
229    // According to the official AWS DynamoDB documentation:
230    // "Binary must have a length greater than zero if the attribute is used as a key attribute for a table or index"
231    let value = attributes
232        .get(VALUE_ATTRIBUTE)
233        .ok_or(DynamoDbStoreInternalError::MissingValue)?;
234    match value {
235        AttributeValue::B(blob) => Ok(blob.as_ref()),
236        value => Err(DynamoDbStoreInternalError::wrong_value_type(value)),
237    }
238}
239
240/// Extracts the value attribute from an item (returned by value).
241fn extract_value_owned(
242    attributes: &mut HashMap<String, AttributeValue>,
243) -> Result<Vec<u8>, DynamoDbStoreInternalError> {
244    let value = attributes
245        .remove(VALUE_ATTRIBUTE)
246        .ok_or(DynamoDbStoreInternalError::MissingValue)?;
247    match value {
248        AttributeValue::B(blob) => Ok(blob.into_inner()),
249        value => Err(DynamoDbStoreInternalError::wrong_value_type(&value)),
250    }
251}
252
253/// Extracts the key and value attributes from an item.
254fn extract_key_value(
255    prefix_len: usize,
256    attributes: &HashMap<String, AttributeValue>,
257) -> Result<(&[u8], &[u8]), DynamoDbStoreInternalError> {
258    let key = extract_key(prefix_len, attributes)?;
259    let value = extract_value(attributes)?;
260    Ok((key, value))
261}
262
263/// Extracts the `(key, value)` pair attributes from an item (returned by value).
264fn extract_key_value_owned(
265    prefix_len: usize,
266    attributes: &mut HashMap<String, AttributeValue>,
267) -> Result<(Vec<u8>, Vec<u8>), DynamoDbStoreInternalError> {
268    let key = extract_key(prefix_len, attributes)?.to_vec();
269    let value = extract_value_owned(attributes)?;
270    Ok((key, value))
271}
272
273struct TransactionBuilder {
274    start_key: Vec<u8>,
275    transactions: Vec<TransactWriteItem>,
276}
277
278impl TransactionBuilder {
279    fn new(start_key: &[u8]) -> Self {
280        Self {
281            start_key: start_key.to_vec(),
282            transactions: Vec::new(),
283        }
284    }
285
286    fn insert_delete_request(
287        &mut self,
288        key: Vec<u8>,
289        store: &DynamoDbStoreInternal,
290    ) -> Result<(), DynamoDbStoreInternalError> {
291        let transaction = store.build_delete_transaction(&self.start_key, key)?;
292        self.transactions.push(transaction);
293        Ok(())
294    }
295
296    fn insert_put_request(
297        &mut self,
298        key: Vec<u8>,
299        value: Vec<u8>,
300        store: &DynamoDbStoreInternal,
301    ) -> Result<(), DynamoDbStoreInternalError> {
302        let transaction = store.build_put_transaction(&self.start_key, key, value)?;
303        self.transactions.push(transaction);
304        Ok(())
305    }
306}
307
308/// A DynamoDB client.
309#[derive(Clone, Debug)]
310pub struct DynamoDbStoreInternal {
311    client: Client,
312    namespace: String,
313    semaphore: Option<Arc<Semaphore>>,
314    max_stream_queries: usize,
315    start_key: Vec<u8>,
316    root_key_written: Arc<AtomicBool>,
317}
318
319/// The initial configuration of the system.
320#[derive(Debug, Clone, Serialize, Deserialize)]
321pub struct DynamoDbStoreInternalConfig {
322    /// Whether to use DynamoDB local or not.
323    use_dynamodb_local: bool,
324    /// The common configuration of the key value store
325    common_config: CommonStoreInternalConfig,
326}
327
328impl DynamoDbStoreInternalConfig {
329    async fn client(&self) -> Result<Client, DynamoDbStoreInternalError> {
330        let config = if self.use_dynamodb_local {
331            get_dynamodb_local_config().await?
332        } else {
333            get_base_config().await?
334        };
335        Ok(Client::from_conf(config))
336    }
337}
338
339impl AdminKeyValueStore for DynamoDbStoreInternal {
340    type Config = DynamoDbStoreInternalConfig;
341
342    fn get_name() -> String {
343        "dynamodb internal".to_string()
344    }
345
346    async fn connect(
347        config: &Self::Config,
348        namespace: &str,
349    ) -> Result<Self, DynamoDbStoreInternalError> {
350        Self::check_namespace(namespace)?;
351        let client = config.client().await?;
352        let semaphore = config
353            .common_config
354            .max_concurrent_queries
355            .map(|n| Arc::new(Semaphore::new(n)));
356        let max_stream_queries = config.common_config.max_stream_queries;
357        let namespace = namespace.to_string();
358        let start_key = extend_root_key(&[]);
359        let store = Self {
360            client,
361            namespace,
362            semaphore,
363            max_stream_queries,
364            start_key,
365            root_key_written: Arc::new(AtomicBool::new(false)),
366        };
367        Ok(store)
368    }
369
370    fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, DynamoDbStoreInternalError> {
371        let client = self.client.clone();
372        let namespace = self.namespace.clone();
373        let semaphore = self.semaphore.clone();
374        let max_stream_queries = self.max_stream_queries;
375        let start_key = extend_root_key(root_key);
376        Ok(Self {
377            client,
378            namespace,
379            semaphore,
380            max_stream_queries,
381            start_key,
382            root_key_written: Arc::new(AtomicBool::new(false)),
383        })
384    }
385
386    async fn list_all(config: &Self::Config) -> Result<Vec<String>, DynamoDbStoreInternalError> {
387        let client = config.client().await?;
388        let mut namespaces = Vec::new();
389        let mut start_table = None;
390        loop {
391            let response = client
392                .list_tables()
393                .set_exclusive_start_table_name(start_table)
394                .send()
395                .boxed_sync()
396                .await?;
397            if let Some(namespaces_blk) = response.table_names {
398                namespaces.extend(namespaces_blk);
399            }
400            if response.last_evaluated_table_name.is_none() {
401                break;
402            } else {
403                start_table = response.last_evaluated_table_name;
404            }
405        }
406        Ok(namespaces)
407    }
408
409    async fn list_root_keys(
410        config: &Self::Config,
411        namespace: &str,
412    ) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
413        let mut store = Self::connect(config, namespace).await?;
414        store.start_key = PARTITION_KEY_ROOT_KEY.to_vec();
415
416        let keys = store.find_keys_by_prefix(EMPTY_ROOT_KEY).await?;
417
418        let mut root_keys = Vec::new();
419        for key in keys.iterator() {
420            let key = key?;
421            root_keys.push(key.to_vec());
422        }
423        Ok(root_keys)
424    }
425
426    async fn delete_all(config: &Self::Config) -> Result<(), DynamoDbStoreInternalError> {
427        let client = config.client().await?;
428        let tables = Self::list_all(config).await?;
429        for table in tables {
430            client
431                .delete_table()
432                .table_name(&table)
433                .send()
434                .boxed_sync()
435                .await?;
436        }
437        Ok(())
438    }
439
440    async fn exists(
441        config: &Self::Config,
442        namespace: &str,
443    ) -> Result<bool, DynamoDbStoreInternalError> {
444        Self::check_namespace(namespace)?;
445        let client = config.client().await?;
446        let key_db = build_key(EMPTY_ROOT_KEY, DB_KEY.to_vec());
447        let response = client
448            .get_item()
449            .table_name(namespace)
450            .set_key(Some(key_db))
451            .send()
452            .boxed_sync()
453            .await;
454        let Err(error) = response else {
455            return Ok(true);
456        };
457        let test = match &error {
458            SdkError::ServiceError(error) => match error.err() {
459                GetItemError::ResourceNotFoundException(error) => {
460                    error.message
461                        == Some("Cannot do operations on a non-existent table".to_string())
462                }
463                _ => false,
464            },
465            _ => false,
466        };
467        if test {
468            Ok(false)
469        } else {
470            Err(error.into())
471        }
472    }
473
474    async fn create(
475        config: &Self::Config,
476        namespace: &str,
477    ) -> Result<(), DynamoDbStoreInternalError> {
478        Self::check_namespace(namespace)?;
479        let client = config.client().await?;
480        client
481            .create_table()
482            .table_name(namespace)
483            .attribute_definitions(
484                AttributeDefinition::builder()
485                    .attribute_name(PARTITION_ATTRIBUTE)
486                    .attribute_type(ScalarAttributeType::B)
487                    .build()?,
488            )
489            .attribute_definitions(
490                AttributeDefinition::builder()
491                    .attribute_name(KEY_ATTRIBUTE)
492                    .attribute_type(ScalarAttributeType::B)
493                    .build()?,
494            )
495            .key_schema(
496                KeySchemaElement::builder()
497                    .attribute_name(PARTITION_ATTRIBUTE)
498                    .key_type(KeyType::Hash)
499                    .build()?,
500            )
501            .key_schema(
502                KeySchemaElement::builder()
503                    .attribute_name(KEY_ATTRIBUTE)
504                    .key_type(KeyType::Range)
505                    .build()?,
506            )
507            .provisioned_throughput(
508                ProvisionedThroughput::builder()
509                    .read_capacity_units(10)
510                    .write_capacity_units(10)
511                    .build()?,
512            )
513            .send()
514            .boxed_sync()
515            .await?;
516        Ok(())
517    }
518
519    async fn delete(
520        config: &Self::Config,
521        namespace: &str,
522    ) -> Result<(), DynamoDbStoreInternalError> {
523        Self::check_namespace(namespace)?;
524        let client = config.client().await?;
525        client
526            .delete_table()
527            .table_name(namespace)
528            .send()
529            .boxed_sync()
530            .await?;
531        Ok(())
532    }
533}
534
535impl DynamoDbStoreInternal {
536    /// Namespaces are named table names in DynamoDB [naming
537    /// rules](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.NamingRules),
538    /// so we need to check correctness of the namespace
539    fn check_namespace(namespace: &str) -> Result<(), InvalidNamespace> {
540        if namespace.len() < 3 {
541            return Err(InvalidNamespace::TooShort);
542        }
543        if namespace.len() > 255 {
544            return Err(InvalidNamespace::TooLong);
545        }
546        if !namespace.chars().all(|character| {
547            character.is_ascii_alphanumeric()
548                || character == '.'
549                || character == '-'
550                || character == '_'
551        }) {
552            return Err(InvalidNamespace::InvalidCharacter);
553        }
554        Ok(())
555    }
556
557    fn build_delete_transaction(
558        &self,
559        start_key: &[u8],
560        key: Vec<u8>,
561    ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
562        check_key_size(&key)?;
563        let request = Delete::builder()
564            .table_name(&self.namespace)
565            .set_key(Some(build_key(start_key, key)))
566            .build()?;
567        Ok(TransactWriteItem::builder().delete(request).build())
568    }
569
570    fn build_put_transaction(
571        &self,
572        start_key: &[u8],
573        key: Vec<u8>,
574        value: Vec<u8>,
575    ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
576        check_key_size(&key)?;
577        ensure!(
578            value.len() <= RAW_MAX_VALUE_SIZE,
579            DynamoDbStoreInternalError::ValueLengthTooLarge
580        );
581        let request = Put::builder()
582            .table_name(&self.namespace)
583            .set_item(Some(build_key_value(start_key, key, value)))
584            .build()?;
585        Ok(TransactWriteItem::builder().put(request).build())
586    }
587
588    /// Obtains the semaphore lock on the database if needed.
589    async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
590        match &self.semaphore {
591            None => None,
592            Some(count) => Some(count.acquire().await),
593        }
594    }
595
596    async fn get_query_output(
597        &self,
598        attribute_str: &str,
599        start_key: &[u8],
600        key_prefix: &[u8],
601        start_key_map: Option<HashMap<String, AttributeValue>>,
602    ) -> Result<QueryOutput, DynamoDbStoreInternalError> {
603        let _guard = self.acquire().await;
604        let start_key = start_key.to_vec();
605        let response = self
606            .client
607            .query()
608            .table_name(&self.namespace)
609            .projection_expression(attribute_str)
610            .key_condition_expression(format!(
611                "{PARTITION_ATTRIBUTE} = :partition and begins_with({KEY_ATTRIBUTE}, :prefix)"
612            ))
613            .expression_attribute_values(":partition", AttributeValue::B(Blob::new(start_key)))
614            .expression_attribute_values(":prefix", AttributeValue::B(Blob::new(key_prefix)))
615            .set_exclusive_start_key(start_key_map)
616            .send()
617            .boxed_sync()
618            .await?;
619        Ok(response)
620    }
621
622    async fn read_value_bytes_general(
623        &self,
624        key_db: HashMap<String, AttributeValue>,
625    ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
626        let _guard = self.acquire().await;
627        let response = self
628            .client
629            .get_item()
630            .table_name(&self.namespace)
631            .set_key(Some(key_db))
632            .send()
633            .boxed_sync()
634            .await?;
635
636        match response.item {
637            Some(mut item) => {
638                let value = extract_value_owned(&mut item)?;
639                Ok(Some(value))
640            }
641            None => Ok(None),
642        }
643    }
644
645    async fn contains_key_general(
646        &self,
647        key_db: HashMap<String, AttributeValue>,
648    ) -> Result<bool, DynamoDbStoreInternalError> {
649        let _guard = self.acquire().await;
650        let response = self
651            .client
652            .get_item()
653            .table_name(&self.namespace)
654            .set_key(Some(key_db))
655            .projection_expression(PARTITION_ATTRIBUTE)
656            .send()
657            .boxed_sync()
658            .await?;
659
660        Ok(response.item.is_some())
661    }
662
663    async fn get_list_responses(
664        &self,
665        attribute: &str,
666        start_key: &[u8],
667        key_prefix: &[u8],
668    ) -> Result<QueryResponses, DynamoDbStoreInternalError> {
669        check_key_size(key_prefix)?;
670        let mut responses = Vec::new();
671        let mut start_key_map = None;
672        loop {
673            let response = self
674                .get_query_output(attribute, start_key, key_prefix, start_key_map)
675                .await?;
676            let last_evaluated = response.last_evaluated_key.clone();
677            responses.push(response);
678            match last_evaluated {
679                None => {
680                    break;
681                }
682                Some(value) => {
683                    start_key_map = Some(value);
684                }
685            }
686        }
687        Ok(QueryResponses {
688            prefix_len: key_prefix.len(),
689            responses,
690        })
691    }
692}
693
694struct QueryResponses {
695    prefix_len: usize,
696    responses: Vec<QueryOutput>,
697}
698
699// Inspired by https://depth-first.com/articles/2020/06/22/returning-rust-iterators/
700#[doc(hidden)]
701#[expect(clippy::type_complexity)]
702pub struct DynamoDbKeyBlockIterator<'a> {
703    prefix_len: usize,
704    pos: usize,
705    iters: Vec<
706        std::iter::Flatten<
707            std::option::Iter<'a, Vec<HashMap<std::string::String, AttributeValue>>>,
708        >,
709    >,
710}
711
712impl<'a> Iterator for DynamoDbKeyBlockIterator<'a> {
713    type Item = Result<&'a [u8], DynamoDbStoreInternalError>;
714
715    fn next(&mut self) -> Option<Self::Item> {
716        let result = self.iters[self.pos].next();
717        match result {
718            None => {
719                if self.pos == self.iters.len() - 1 {
720                    return None;
721                }
722                self.pos += 1;
723                self.iters[self.pos]
724                    .next()
725                    .map(|x| extract_key(self.prefix_len, x))
726            }
727            Some(result) => Some(extract_key(self.prefix_len, result)),
728        }
729    }
730}
731
732/// A set of keys returned by a search query on DynamoDB.
733pub struct DynamoDbKeys {
734    result_queries: QueryResponses,
735}
736
737impl KeyIterable<DynamoDbStoreInternalError> for DynamoDbKeys {
738    type Iterator<'a>
739        = DynamoDbKeyBlockIterator<'a>
740    where
741        Self: 'a;
742
743    fn iterator(&self) -> Self::Iterator<'_> {
744        let pos = 0;
745        let mut iters = Vec::new();
746        for response in &self.result_queries.responses {
747            let iter = response.items.iter().flatten();
748            iters.push(iter);
749        }
750        DynamoDbKeyBlockIterator {
751            prefix_len: self.result_queries.prefix_len,
752            pos,
753            iters,
754        }
755    }
756}
757
758/// A set of `(key, value)` returned by a search query on DynamoDB.
759pub struct DynamoDbKeyValues {
760    result_queries: QueryResponses,
761}
762
763#[doc(hidden)]
764#[expect(clippy::type_complexity)]
765pub struct DynamoDbKeyValueIterator<'a> {
766    prefix_len: usize,
767    pos: usize,
768    iters: Vec<
769        std::iter::Flatten<
770            std::option::Iter<'a, Vec<HashMap<std::string::String, AttributeValue>>>,
771        >,
772    >,
773}
774
775impl<'a> Iterator for DynamoDbKeyValueIterator<'a> {
776    type Item = Result<(&'a [u8], &'a [u8]), DynamoDbStoreInternalError>;
777
778    fn next(&mut self) -> Option<Self::Item> {
779        let result = self.iters[self.pos].next();
780        match result {
781            None => {
782                if self.pos == self.iters.len() - 1 {
783                    return None;
784                }
785                self.pos += 1;
786                self.iters[self.pos]
787                    .next()
788                    .map(|x| extract_key_value(self.prefix_len, x))
789            }
790            Some(result) => Some(extract_key_value(self.prefix_len, result)),
791        }
792    }
793}
794
795#[doc(hidden)]
796#[expect(clippy::type_complexity)]
797pub struct DynamoDbKeyValueIteratorOwned {
798    prefix_len: usize,
799    pos: usize,
800    iters: Vec<
801        std::iter::Flatten<
802            std::option::IntoIter<Vec<HashMap<std::string::String, AttributeValue>>>,
803        >,
804    >,
805}
806
807impl Iterator for DynamoDbKeyValueIteratorOwned {
808    type Item = Result<(Vec<u8>, Vec<u8>), DynamoDbStoreInternalError>;
809
810    fn next(&mut self) -> Option<Self::Item> {
811        let result = self.iters[self.pos].next();
812        match result {
813            None => {
814                if self.pos == self.iters.len() - 1 {
815                    return None;
816                }
817                self.pos += 1;
818                self.iters[self.pos]
819                    .next()
820                    .map(|mut x| extract_key_value_owned(self.prefix_len, &mut x))
821            }
822            Some(mut result) => Some(extract_key_value_owned(self.prefix_len, &mut result)),
823        }
824    }
825}
826
827impl KeyValueIterable<DynamoDbStoreInternalError> for DynamoDbKeyValues {
828    type Iterator<'a>
829        = DynamoDbKeyValueIterator<'a>
830    where
831        Self: 'a;
832    type IteratorOwned = DynamoDbKeyValueIteratorOwned;
833
834    fn iterator(&self) -> Self::Iterator<'_> {
835        let pos = 0;
836        let mut iters = Vec::new();
837        for response in &self.result_queries.responses {
838            let iter = response.items.iter().flatten();
839            iters.push(iter);
840        }
841        DynamoDbKeyValueIterator {
842            prefix_len: self.result_queries.prefix_len,
843            pos,
844            iters,
845        }
846    }
847
848    fn into_iterator_owned(self) -> Self::IteratorOwned {
849        let pos = 0;
850        let mut iters = Vec::new();
851        for response in self.result_queries.responses.into_iter() {
852            let iter = response.items.into_iter().flatten();
853            iters.push(iter);
854        }
855        DynamoDbKeyValueIteratorOwned {
856            prefix_len: self.result_queries.prefix_len,
857            pos,
858            iters,
859        }
860    }
861}
862
863impl WithError for DynamoDbStoreInternal {
864    type Error = DynamoDbStoreInternalError;
865}
866
867impl ReadableKeyValueStore for DynamoDbStoreInternal {
868    const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
869    type Keys = DynamoDbKeys;
870    type KeyValues = DynamoDbKeyValues;
871
872    fn max_stream_queries(&self) -> usize {
873        self.max_stream_queries
874    }
875
876    async fn read_value_bytes(
877        &self,
878        key: &[u8],
879    ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
880        check_key_size(key)?;
881        let key_db = build_key(&self.start_key, key.to_vec());
882        self.read_value_bytes_general(key_db).await
883    }
884
885    async fn contains_key(&self, key: &[u8]) -> Result<bool, DynamoDbStoreInternalError> {
886        check_key_size(key)?;
887        let key_db = build_key(&self.start_key, key.to_vec());
888        self.contains_key_general(key_db).await
889    }
890
891    async fn contains_keys(
892        &self,
893        keys: Vec<Vec<u8>>,
894    ) -> Result<Vec<bool>, DynamoDbStoreInternalError> {
895        let mut handles = Vec::new();
896        for key in keys {
897            check_key_size(&key)?;
898            let key_db = build_key(&self.start_key, key);
899            let handle = self.contains_key_general(key_db);
900            handles.push(handle);
901        }
902        join_all(handles)
903            .await
904            .into_iter()
905            .collect::<Result<_, _>>()
906    }
907
908    async fn read_multi_values_bytes(
909        &self,
910        keys: Vec<Vec<u8>>,
911    ) -> Result<Vec<Option<Vec<u8>>>, DynamoDbStoreInternalError> {
912        let mut handles = Vec::new();
913        for key in keys {
914            check_key_size(&key)?;
915            let key_db = build_key(&self.start_key, key);
916            let handle = self.read_value_bytes_general(key_db);
917            handles.push(handle);
918        }
919        join_all(handles)
920            .await
921            .into_iter()
922            .collect::<Result<_, _>>()
923    }
924
925    async fn find_keys_by_prefix(
926        &self,
927        key_prefix: &[u8],
928    ) -> Result<DynamoDbKeys, DynamoDbStoreInternalError> {
929        let result_queries = self
930            .get_list_responses(KEY_ATTRIBUTE, &self.start_key, key_prefix)
931            .await?;
932        Ok(DynamoDbKeys { result_queries })
933    }
934
935    async fn find_key_values_by_prefix(
936        &self,
937        key_prefix: &[u8],
938    ) -> Result<DynamoDbKeyValues, DynamoDbStoreInternalError> {
939        let result_queries = self
940            .get_list_responses(KEY_VALUE_ATTRIBUTE, &self.start_key, key_prefix)
941            .await?;
942        Ok(DynamoDbKeyValues { result_queries })
943    }
944}
945
946impl DirectWritableKeyValueStore for DynamoDbStoreInternal {
947    const MAX_BATCH_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_SIZE;
948    const MAX_BATCH_TOTAL_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE;
949    const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
950
951    // DynamoDB does not support the `DeletePrefix` operation.
952    type Batch = SimpleUnorderedBatch;
953
954    async fn write_batch(&self, batch: Self::Batch) -> Result<(), DynamoDbStoreInternalError> {
955        if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
956            let mut builder = TransactionBuilder::new(PARTITION_KEY_ROOT_KEY);
957            builder.insert_put_request(self.start_key.clone(), vec![], self)?;
958            self.client
959                .transact_write_items()
960                .set_transact_items(Some(builder.transactions))
961                .send()
962                .boxed_sync()
963                .await?;
964        }
965        let mut builder = TransactionBuilder::new(&self.start_key);
966        for key in batch.deletions {
967            builder.insert_delete_request(key, self)?;
968        }
969        for (key, value) in batch.insertions {
970            builder.insert_put_request(key, value, self)?;
971        }
972        if !builder.transactions.is_empty() {
973            let _guard = self.acquire().await;
974            self.client
975                .transact_write_items()
976                .set_transact_items(Some(builder.transactions))
977                .send()
978                .boxed_sync()
979                .await?;
980        }
981        Ok(())
982    }
983}
984
985/// Error when validating a namespace
986#[derive(Debug, Error)]
987pub enum InvalidNamespace {
988    /// The namespace should be at least 3 characters.
989    #[error("Namespace must have at least 3 characters")]
990    TooShort,
991
992    /// The namespace should be at most 63 characters.
993    #[error("Namespace must be at most 63 characters")]
994    TooLong,
995
996    /// allowed characters are lowercase letters, numbers, periods and hyphens
997    #[error("Namespace must only contain lowercase letters, numbers, periods and hyphens")]
998    InvalidCharacter,
999}
1000
1001/// Errors that occur when using [`DynamoDbStoreInternal`].
1002#[derive(Debug, Error)]
1003pub enum DynamoDbStoreInternalError {
1004    /// An error occurred while getting the item.
1005    #[error(transparent)]
1006    Get(#[from] Box<SdkError<GetItemError>>),
1007
1008    /// An error occurred while writing a transaction of items.
1009    #[error(transparent)]
1010    TransactWriteItem(#[from] Box<SdkError<TransactWriteItemsError>>),
1011
1012    /// An error occurred while doing a Query.
1013    #[error(transparent)]
1014    Query(#[from] Box<SdkError<QueryError>>),
1015
1016    /// An error occurred while deleting a table
1017    #[error(transparent)]
1018    DeleteTable(#[from] Box<SdkError<DeleteTableError>>),
1019
1020    /// An error occurred while listing tables
1021    #[error(transparent)]
1022    ListTables(#[from] Box<SdkError<ListTablesError>>),
1023
1024    /// The transact maximum size is `MAX_TRANSACT_WRITE_ITEM_SIZE`.
1025    #[error("The transact must have length at most MAX_TRANSACT_WRITE_ITEM_SIZE")]
1026    TransactUpperLimitSize,
1027
1028    /// Keys have to be of non-zero length.
1029    #[error("The key must be of strictly positive length")]
1030    ZeroLengthKey,
1031
1032    /// The key must have at most 1024 bytes
1033    #[error("The key must have at most 1024 bytes")]
1034    KeyTooLong,
1035
1036    /// The key prefix must have at most 1024 bytes
1037    #[error("The key prefix must have at most 1024 bytes")]
1038    KeyPrefixTooLong,
1039
1040    /// Key prefixes have to be of non-zero length.
1041    #[error("The key_prefix must be of strictly positive length")]
1042    ZeroLengthKeyPrefix,
1043
1044    /// The journal is not coherent
1045    #[error(transparent)]
1046    JournalConsistencyError(#[from] JournalConsistencyError),
1047
1048    /// The length of the value should be at most 400 KB.
1049    #[error("The DynamoDB value should be less than 400 KB")]
1050    ValueLengthTooLarge,
1051
1052    /// The stored key is missing.
1053    #[error("The stored key attribute is missing")]
1054    MissingKey,
1055
1056    /// The type of the keys was not correct (It should have been a binary blob).
1057    #[error("Key was stored as {0}, but it was expected to be stored as a binary blob")]
1058    WrongKeyType(String),
1059
1060    /// The value attribute is missing.
1061    #[error("The stored value attribute is missing")]
1062    MissingValue,
1063
1064    /// The value was stored as the wrong type (it should be a binary blob).
1065    #[error("Value was stored as {0}, but it was expected to be stored as a binary blob")]
1066    WrongValueType(String),
1067
1068    /// A BCS error occurred.
1069    #[error(transparent)]
1070    BcsError(#[from] bcs::Error),
1071
1072    /// A wrong namespace error occurred
1073    #[error(transparent)]
1074    InvalidNamespace(#[from] InvalidNamespace),
1075
1076    /// An error occurred while creating the table.
1077    #[error(transparent)]
1078    CreateTable(#[from] Box<SdkError<CreateTableError>>),
1079
1080    /// An error occurred while building an object
1081    #[error(transparent)]
1082    Build(#[from] Box<BuildError>),
1083}
1084
1085impl<InnerError> From<SdkError<InnerError>> for DynamoDbStoreInternalError
1086where
1087    DynamoDbStoreInternalError: From<Box<SdkError<InnerError>>>,
1088{
1089    fn from(error: SdkError<InnerError>) -> Self {
1090        Box::new(error).into()
1091    }
1092}
1093
1094impl From<BuildError> for DynamoDbStoreInternalError {
1095    fn from(error: BuildError) -> Self {
1096        Box::new(error).into()
1097    }
1098}
1099
1100impl DynamoDbStoreInternalError {
1101    /// Creates a [`DynamoDbStoreInternalError::WrongKeyType`] instance based on the returned value type.
1102    ///
1103    /// # Panics
1104    ///
1105    /// If the value type is in the correct type, a binary blob.
1106    pub fn wrong_key_type(value: &AttributeValue) -> Self {
1107        DynamoDbStoreInternalError::WrongKeyType(Self::type_description_of(value))
1108    }
1109
1110    /// Creates a [`DynamoDbStoreInternalError::WrongValueType`] instance based on the returned value type.
1111    ///
1112    /// # Panics
1113    ///
1114    /// If the value type is in the correct type, a binary blob.
1115    pub fn wrong_value_type(value: &AttributeValue) -> Self {
1116        DynamoDbStoreInternalError::WrongValueType(Self::type_description_of(value))
1117    }
1118
1119    fn type_description_of(value: &AttributeValue) -> String {
1120        match value {
1121            AttributeValue::B(_) => unreachable!("creating an error type for the correct type"),
1122            AttributeValue::Bool(_) => "a boolean",
1123            AttributeValue::Bs(_) => "a list of binary blobs",
1124            AttributeValue::L(_) => "a list",
1125            AttributeValue::M(_) => "a map",
1126            AttributeValue::N(_) => "a number",
1127            AttributeValue::Ns(_) => "a list of numbers",
1128            AttributeValue::Null(_) => "a null value",
1129            AttributeValue::S(_) => "a string",
1130            AttributeValue::Ss(_) => "a list of strings",
1131            _ => "an unknown type",
1132        }
1133        .to_owned()
1134    }
1135}
1136
1137impl KeyValueStoreError for DynamoDbStoreInternalError {
1138    const BACKEND: &'static str = "dynamo_db";
1139}
1140
1141#[cfg(with_testing)]
1142impl TestKeyValueStore for JournalingKeyValueStore<DynamoDbStoreInternal> {
1143    async fn new_test_config() -> Result<DynamoDbStoreInternalConfig, DynamoDbStoreInternalError> {
1144        let common_config = CommonStoreInternalConfig {
1145            max_concurrent_queries: Some(TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES),
1146            max_stream_queries: TEST_DYNAMO_DB_MAX_STREAM_QUERIES,
1147            replication_factor: 1,
1148        };
1149        Ok(DynamoDbStoreInternalConfig {
1150            use_dynamodb_local: true,
1151            common_config,
1152        })
1153    }
1154}
1155
1156/// A shared DB client for DynamoDB implementing LRU caching and metrics
1157#[cfg(with_metrics)]
1158pub type DynamoDbStore = MeteredStore<
1159    LruCachingStore<
1160        MeteredStore<
1161            ValueSplittingStore<MeteredStore<JournalingKeyValueStore<DynamoDbStoreInternal>>>,
1162        >,
1163    >,
1164>;
1165
1166/// A shared DB client for DynamoDB implementing LRU caching
1167#[cfg(not(with_metrics))]
1168pub type DynamoDbStore =
1169    LruCachingStore<ValueSplittingStore<JournalingKeyValueStore<DynamoDbStoreInternal>>>;
1170
1171/// The combined error type for [`DynamoDbStore`].
1172pub type DynamoDbStoreError = ValueSplittingError<DynamoDbStoreInternalError>;
1173
1174/// The config type for [`DynamoDbStore`]`
1175pub type DynamoDbStoreConfig = LruCachingConfig<DynamoDbStoreInternalConfig>;
1176
1177impl DynamoDbStoreConfig {
1178    /// Creates a `DynamoDbStoreConfig` from the input.
1179    pub fn new(
1180        use_dynamodb_local: bool,
1181        common_config: crate::store::CommonStoreConfig,
1182    ) -> DynamoDbStoreConfig {
1183        let inner_config = DynamoDbStoreInternalConfig {
1184            use_dynamodb_local,
1185            common_config: common_config.reduced(),
1186        };
1187        DynamoDbStoreConfig {
1188            inner_config,
1189            storage_cache_config: common_config.storage_cache_config,
1190        }
1191    }
1192}
1193
1194#[cfg(test)]
1195mod tests {
1196    use bcs::serialized_size;
1197
1198    use crate::common::get_uleb128_size;
1199
1200    #[test]
1201    fn test_serialization_len() {
1202        for n in [0, 10, 127, 128, 129, 16383, 16384, 20000] {
1203            let vec = vec![0u8; n];
1204            let est_size = get_uleb128_size(n) + n;
1205            let serial_size = serialized_size(&vec).unwrap();
1206            assert_eq!(est_size, serial_size);
1207        }
1208    }
1209}