1use std::{
7 collections::HashMap,
8 env,
9 sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc,
12 },
13};
14
15use async_lock::{Semaphore, SemaphoreGuard};
16use aws_sdk_dynamodb::{
17 error::SdkError,
18 operation::{
19 batch_get_item::BatchGetItemError,
20 create_table::CreateTableError,
21 delete_table::DeleteTableError,
22 get_item::GetItemError,
23 list_tables::ListTablesError,
24 query::{QueryError, QueryOutput},
25 transact_write_items::TransactWriteItemsError,
26 },
27 primitives::Blob,
28 types::{
29 AttributeDefinition, AttributeValue, Delete, KeySchemaElement, KeyType, KeysAndAttributes,
30 ProvisionedThroughput, Put, ScalarAttributeType, TransactWriteItem,
31 },
32 Client,
33};
34use aws_smithy_types::error::operation::BuildError;
35use futures::future::join_all;
36use linera_base::{ensure, util::future::FutureSyncExt as _};
37use serde::{Deserialize, Serialize};
38use thiserror::Error;
39
40#[cfg(with_metrics)]
41use crate::metering::MeteredDatabase;
42#[cfg(with_testing)]
43use crate::store::TestKeyValueDatabase;
44use crate::{
45 batch::SimpleUnorderedBatch,
46 common::get_uleb128_size,
47 journaling::{JournalConsistencyError, JournalingKeyValueDatabase},
48 lru_caching::{LruCachingConfig, LruCachingDatabase},
49 store::{
50 DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
51 WithError,
52 },
53 value_splitting::{ValueSplittingDatabase, ValueSplittingError},
54};
55
56const DYNAMODB_LOCAL_ENDPOINT: &str = "DYNAMODB_LOCAL_ENDPOINT";
58
59async fn get_base_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError> {
61 let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
62 .boxed_sync()
63 .await;
64 Ok((&base_config).into())
65}
66
67fn get_endpoint_address() -> Option<String> {
68 env::var(DYNAMODB_LOCAL_ENDPOINT).ok()
69}
70
71async fn get_dynamodb_local_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError>
73{
74 let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
75 .boxed_sync()
76 .await;
77 let endpoint_address = get_endpoint_address().unwrap();
78 let config = aws_sdk_dynamodb::config::Builder::from(&base_config)
79 .endpoint_url(endpoint_address)
80 .build();
81 Ok(config)
82}
83
84const PARTITION_KEY_ROOT_KEY: &[u8] = &[1];
89
90const PARTITION_ATTRIBUTE: &str = "item_partition";
92
93const EMPTY_ROOT_KEY: &[u8] = &[0];
95
96const DB_KEY: &[u8] = &[0];
98
99const KEY_ATTRIBUTE: &str = "item_key";
101
102const VALUE_ATTRIBUTE: &str = "item_value";
104
105const KEY_VALUE_ATTRIBUTE: &str = "item_key, item_value";
107
108const RAW_MAX_VALUE_SIZE: usize = 409600;
111
112const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
126 - MAX_KEY_SIZE
127 - get_uleb128_size(RAW_MAX_VALUE_SIZE)
128 - get_uleb128_size(MAX_KEY_SIZE)
129 - 1
130 - 1;
131
132const MAX_KEY_SIZE: usize = 1023;
136
137const MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE: usize = 4000000;
141
142#[cfg(with_testing)]
146const TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES: usize = 10;
147
148#[cfg(with_testing)]
150const TEST_DYNAMO_DB_MAX_STREAM_QUERIES: usize = 10;
151
152const MAX_TRANSACT_WRITE_ITEM_SIZE: usize = 100;
155
156const MAX_BATCH_GET_ITEM_SIZE: usize = 40;
160
161fn build_key(start_key: &[u8], key: Vec<u8>) -> HashMap<String, AttributeValue> {
170 let mut prefixed_key = vec![1];
171 prefixed_key.extend(key);
172 [
173 (
174 PARTITION_ATTRIBUTE.to_owned(),
175 AttributeValue::B(Blob::new(start_key.to_vec())),
176 ),
177 (
178 KEY_ATTRIBUTE.to_owned(),
179 AttributeValue::B(Blob::new(prefixed_key)),
180 ),
181 ]
182 .into()
183}
184
185fn build_key_value(
187 start_key: &[u8],
188 key: Vec<u8>,
189 value: Vec<u8>,
190) -> HashMap<String, AttributeValue> {
191 let mut prefixed_key = vec![1];
192 prefixed_key.extend(key);
193 [
194 (
195 PARTITION_ATTRIBUTE.to_owned(),
196 AttributeValue::B(Blob::new(start_key.to_vec())),
197 ),
198 (
199 KEY_ATTRIBUTE.to_owned(),
200 AttributeValue::B(Blob::new(prefixed_key)),
201 ),
202 (
203 VALUE_ATTRIBUTE.to_owned(),
204 AttributeValue::B(Blob::new(value)),
205 ),
206 ]
207 .into()
208}
209
210fn check_key_size(key: &[u8]) -> Result<(), DynamoDbStoreInternalError> {
212 ensure!(
213 key.len() <= MAX_KEY_SIZE,
214 DynamoDbStoreInternalError::KeyTooLong
215 );
216 Ok(())
217}
218
219fn extract_key(
221 prefix_len: usize,
222 attributes: &HashMap<String, AttributeValue>,
223) -> Result<&[u8], DynamoDbStoreInternalError> {
224 let key = attributes
225 .get(KEY_ATTRIBUTE)
226 .ok_or(DynamoDbStoreInternalError::MissingKey)?;
227 match key {
228 AttributeValue::B(blob) => Ok(&blob.as_ref()[1 + prefix_len..]),
229 key => Err(DynamoDbStoreInternalError::wrong_key_type(key)),
230 }
231}
232
233fn extract_value(
235 attributes: &HashMap<String, AttributeValue>,
236) -> Result<&[u8], DynamoDbStoreInternalError> {
237 let value = attributes
240 .get(VALUE_ATTRIBUTE)
241 .ok_or(DynamoDbStoreInternalError::MissingValue)?;
242 match value {
243 AttributeValue::B(blob) => Ok(blob.as_ref()),
244 value => Err(DynamoDbStoreInternalError::wrong_value_type(value)),
245 }
246}
247
248fn extract_value_owned(
250 attributes: &mut HashMap<String, AttributeValue>,
251) -> Result<Vec<u8>, DynamoDbStoreInternalError> {
252 let value = attributes
253 .remove(VALUE_ATTRIBUTE)
254 .ok_or(DynamoDbStoreInternalError::MissingValue)?;
255 match value {
256 AttributeValue::B(blob) => Ok(blob.into_inner()),
257 value => Err(DynamoDbStoreInternalError::wrong_value_type(&value)),
258 }
259}
260
261fn extract_key_value(
263 prefix_len: usize,
264 attributes: &HashMap<String, AttributeValue>,
265) -> Result<(&[u8], &[u8]), DynamoDbStoreInternalError> {
266 let key = extract_key(prefix_len, attributes)?;
267 let value = extract_value(attributes)?;
268 Ok((key, value))
269}
270
271struct TransactionBuilder {
272 start_key: Vec<u8>,
273 transactions: Vec<TransactWriteItem>,
274}
275
276impl TransactionBuilder {
277 fn new(start_key: &[u8]) -> Self {
278 Self {
279 start_key: start_key.to_vec(),
280 transactions: Vec::new(),
281 }
282 }
283
284 fn insert_delete_request(
285 &mut self,
286 key: Vec<u8>,
287 store: &DynamoDbStoreInternal,
288 ) -> Result<(), DynamoDbStoreInternalError> {
289 let transaction = store.build_delete_transaction(&self.start_key, key)?;
290 self.transactions.push(transaction);
291 Ok(())
292 }
293
294 fn insert_put_request(
295 &mut self,
296 key: Vec<u8>,
297 value: Vec<u8>,
298 store: &DynamoDbStoreInternal,
299 ) -> Result<(), DynamoDbStoreInternalError> {
300 let transaction = store.build_put_transaction(&self.start_key, key, value)?;
301 self.transactions.push(transaction);
302 Ok(())
303 }
304}
305
306#[derive(Clone, Debug)]
308pub struct DynamoDbStoreInternal {
309 client: Client,
310 namespace: String,
311 semaphore: Option<Arc<Semaphore>>,
312 max_stream_queries: usize,
313 start_key: Vec<u8>,
314 root_key_written: Arc<AtomicBool>,
315}
316
317#[derive(Clone)]
319pub struct DynamoDbDatabaseInternal {
320 client: Client,
321 namespace: String,
322 semaphore: Option<Arc<Semaphore>>,
323 max_stream_queries: usize,
324}
325
326impl WithError for DynamoDbDatabaseInternal {
327 type Error = DynamoDbStoreInternalError;
328}
329
330#[derive(Debug, Clone, Serialize, Deserialize)]
332pub struct DynamoDbStoreInternalConfig {
333 pub use_dynamodb_local: bool,
335 pub max_concurrent_queries: Option<usize>,
337 pub max_stream_queries: usize,
339}
340
341impl DynamoDbStoreInternalConfig {
342 async fn client(&self) -> Result<Client, DynamoDbStoreInternalError> {
343 let config = if self.use_dynamodb_local {
344 get_dynamodb_local_config().await?
345 } else {
346 get_base_config().await?
347 };
348 Ok(Client::from_conf(config))
349 }
350}
351
352impl KeyValueDatabase for DynamoDbDatabaseInternal {
353 type Config = DynamoDbStoreInternalConfig;
354 type Store = DynamoDbStoreInternal;
355
356 fn get_name() -> String {
357 "dynamodb internal".to_string()
358 }
359
360 async fn connect(
361 config: &Self::Config,
362 namespace: &str,
363 ) -> Result<Self, DynamoDbStoreInternalError> {
364 Self::check_namespace(namespace)?;
365 let client = config.client().await?;
366 let semaphore = config
367 .max_concurrent_queries
368 .map(|n| Arc::new(Semaphore::new(n)));
369 let max_stream_queries = config.max_stream_queries;
370 let namespace = namespace.to_string();
371 let store = Self {
372 client,
373 namespace,
374 semaphore,
375 max_stream_queries,
376 };
377 Ok(store)
378 }
379
380 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, DynamoDbStoreInternalError> {
381 let mut start_key = EMPTY_ROOT_KEY.to_vec();
382 start_key.extend(root_key);
383 Ok(self.open_internal(start_key))
384 }
385
386 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, DynamoDbStoreInternalError> {
387 self.open_shared(root_key)
388 }
389
390 async fn list_all(config: &Self::Config) -> Result<Vec<String>, DynamoDbStoreInternalError> {
391 let client = config.client().await?;
392 let mut namespaces = Vec::new();
393 let mut start_table = None;
394 loop {
395 let response = client
396 .list_tables()
397 .set_exclusive_start_table_name(start_table)
398 .send()
399 .boxed_sync()
400 .await?;
401 if let Some(namespaces_blk) = response.table_names {
402 namespaces.extend(namespaces_blk);
403 }
404 if response.last_evaluated_table_name.is_none() {
405 break;
406 } else {
407 start_table = response.last_evaluated_table_name;
408 }
409 }
410 Ok(namespaces)
411 }
412
413 async fn list_root_keys(&self) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
414 let store = self.open_internal(PARTITION_KEY_ROOT_KEY.to_vec());
415 store.find_keys_by_prefix(EMPTY_ROOT_KEY).await
416 }
417
418 async fn delete_all(config: &Self::Config) -> Result<(), DynamoDbStoreInternalError> {
419 let client = config.client().await?;
420 let tables = Self::list_all(config).await?;
421 for table in tables {
422 client
423 .delete_table()
424 .table_name(&table)
425 .send()
426 .boxed_sync()
427 .await?;
428 }
429 Ok(())
430 }
431
432 async fn exists(
433 config: &Self::Config,
434 namespace: &str,
435 ) -> Result<bool, DynamoDbStoreInternalError> {
436 Self::check_namespace(namespace)?;
437 let client = config.client().await?;
438 let key_db = build_key(EMPTY_ROOT_KEY, DB_KEY.to_vec());
439 let response = client
440 .get_item()
441 .table_name(namespace)
442 .set_key(Some(key_db))
443 .send()
444 .boxed_sync()
445 .await;
446 let Err(error) = response else {
447 return Ok(true);
448 };
449 let test = match &error {
450 SdkError::ServiceError(error) => match error.err() {
451 GetItemError::ResourceNotFoundException(error) => {
452 error.message
453 == Some("Cannot do operations on a non-existent table".to_string())
454 }
455 _ => false,
456 },
457 _ => false,
458 };
459 if test {
460 Ok(false)
461 } else {
462 Err(error.into())
463 }
464 }
465
466 async fn create(
467 config: &Self::Config,
468 namespace: &str,
469 ) -> Result<(), DynamoDbStoreInternalError> {
470 Self::check_namespace(namespace)?;
471 let client = config.client().await?;
472 client
473 .create_table()
474 .table_name(namespace)
475 .attribute_definitions(
476 AttributeDefinition::builder()
477 .attribute_name(PARTITION_ATTRIBUTE)
478 .attribute_type(ScalarAttributeType::B)
479 .build()?,
480 )
481 .attribute_definitions(
482 AttributeDefinition::builder()
483 .attribute_name(KEY_ATTRIBUTE)
484 .attribute_type(ScalarAttributeType::B)
485 .build()?,
486 )
487 .key_schema(
488 KeySchemaElement::builder()
489 .attribute_name(PARTITION_ATTRIBUTE)
490 .key_type(KeyType::Hash)
491 .build()?,
492 )
493 .key_schema(
494 KeySchemaElement::builder()
495 .attribute_name(KEY_ATTRIBUTE)
496 .key_type(KeyType::Range)
497 .build()?,
498 )
499 .provisioned_throughput(
500 ProvisionedThroughput::builder()
501 .read_capacity_units(10)
502 .write_capacity_units(10)
503 .build()?,
504 )
505 .send()
506 .boxed_sync()
507 .await?;
508 Ok(())
509 }
510
511 async fn delete(
512 config: &Self::Config,
513 namespace: &str,
514 ) -> Result<(), DynamoDbStoreInternalError> {
515 Self::check_namespace(namespace)?;
516 let client = config.client().await?;
517 client
518 .delete_table()
519 .table_name(namespace)
520 .send()
521 .boxed_sync()
522 .await?;
523 Ok(())
524 }
525}
526
527impl DynamoDbDatabaseInternal {
528 fn check_namespace(namespace: &str) -> Result<(), InvalidNamespace> {
532 if namespace.len() < 3 {
533 return Err(InvalidNamespace::TooShort);
534 }
535 if namespace.len() > 255 {
536 return Err(InvalidNamespace::TooLong);
537 }
538 if !namespace.chars().all(|character| {
539 character.is_ascii_alphanumeric()
540 || character == '.'
541 || character == '-'
542 || character == '_'
543 }) {
544 return Err(InvalidNamespace::InvalidCharacter);
545 }
546 Ok(())
547 }
548
549 fn open_internal(&self, start_key: Vec<u8>) -> DynamoDbStoreInternal {
550 let client = self.client.clone();
551 let namespace = self.namespace.clone();
552 let semaphore = self.semaphore.clone();
553 let max_stream_queries = self.max_stream_queries;
554 DynamoDbStoreInternal {
555 client,
556 namespace,
557 semaphore,
558 max_stream_queries,
559 start_key,
560 root_key_written: Arc::new(AtomicBool::new(false)),
561 }
562 }
563}
564
565impl DynamoDbStoreInternal {
566 fn build_delete_transaction(
567 &self,
568 start_key: &[u8],
569 key: Vec<u8>,
570 ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
571 check_key_size(&key)?;
572 let request = Delete::builder()
573 .table_name(&self.namespace)
574 .set_key(Some(build_key(start_key, key)))
575 .build()?;
576 Ok(TransactWriteItem::builder().delete(request).build())
577 }
578
579 fn build_put_transaction(
580 &self,
581 start_key: &[u8],
582 key: Vec<u8>,
583 value: Vec<u8>,
584 ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
585 check_key_size(&key)?;
586 ensure!(
587 value.len() <= RAW_MAX_VALUE_SIZE,
588 DynamoDbStoreInternalError::ValueLengthTooLarge
589 );
590 let request = Put::builder()
591 .table_name(&self.namespace)
592 .set_item(Some(build_key_value(start_key, key, value)))
593 .build()?;
594 Ok(TransactWriteItem::builder().put(request).build())
595 }
596
597 async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
599 match &self.semaphore {
600 None => None,
601 Some(count) => Some(count.acquire().await),
602 }
603 }
604
605 async fn get_query_output(
606 &self,
607 attribute_str: &str,
608 start_key: &[u8],
609 key_prefix: &[u8],
610 start_key_map: Option<HashMap<String, AttributeValue>>,
611 ) -> Result<QueryOutput, DynamoDbStoreInternalError> {
612 let _guard = self.acquire().await;
613 let start_key = start_key.to_vec();
614 let mut prefixed_key_prefix = vec![1];
615 prefixed_key_prefix.extend(key_prefix);
616 let response = self
617 .client
618 .query()
619 .table_name(&self.namespace)
620 .projection_expression(attribute_str)
621 .key_condition_expression(format!(
622 "{PARTITION_ATTRIBUTE} = :partition and begins_with({KEY_ATTRIBUTE}, :prefix)"
623 ))
624 .expression_attribute_values(":partition", AttributeValue::B(Blob::new(start_key)))
625 .expression_attribute_values(
626 ":prefix",
627 AttributeValue::B(Blob::new(prefixed_key_prefix)),
628 )
629 .set_exclusive_start_key(start_key_map)
630 .send()
631 .boxed_sync()
632 .await?;
633 Ok(response)
634 }
635
636 async fn read_value_bytes_general(
637 &self,
638 key_db: HashMap<String, AttributeValue>,
639 ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
640 let _guard = self.acquire().await;
641 let response = self
642 .client
643 .get_item()
644 .table_name(&self.namespace)
645 .set_key(Some(key_db))
646 .send()
647 .boxed_sync()
648 .await?;
649
650 match response.item {
651 Some(mut item) => {
652 let value = extract_value_owned(&mut item)?;
653 Ok(Some(value))
654 }
655 None => Ok(None),
656 }
657 }
658
659 async fn contains_key_general(
660 &self,
661 key_db: HashMap<String, AttributeValue>,
662 ) -> Result<bool, DynamoDbStoreInternalError> {
663 let _guard = self.acquire().await;
664 let response = self
665 .client
666 .get_item()
667 .table_name(&self.namespace)
668 .set_key(Some(key_db))
669 .projection_expression(PARTITION_ATTRIBUTE)
670 .send()
671 .boxed_sync()
672 .await?;
673
674 Ok(response.item.is_some())
675 }
676
677 async fn get_list_responses(
678 &self,
679 attribute: &str,
680 start_key: &[u8],
681 key_prefix: &[u8],
682 ) -> Result<QueryResponses, DynamoDbStoreInternalError> {
683 check_key_size(key_prefix)?;
684 let mut responses = Vec::new();
685 let mut start_key_map = None;
686 loop {
687 let response = self
688 .get_query_output(attribute, start_key, key_prefix, start_key_map)
689 .await?;
690 let last_evaluated = response.last_evaluated_key.clone();
691 responses.push(response);
692 match last_evaluated {
693 None => {
694 break;
695 }
696 Some(value) => {
697 start_key_map = Some(value);
698 }
699 }
700 }
701 Ok(QueryResponses {
702 prefix_len: key_prefix.len(),
703 responses,
704 })
705 }
706
707 async fn read_batch_values_bytes(
708 &self,
709 keys: &[Vec<u8>],
710 ) -> Result<Vec<Option<Vec<u8>>>, DynamoDbStoreInternalError> {
711 if keys.is_empty() {
713 return Ok(Vec::new());
714 }
715 let mut results = vec![None; keys.len()];
716
717 let mut request_keys = Vec::new();
719 let mut key_to_index = HashMap::<Vec<u8>, Vec<usize>>::new();
720
721 for (i, key) in keys.iter().enumerate() {
722 check_key_size(key)?;
723 let key_attrs = build_key(&self.start_key, key.clone());
724 key_to_index.entry(key.clone()).or_default().push(i);
725 request_keys.push(key_attrs);
726 }
727
728 let keys_and_attributes = KeysAndAttributes::builder()
729 .set_keys(Some(request_keys))
730 .build()?;
731
732 let mut request_items = HashMap::new();
733 request_items.insert(self.namespace.clone(), keys_and_attributes);
734
735 let mut remaining_request_items = Some(request_items);
737
738 while let Some(request_items) = remaining_request_items {
739 if request_items.is_empty() {
741 break;
742 }
743
744 let _guard = self.acquire().await;
745 let response = self
746 .client
747 .batch_get_item()
748 .set_request_items(Some(request_items))
749 .send()
750 .boxed_sync()
751 .await?;
752
753 if let Some(mut responses) = response.responses {
755 if let Some(items) = responses.remove(&self.namespace) {
756 for mut item in items {
757 let key_attr = item
759 .get(KEY_ATTRIBUTE)
760 .ok_or(DynamoDbStoreInternalError::MissingKey)?;
761
762 if let AttributeValue::B(blob) = key_attr {
763 let prefixed_key = blob.as_ref();
764 let key = &prefixed_key[1..]; if let Some(indices) = key_to_index.get(key) {
766 if let Some((&last, rest)) = indices.split_last() {
767 let value = extract_value_owned(&mut item)?;
768 for index in rest {
769 results[*index] = Some(value.clone());
770 }
771 results[last] = Some(value);
772 }
773 }
774 }
775 }
776 }
777 }
778
779 remaining_request_items = response.unprocessed_keys;
781 }
782
783 Ok(results)
784 }
785}
786
787struct QueryResponses {
788 prefix_len: usize,
789 responses: Vec<QueryOutput>,
790}
791
792impl QueryResponses {
793 fn keys(&self) -> impl Iterator<Item = Result<&[u8], DynamoDbStoreInternalError>> {
794 self.responses
795 .iter()
796 .flat_map(|response| response.items.iter().flatten())
797 .map(|item| extract_key(self.prefix_len, item))
798 }
799
800 fn key_values(
801 &self,
802 ) -> impl Iterator<Item = Result<(&[u8], &[u8]), DynamoDbStoreInternalError>> {
803 self.responses
804 .iter()
805 .flat_map(|response| response.items.iter().flatten())
806 .map(|item| extract_key_value(self.prefix_len, item))
807 }
808}
809
810impl WithError for DynamoDbStoreInternal {
811 type Error = DynamoDbStoreInternalError;
812}
813
814impl ReadableKeyValueStore for DynamoDbStoreInternal {
815 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
816
817 fn max_stream_queries(&self) -> usize {
818 self.max_stream_queries
819 }
820
821 fn root_key(&self) -> Result<Vec<u8>, DynamoDbStoreInternalError> {
822 assert!(self.start_key.starts_with(EMPTY_ROOT_KEY));
823 Ok(self.start_key[EMPTY_ROOT_KEY.len()..].to_vec())
824 }
825
826 async fn read_value_bytes(
827 &self,
828 key: &[u8],
829 ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
830 check_key_size(key)?;
831 let key_db = build_key(&self.start_key, key.to_vec());
832 self.read_value_bytes_general(key_db).await
833 }
834
835 async fn contains_key(&self, key: &[u8]) -> Result<bool, DynamoDbStoreInternalError> {
836 check_key_size(key)?;
837 let key_db = build_key(&self.start_key, key.to_vec());
838 self.contains_key_general(key_db).await
839 }
840
841 async fn contains_keys(
842 &self,
843 keys: &[Vec<u8>],
844 ) -> Result<Vec<bool>, DynamoDbStoreInternalError> {
845 let mut handles = Vec::new();
846 for key in keys {
847 check_key_size(key)?;
848 let key_db = build_key(&self.start_key, key.clone());
849 let handle = self.contains_key_general(key_db);
850 handles.push(handle);
851 }
852 join_all(handles)
853 .await
854 .into_iter()
855 .collect::<Result<_, _>>()
856 }
857
858 async fn read_multi_values_bytes(
859 &self,
860 keys: &[Vec<u8>],
861 ) -> Result<Vec<Option<Vec<u8>>>, DynamoDbStoreInternalError> {
862 if keys.is_empty() {
863 return Ok(Vec::new());
864 }
865
866 let handles = keys
867 .chunks(MAX_BATCH_GET_ITEM_SIZE)
868 .map(|key_batch| self.read_batch_values_bytes(key_batch));
869 let results: Vec<_> = join_all(handles)
870 .await
871 .into_iter()
872 .collect::<Result<_, _>>()?;
873 Ok(results.into_iter().flatten().collect())
874 }
875
876 async fn find_keys_by_prefix(
877 &self,
878 key_prefix: &[u8],
879 ) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
880 let result_queries = self
881 .get_list_responses(KEY_ATTRIBUTE, &self.start_key, key_prefix)
882 .await?;
883 result_queries
884 .keys()
885 .map(|key| key.map(|k| k.to_vec()))
886 .collect()
887 }
888
889 async fn find_key_values_by_prefix(
890 &self,
891 key_prefix: &[u8],
892 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, DynamoDbStoreInternalError> {
893 let result_queries = self
894 .get_list_responses(KEY_VALUE_ATTRIBUTE, &self.start_key, key_prefix)
895 .await?;
896 result_queries
897 .key_values()
898 .map(|entry| entry.map(|(key, value)| (key.to_vec(), value.to_vec())))
899 .collect()
900 }
901}
902
903impl DirectWritableKeyValueStore for DynamoDbStoreInternal {
904 const MAX_BATCH_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_SIZE;
905 const MAX_BATCH_TOTAL_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE;
906 const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
907
908 type Batch = SimpleUnorderedBatch;
910
911 async fn write_batch(&self, batch: Self::Batch) -> Result<(), DynamoDbStoreInternalError> {
912 if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
913 let mut builder = TransactionBuilder::new(PARTITION_KEY_ROOT_KEY);
914 builder.insert_put_request(self.start_key.clone(), vec![], self)?;
915 self.client
916 .transact_write_items()
917 .set_transact_items(Some(builder.transactions))
918 .send()
919 .boxed_sync()
920 .await?;
921 }
922 let mut builder = TransactionBuilder::new(&self.start_key);
923 for key in batch.deletions {
924 builder.insert_delete_request(key, self)?;
925 }
926 for (key, value) in batch.insertions {
927 builder.insert_put_request(key, value, self)?;
928 }
929 if !builder.transactions.is_empty() {
930 let _guard = self.acquire().await;
931 self.client
932 .transact_write_items()
933 .set_transact_items(Some(builder.transactions))
934 .send()
935 .boxed_sync()
936 .await?;
937 }
938 Ok(())
939 }
940}
941
942#[derive(Debug, Error)]
944pub enum InvalidNamespace {
945 #[error("Namespace must have at least 3 characters")]
947 TooShort,
948
949 #[error("Namespace must be at most 63 characters")]
951 TooLong,
952
953 #[error("Namespace must only contain lowercase letters, numbers, periods and hyphens")]
955 InvalidCharacter,
956}
957
958#[derive(Debug, Error)]
960pub enum DynamoDbStoreInternalError {
961 #[error(transparent)]
963 Get(#[from] Box<SdkError<GetItemError>>),
964
965 #[error(transparent)]
967 BatchGet(#[from] Box<SdkError<BatchGetItemError>>),
968
969 #[error(transparent)]
971 TransactWriteItem(#[from] Box<SdkError<TransactWriteItemsError>>),
972
973 #[error(transparent)]
975 Query(#[from] Box<SdkError<QueryError>>),
976
977 #[error(transparent)]
979 DeleteTable(#[from] Box<SdkError<DeleteTableError>>),
980
981 #[error(transparent)]
983 ListTables(#[from] Box<SdkError<ListTablesError>>),
984
985 #[error("The transact must have length at most MAX_TRANSACT_WRITE_ITEM_SIZE")]
987 TransactUpperLimitSize,
988
989 #[error("The key must have at most 1024 bytes")]
991 KeyTooLong,
992
993 #[error("The key prefix must have at most 1024 bytes")]
995 KeyPrefixTooLong,
996
997 #[error(transparent)]
999 JournalConsistencyError(#[from] JournalConsistencyError),
1000
1001 #[error("The DynamoDB value should be less than 400 KB")]
1003 ValueLengthTooLarge,
1004
1005 #[error("The stored key attribute is missing")]
1007 MissingKey,
1008
1009 #[error("Key was stored as {0}, but it was expected to be stored as a binary blob")]
1011 WrongKeyType(String),
1012
1013 #[error("The stored value attribute is missing")]
1015 MissingValue,
1016
1017 #[error("Value was stored as {0}, but it was expected to be stored as a binary blob")]
1019 WrongValueType(String),
1020
1021 #[error(transparent)]
1023 BcsError(#[from] bcs::Error),
1024
1025 #[error(transparent)]
1027 InvalidNamespace(#[from] InvalidNamespace),
1028
1029 #[error(transparent)]
1031 CreateTable(#[from] Box<SdkError<CreateTableError>>),
1032
1033 #[error(transparent)]
1035 Build(#[from] Box<BuildError>),
1036}
1037
1038impl<InnerError> From<SdkError<InnerError>> for DynamoDbStoreInternalError
1039where
1040 DynamoDbStoreInternalError: From<Box<SdkError<InnerError>>>,
1041{
1042 fn from(error: SdkError<InnerError>) -> Self {
1043 Box::new(error).into()
1044 }
1045}
1046
1047impl From<BuildError> for DynamoDbStoreInternalError {
1048 fn from(error: BuildError) -> Self {
1049 Box::new(error).into()
1050 }
1051}
1052
1053impl DynamoDbStoreInternalError {
1054 pub fn wrong_key_type(value: &AttributeValue) -> Self {
1060 DynamoDbStoreInternalError::WrongKeyType(Self::type_description_of(value))
1061 }
1062
1063 pub fn wrong_value_type(value: &AttributeValue) -> Self {
1069 DynamoDbStoreInternalError::WrongValueType(Self::type_description_of(value))
1070 }
1071
1072 fn type_description_of(value: &AttributeValue) -> String {
1073 match value {
1074 AttributeValue::B(_) => unreachable!("creating an error type for the correct type"),
1075 AttributeValue::Bool(_) => "a boolean",
1076 AttributeValue::Bs(_) => "a list of binary blobs",
1077 AttributeValue::L(_) => "a list",
1078 AttributeValue::M(_) => "a map",
1079 AttributeValue::N(_) => "a number",
1080 AttributeValue::Ns(_) => "a list of numbers",
1081 AttributeValue::Null(_) => "a null value",
1082 AttributeValue::S(_) => "a string",
1083 AttributeValue::Ss(_) => "a list of strings",
1084 _ => "an unknown type",
1085 }
1086 .to_owned()
1087 }
1088}
1089
1090impl KeyValueStoreError for DynamoDbStoreInternalError {
1091 const BACKEND: &'static str = "dynamo_db";
1092}
1093
1094#[cfg(with_testing)]
1095impl TestKeyValueDatabase for JournalingKeyValueDatabase<DynamoDbDatabaseInternal> {
1096 async fn new_test_config() -> Result<DynamoDbStoreInternalConfig, DynamoDbStoreInternalError> {
1097 Ok(DynamoDbStoreInternalConfig {
1098 use_dynamodb_local: true,
1099 max_concurrent_queries: Some(TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES),
1100 max_stream_queries: TEST_DYNAMO_DB_MAX_STREAM_QUERIES,
1101 })
1102 }
1103}
1104
1105pub type DynamoDbStoreError = ValueSplittingError<DynamoDbStoreInternalError>;
1107
1108pub type DynamoDbStoreConfig = LruCachingConfig<DynamoDbStoreInternalConfig>;
1110
1111#[cfg(with_metrics)]
1113pub type DynamoDbDatabase = MeteredDatabase<
1114 LruCachingDatabase<
1115 MeteredDatabase<
1116 ValueSplittingDatabase<
1117 MeteredDatabase<JournalingKeyValueDatabase<DynamoDbDatabaseInternal>>,
1118 >,
1119 >,
1120 >,
1121>;
1122#[cfg(not(with_metrics))]
1124pub type DynamoDbDatabase = LruCachingDatabase<
1125 ValueSplittingDatabase<JournalingKeyValueDatabase<DynamoDbDatabaseInternal>>,
1126>;
1127
1128#[cfg(test)]
1129mod tests {
1130 use bcs::serialized_size;
1131
1132 use crate::common::get_uleb128_size;
1133
1134 #[test]
1135 fn test_serialization_len() {
1136 for n in [0, 10, 127, 128, 129, 16383, 16384, 20000] {
1137 let vec = vec![0u8; n];
1138 let est_size = get_uleb128_size(n) + n;
1139 let serial_size = serialized_size(&vec).unwrap();
1140 assert_eq!(est_size, serial_size);
1141 }
1142 }
1143}