1use std::{
7 collections::HashMap,
8 env,
9 sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc,
12 },
13};
14
15use async_lock::{Semaphore, SemaphoreGuard};
16use aws_sdk_dynamodb::{
17 error::SdkError,
18 operation::{
19 create_table::CreateTableError,
20 delete_table::DeleteTableError,
21 get_item::GetItemError,
22 list_tables::ListTablesError,
23 query::{QueryError, QueryOutput},
24 transact_write_items::TransactWriteItemsError,
25 },
26 primitives::Blob,
27 types::{
28 AttributeDefinition, AttributeValue, Delete, KeySchemaElement, KeyType,
29 ProvisionedThroughput, Put, ScalarAttributeType, TransactWriteItem,
30 },
31 Client,
32};
33use aws_smithy_types::error::operation::BuildError;
34use futures::future::join_all;
35use linera_base::ensure;
36use serde::{Deserialize, Serialize};
37use thiserror::Error;
38
39#[cfg(with_metrics)]
40use crate::metering::MeteredStore;
41#[cfg(with_testing)]
42use crate::store::TestKeyValueStore;
43use crate::{
44 batch::SimpleUnorderedBatch,
45 common::get_uleb128_size,
46 journaling::{DirectWritableKeyValueStore, JournalConsistencyError, JournalingKeyValueStore},
47 lru_caching::{LruCachingConfig, LruCachingStore},
48 store::{AdminKeyValueStore, KeyValueStoreError, ReadableKeyValueStore, WithError},
49 value_splitting::{ValueSplittingError, ValueSplittingStore},
50 FutureSyncExt as _,
51};
52
53const DYNAMODB_LOCAL_ENDPOINT: &str = "DYNAMODB_LOCAL_ENDPOINT";
55
56async fn get_base_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError> {
58 let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
59 .boxed_sync()
60 .await;
61 Ok((&base_config).into())
62}
63
64fn get_endpoint_address() -> Option<String> {
65 env::var(DYNAMODB_LOCAL_ENDPOINT).ok()
66}
67
68async fn get_dynamodb_local_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError>
70{
71 let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
72 .boxed_sync()
73 .await;
74 let endpoint_address = get_endpoint_address().unwrap();
75 let config = aws_sdk_dynamodb::config::Builder::from(&base_config)
76 .endpoint_url(endpoint_address)
77 .build();
78 Ok(config)
79}
80
81const PARTITION_KEY_ROOT_KEY: &[u8] = &[1];
86
87const PARTITION_ATTRIBUTE: &str = "item_partition";
89
90const EMPTY_ROOT_KEY: &[u8] = &[0];
92
93const DB_KEY: &[u8] = &[0];
95
96const KEY_ATTRIBUTE: &str = "item_key";
98
99const VALUE_ATTRIBUTE: &str = "item_value";
101
102const KEY_VALUE_ATTRIBUTE: &str = "item_key, item_value";
104
105const RAW_MAX_VALUE_SIZE: usize = 409600;
108
109const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
123 - MAX_KEY_SIZE
124 - get_uleb128_size(RAW_MAX_VALUE_SIZE)
125 - get_uleb128_size(MAX_KEY_SIZE)
126 - 1
127 - 1;
128
129const MAX_KEY_SIZE: usize = 1024;
132
133const MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE: usize = 4000000;
137
138#[cfg(with_testing)]
142const TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES: usize = 10;
143
144#[cfg(with_testing)]
146const TEST_DYNAMO_DB_MAX_STREAM_QUERIES: usize = 10;
147
148const MAX_TRANSACT_WRITE_ITEM_SIZE: usize = 100;
151
152fn extend_root_key(root_key: &[u8]) -> Vec<u8> {
154 let mut start_key = EMPTY_ROOT_KEY.to_vec();
155 start_key.extend(root_key);
156 start_key
157}
158
159fn build_key(start_key: &[u8], key: Vec<u8>) -> HashMap<String, AttributeValue> {
168 [
169 (
170 PARTITION_ATTRIBUTE.to_owned(),
171 AttributeValue::B(Blob::new(start_key.to_vec())),
172 ),
173 (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
174 ]
175 .into()
176}
177
178fn build_key_value(
180 start_key: &[u8],
181 key: Vec<u8>,
182 value: Vec<u8>,
183) -> HashMap<String, AttributeValue> {
184 [
185 (
186 PARTITION_ATTRIBUTE.to_owned(),
187 AttributeValue::B(Blob::new(start_key.to_vec())),
188 ),
189 (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
190 (
191 VALUE_ATTRIBUTE.to_owned(),
192 AttributeValue::B(Blob::new(value)),
193 ),
194 ]
195 .into()
196}
197
198fn check_key_size(key: &[u8]) -> Result<(), DynamoDbStoreInternalError> {
200 ensure!(!key.is_empty(), DynamoDbStoreInternalError::ZeroLengthKey);
201 ensure!(
202 key.len() <= MAX_KEY_SIZE,
203 DynamoDbStoreInternalError::KeyTooLong
204 );
205 Ok(())
206}
207
208fn extract_key(
210 prefix_len: usize,
211 attributes: &HashMap<String, AttributeValue>,
212) -> Result<&[u8], DynamoDbStoreInternalError> {
213 let key = attributes
214 .get(KEY_ATTRIBUTE)
215 .ok_or(DynamoDbStoreInternalError::MissingKey)?;
216 match key {
217 AttributeValue::B(blob) => Ok(&blob.as_ref()[prefix_len..]),
218 key => Err(DynamoDbStoreInternalError::wrong_key_type(key)),
219 }
220}
221
222fn extract_value(
224 attributes: &HashMap<String, AttributeValue>,
225) -> Result<&[u8], DynamoDbStoreInternalError> {
226 let value = attributes
229 .get(VALUE_ATTRIBUTE)
230 .ok_or(DynamoDbStoreInternalError::MissingValue)?;
231 match value {
232 AttributeValue::B(blob) => Ok(blob.as_ref()),
233 value => Err(DynamoDbStoreInternalError::wrong_value_type(value)),
234 }
235}
236
237fn extract_value_owned(
239 attributes: &mut HashMap<String, AttributeValue>,
240) -> Result<Vec<u8>, DynamoDbStoreInternalError> {
241 let value = attributes
242 .remove(VALUE_ATTRIBUTE)
243 .ok_or(DynamoDbStoreInternalError::MissingValue)?;
244 match value {
245 AttributeValue::B(blob) => Ok(blob.into_inner()),
246 value => Err(DynamoDbStoreInternalError::wrong_value_type(&value)),
247 }
248}
249
250fn extract_key_value(
252 prefix_len: usize,
253 attributes: &HashMap<String, AttributeValue>,
254) -> Result<(&[u8], &[u8]), DynamoDbStoreInternalError> {
255 let key = extract_key(prefix_len, attributes)?;
256 let value = extract_value(attributes)?;
257 Ok((key, value))
258}
259
260struct TransactionBuilder {
261 start_key: Vec<u8>,
262 transactions: Vec<TransactWriteItem>,
263}
264
265impl TransactionBuilder {
266 fn new(start_key: &[u8]) -> Self {
267 Self {
268 start_key: start_key.to_vec(),
269 transactions: Vec::new(),
270 }
271 }
272
273 fn insert_delete_request(
274 &mut self,
275 key: Vec<u8>,
276 store: &DynamoDbStoreInternal,
277 ) -> Result<(), DynamoDbStoreInternalError> {
278 let transaction = store.build_delete_transaction(&self.start_key, key)?;
279 self.transactions.push(transaction);
280 Ok(())
281 }
282
283 fn insert_put_request(
284 &mut self,
285 key: Vec<u8>,
286 value: Vec<u8>,
287 store: &DynamoDbStoreInternal,
288 ) -> Result<(), DynamoDbStoreInternalError> {
289 let transaction = store.build_put_transaction(&self.start_key, key, value)?;
290 self.transactions.push(transaction);
291 Ok(())
292 }
293}
294
295#[derive(Clone, Debug)]
297pub struct DynamoDbStoreInternal {
298 client: Client,
299 namespace: String,
300 semaphore: Option<Arc<Semaphore>>,
301 max_stream_queries: usize,
302 start_key: Vec<u8>,
303 root_key_written: Arc<AtomicBool>,
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize)]
308pub struct DynamoDbStoreInternalConfig {
309 pub use_dynamodb_local: bool,
311 pub max_concurrent_queries: Option<usize>,
313 pub max_stream_queries: usize,
315}
316
317impl DynamoDbStoreInternalConfig {
318 async fn client(&self) -> Result<Client, DynamoDbStoreInternalError> {
319 let config = if self.use_dynamodb_local {
320 get_dynamodb_local_config().await?
321 } else {
322 get_base_config().await?
323 };
324 Ok(Client::from_conf(config))
325 }
326}
327
328impl AdminKeyValueStore for DynamoDbStoreInternal {
329 type Config = DynamoDbStoreInternalConfig;
330
331 fn get_name() -> String {
332 "dynamodb internal".to_string()
333 }
334
335 async fn connect(
336 config: &Self::Config,
337 namespace: &str,
338 ) -> Result<Self, DynamoDbStoreInternalError> {
339 Self::check_namespace(namespace)?;
340 let client = config.client().await?;
341 let semaphore = config
342 .max_concurrent_queries
343 .map(|n| Arc::new(Semaphore::new(n)));
344 let max_stream_queries = config.max_stream_queries;
345 let namespace = namespace.to_string();
346 let start_key = extend_root_key(&[]);
347 let store = Self {
348 client,
349 namespace,
350 semaphore,
351 max_stream_queries,
352 start_key,
353 root_key_written: Arc::new(AtomicBool::new(false)),
354 };
355 Ok(store)
356 }
357
358 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, DynamoDbStoreInternalError> {
359 let client = self.client.clone();
360 let namespace = self.namespace.clone();
361 let semaphore = self.semaphore.clone();
362 let max_stream_queries = self.max_stream_queries;
363 let start_key = extend_root_key(root_key);
364 Ok(Self {
365 client,
366 namespace,
367 semaphore,
368 max_stream_queries,
369 start_key,
370 root_key_written: Arc::new(AtomicBool::new(false)),
371 })
372 }
373
374 async fn list_all(config: &Self::Config) -> Result<Vec<String>, DynamoDbStoreInternalError> {
375 let client = config.client().await?;
376 let mut namespaces = Vec::new();
377 let mut start_table = None;
378 loop {
379 let response = client
380 .list_tables()
381 .set_exclusive_start_table_name(start_table)
382 .send()
383 .boxed_sync()
384 .await?;
385 if let Some(namespaces_blk) = response.table_names {
386 namespaces.extend(namespaces_blk);
387 }
388 if response.last_evaluated_table_name.is_none() {
389 break;
390 } else {
391 start_table = response.last_evaluated_table_name;
392 }
393 }
394 Ok(namespaces)
395 }
396
397 async fn list_root_keys(
398 config: &Self::Config,
399 namespace: &str,
400 ) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
401 let mut store = Self::connect(config, namespace).await?;
402 store.start_key = PARTITION_KEY_ROOT_KEY.to_vec();
403 store.find_keys_by_prefix(EMPTY_ROOT_KEY).await
404 }
405
406 async fn delete_all(config: &Self::Config) -> Result<(), DynamoDbStoreInternalError> {
407 let client = config.client().await?;
408 let tables = Self::list_all(config).await?;
409 for table in tables {
410 client
411 .delete_table()
412 .table_name(&table)
413 .send()
414 .boxed_sync()
415 .await?;
416 }
417 Ok(())
418 }
419
420 async fn exists(
421 config: &Self::Config,
422 namespace: &str,
423 ) -> Result<bool, DynamoDbStoreInternalError> {
424 Self::check_namespace(namespace)?;
425 let client = config.client().await?;
426 let key_db = build_key(EMPTY_ROOT_KEY, DB_KEY.to_vec());
427 let response = client
428 .get_item()
429 .table_name(namespace)
430 .set_key(Some(key_db))
431 .send()
432 .boxed_sync()
433 .await;
434 let Err(error) = response else {
435 return Ok(true);
436 };
437 let test = match &error {
438 SdkError::ServiceError(error) => match error.err() {
439 GetItemError::ResourceNotFoundException(error) => {
440 error.message
441 == Some("Cannot do operations on a non-existent table".to_string())
442 }
443 _ => false,
444 },
445 _ => false,
446 };
447 if test {
448 Ok(false)
449 } else {
450 Err(error.into())
451 }
452 }
453
454 async fn create(
455 config: &Self::Config,
456 namespace: &str,
457 ) -> Result<(), DynamoDbStoreInternalError> {
458 Self::check_namespace(namespace)?;
459 let client = config.client().await?;
460 client
461 .create_table()
462 .table_name(namespace)
463 .attribute_definitions(
464 AttributeDefinition::builder()
465 .attribute_name(PARTITION_ATTRIBUTE)
466 .attribute_type(ScalarAttributeType::B)
467 .build()?,
468 )
469 .attribute_definitions(
470 AttributeDefinition::builder()
471 .attribute_name(KEY_ATTRIBUTE)
472 .attribute_type(ScalarAttributeType::B)
473 .build()?,
474 )
475 .key_schema(
476 KeySchemaElement::builder()
477 .attribute_name(PARTITION_ATTRIBUTE)
478 .key_type(KeyType::Hash)
479 .build()?,
480 )
481 .key_schema(
482 KeySchemaElement::builder()
483 .attribute_name(KEY_ATTRIBUTE)
484 .key_type(KeyType::Range)
485 .build()?,
486 )
487 .provisioned_throughput(
488 ProvisionedThroughput::builder()
489 .read_capacity_units(10)
490 .write_capacity_units(10)
491 .build()?,
492 )
493 .send()
494 .boxed_sync()
495 .await?;
496 Ok(())
497 }
498
499 async fn delete(
500 config: &Self::Config,
501 namespace: &str,
502 ) -> Result<(), DynamoDbStoreInternalError> {
503 Self::check_namespace(namespace)?;
504 let client = config.client().await?;
505 client
506 .delete_table()
507 .table_name(namespace)
508 .send()
509 .boxed_sync()
510 .await?;
511 Ok(())
512 }
513}
514
515impl DynamoDbStoreInternal {
516 fn check_namespace(namespace: &str) -> Result<(), InvalidNamespace> {
520 if namespace.len() < 3 {
521 return Err(InvalidNamespace::TooShort);
522 }
523 if namespace.len() > 255 {
524 return Err(InvalidNamespace::TooLong);
525 }
526 if !namespace.chars().all(|character| {
527 character.is_ascii_alphanumeric()
528 || character == '.'
529 || character == '-'
530 || character == '_'
531 }) {
532 return Err(InvalidNamespace::InvalidCharacter);
533 }
534 Ok(())
535 }
536
537 fn build_delete_transaction(
538 &self,
539 start_key: &[u8],
540 key: Vec<u8>,
541 ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
542 check_key_size(&key)?;
543 let request = Delete::builder()
544 .table_name(&self.namespace)
545 .set_key(Some(build_key(start_key, key)))
546 .build()?;
547 Ok(TransactWriteItem::builder().delete(request).build())
548 }
549
550 fn build_put_transaction(
551 &self,
552 start_key: &[u8],
553 key: Vec<u8>,
554 value: Vec<u8>,
555 ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
556 check_key_size(&key)?;
557 ensure!(
558 value.len() <= RAW_MAX_VALUE_SIZE,
559 DynamoDbStoreInternalError::ValueLengthTooLarge
560 );
561 let request = Put::builder()
562 .table_name(&self.namespace)
563 .set_item(Some(build_key_value(start_key, key, value)))
564 .build()?;
565 Ok(TransactWriteItem::builder().put(request).build())
566 }
567
568 async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
570 match &self.semaphore {
571 None => None,
572 Some(count) => Some(count.acquire().await),
573 }
574 }
575
576 async fn get_query_output(
577 &self,
578 attribute_str: &str,
579 start_key: &[u8],
580 key_prefix: &[u8],
581 start_key_map: Option<HashMap<String, AttributeValue>>,
582 ) -> Result<QueryOutput, DynamoDbStoreInternalError> {
583 let _guard = self.acquire().await;
584 let start_key = start_key.to_vec();
585 let response = self
586 .client
587 .query()
588 .table_name(&self.namespace)
589 .projection_expression(attribute_str)
590 .key_condition_expression(format!(
591 "{PARTITION_ATTRIBUTE} = :partition and begins_with({KEY_ATTRIBUTE}, :prefix)"
592 ))
593 .expression_attribute_values(":partition", AttributeValue::B(Blob::new(start_key)))
594 .expression_attribute_values(":prefix", AttributeValue::B(Blob::new(key_prefix)))
595 .set_exclusive_start_key(start_key_map)
596 .send()
597 .boxed_sync()
598 .await?;
599 Ok(response)
600 }
601
602 async fn read_value_bytes_general(
603 &self,
604 key_db: HashMap<String, AttributeValue>,
605 ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
606 let _guard = self.acquire().await;
607 let response = self
608 .client
609 .get_item()
610 .table_name(&self.namespace)
611 .set_key(Some(key_db))
612 .send()
613 .boxed_sync()
614 .await?;
615
616 match response.item {
617 Some(mut item) => {
618 let value = extract_value_owned(&mut item)?;
619 Ok(Some(value))
620 }
621 None => Ok(None),
622 }
623 }
624
625 async fn contains_key_general(
626 &self,
627 key_db: HashMap<String, AttributeValue>,
628 ) -> Result<bool, DynamoDbStoreInternalError> {
629 let _guard = self.acquire().await;
630 let response = self
631 .client
632 .get_item()
633 .table_name(&self.namespace)
634 .set_key(Some(key_db))
635 .projection_expression(PARTITION_ATTRIBUTE)
636 .send()
637 .boxed_sync()
638 .await?;
639
640 Ok(response.item.is_some())
641 }
642
643 async fn get_list_responses(
644 &self,
645 attribute: &str,
646 start_key: &[u8],
647 key_prefix: &[u8],
648 ) -> Result<QueryResponses, DynamoDbStoreInternalError> {
649 check_key_size(key_prefix)?;
650 let mut responses = Vec::new();
651 let mut start_key_map = None;
652 loop {
653 let response = self
654 .get_query_output(attribute, start_key, key_prefix, start_key_map)
655 .await?;
656 let last_evaluated = response.last_evaluated_key.clone();
657 responses.push(response);
658 match last_evaluated {
659 None => {
660 break;
661 }
662 Some(value) => {
663 start_key_map = Some(value);
664 }
665 }
666 }
667 Ok(QueryResponses {
668 prefix_len: key_prefix.len(),
669 responses,
670 })
671 }
672}
673
674struct QueryResponses {
675 prefix_len: usize,
676 responses: Vec<QueryOutput>,
677}
678
679impl QueryResponses {
680 fn keys(&self) -> impl Iterator<Item = Result<&[u8], DynamoDbStoreInternalError>> {
681 self.responses
682 .iter()
683 .flat_map(|response| response.items.iter().flatten())
684 .map(|item| extract_key(self.prefix_len, item))
685 }
686
687 fn key_values(
688 &self,
689 ) -> impl Iterator<Item = Result<(&[u8], &[u8]), DynamoDbStoreInternalError>> {
690 self.responses
691 .iter()
692 .flat_map(|response| response.items.iter().flatten())
693 .map(|item| extract_key_value(self.prefix_len, item))
694 }
695}
696
697impl WithError for DynamoDbStoreInternal {
698 type Error = DynamoDbStoreInternalError;
699}
700
701impl ReadableKeyValueStore for DynamoDbStoreInternal {
702 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
703
704 fn max_stream_queries(&self) -> usize {
705 self.max_stream_queries
706 }
707
708 async fn read_value_bytes(
709 &self,
710 key: &[u8],
711 ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
712 check_key_size(key)?;
713 let key_db = build_key(&self.start_key, key.to_vec());
714 self.read_value_bytes_general(key_db).await
715 }
716
717 async fn contains_key(&self, key: &[u8]) -> Result<bool, DynamoDbStoreInternalError> {
718 check_key_size(key)?;
719 let key_db = build_key(&self.start_key, key.to_vec());
720 self.contains_key_general(key_db).await
721 }
722
723 async fn contains_keys(
724 &self,
725 keys: Vec<Vec<u8>>,
726 ) -> Result<Vec<bool>, DynamoDbStoreInternalError> {
727 let mut handles = Vec::new();
728 for key in keys {
729 check_key_size(&key)?;
730 let key_db = build_key(&self.start_key, key);
731 let handle = self.contains_key_general(key_db);
732 handles.push(handle);
733 }
734 join_all(handles)
735 .await
736 .into_iter()
737 .collect::<Result<_, _>>()
738 }
739
740 async fn read_multi_values_bytes(
741 &self,
742 keys: Vec<Vec<u8>>,
743 ) -> Result<Vec<Option<Vec<u8>>>, DynamoDbStoreInternalError> {
744 let mut handles = Vec::new();
745 for key in keys {
746 check_key_size(&key)?;
747 let key_db = build_key(&self.start_key, key);
748 let handle = self.read_value_bytes_general(key_db);
749 handles.push(handle);
750 }
751 join_all(handles)
752 .await
753 .into_iter()
754 .collect::<Result<_, _>>()
755 }
756
757 async fn find_keys_by_prefix(
758 &self,
759 key_prefix: &[u8],
760 ) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
761 let result_queries = self
762 .get_list_responses(KEY_ATTRIBUTE, &self.start_key, key_prefix)
763 .await?;
764 result_queries
765 .keys()
766 .map(|key| key.map(|k| k.to_vec()))
767 .collect()
768 }
769
770 async fn find_key_values_by_prefix(
771 &self,
772 key_prefix: &[u8],
773 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, DynamoDbStoreInternalError> {
774 let result_queries = self
775 .get_list_responses(KEY_VALUE_ATTRIBUTE, &self.start_key, key_prefix)
776 .await?;
777 result_queries
778 .key_values()
779 .map(|entry| entry.map(|(key, value)| (key.to_vec(), value.to_vec())))
780 .collect()
781 }
782}
783
784impl DirectWritableKeyValueStore for DynamoDbStoreInternal {
785 const MAX_BATCH_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_SIZE;
786 const MAX_BATCH_TOTAL_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE;
787 const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
788
789 type Batch = SimpleUnorderedBatch;
791
792 async fn write_batch(&self, batch: Self::Batch) -> Result<(), DynamoDbStoreInternalError> {
793 if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
794 let mut builder = TransactionBuilder::new(PARTITION_KEY_ROOT_KEY);
795 builder.insert_put_request(self.start_key.clone(), vec![], self)?;
796 self.client
797 .transact_write_items()
798 .set_transact_items(Some(builder.transactions))
799 .send()
800 .boxed_sync()
801 .await?;
802 }
803 let mut builder = TransactionBuilder::new(&self.start_key);
804 for key in batch.deletions {
805 builder.insert_delete_request(key, self)?;
806 }
807 for (key, value) in batch.insertions {
808 builder.insert_put_request(key, value, self)?;
809 }
810 if !builder.transactions.is_empty() {
811 let _guard = self.acquire().await;
812 self.client
813 .transact_write_items()
814 .set_transact_items(Some(builder.transactions))
815 .send()
816 .boxed_sync()
817 .await?;
818 }
819 Ok(())
820 }
821}
822
823#[derive(Debug, Error)]
825pub enum InvalidNamespace {
826 #[error("Namespace must have at least 3 characters")]
828 TooShort,
829
830 #[error("Namespace must be at most 63 characters")]
832 TooLong,
833
834 #[error("Namespace must only contain lowercase letters, numbers, periods and hyphens")]
836 InvalidCharacter,
837}
838
839#[derive(Debug, Error)]
841pub enum DynamoDbStoreInternalError {
842 #[error(transparent)]
844 Get(#[from] Box<SdkError<GetItemError>>),
845
846 #[error(transparent)]
848 TransactWriteItem(#[from] Box<SdkError<TransactWriteItemsError>>),
849
850 #[error(transparent)]
852 Query(#[from] Box<SdkError<QueryError>>),
853
854 #[error(transparent)]
856 DeleteTable(#[from] Box<SdkError<DeleteTableError>>),
857
858 #[error(transparent)]
860 ListTables(#[from] Box<SdkError<ListTablesError>>),
861
862 #[error("The transact must have length at most MAX_TRANSACT_WRITE_ITEM_SIZE")]
864 TransactUpperLimitSize,
865
866 #[error("The key must be of strictly positive length")]
868 ZeroLengthKey,
869
870 #[error("The key must have at most 1024 bytes")]
872 KeyTooLong,
873
874 #[error("The key prefix must have at most 1024 bytes")]
876 KeyPrefixTooLong,
877
878 #[error("The key_prefix must be of strictly positive length")]
880 ZeroLengthKeyPrefix,
881
882 #[error(transparent)]
884 JournalConsistencyError(#[from] JournalConsistencyError),
885
886 #[error("The DynamoDB value should be less than 400 KB")]
888 ValueLengthTooLarge,
889
890 #[error("The stored key attribute is missing")]
892 MissingKey,
893
894 #[error("Key was stored as {0}, but it was expected to be stored as a binary blob")]
896 WrongKeyType(String),
897
898 #[error("The stored value attribute is missing")]
900 MissingValue,
901
902 #[error("Value was stored as {0}, but it was expected to be stored as a binary blob")]
904 WrongValueType(String),
905
906 #[error(transparent)]
908 BcsError(#[from] bcs::Error),
909
910 #[error(transparent)]
912 InvalidNamespace(#[from] InvalidNamespace),
913
914 #[error(transparent)]
916 CreateTable(#[from] Box<SdkError<CreateTableError>>),
917
918 #[error(transparent)]
920 Build(#[from] Box<BuildError>),
921}
922
923impl<InnerError> From<SdkError<InnerError>> for DynamoDbStoreInternalError
924where
925 DynamoDbStoreInternalError: From<Box<SdkError<InnerError>>>,
926{
927 fn from(error: SdkError<InnerError>) -> Self {
928 Box::new(error).into()
929 }
930}
931
932impl From<BuildError> for DynamoDbStoreInternalError {
933 fn from(error: BuildError) -> Self {
934 Box::new(error).into()
935 }
936}
937
938impl DynamoDbStoreInternalError {
939 pub fn wrong_key_type(value: &AttributeValue) -> Self {
945 DynamoDbStoreInternalError::WrongKeyType(Self::type_description_of(value))
946 }
947
948 pub fn wrong_value_type(value: &AttributeValue) -> Self {
954 DynamoDbStoreInternalError::WrongValueType(Self::type_description_of(value))
955 }
956
957 fn type_description_of(value: &AttributeValue) -> String {
958 match value {
959 AttributeValue::B(_) => unreachable!("creating an error type for the correct type"),
960 AttributeValue::Bool(_) => "a boolean",
961 AttributeValue::Bs(_) => "a list of binary blobs",
962 AttributeValue::L(_) => "a list",
963 AttributeValue::M(_) => "a map",
964 AttributeValue::N(_) => "a number",
965 AttributeValue::Ns(_) => "a list of numbers",
966 AttributeValue::Null(_) => "a null value",
967 AttributeValue::S(_) => "a string",
968 AttributeValue::Ss(_) => "a list of strings",
969 _ => "an unknown type",
970 }
971 .to_owned()
972 }
973}
974
975impl KeyValueStoreError for DynamoDbStoreInternalError {
976 const BACKEND: &'static str = "dynamo_db";
977}
978
979#[cfg(with_testing)]
980impl TestKeyValueStore for JournalingKeyValueStore<DynamoDbStoreInternal> {
981 async fn new_test_config() -> Result<DynamoDbStoreInternalConfig, DynamoDbStoreInternalError> {
982 Ok(DynamoDbStoreInternalConfig {
983 use_dynamodb_local: true,
984 max_concurrent_queries: Some(TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES),
985 max_stream_queries: TEST_DYNAMO_DB_MAX_STREAM_QUERIES,
986 })
987 }
988}
989
990#[cfg(with_metrics)]
992pub type DynamoDbStore = MeteredStore<
993 LruCachingStore<
994 MeteredStore<
995 ValueSplittingStore<MeteredStore<JournalingKeyValueStore<DynamoDbStoreInternal>>>,
996 >,
997 >,
998>;
999
1000#[cfg(not(with_metrics))]
1002pub type DynamoDbStore =
1003 LruCachingStore<ValueSplittingStore<JournalingKeyValueStore<DynamoDbStoreInternal>>>;
1004
1005pub type DynamoDbStoreError = ValueSplittingError<DynamoDbStoreInternalError>;
1007
1008pub type DynamoDbStoreConfig = LruCachingConfig<DynamoDbStoreInternalConfig>;
1010
1011#[cfg(test)]
1012mod tests {
1013 use bcs::serialized_size;
1014
1015 use crate::common::get_uleb128_size;
1016
1017 #[test]
1018 fn test_serialization_len() {
1019 for n in [0, 10, 127, 128, 129, 16383, 16384, 20000] {
1020 let vec = vec![0u8; n];
1021 let est_size = get_uleb128_size(n) + n;
1022 let serial_size = serialized_size(&vec).unwrap();
1023 assert_eq!(est_size, serial_size);
1024 }
1025 }
1026}