1use std::{
7 collections::HashMap,
8 env,
9 sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc,
12 },
13};
14
15use async_lock::{Semaphore, SemaphoreGuard};
16use aws_sdk_dynamodb::{
17 error::SdkError,
18 operation::{
19 batch_get_item::BatchGetItemError,
20 create_table::CreateTableError,
21 delete_table::DeleteTableError,
22 get_item::GetItemError,
23 list_tables::ListTablesError,
24 query::{QueryError, QueryOutput},
25 transact_write_items::TransactWriteItemsError,
26 },
27 primitives::Blob,
28 types::{
29 AttributeDefinition, AttributeValue, Delete, KeySchemaElement, KeyType, KeysAndAttributes,
30 ProvisionedThroughput, Put, ScalarAttributeType, TransactWriteItem,
31 },
32 Client,
33};
34use aws_smithy_types::error::operation::BuildError;
35use futures::future::join_all;
36use linera_base::ensure;
37use serde::{Deserialize, Serialize};
38use thiserror::Error;
39
40#[cfg(with_metrics)]
41use crate::metering::MeteredDatabase;
42#[cfg(with_testing)]
43use crate::store::TestKeyValueDatabase;
44use crate::{
45 batch::SimpleUnorderedBatch,
46 common::get_uleb128_size,
47 journaling::{JournalConsistencyError, JournalingKeyValueDatabase},
48 lru_caching::{LruCachingConfig, LruCachingDatabase},
49 store::{
50 DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
51 WithError,
52 },
53 value_splitting::{ValueSplittingDatabase, ValueSplittingError},
54 FutureSyncExt as _,
55};
56
57const DYNAMODB_LOCAL_ENDPOINT: &str = "DYNAMODB_LOCAL_ENDPOINT";
59
60async fn get_base_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError> {
62 let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
63 .boxed_sync()
64 .await;
65 Ok((&base_config).into())
66}
67
68fn get_endpoint_address() -> Option<String> {
69 env::var(DYNAMODB_LOCAL_ENDPOINT).ok()
70}
71
72async fn get_dynamodb_local_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError>
74{
75 let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
76 .boxed_sync()
77 .await;
78 let endpoint_address = get_endpoint_address().unwrap();
79 let config = aws_sdk_dynamodb::config::Builder::from(&base_config)
80 .endpoint_url(endpoint_address)
81 .build();
82 Ok(config)
83}
84
85const PARTITION_KEY_ROOT_KEY: &[u8] = &[1];
90
91const PARTITION_ATTRIBUTE: &str = "item_partition";
93
94const EMPTY_ROOT_KEY: &[u8] = &[0];
96
97const DB_KEY: &[u8] = &[0];
99
100const KEY_ATTRIBUTE: &str = "item_key";
102
103const VALUE_ATTRIBUTE: &str = "item_value";
105
106const KEY_VALUE_ATTRIBUTE: &str = "item_key, item_value";
108
109const RAW_MAX_VALUE_SIZE: usize = 409600;
112
113const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
127 - MAX_KEY_SIZE
128 - get_uleb128_size(RAW_MAX_VALUE_SIZE)
129 - get_uleb128_size(MAX_KEY_SIZE)
130 - 1
131 - 1;
132
133const MAX_KEY_SIZE: usize = 1024;
136
137const MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE: usize = 4000000;
141
142#[cfg(with_testing)]
146const TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES: usize = 10;
147
148#[cfg(with_testing)]
150const TEST_DYNAMO_DB_MAX_STREAM_QUERIES: usize = 10;
151
152const MAX_TRANSACT_WRITE_ITEM_SIZE: usize = 100;
155
156const MAX_BATCH_GET_ITEM_SIZE: usize = 40;
160
161fn build_key(start_key: &[u8], key: Vec<u8>) -> HashMap<String, AttributeValue> {
170 [
171 (
172 PARTITION_ATTRIBUTE.to_owned(),
173 AttributeValue::B(Blob::new(start_key.to_vec())),
174 ),
175 (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
176 ]
177 .into()
178}
179
180fn build_key_value(
182 start_key: &[u8],
183 key: Vec<u8>,
184 value: Vec<u8>,
185) -> HashMap<String, AttributeValue> {
186 [
187 (
188 PARTITION_ATTRIBUTE.to_owned(),
189 AttributeValue::B(Blob::new(start_key.to_vec())),
190 ),
191 (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
192 (
193 VALUE_ATTRIBUTE.to_owned(),
194 AttributeValue::B(Blob::new(value)),
195 ),
196 ]
197 .into()
198}
199
200fn check_key_size(key: &[u8]) -> Result<(), DynamoDbStoreInternalError> {
202 ensure!(!key.is_empty(), DynamoDbStoreInternalError::ZeroLengthKey);
203 ensure!(
204 key.len() <= MAX_KEY_SIZE,
205 DynamoDbStoreInternalError::KeyTooLong
206 );
207 Ok(())
208}
209
210fn extract_key(
212 prefix_len: usize,
213 attributes: &HashMap<String, AttributeValue>,
214) -> Result<&[u8], DynamoDbStoreInternalError> {
215 let key = attributes
216 .get(KEY_ATTRIBUTE)
217 .ok_or(DynamoDbStoreInternalError::MissingKey)?;
218 match key {
219 AttributeValue::B(blob) => Ok(&blob.as_ref()[prefix_len..]),
220 key => Err(DynamoDbStoreInternalError::wrong_key_type(key)),
221 }
222}
223
224fn extract_value(
226 attributes: &HashMap<String, AttributeValue>,
227) -> Result<&[u8], DynamoDbStoreInternalError> {
228 let value = attributes
231 .get(VALUE_ATTRIBUTE)
232 .ok_or(DynamoDbStoreInternalError::MissingValue)?;
233 match value {
234 AttributeValue::B(blob) => Ok(blob.as_ref()),
235 value => Err(DynamoDbStoreInternalError::wrong_value_type(value)),
236 }
237}
238
239fn extract_value_owned(
241 attributes: &mut HashMap<String, AttributeValue>,
242) -> Result<Vec<u8>, DynamoDbStoreInternalError> {
243 let value = attributes
244 .remove(VALUE_ATTRIBUTE)
245 .ok_or(DynamoDbStoreInternalError::MissingValue)?;
246 match value {
247 AttributeValue::B(blob) => Ok(blob.into_inner()),
248 value => Err(DynamoDbStoreInternalError::wrong_value_type(&value)),
249 }
250}
251
252fn extract_key_value(
254 prefix_len: usize,
255 attributes: &HashMap<String, AttributeValue>,
256) -> Result<(&[u8], &[u8]), DynamoDbStoreInternalError> {
257 let key = extract_key(prefix_len, attributes)?;
258 let value = extract_value(attributes)?;
259 Ok((key, value))
260}
261
262struct TransactionBuilder {
263 start_key: Vec<u8>,
264 transactions: Vec<TransactWriteItem>,
265}
266
267impl TransactionBuilder {
268 fn new(start_key: &[u8]) -> Self {
269 Self {
270 start_key: start_key.to_vec(),
271 transactions: Vec::new(),
272 }
273 }
274
275 fn insert_delete_request(
276 &mut self,
277 key: Vec<u8>,
278 store: &DynamoDbStoreInternal,
279 ) -> Result<(), DynamoDbStoreInternalError> {
280 let transaction = store.build_delete_transaction(&self.start_key, key)?;
281 self.transactions.push(transaction);
282 Ok(())
283 }
284
285 fn insert_put_request(
286 &mut self,
287 key: Vec<u8>,
288 value: Vec<u8>,
289 store: &DynamoDbStoreInternal,
290 ) -> Result<(), DynamoDbStoreInternalError> {
291 let transaction = store.build_put_transaction(&self.start_key, key, value)?;
292 self.transactions.push(transaction);
293 Ok(())
294 }
295}
296
297#[derive(Clone, Debug)]
299pub struct DynamoDbStoreInternal {
300 client: Client,
301 namespace: String,
302 semaphore: Option<Arc<Semaphore>>,
303 max_stream_queries: usize,
304 start_key: Vec<u8>,
305 root_key_written: Arc<AtomicBool>,
306}
307
308#[derive(Clone)]
310pub struct DynamoDbDatabaseInternal {
311 client: Client,
312 namespace: String,
313 semaphore: Option<Arc<Semaphore>>,
314 max_stream_queries: usize,
315}
316
317impl WithError for DynamoDbDatabaseInternal {
318 type Error = DynamoDbStoreInternalError;
319}
320
321#[derive(Debug, Clone, Serialize, Deserialize)]
323pub struct DynamoDbStoreInternalConfig {
324 pub use_dynamodb_local: bool,
326 pub max_concurrent_queries: Option<usize>,
328 pub max_stream_queries: usize,
330}
331
332impl DynamoDbStoreInternalConfig {
333 async fn client(&self) -> Result<Client, DynamoDbStoreInternalError> {
334 let config = if self.use_dynamodb_local {
335 get_dynamodb_local_config().await?
336 } else {
337 get_base_config().await?
338 };
339 Ok(Client::from_conf(config))
340 }
341}
342
343impl KeyValueDatabase for DynamoDbDatabaseInternal {
344 type Config = DynamoDbStoreInternalConfig;
345 type Store = DynamoDbStoreInternal;
346
347 fn get_name() -> String {
348 "dynamodb internal".to_string()
349 }
350
351 async fn connect(
352 config: &Self::Config,
353 namespace: &str,
354 ) -> Result<Self, DynamoDbStoreInternalError> {
355 Self::check_namespace(namespace)?;
356 let client = config.client().await?;
357 let semaphore = config
358 .max_concurrent_queries
359 .map(|n| Arc::new(Semaphore::new(n)));
360 let max_stream_queries = config.max_stream_queries;
361 let namespace = namespace.to_string();
362 let store = Self {
363 client,
364 namespace,
365 semaphore,
366 max_stream_queries,
367 };
368 Ok(store)
369 }
370
371 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, DynamoDbStoreInternalError> {
372 let mut start_key = EMPTY_ROOT_KEY.to_vec();
373 start_key.extend(root_key);
374 Ok(self.open_internal(start_key))
375 }
376
377 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, DynamoDbStoreInternalError> {
378 self.open_shared(root_key)
379 }
380
381 async fn list_all(config: &Self::Config) -> Result<Vec<String>, DynamoDbStoreInternalError> {
382 let client = config.client().await?;
383 let mut namespaces = Vec::new();
384 let mut start_table = None;
385 loop {
386 let response = client
387 .list_tables()
388 .set_exclusive_start_table_name(start_table)
389 .send()
390 .boxed_sync()
391 .await?;
392 if let Some(namespaces_blk) = response.table_names {
393 namespaces.extend(namespaces_blk);
394 }
395 if response.last_evaluated_table_name.is_none() {
396 break;
397 } else {
398 start_table = response.last_evaluated_table_name;
399 }
400 }
401 Ok(namespaces)
402 }
403
404 async fn list_root_keys(
405 config: &Self::Config,
406 namespace: &str,
407 ) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
408 let database = Self::connect(config, namespace).await?;
409 let store = database.open_internal(PARTITION_KEY_ROOT_KEY.to_vec());
410 store.find_keys_by_prefix(EMPTY_ROOT_KEY).await
411 }
412
413 async fn delete_all(config: &Self::Config) -> Result<(), DynamoDbStoreInternalError> {
414 let client = config.client().await?;
415 let tables = Self::list_all(config).await?;
416 for table in tables {
417 client
418 .delete_table()
419 .table_name(&table)
420 .send()
421 .boxed_sync()
422 .await?;
423 }
424 Ok(())
425 }
426
427 async fn exists(
428 config: &Self::Config,
429 namespace: &str,
430 ) -> Result<bool, DynamoDbStoreInternalError> {
431 Self::check_namespace(namespace)?;
432 let client = config.client().await?;
433 let key_db = build_key(EMPTY_ROOT_KEY, DB_KEY.to_vec());
434 let response = client
435 .get_item()
436 .table_name(namespace)
437 .set_key(Some(key_db))
438 .send()
439 .boxed_sync()
440 .await;
441 let Err(error) = response else {
442 return Ok(true);
443 };
444 let test = match &error {
445 SdkError::ServiceError(error) => match error.err() {
446 GetItemError::ResourceNotFoundException(error) => {
447 error.message
448 == Some("Cannot do operations on a non-existent table".to_string())
449 }
450 _ => false,
451 },
452 _ => false,
453 };
454 if test {
455 Ok(false)
456 } else {
457 Err(error.into())
458 }
459 }
460
461 async fn create(
462 config: &Self::Config,
463 namespace: &str,
464 ) -> Result<(), DynamoDbStoreInternalError> {
465 Self::check_namespace(namespace)?;
466 let client = config.client().await?;
467 client
468 .create_table()
469 .table_name(namespace)
470 .attribute_definitions(
471 AttributeDefinition::builder()
472 .attribute_name(PARTITION_ATTRIBUTE)
473 .attribute_type(ScalarAttributeType::B)
474 .build()?,
475 )
476 .attribute_definitions(
477 AttributeDefinition::builder()
478 .attribute_name(KEY_ATTRIBUTE)
479 .attribute_type(ScalarAttributeType::B)
480 .build()?,
481 )
482 .key_schema(
483 KeySchemaElement::builder()
484 .attribute_name(PARTITION_ATTRIBUTE)
485 .key_type(KeyType::Hash)
486 .build()?,
487 )
488 .key_schema(
489 KeySchemaElement::builder()
490 .attribute_name(KEY_ATTRIBUTE)
491 .key_type(KeyType::Range)
492 .build()?,
493 )
494 .provisioned_throughput(
495 ProvisionedThroughput::builder()
496 .read_capacity_units(10)
497 .write_capacity_units(10)
498 .build()?,
499 )
500 .send()
501 .boxed_sync()
502 .await?;
503 Ok(())
504 }
505
506 async fn delete(
507 config: &Self::Config,
508 namespace: &str,
509 ) -> Result<(), DynamoDbStoreInternalError> {
510 Self::check_namespace(namespace)?;
511 let client = config.client().await?;
512 client
513 .delete_table()
514 .table_name(namespace)
515 .send()
516 .boxed_sync()
517 .await?;
518 Ok(())
519 }
520}
521
522impl DynamoDbDatabaseInternal {
523 fn check_namespace(namespace: &str) -> Result<(), InvalidNamespace> {
527 if namespace.len() < 3 {
528 return Err(InvalidNamespace::TooShort);
529 }
530 if namespace.len() > 255 {
531 return Err(InvalidNamespace::TooLong);
532 }
533 if !namespace.chars().all(|character| {
534 character.is_ascii_alphanumeric()
535 || character == '.'
536 || character == '-'
537 || character == '_'
538 }) {
539 return Err(InvalidNamespace::InvalidCharacter);
540 }
541 Ok(())
542 }
543
544 fn open_internal(&self, start_key: Vec<u8>) -> DynamoDbStoreInternal {
545 let client = self.client.clone();
546 let namespace = self.namespace.clone();
547 let semaphore = self.semaphore.clone();
548 let max_stream_queries = self.max_stream_queries;
549 DynamoDbStoreInternal {
550 client,
551 namespace,
552 semaphore,
553 max_stream_queries,
554 start_key,
555 root_key_written: Arc::new(AtomicBool::new(false)),
556 }
557 }
558}
559
560impl DynamoDbStoreInternal {
561 fn build_delete_transaction(
562 &self,
563 start_key: &[u8],
564 key: Vec<u8>,
565 ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
566 check_key_size(&key)?;
567 let request = Delete::builder()
568 .table_name(&self.namespace)
569 .set_key(Some(build_key(start_key, key)))
570 .build()?;
571 Ok(TransactWriteItem::builder().delete(request).build())
572 }
573
574 fn build_put_transaction(
575 &self,
576 start_key: &[u8],
577 key: Vec<u8>,
578 value: Vec<u8>,
579 ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
580 check_key_size(&key)?;
581 ensure!(
582 value.len() <= RAW_MAX_VALUE_SIZE,
583 DynamoDbStoreInternalError::ValueLengthTooLarge
584 );
585 let request = Put::builder()
586 .table_name(&self.namespace)
587 .set_item(Some(build_key_value(start_key, key, value)))
588 .build()?;
589 Ok(TransactWriteItem::builder().put(request).build())
590 }
591
592 async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
594 match &self.semaphore {
595 None => None,
596 Some(count) => Some(count.acquire().await),
597 }
598 }
599
600 async fn get_query_output(
601 &self,
602 attribute_str: &str,
603 start_key: &[u8],
604 key_prefix: &[u8],
605 start_key_map: Option<HashMap<String, AttributeValue>>,
606 ) -> Result<QueryOutput, DynamoDbStoreInternalError> {
607 let _guard = self.acquire().await;
608 let start_key = start_key.to_vec();
609 let response = self
610 .client
611 .query()
612 .table_name(&self.namespace)
613 .projection_expression(attribute_str)
614 .key_condition_expression(format!(
615 "{PARTITION_ATTRIBUTE} = :partition and begins_with({KEY_ATTRIBUTE}, :prefix)"
616 ))
617 .expression_attribute_values(":partition", AttributeValue::B(Blob::new(start_key)))
618 .expression_attribute_values(":prefix", AttributeValue::B(Blob::new(key_prefix)))
619 .set_exclusive_start_key(start_key_map)
620 .send()
621 .boxed_sync()
622 .await?;
623 Ok(response)
624 }
625
626 async fn read_value_bytes_general(
627 &self,
628 key_db: HashMap<String, AttributeValue>,
629 ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
630 let _guard = self.acquire().await;
631 let response = self
632 .client
633 .get_item()
634 .table_name(&self.namespace)
635 .set_key(Some(key_db))
636 .send()
637 .boxed_sync()
638 .await?;
639
640 match response.item {
641 Some(mut item) => {
642 let value = extract_value_owned(&mut item)?;
643 Ok(Some(value))
644 }
645 None => Ok(None),
646 }
647 }
648
649 async fn contains_key_general(
650 &self,
651 key_db: HashMap<String, AttributeValue>,
652 ) -> Result<bool, DynamoDbStoreInternalError> {
653 let _guard = self.acquire().await;
654 let response = self
655 .client
656 .get_item()
657 .table_name(&self.namespace)
658 .set_key(Some(key_db))
659 .projection_expression(PARTITION_ATTRIBUTE)
660 .send()
661 .boxed_sync()
662 .await?;
663
664 Ok(response.item.is_some())
665 }
666
667 async fn get_list_responses(
668 &self,
669 attribute: &str,
670 start_key: &[u8],
671 key_prefix: &[u8],
672 ) -> Result<QueryResponses, DynamoDbStoreInternalError> {
673 check_key_size(key_prefix)?;
674 let mut responses = Vec::new();
675 let mut start_key_map = None;
676 loop {
677 let response = self
678 .get_query_output(attribute, start_key, key_prefix, start_key_map)
679 .await?;
680 let last_evaluated = response.last_evaluated_key.clone();
681 responses.push(response);
682 match last_evaluated {
683 None => {
684 break;
685 }
686 Some(value) => {
687 start_key_map = Some(value);
688 }
689 }
690 }
691 Ok(QueryResponses {
692 prefix_len: key_prefix.len(),
693 responses,
694 })
695 }
696
697 async fn read_batch_values_bytes(
698 &self,
699 keys: &[Vec<u8>],
700 ) -> Result<Vec<Option<Vec<u8>>>, DynamoDbStoreInternalError> {
701 if keys.is_empty() {
703 return Ok(Vec::new());
704 }
705 let mut results = vec![None; keys.len()];
706
707 let mut request_keys = Vec::new();
709 let mut key_to_index = HashMap::<Vec<u8>, Vec<usize>>::new();
710
711 for (i, key) in keys.iter().enumerate() {
712 check_key_size(key)?;
713 let key_attrs = build_key(&self.start_key, key.clone());
714 key_to_index.entry(key.clone()).or_default().push(i);
715 request_keys.push(key_attrs);
716 }
717
718 let keys_and_attributes = KeysAndAttributes::builder()
719 .set_keys(Some(request_keys))
720 .build()?;
721
722 let mut request_items = HashMap::new();
723 request_items.insert(self.namespace.clone(), keys_and_attributes);
724
725 let mut remaining_request_items = Some(request_items);
727
728 while let Some(request_items) = remaining_request_items {
729 if request_items.is_empty() {
731 break;
732 }
733
734 let _guard = self.acquire().await;
735 let response = self
736 .client
737 .batch_get_item()
738 .set_request_items(Some(request_items))
739 .send()
740 .boxed_sync()
741 .await?;
742
743 if let Some(mut responses) = response.responses {
745 if let Some(items) = responses.remove(&self.namespace) {
746 for mut item in items {
747 let key_attr = item
749 .get(KEY_ATTRIBUTE)
750 .ok_or(DynamoDbStoreInternalError::MissingKey)?;
751
752 if let AttributeValue::B(blob) = key_attr {
753 let key = blob.as_ref();
754 if let Some(indices) = key_to_index.get(key) {
755 if let Some((&last, rest)) = indices.split_last() {
756 let value = extract_value_owned(&mut item)?;
757 for index in rest {
758 results[*index] = Some(value.clone());
759 }
760 results[last] = Some(value);
761 }
762 }
763 }
764 }
765 }
766 }
767
768 remaining_request_items = response.unprocessed_keys;
770 }
771
772 Ok(results)
773 }
774}
775
776struct QueryResponses {
777 prefix_len: usize,
778 responses: Vec<QueryOutput>,
779}
780
781impl QueryResponses {
782 fn keys(&self) -> impl Iterator<Item = Result<&[u8], DynamoDbStoreInternalError>> {
783 self.responses
784 .iter()
785 .flat_map(|response| response.items.iter().flatten())
786 .map(|item| extract_key(self.prefix_len, item))
787 }
788
789 fn key_values(
790 &self,
791 ) -> impl Iterator<Item = Result<(&[u8], &[u8]), DynamoDbStoreInternalError>> {
792 self.responses
793 .iter()
794 .flat_map(|response| response.items.iter().flatten())
795 .map(|item| extract_key_value(self.prefix_len, item))
796 }
797}
798
799impl WithError for DynamoDbStoreInternal {
800 type Error = DynamoDbStoreInternalError;
801}
802
803impl ReadableKeyValueStore for DynamoDbStoreInternal {
804 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
805
806 fn max_stream_queries(&self) -> usize {
807 self.max_stream_queries
808 }
809
810 async fn read_value_bytes(
811 &self,
812 key: &[u8],
813 ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
814 check_key_size(key)?;
815 let key_db = build_key(&self.start_key, key.to_vec());
816 self.read_value_bytes_general(key_db).await
817 }
818
819 async fn contains_key(&self, key: &[u8]) -> Result<bool, DynamoDbStoreInternalError> {
820 check_key_size(key)?;
821 let key_db = build_key(&self.start_key, key.to_vec());
822 self.contains_key_general(key_db).await
823 }
824
825 async fn contains_keys(
826 &self,
827 keys: Vec<Vec<u8>>,
828 ) -> Result<Vec<bool>, DynamoDbStoreInternalError> {
829 let mut handles = Vec::new();
830 for key in keys {
831 check_key_size(&key)?;
832 let key_db = build_key(&self.start_key, key);
833 let handle = self.contains_key_general(key_db);
834 handles.push(handle);
835 }
836 join_all(handles)
837 .await
838 .into_iter()
839 .collect::<Result<_, _>>()
840 }
841
842 async fn read_multi_values_bytes(
843 &self,
844 keys: Vec<Vec<u8>>,
845 ) -> Result<Vec<Option<Vec<u8>>>, DynamoDbStoreInternalError> {
846 if keys.is_empty() {
847 return Ok(Vec::new());
848 }
849
850 let handles = keys
851 .chunks(MAX_BATCH_GET_ITEM_SIZE)
852 .map(|key_batch| self.read_batch_values_bytes(key_batch));
853 let results: Vec<_> = join_all(handles)
854 .await
855 .into_iter()
856 .collect::<Result<_, _>>()?;
857 Ok(results.into_iter().flatten().collect())
858 }
859
860 async fn find_keys_by_prefix(
861 &self,
862 key_prefix: &[u8],
863 ) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
864 let result_queries = self
865 .get_list_responses(KEY_ATTRIBUTE, &self.start_key, key_prefix)
866 .await?;
867 result_queries
868 .keys()
869 .map(|key| key.map(|k| k.to_vec()))
870 .collect()
871 }
872
873 async fn find_key_values_by_prefix(
874 &self,
875 key_prefix: &[u8],
876 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, DynamoDbStoreInternalError> {
877 let result_queries = self
878 .get_list_responses(KEY_VALUE_ATTRIBUTE, &self.start_key, key_prefix)
879 .await?;
880 result_queries
881 .key_values()
882 .map(|entry| entry.map(|(key, value)| (key.to_vec(), value.to_vec())))
883 .collect()
884 }
885}
886
887impl DirectWritableKeyValueStore for DynamoDbStoreInternal {
888 const MAX_BATCH_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_SIZE;
889 const MAX_BATCH_TOTAL_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE;
890 const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
891
892 type Batch = SimpleUnorderedBatch;
894
895 async fn write_batch(&self, batch: Self::Batch) -> Result<(), DynamoDbStoreInternalError> {
896 if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
897 let mut builder = TransactionBuilder::new(PARTITION_KEY_ROOT_KEY);
898 builder.insert_put_request(self.start_key.clone(), vec![], self)?;
899 self.client
900 .transact_write_items()
901 .set_transact_items(Some(builder.transactions))
902 .send()
903 .boxed_sync()
904 .await?;
905 }
906 let mut builder = TransactionBuilder::new(&self.start_key);
907 for key in batch.deletions {
908 builder.insert_delete_request(key, self)?;
909 }
910 for (key, value) in batch.insertions {
911 builder.insert_put_request(key, value, self)?;
912 }
913 if !builder.transactions.is_empty() {
914 let _guard = self.acquire().await;
915 self.client
916 .transact_write_items()
917 .set_transact_items(Some(builder.transactions))
918 .send()
919 .boxed_sync()
920 .await?;
921 }
922 Ok(())
923 }
924}
925
926#[derive(Debug, Error)]
928pub enum InvalidNamespace {
929 #[error("Namespace must have at least 3 characters")]
931 TooShort,
932
933 #[error("Namespace must be at most 63 characters")]
935 TooLong,
936
937 #[error("Namespace must only contain lowercase letters, numbers, periods and hyphens")]
939 InvalidCharacter,
940}
941
942#[derive(Debug, Error)]
944pub enum DynamoDbStoreInternalError {
945 #[error(transparent)]
947 Get(#[from] Box<SdkError<GetItemError>>),
948
949 #[error(transparent)]
951 BatchGet(#[from] Box<SdkError<BatchGetItemError>>),
952
953 #[error(transparent)]
955 TransactWriteItem(#[from] Box<SdkError<TransactWriteItemsError>>),
956
957 #[error(transparent)]
959 Query(#[from] Box<SdkError<QueryError>>),
960
961 #[error(transparent)]
963 DeleteTable(#[from] Box<SdkError<DeleteTableError>>),
964
965 #[error(transparent)]
967 ListTables(#[from] Box<SdkError<ListTablesError>>),
968
969 #[error("The transact must have length at most MAX_TRANSACT_WRITE_ITEM_SIZE")]
971 TransactUpperLimitSize,
972
973 #[error("The key must be of strictly positive length")]
975 ZeroLengthKey,
976
977 #[error("The key must have at most 1024 bytes")]
979 KeyTooLong,
980
981 #[error("The key prefix must have at most 1024 bytes")]
983 KeyPrefixTooLong,
984
985 #[error("The key_prefix must be of strictly positive length")]
987 ZeroLengthKeyPrefix,
988
989 #[error(transparent)]
991 JournalConsistencyError(#[from] JournalConsistencyError),
992
993 #[error("The DynamoDB value should be less than 400 KB")]
995 ValueLengthTooLarge,
996
997 #[error("The stored key attribute is missing")]
999 MissingKey,
1000
1001 #[error("Key was stored as {0}, but it was expected to be stored as a binary blob")]
1003 WrongKeyType(String),
1004
1005 #[error("The stored value attribute is missing")]
1007 MissingValue,
1008
1009 #[error("Value was stored as {0}, but it was expected to be stored as a binary blob")]
1011 WrongValueType(String),
1012
1013 #[error(transparent)]
1015 BcsError(#[from] bcs::Error),
1016
1017 #[error(transparent)]
1019 InvalidNamespace(#[from] InvalidNamespace),
1020
1021 #[error(transparent)]
1023 CreateTable(#[from] Box<SdkError<CreateTableError>>),
1024
1025 #[error(transparent)]
1027 Build(#[from] Box<BuildError>),
1028}
1029
1030impl<InnerError> From<SdkError<InnerError>> for DynamoDbStoreInternalError
1031where
1032 DynamoDbStoreInternalError: From<Box<SdkError<InnerError>>>,
1033{
1034 fn from(error: SdkError<InnerError>) -> Self {
1035 Box::new(error).into()
1036 }
1037}
1038
1039impl From<BuildError> for DynamoDbStoreInternalError {
1040 fn from(error: BuildError) -> Self {
1041 Box::new(error).into()
1042 }
1043}
1044
1045impl DynamoDbStoreInternalError {
1046 pub fn wrong_key_type(value: &AttributeValue) -> Self {
1052 DynamoDbStoreInternalError::WrongKeyType(Self::type_description_of(value))
1053 }
1054
1055 pub fn wrong_value_type(value: &AttributeValue) -> Self {
1061 DynamoDbStoreInternalError::WrongValueType(Self::type_description_of(value))
1062 }
1063
1064 fn type_description_of(value: &AttributeValue) -> String {
1065 match value {
1066 AttributeValue::B(_) => unreachable!("creating an error type for the correct type"),
1067 AttributeValue::Bool(_) => "a boolean",
1068 AttributeValue::Bs(_) => "a list of binary blobs",
1069 AttributeValue::L(_) => "a list",
1070 AttributeValue::M(_) => "a map",
1071 AttributeValue::N(_) => "a number",
1072 AttributeValue::Ns(_) => "a list of numbers",
1073 AttributeValue::Null(_) => "a null value",
1074 AttributeValue::S(_) => "a string",
1075 AttributeValue::Ss(_) => "a list of strings",
1076 _ => "an unknown type",
1077 }
1078 .to_owned()
1079 }
1080}
1081
1082impl KeyValueStoreError for DynamoDbStoreInternalError {
1083 const BACKEND: &'static str = "dynamo_db";
1084}
1085
1086#[cfg(with_testing)]
1087impl TestKeyValueDatabase for JournalingKeyValueDatabase<DynamoDbDatabaseInternal> {
1088 async fn new_test_config() -> Result<DynamoDbStoreInternalConfig, DynamoDbStoreInternalError> {
1089 Ok(DynamoDbStoreInternalConfig {
1090 use_dynamodb_local: true,
1091 max_concurrent_queries: Some(TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES),
1092 max_stream_queries: TEST_DYNAMO_DB_MAX_STREAM_QUERIES,
1093 })
1094 }
1095}
1096
1097pub type DynamoDbStoreError = ValueSplittingError<DynamoDbStoreInternalError>;
1099
1100pub type DynamoDbStoreConfig = LruCachingConfig<DynamoDbStoreInternalConfig>;
1102
1103#[cfg(with_metrics)]
1105pub type DynamoDbDatabase = MeteredDatabase<
1106 LruCachingDatabase<
1107 MeteredDatabase<
1108 ValueSplittingDatabase<
1109 MeteredDatabase<JournalingKeyValueDatabase<DynamoDbDatabaseInternal>>,
1110 >,
1111 >,
1112 >,
1113>;
1114#[cfg(not(with_metrics))]
1116pub type DynamoDbDatabase = LruCachingDatabase<
1117 ValueSplittingDatabase<JournalingKeyValueDatabase<DynamoDbDatabaseInternal>>,
1118>;
1119
1120#[cfg(test)]
1121mod tests {
1122 use bcs::serialized_size;
1123
1124 use crate::common::get_uleb128_size;
1125
1126 #[test]
1127 fn test_serialization_len() {
1128 for n in [0, 10, 127, 128, 129, 16383, 16384, 20000] {
1129 let vec = vec![0u8; n];
1130 let est_size = get_uleb128_size(n) + n;
1131 let serial_size = serialized_size(&vec).unwrap();
1132 assert_eq!(est_size, serial_size);
1133 }
1134 }
1135}