1use std::{
7 collections::HashMap,
8 env,
9 sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc,
12 },
13};
14
15use async_lock::{Semaphore, SemaphoreGuard};
16use aws_sdk_dynamodb::{
17 error::SdkError,
18 operation::{
19 create_table::CreateTableError,
20 delete_table::DeleteTableError,
21 get_item::GetItemError,
22 list_tables::ListTablesError,
23 query::{QueryError, QueryOutput},
24 transact_write_items::TransactWriteItemsError,
25 },
26 primitives::Blob,
27 types::{
28 AttributeDefinition, AttributeValue, Delete, KeySchemaElement, KeyType,
29 ProvisionedThroughput, Put, ScalarAttributeType, TransactWriteItem,
30 },
31 Client,
32};
33use aws_smithy_types::error::operation::BuildError;
34use futures::future::join_all;
35use linera_base::ensure;
36use serde::{Deserialize, Serialize};
37use thiserror::Error;
38
39#[cfg(with_metrics)]
40use crate::metering::MeteredDatabase;
41#[cfg(with_testing)]
42use crate::store::TestKeyValueDatabase;
43use crate::{
44 batch::SimpleUnorderedBatch,
45 common::get_uleb128_size,
46 journaling::{JournalConsistencyError, JournalingKeyValueDatabase},
47 lru_caching::{LruCachingConfig, LruCachingDatabase},
48 store::{
49 DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
50 WithError,
51 },
52 value_splitting::{ValueSplittingDatabase, ValueSplittingError},
53 FutureSyncExt as _,
54};
55
56const DYNAMODB_LOCAL_ENDPOINT: &str = "DYNAMODB_LOCAL_ENDPOINT";
58
59async fn get_base_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError> {
61 let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
62 .boxed_sync()
63 .await;
64 Ok((&base_config).into())
65}
66
67fn get_endpoint_address() -> Option<String> {
68 env::var(DYNAMODB_LOCAL_ENDPOINT).ok()
69}
70
71async fn get_dynamodb_local_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError>
73{
74 let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
75 .boxed_sync()
76 .await;
77 let endpoint_address = get_endpoint_address().unwrap();
78 let config = aws_sdk_dynamodb::config::Builder::from(&base_config)
79 .endpoint_url(endpoint_address)
80 .build();
81 Ok(config)
82}
83
84const PARTITION_KEY_ROOT_KEY: &[u8] = &[1];
89
90const PARTITION_ATTRIBUTE: &str = "item_partition";
92
93const EMPTY_ROOT_KEY: &[u8] = &[0];
95
96const DB_KEY: &[u8] = &[0];
98
99const KEY_ATTRIBUTE: &str = "item_key";
101
102const VALUE_ATTRIBUTE: &str = "item_value";
104
105const KEY_VALUE_ATTRIBUTE: &str = "item_key, item_value";
107
108const RAW_MAX_VALUE_SIZE: usize = 409600;
111
112const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
126 - MAX_KEY_SIZE
127 - get_uleb128_size(RAW_MAX_VALUE_SIZE)
128 - get_uleb128_size(MAX_KEY_SIZE)
129 - 1
130 - 1;
131
132const MAX_KEY_SIZE: usize = 1024;
135
136const MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE: usize = 4000000;
140
141#[cfg(with_testing)]
145const TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES: usize = 10;
146
147#[cfg(with_testing)]
149const TEST_DYNAMO_DB_MAX_STREAM_QUERIES: usize = 10;
150
151const MAX_TRANSACT_WRITE_ITEM_SIZE: usize = 100;
154
155fn build_key(start_key: &[u8], key: Vec<u8>) -> HashMap<String, AttributeValue> {
164 [
165 (
166 PARTITION_ATTRIBUTE.to_owned(),
167 AttributeValue::B(Blob::new(start_key.to_vec())),
168 ),
169 (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
170 ]
171 .into()
172}
173
174fn build_key_value(
176 start_key: &[u8],
177 key: Vec<u8>,
178 value: Vec<u8>,
179) -> HashMap<String, AttributeValue> {
180 [
181 (
182 PARTITION_ATTRIBUTE.to_owned(),
183 AttributeValue::B(Blob::new(start_key.to_vec())),
184 ),
185 (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
186 (
187 VALUE_ATTRIBUTE.to_owned(),
188 AttributeValue::B(Blob::new(value)),
189 ),
190 ]
191 .into()
192}
193
194fn check_key_size(key: &[u8]) -> Result<(), DynamoDbStoreInternalError> {
196 ensure!(!key.is_empty(), DynamoDbStoreInternalError::ZeroLengthKey);
197 ensure!(
198 key.len() <= MAX_KEY_SIZE,
199 DynamoDbStoreInternalError::KeyTooLong
200 );
201 Ok(())
202}
203
204fn extract_key(
206 prefix_len: usize,
207 attributes: &HashMap<String, AttributeValue>,
208) -> Result<&[u8], DynamoDbStoreInternalError> {
209 let key = attributes
210 .get(KEY_ATTRIBUTE)
211 .ok_or(DynamoDbStoreInternalError::MissingKey)?;
212 match key {
213 AttributeValue::B(blob) => Ok(&blob.as_ref()[prefix_len..]),
214 key => Err(DynamoDbStoreInternalError::wrong_key_type(key)),
215 }
216}
217
218fn extract_value(
220 attributes: &HashMap<String, AttributeValue>,
221) -> Result<&[u8], DynamoDbStoreInternalError> {
222 let value = attributes
225 .get(VALUE_ATTRIBUTE)
226 .ok_or(DynamoDbStoreInternalError::MissingValue)?;
227 match value {
228 AttributeValue::B(blob) => Ok(blob.as_ref()),
229 value => Err(DynamoDbStoreInternalError::wrong_value_type(value)),
230 }
231}
232
233fn extract_value_owned(
235 attributes: &mut HashMap<String, AttributeValue>,
236) -> Result<Vec<u8>, DynamoDbStoreInternalError> {
237 let value = attributes
238 .remove(VALUE_ATTRIBUTE)
239 .ok_or(DynamoDbStoreInternalError::MissingValue)?;
240 match value {
241 AttributeValue::B(blob) => Ok(blob.into_inner()),
242 value => Err(DynamoDbStoreInternalError::wrong_value_type(&value)),
243 }
244}
245
246fn extract_key_value(
248 prefix_len: usize,
249 attributes: &HashMap<String, AttributeValue>,
250) -> Result<(&[u8], &[u8]), DynamoDbStoreInternalError> {
251 let key = extract_key(prefix_len, attributes)?;
252 let value = extract_value(attributes)?;
253 Ok((key, value))
254}
255
256struct TransactionBuilder {
257 start_key: Vec<u8>,
258 transactions: Vec<TransactWriteItem>,
259}
260
261impl TransactionBuilder {
262 fn new(start_key: &[u8]) -> Self {
263 Self {
264 start_key: start_key.to_vec(),
265 transactions: Vec::new(),
266 }
267 }
268
269 fn insert_delete_request(
270 &mut self,
271 key: Vec<u8>,
272 store: &DynamoDbStoreInternal,
273 ) -> Result<(), DynamoDbStoreInternalError> {
274 let transaction = store.build_delete_transaction(&self.start_key, key)?;
275 self.transactions.push(transaction);
276 Ok(())
277 }
278
279 fn insert_put_request(
280 &mut self,
281 key: Vec<u8>,
282 value: Vec<u8>,
283 store: &DynamoDbStoreInternal,
284 ) -> Result<(), DynamoDbStoreInternalError> {
285 let transaction = store.build_put_transaction(&self.start_key, key, value)?;
286 self.transactions.push(transaction);
287 Ok(())
288 }
289}
290
291#[derive(Clone, Debug)]
293pub struct DynamoDbStoreInternal {
294 client: Client,
295 namespace: String,
296 semaphore: Option<Arc<Semaphore>>,
297 max_stream_queries: usize,
298 start_key: Vec<u8>,
299 root_key_written: Arc<AtomicBool>,
300}
301
302#[derive(Clone)]
304pub struct DynamoDbDatabaseInternal {
305 client: Client,
306 namespace: String,
307 semaphore: Option<Arc<Semaphore>>,
308 max_stream_queries: usize,
309}
310
311impl WithError for DynamoDbDatabaseInternal {
312 type Error = DynamoDbStoreInternalError;
313}
314
315#[derive(Debug, Clone, Serialize, Deserialize)]
317pub struct DynamoDbStoreInternalConfig {
318 pub use_dynamodb_local: bool,
320 pub max_concurrent_queries: Option<usize>,
322 pub max_stream_queries: usize,
324}
325
326impl DynamoDbStoreInternalConfig {
327 async fn client(&self) -> Result<Client, DynamoDbStoreInternalError> {
328 let config = if self.use_dynamodb_local {
329 get_dynamodb_local_config().await?
330 } else {
331 get_base_config().await?
332 };
333 Ok(Client::from_conf(config))
334 }
335}
336
337impl KeyValueDatabase for DynamoDbDatabaseInternal {
338 type Config = DynamoDbStoreInternalConfig;
339 type Store = DynamoDbStoreInternal;
340
341 fn get_name() -> String {
342 "dynamodb internal".to_string()
343 }
344
345 async fn connect(
346 config: &Self::Config,
347 namespace: &str,
348 ) -> Result<Self, DynamoDbStoreInternalError> {
349 Self::check_namespace(namespace)?;
350 let client = config.client().await?;
351 let semaphore = config
352 .max_concurrent_queries
353 .map(|n| Arc::new(Semaphore::new(n)));
354 let max_stream_queries = config.max_stream_queries;
355 let namespace = namespace.to_string();
356 let store = Self {
357 client,
358 namespace,
359 semaphore,
360 max_stream_queries,
361 };
362 Ok(store)
363 }
364
365 fn open_shared(&self, root_key: &[u8]) -> Result<Self::Store, DynamoDbStoreInternalError> {
366 let mut start_key = EMPTY_ROOT_KEY.to_vec();
367 start_key.extend(root_key);
368 Ok(self.open_internal(start_key))
369 }
370
371 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self::Store, DynamoDbStoreInternalError> {
372 self.open_shared(root_key)
373 }
374
375 async fn list_all(config: &Self::Config) -> Result<Vec<String>, DynamoDbStoreInternalError> {
376 let client = config.client().await?;
377 let mut namespaces = Vec::new();
378 let mut start_table = None;
379 loop {
380 let response = client
381 .list_tables()
382 .set_exclusive_start_table_name(start_table)
383 .send()
384 .boxed_sync()
385 .await?;
386 if let Some(namespaces_blk) = response.table_names {
387 namespaces.extend(namespaces_blk);
388 }
389 if response.last_evaluated_table_name.is_none() {
390 break;
391 } else {
392 start_table = response.last_evaluated_table_name;
393 }
394 }
395 Ok(namespaces)
396 }
397
398 async fn list_root_keys(
399 config: &Self::Config,
400 namespace: &str,
401 ) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
402 let database = Self::connect(config, namespace).await?;
403 let store = database.open_internal(PARTITION_KEY_ROOT_KEY.to_vec());
404 store.find_keys_by_prefix(EMPTY_ROOT_KEY).await
405 }
406
407 async fn delete_all(config: &Self::Config) -> Result<(), DynamoDbStoreInternalError> {
408 let client = config.client().await?;
409 let tables = Self::list_all(config).await?;
410 for table in tables {
411 client
412 .delete_table()
413 .table_name(&table)
414 .send()
415 .boxed_sync()
416 .await?;
417 }
418 Ok(())
419 }
420
421 async fn exists(
422 config: &Self::Config,
423 namespace: &str,
424 ) -> Result<bool, DynamoDbStoreInternalError> {
425 Self::check_namespace(namespace)?;
426 let client = config.client().await?;
427 let key_db = build_key(EMPTY_ROOT_KEY, DB_KEY.to_vec());
428 let response = client
429 .get_item()
430 .table_name(namespace)
431 .set_key(Some(key_db))
432 .send()
433 .boxed_sync()
434 .await;
435 let Err(error) = response else {
436 return Ok(true);
437 };
438 let test = match &error {
439 SdkError::ServiceError(error) => match error.err() {
440 GetItemError::ResourceNotFoundException(error) => {
441 error.message
442 == Some("Cannot do operations on a non-existent table".to_string())
443 }
444 _ => false,
445 },
446 _ => false,
447 };
448 if test {
449 Ok(false)
450 } else {
451 Err(error.into())
452 }
453 }
454
455 async fn create(
456 config: &Self::Config,
457 namespace: &str,
458 ) -> Result<(), DynamoDbStoreInternalError> {
459 Self::check_namespace(namespace)?;
460 let client = config.client().await?;
461 client
462 .create_table()
463 .table_name(namespace)
464 .attribute_definitions(
465 AttributeDefinition::builder()
466 .attribute_name(PARTITION_ATTRIBUTE)
467 .attribute_type(ScalarAttributeType::B)
468 .build()?,
469 )
470 .attribute_definitions(
471 AttributeDefinition::builder()
472 .attribute_name(KEY_ATTRIBUTE)
473 .attribute_type(ScalarAttributeType::B)
474 .build()?,
475 )
476 .key_schema(
477 KeySchemaElement::builder()
478 .attribute_name(PARTITION_ATTRIBUTE)
479 .key_type(KeyType::Hash)
480 .build()?,
481 )
482 .key_schema(
483 KeySchemaElement::builder()
484 .attribute_name(KEY_ATTRIBUTE)
485 .key_type(KeyType::Range)
486 .build()?,
487 )
488 .provisioned_throughput(
489 ProvisionedThroughput::builder()
490 .read_capacity_units(10)
491 .write_capacity_units(10)
492 .build()?,
493 )
494 .send()
495 .boxed_sync()
496 .await?;
497 Ok(())
498 }
499
500 async fn delete(
501 config: &Self::Config,
502 namespace: &str,
503 ) -> Result<(), DynamoDbStoreInternalError> {
504 Self::check_namespace(namespace)?;
505 let client = config.client().await?;
506 client
507 .delete_table()
508 .table_name(namespace)
509 .send()
510 .boxed_sync()
511 .await?;
512 Ok(())
513 }
514}
515
516impl DynamoDbDatabaseInternal {
517 fn check_namespace(namespace: &str) -> Result<(), InvalidNamespace> {
521 if namespace.len() < 3 {
522 return Err(InvalidNamespace::TooShort);
523 }
524 if namespace.len() > 255 {
525 return Err(InvalidNamespace::TooLong);
526 }
527 if !namespace.chars().all(|character| {
528 character.is_ascii_alphanumeric()
529 || character == '.'
530 || character == '-'
531 || character == '_'
532 }) {
533 return Err(InvalidNamespace::InvalidCharacter);
534 }
535 Ok(())
536 }
537
538 fn open_internal(&self, start_key: Vec<u8>) -> DynamoDbStoreInternal {
539 let client = self.client.clone();
540 let namespace = self.namespace.clone();
541 let semaphore = self.semaphore.clone();
542 let max_stream_queries = self.max_stream_queries;
543 DynamoDbStoreInternal {
544 client,
545 namespace,
546 semaphore,
547 max_stream_queries,
548 start_key,
549 root_key_written: Arc::new(AtomicBool::new(false)),
550 }
551 }
552}
553
554impl DynamoDbStoreInternal {
555 fn build_delete_transaction(
556 &self,
557 start_key: &[u8],
558 key: Vec<u8>,
559 ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
560 check_key_size(&key)?;
561 let request = Delete::builder()
562 .table_name(&self.namespace)
563 .set_key(Some(build_key(start_key, key)))
564 .build()?;
565 Ok(TransactWriteItem::builder().delete(request).build())
566 }
567
568 fn build_put_transaction(
569 &self,
570 start_key: &[u8],
571 key: Vec<u8>,
572 value: Vec<u8>,
573 ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
574 check_key_size(&key)?;
575 ensure!(
576 value.len() <= RAW_MAX_VALUE_SIZE,
577 DynamoDbStoreInternalError::ValueLengthTooLarge
578 );
579 let request = Put::builder()
580 .table_name(&self.namespace)
581 .set_item(Some(build_key_value(start_key, key, value)))
582 .build()?;
583 Ok(TransactWriteItem::builder().put(request).build())
584 }
585
586 async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
588 match &self.semaphore {
589 None => None,
590 Some(count) => Some(count.acquire().await),
591 }
592 }
593
594 async fn get_query_output(
595 &self,
596 attribute_str: &str,
597 start_key: &[u8],
598 key_prefix: &[u8],
599 start_key_map: Option<HashMap<String, AttributeValue>>,
600 ) -> Result<QueryOutput, DynamoDbStoreInternalError> {
601 let _guard = self.acquire().await;
602 let start_key = start_key.to_vec();
603 let response = self
604 .client
605 .query()
606 .table_name(&self.namespace)
607 .projection_expression(attribute_str)
608 .key_condition_expression(format!(
609 "{PARTITION_ATTRIBUTE} = :partition and begins_with({KEY_ATTRIBUTE}, :prefix)"
610 ))
611 .expression_attribute_values(":partition", AttributeValue::B(Blob::new(start_key)))
612 .expression_attribute_values(":prefix", AttributeValue::B(Blob::new(key_prefix)))
613 .set_exclusive_start_key(start_key_map)
614 .send()
615 .boxed_sync()
616 .await?;
617 Ok(response)
618 }
619
620 async fn read_value_bytes_general(
621 &self,
622 key_db: HashMap<String, AttributeValue>,
623 ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
624 let _guard = self.acquire().await;
625 let response = self
626 .client
627 .get_item()
628 .table_name(&self.namespace)
629 .set_key(Some(key_db))
630 .send()
631 .boxed_sync()
632 .await?;
633
634 match response.item {
635 Some(mut item) => {
636 let value = extract_value_owned(&mut item)?;
637 Ok(Some(value))
638 }
639 None => Ok(None),
640 }
641 }
642
643 async fn contains_key_general(
644 &self,
645 key_db: HashMap<String, AttributeValue>,
646 ) -> Result<bool, DynamoDbStoreInternalError> {
647 let _guard = self.acquire().await;
648 let response = self
649 .client
650 .get_item()
651 .table_name(&self.namespace)
652 .set_key(Some(key_db))
653 .projection_expression(PARTITION_ATTRIBUTE)
654 .send()
655 .boxed_sync()
656 .await?;
657
658 Ok(response.item.is_some())
659 }
660
661 async fn get_list_responses(
662 &self,
663 attribute: &str,
664 start_key: &[u8],
665 key_prefix: &[u8],
666 ) -> Result<QueryResponses, DynamoDbStoreInternalError> {
667 check_key_size(key_prefix)?;
668 let mut responses = Vec::new();
669 let mut start_key_map = None;
670 loop {
671 let response = self
672 .get_query_output(attribute, start_key, key_prefix, start_key_map)
673 .await?;
674 let last_evaluated = response.last_evaluated_key.clone();
675 responses.push(response);
676 match last_evaluated {
677 None => {
678 break;
679 }
680 Some(value) => {
681 start_key_map = Some(value);
682 }
683 }
684 }
685 Ok(QueryResponses {
686 prefix_len: key_prefix.len(),
687 responses,
688 })
689 }
690}
691
692struct QueryResponses {
693 prefix_len: usize,
694 responses: Vec<QueryOutput>,
695}
696
697impl QueryResponses {
698 fn keys(&self) -> impl Iterator<Item = Result<&[u8], DynamoDbStoreInternalError>> {
699 self.responses
700 .iter()
701 .flat_map(|response| response.items.iter().flatten())
702 .map(|item| extract_key(self.prefix_len, item))
703 }
704
705 fn key_values(
706 &self,
707 ) -> impl Iterator<Item = Result<(&[u8], &[u8]), DynamoDbStoreInternalError>> {
708 self.responses
709 .iter()
710 .flat_map(|response| response.items.iter().flatten())
711 .map(|item| extract_key_value(self.prefix_len, item))
712 }
713}
714
715impl WithError for DynamoDbStoreInternal {
716 type Error = DynamoDbStoreInternalError;
717}
718
719impl ReadableKeyValueStore for DynamoDbStoreInternal {
720 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
721
722 fn max_stream_queries(&self) -> usize {
723 self.max_stream_queries
724 }
725
726 async fn read_value_bytes(
727 &self,
728 key: &[u8],
729 ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
730 check_key_size(key)?;
731 let key_db = build_key(&self.start_key, key.to_vec());
732 self.read_value_bytes_general(key_db).await
733 }
734
735 async fn contains_key(&self, key: &[u8]) -> Result<bool, DynamoDbStoreInternalError> {
736 check_key_size(key)?;
737 let key_db = build_key(&self.start_key, key.to_vec());
738 self.contains_key_general(key_db).await
739 }
740
741 async fn contains_keys(
742 &self,
743 keys: Vec<Vec<u8>>,
744 ) -> Result<Vec<bool>, DynamoDbStoreInternalError> {
745 let mut handles = Vec::new();
746 for key in keys {
747 check_key_size(&key)?;
748 let key_db = build_key(&self.start_key, key);
749 let handle = self.contains_key_general(key_db);
750 handles.push(handle);
751 }
752 join_all(handles)
753 .await
754 .into_iter()
755 .collect::<Result<_, _>>()
756 }
757
758 async fn read_multi_values_bytes(
759 &self,
760 keys: Vec<Vec<u8>>,
761 ) -> Result<Vec<Option<Vec<u8>>>, DynamoDbStoreInternalError> {
762 let mut handles = Vec::new();
763 for key in keys {
764 check_key_size(&key)?;
765 let key_db = build_key(&self.start_key, key);
766 let handle = self.read_value_bytes_general(key_db);
767 handles.push(handle);
768 }
769 join_all(handles)
770 .await
771 .into_iter()
772 .collect::<Result<_, _>>()
773 }
774
775 async fn find_keys_by_prefix(
776 &self,
777 key_prefix: &[u8],
778 ) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
779 let result_queries = self
780 .get_list_responses(KEY_ATTRIBUTE, &self.start_key, key_prefix)
781 .await?;
782 result_queries
783 .keys()
784 .map(|key| key.map(|k| k.to_vec()))
785 .collect()
786 }
787
788 async fn find_key_values_by_prefix(
789 &self,
790 key_prefix: &[u8],
791 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, DynamoDbStoreInternalError> {
792 let result_queries = self
793 .get_list_responses(KEY_VALUE_ATTRIBUTE, &self.start_key, key_prefix)
794 .await?;
795 result_queries
796 .key_values()
797 .map(|entry| entry.map(|(key, value)| (key.to_vec(), value.to_vec())))
798 .collect()
799 }
800}
801
802impl DirectWritableKeyValueStore for DynamoDbStoreInternal {
803 const MAX_BATCH_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_SIZE;
804 const MAX_BATCH_TOTAL_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE;
805 const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
806
807 type Batch = SimpleUnorderedBatch;
809
810 async fn write_batch(&self, batch: Self::Batch) -> Result<(), DynamoDbStoreInternalError> {
811 if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
812 let mut builder = TransactionBuilder::new(PARTITION_KEY_ROOT_KEY);
813 builder.insert_put_request(self.start_key.clone(), vec![], self)?;
814 self.client
815 .transact_write_items()
816 .set_transact_items(Some(builder.transactions))
817 .send()
818 .boxed_sync()
819 .await?;
820 }
821 let mut builder = TransactionBuilder::new(&self.start_key);
822 for key in batch.deletions {
823 builder.insert_delete_request(key, self)?;
824 }
825 for (key, value) in batch.insertions {
826 builder.insert_put_request(key, value, self)?;
827 }
828 if !builder.transactions.is_empty() {
829 let _guard = self.acquire().await;
830 self.client
831 .transact_write_items()
832 .set_transact_items(Some(builder.transactions))
833 .send()
834 .boxed_sync()
835 .await?;
836 }
837 Ok(())
838 }
839}
840
841#[derive(Debug, Error)]
843pub enum InvalidNamespace {
844 #[error("Namespace must have at least 3 characters")]
846 TooShort,
847
848 #[error("Namespace must be at most 63 characters")]
850 TooLong,
851
852 #[error("Namespace must only contain lowercase letters, numbers, periods and hyphens")]
854 InvalidCharacter,
855}
856
857#[derive(Debug, Error)]
859pub enum DynamoDbStoreInternalError {
860 #[error(transparent)]
862 Get(#[from] Box<SdkError<GetItemError>>),
863
864 #[error(transparent)]
866 TransactWriteItem(#[from] Box<SdkError<TransactWriteItemsError>>),
867
868 #[error(transparent)]
870 Query(#[from] Box<SdkError<QueryError>>),
871
872 #[error(transparent)]
874 DeleteTable(#[from] Box<SdkError<DeleteTableError>>),
875
876 #[error(transparent)]
878 ListTables(#[from] Box<SdkError<ListTablesError>>),
879
880 #[error("The transact must have length at most MAX_TRANSACT_WRITE_ITEM_SIZE")]
882 TransactUpperLimitSize,
883
884 #[error("The key must be of strictly positive length")]
886 ZeroLengthKey,
887
888 #[error("The key must have at most 1024 bytes")]
890 KeyTooLong,
891
892 #[error("The key prefix must have at most 1024 bytes")]
894 KeyPrefixTooLong,
895
896 #[error("The key_prefix must be of strictly positive length")]
898 ZeroLengthKeyPrefix,
899
900 #[error(transparent)]
902 JournalConsistencyError(#[from] JournalConsistencyError),
903
904 #[error("The DynamoDB value should be less than 400 KB")]
906 ValueLengthTooLarge,
907
908 #[error("The stored key attribute is missing")]
910 MissingKey,
911
912 #[error("Key was stored as {0}, but it was expected to be stored as a binary blob")]
914 WrongKeyType(String),
915
916 #[error("The stored value attribute is missing")]
918 MissingValue,
919
920 #[error("Value was stored as {0}, but it was expected to be stored as a binary blob")]
922 WrongValueType(String),
923
924 #[error(transparent)]
926 BcsError(#[from] bcs::Error),
927
928 #[error(transparent)]
930 InvalidNamespace(#[from] InvalidNamespace),
931
932 #[error(transparent)]
934 CreateTable(#[from] Box<SdkError<CreateTableError>>),
935
936 #[error(transparent)]
938 Build(#[from] Box<BuildError>),
939}
940
941impl<InnerError> From<SdkError<InnerError>> for DynamoDbStoreInternalError
942where
943 DynamoDbStoreInternalError: From<Box<SdkError<InnerError>>>,
944{
945 fn from(error: SdkError<InnerError>) -> Self {
946 Box::new(error).into()
947 }
948}
949
950impl From<BuildError> for DynamoDbStoreInternalError {
951 fn from(error: BuildError) -> Self {
952 Box::new(error).into()
953 }
954}
955
956impl DynamoDbStoreInternalError {
957 pub fn wrong_key_type(value: &AttributeValue) -> Self {
963 DynamoDbStoreInternalError::WrongKeyType(Self::type_description_of(value))
964 }
965
966 pub fn wrong_value_type(value: &AttributeValue) -> Self {
972 DynamoDbStoreInternalError::WrongValueType(Self::type_description_of(value))
973 }
974
975 fn type_description_of(value: &AttributeValue) -> String {
976 match value {
977 AttributeValue::B(_) => unreachable!("creating an error type for the correct type"),
978 AttributeValue::Bool(_) => "a boolean",
979 AttributeValue::Bs(_) => "a list of binary blobs",
980 AttributeValue::L(_) => "a list",
981 AttributeValue::M(_) => "a map",
982 AttributeValue::N(_) => "a number",
983 AttributeValue::Ns(_) => "a list of numbers",
984 AttributeValue::Null(_) => "a null value",
985 AttributeValue::S(_) => "a string",
986 AttributeValue::Ss(_) => "a list of strings",
987 _ => "an unknown type",
988 }
989 .to_owned()
990 }
991}
992
993impl KeyValueStoreError for DynamoDbStoreInternalError {
994 const BACKEND: &'static str = "dynamo_db";
995}
996
997#[cfg(with_testing)]
998impl TestKeyValueDatabase for JournalingKeyValueDatabase<DynamoDbDatabaseInternal> {
999 async fn new_test_config() -> Result<DynamoDbStoreInternalConfig, DynamoDbStoreInternalError> {
1000 Ok(DynamoDbStoreInternalConfig {
1001 use_dynamodb_local: true,
1002 max_concurrent_queries: Some(TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES),
1003 max_stream_queries: TEST_DYNAMO_DB_MAX_STREAM_QUERIES,
1004 })
1005 }
1006}
1007
1008pub type DynamoDbStoreError = ValueSplittingError<DynamoDbStoreInternalError>;
1010
1011pub type DynamoDbStoreConfig = LruCachingConfig<DynamoDbStoreInternalConfig>;
1013
1014#[cfg(with_metrics)]
1016pub type DynamoDbDatabase = MeteredDatabase<
1017 LruCachingDatabase<
1018 MeteredDatabase<
1019 ValueSplittingDatabase<
1020 MeteredDatabase<JournalingKeyValueDatabase<DynamoDbDatabaseInternal>>,
1021 >,
1022 >,
1023 >,
1024>;
1025#[cfg(not(with_metrics))]
1027pub type DynamoDbDatabase = LruCachingDatabase<
1028 ValueSplittingDatabase<JournalingKeyValueDatabase<DynamoDbDatabaseInternal>>,
1029>;
1030
1031#[cfg(test)]
1032mod tests {
1033 use bcs::serialized_size;
1034
1035 use crate::common::get_uleb128_size;
1036
1037 #[test]
1038 fn test_serialization_len() {
1039 for n in [0, 10, 127, 128, 129, 16383, 16384, 20000] {
1040 let vec = vec![0u8; n];
1041 let est_size = get_uleb128_size(n) + n;
1042 let serial_size = serialized_size(&vec).unwrap();
1043 assert_eq!(est_size, serial_size);
1044 }
1045 }
1046}