1use std::{
7 collections::HashMap,
8 env,
9 sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc,
12 },
13};
14
15use async_lock::{Semaphore, SemaphoreGuard};
16use aws_sdk_dynamodb::{
17 error::SdkError,
18 operation::{
19 create_table::CreateTableError,
20 delete_table::DeleteTableError,
21 get_item::GetItemError,
22 list_tables::ListTablesError,
23 query::{QueryError, QueryOutput},
24 transact_write_items::TransactWriteItemsError,
25 },
26 primitives::Blob,
27 types::{
28 AttributeDefinition, AttributeValue, Delete, KeySchemaElement, KeyType,
29 ProvisionedThroughput, Put, ScalarAttributeType, TransactWriteItem,
30 },
31 Client,
32};
33use aws_smithy_types::error::operation::BuildError;
34use futures::future::join_all;
35use linera_base::ensure;
36use serde::{Deserialize, Serialize};
37use thiserror::Error;
38
39#[cfg(with_metrics)]
40use crate::metering::MeteredStore;
41#[cfg(with_testing)]
42use crate::store::TestKeyValueStore;
43use crate::{
44 batch::SimpleUnorderedBatch,
45 common::get_uleb128_size,
46 journaling::{DirectWritableKeyValueStore, JournalConsistencyError, JournalingKeyValueStore},
47 lru_caching::{LruCachingConfig, LruCachingStore},
48 store::{
49 AdminKeyValueStore, CommonStoreInternalConfig, KeyIterable, KeyValueIterable,
50 KeyValueStoreError, ReadableKeyValueStore, WithError,
51 },
52 value_splitting::{ValueSplittingError, ValueSplittingStore},
53 FutureSyncExt as _,
54};
55
56const DYNAMODB_LOCAL_ENDPOINT: &str = "DYNAMODB_LOCAL_ENDPOINT";
58
59async fn get_base_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError> {
61 let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
62 .boxed_sync()
63 .await;
64 Ok((&base_config).into())
65}
66
67fn get_endpoint_address() -> Option<String> {
68 env::var(DYNAMODB_LOCAL_ENDPOINT).ok()
69}
70
71async fn get_dynamodb_local_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError>
73{
74 let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
75 .boxed_sync()
76 .await;
77 let endpoint_address = get_endpoint_address().unwrap();
78 let config = aws_sdk_dynamodb::config::Builder::from(&base_config)
79 .endpoint_url(endpoint_address)
80 .build();
81 Ok(config)
82}
83
84const PARTITION_KEY_ROOT_KEY: &[u8] = &[1];
89
90const PARTITION_ATTRIBUTE: &str = "item_partition";
92
93const EMPTY_ROOT_KEY: &[u8] = &[0];
95
96const DB_KEY: &[u8] = &[0];
98
99const KEY_ATTRIBUTE: &str = "item_key";
101
102const VALUE_ATTRIBUTE: &str = "item_value";
104
105const KEY_VALUE_ATTRIBUTE: &str = "item_key, item_value";
107
108const RAW_MAX_VALUE_SIZE: usize = 409600;
111
112const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
126 - MAX_KEY_SIZE
127 - get_uleb128_size(RAW_MAX_VALUE_SIZE)
128 - get_uleb128_size(MAX_KEY_SIZE)
129 - 1
130 - 1;
131
132const MAX_KEY_SIZE: usize = 1024;
135
136const MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE: usize = 4000000;
140
141#[cfg(with_testing)]
145const TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES: usize = 10;
146
147#[cfg(with_testing)]
149const TEST_DYNAMO_DB_MAX_STREAM_QUERIES: usize = 10;
150
151const MAX_TRANSACT_WRITE_ITEM_SIZE: usize = 100;
154
155fn extend_root_key(root_key: &[u8]) -> Vec<u8> {
157 let mut start_key = EMPTY_ROOT_KEY.to_vec();
158 start_key.extend(root_key);
159 start_key
160}
161
162fn build_key(start_key: &[u8], key: Vec<u8>) -> HashMap<String, AttributeValue> {
171 [
172 (
173 PARTITION_ATTRIBUTE.to_owned(),
174 AttributeValue::B(Blob::new(start_key.to_vec())),
175 ),
176 (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
177 ]
178 .into()
179}
180
181fn build_key_value(
183 start_key: &[u8],
184 key: Vec<u8>,
185 value: Vec<u8>,
186) -> HashMap<String, AttributeValue> {
187 [
188 (
189 PARTITION_ATTRIBUTE.to_owned(),
190 AttributeValue::B(Blob::new(start_key.to_vec())),
191 ),
192 (KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
193 (
194 VALUE_ATTRIBUTE.to_owned(),
195 AttributeValue::B(Blob::new(value)),
196 ),
197 ]
198 .into()
199}
200
201fn check_key_size(key: &[u8]) -> Result<(), DynamoDbStoreInternalError> {
203 ensure!(!key.is_empty(), DynamoDbStoreInternalError::ZeroLengthKey);
204 ensure!(
205 key.len() <= MAX_KEY_SIZE,
206 DynamoDbStoreInternalError::KeyTooLong
207 );
208 Ok(())
209}
210
211fn extract_key(
213 prefix_len: usize,
214 attributes: &HashMap<String, AttributeValue>,
215) -> Result<&[u8], DynamoDbStoreInternalError> {
216 let key = attributes
217 .get(KEY_ATTRIBUTE)
218 .ok_or(DynamoDbStoreInternalError::MissingKey)?;
219 match key {
220 AttributeValue::B(blob) => Ok(&blob.as_ref()[prefix_len..]),
221 key => Err(DynamoDbStoreInternalError::wrong_key_type(key)),
222 }
223}
224
225fn extract_value(
227 attributes: &HashMap<String, AttributeValue>,
228) -> Result<&[u8], DynamoDbStoreInternalError> {
229 let value = attributes
232 .get(VALUE_ATTRIBUTE)
233 .ok_or(DynamoDbStoreInternalError::MissingValue)?;
234 match value {
235 AttributeValue::B(blob) => Ok(blob.as_ref()),
236 value => Err(DynamoDbStoreInternalError::wrong_value_type(value)),
237 }
238}
239
240fn extract_value_owned(
242 attributes: &mut HashMap<String, AttributeValue>,
243) -> Result<Vec<u8>, DynamoDbStoreInternalError> {
244 let value = attributes
245 .remove(VALUE_ATTRIBUTE)
246 .ok_or(DynamoDbStoreInternalError::MissingValue)?;
247 match value {
248 AttributeValue::B(blob) => Ok(blob.into_inner()),
249 value => Err(DynamoDbStoreInternalError::wrong_value_type(&value)),
250 }
251}
252
253fn extract_key_value(
255 prefix_len: usize,
256 attributes: &HashMap<String, AttributeValue>,
257) -> Result<(&[u8], &[u8]), DynamoDbStoreInternalError> {
258 let key = extract_key(prefix_len, attributes)?;
259 let value = extract_value(attributes)?;
260 Ok((key, value))
261}
262
263fn extract_key_value_owned(
265 prefix_len: usize,
266 attributes: &mut HashMap<String, AttributeValue>,
267) -> Result<(Vec<u8>, Vec<u8>), DynamoDbStoreInternalError> {
268 let key = extract_key(prefix_len, attributes)?.to_vec();
269 let value = extract_value_owned(attributes)?;
270 Ok((key, value))
271}
272
273struct TransactionBuilder {
274 start_key: Vec<u8>,
275 transactions: Vec<TransactWriteItem>,
276}
277
278impl TransactionBuilder {
279 fn new(start_key: &[u8]) -> Self {
280 Self {
281 start_key: start_key.to_vec(),
282 transactions: Vec::new(),
283 }
284 }
285
286 fn insert_delete_request(
287 &mut self,
288 key: Vec<u8>,
289 store: &DynamoDbStoreInternal,
290 ) -> Result<(), DynamoDbStoreInternalError> {
291 let transaction = store.build_delete_transaction(&self.start_key, key)?;
292 self.transactions.push(transaction);
293 Ok(())
294 }
295
296 fn insert_put_request(
297 &mut self,
298 key: Vec<u8>,
299 value: Vec<u8>,
300 store: &DynamoDbStoreInternal,
301 ) -> Result<(), DynamoDbStoreInternalError> {
302 let transaction = store.build_put_transaction(&self.start_key, key, value)?;
303 self.transactions.push(transaction);
304 Ok(())
305 }
306}
307
308#[derive(Clone, Debug)]
310pub struct DynamoDbStoreInternal {
311 client: Client,
312 namespace: String,
313 semaphore: Option<Arc<Semaphore>>,
314 max_stream_queries: usize,
315 start_key: Vec<u8>,
316 root_key_written: Arc<AtomicBool>,
317}
318
319#[derive(Debug, Clone, Serialize, Deserialize)]
321pub struct DynamoDbStoreInternalConfig {
322 use_dynamodb_local: bool,
324 common_config: CommonStoreInternalConfig,
326}
327
328impl DynamoDbStoreInternalConfig {
329 async fn client(&self) -> Result<Client, DynamoDbStoreInternalError> {
330 let config = if self.use_dynamodb_local {
331 get_dynamodb_local_config().await?
332 } else {
333 get_base_config().await?
334 };
335 Ok(Client::from_conf(config))
336 }
337}
338
339impl AdminKeyValueStore for DynamoDbStoreInternal {
340 type Config = DynamoDbStoreInternalConfig;
341
342 fn get_name() -> String {
343 "dynamodb internal".to_string()
344 }
345
346 async fn connect(
347 config: &Self::Config,
348 namespace: &str,
349 ) -> Result<Self, DynamoDbStoreInternalError> {
350 Self::check_namespace(namespace)?;
351 let client = config.client().await?;
352 let semaphore = config
353 .common_config
354 .max_concurrent_queries
355 .map(|n| Arc::new(Semaphore::new(n)));
356 let max_stream_queries = config.common_config.max_stream_queries;
357 let namespace = namespace.to_string();
358 let start_key = extend_root_key(&[]);
359 let store = Self {
360 client,
361 namespace,
362 semaphore,
363 max_stream_queries,
364 start_key,
365 root_key_written: Arc::new(AtomicBool::new(false)),
366 };
367 Ok(store)
368 }
369
370 fn open_exclusive(&self, root_key: &[u8]) -> Result<Self, DynamoDbStoreInternalError> {
371 let client = self.client.clone();
372 let namespace = self.namespace.clone();
373 let semaphore = self.semaphore.clone();
374 let max_stream_queries = self.max_stream_queries;
375 let start_key = extend_root_key(root_key);
376 Ok(Self {
377 client,
378 namespace,
379 semaphore,
380 max_stream_queries,
381 start_key,
382 root_key_written: Arc::new(AtomicBool::new(false)),
383 })
384 }
385
386 async fn list_all(config: &Self::Config) -> Result<Vec<String>, DynamoDbStoreInternalError> {
387 let client = config.client().await?;
388 let mut namespaces = Vec::new();
389 let mut start_table = None;
390 loop {
391 let response = client
392 .list_tables()
393 .set_exclusive_start_table_name(start_table)
394 .send()
395 .boxed_sync()
396 .await?;
397 if let Some(namespaces_blk) = response.table_names {
398 namespaces.extend(namespaces_blk);
399 }
400 if response.last_evaluated_table_name.is_none() {
401 break;
402 } else {
403 start_table = response.last_evaluated_table_name;
404 }
405 }
406 Ok(namespaces)
407 }
408
409 async fn list_root_keys(
410 config: &Self::Config,
411 namespace: &str,
412 ) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
413 let mut store = Self::connect(config, namespace).await?;
414 store.start_key = PARTITION_KEY_ROOT_KEY.to_vec();
415
416 let keys = store.find_keys_by_prefix(EMPTY_ROOT_KEY).await?;
417
418 let mut root_keys = Vec::new();
419 for key in keys.iterator() {
420 let key = key?;
421 root_keys.push(key.to_vec());
422 }
423 Ok(root_keys)
424 }
425
426 async fn delete_all(config: &Self::Config) -> Result<(), DynamoDbStoreInternalError> {
427 let client = config.client().await?;
428 let tables = Self::list_all(config).await?;
429 for table in tables {
430 client
431 .delete_table()
432 .table_name(&table)
433 .send()
434 .boxed_sync()
435 .await?;
436 }
437 Ok(())
438 }
439
440 async fn exists(
441 config: &Self::Config,
442 namespace: &str,
443 ) -> Result<bool, DynamoDbStoreInternalError> {
444 Self::check_namespace(namespace)?;
445 let client = config.client().await?;
446 let key_db = build_key(EMPTY_ROOT_KEY, DB_KEY.to_vec());
447 let response = client
448 .get_item()
449 .table_name(namespace)
450 .set_key(Some(key_db))
451 .send()
452 .boxed_sync()
453 .await;
454 let Err(error) = response else {
455 return Ok(true);
456 };
457 let test = match &error {
458 SdkError::ServiceError(error) => match error.err() {
459 GetItemError::ResourceNotFoundException(error) => {
460 error.message
461 == Some("Cannot do operations on a non-existent table".to_string())
462 }
463 _ => false,
464 },
465 _ => false,
466 };
467 if test {
468 Ok(false)
469 } else {
470 Err(error.into())
471 }
472 }
473
474 async fn create(
475 config: &Self::Config,
476 namespace: &str,
477 ) -> Result<(), DynamoDbStoreInternalError> {
478 Self::check_namespace(namespace)?;
479 let client = config.client().await?;
480 client
481 .create_table()
482 .table_name(namespace)
483 .attribute_definitions(
484 AttributeDefinition::builder()
485 .attribute_name(PARTITION_ATTRIBUTE)
486 .attribute_type(ScalarAttributeType::B)
487 .build()?,
488 )
489 .attribute_definitions(
490 AttributeDefinition::builder()
491 .attribute_name(KEY_ATTRIBUTE)
492 .attribute_type(ScalarAttributeType::B)
493 .build()?,
494 )
495 .key_schema(
496 KeySchemaElement::builder()
497 .attribute_name(PARTITION_ATTRIBUTE)
498 .key_type(KeyType::Hash)
499 .build()?,
500 )
501 .key_schema(
502 KeySchemaElement::builder()
503 .attribute_name(KEY_ATTRIBUTE)
504 .key_type(KeyType::Range)
505 .build()?,
506 )
507 .provisioned_throughput(
508 ProvisionedThroughput::builder()
509 .read_capacity_units(10)
510 .write_capacity_units(10)
511 .build()?,
512 )
513 .send()
514 .boxed_sync()
515 .await?;
516 Ok(())
517 }
518
519 async fn delete(
520 config: &Self::Config,
521 namespace: &str,
522 ) -> Result<(), DynamoDbStoreInternalError> {
523 Self::check_namespace(namespace)?;
524 let client = config.client().await?;
525 client
526 .delete_table()
527 .table_name(namespace)
528 .send()
529 .boxed_sync()
530 .await?;
531 Ok(())
532 }
533}
534
535impl DynamoDbStoreInternal {
536 fn check_namespace(namespace: &str) -> Result<(), InvalidNamespace> {
540 if namespace.len() < 3 {
541 return Err(InvalidNamespace::TooShort);
542 }
543 if namespace.len() > 255 {
544 return Err(InvalidNamespace::TooLong);
545 }
546 if !namespace.chars().all(|character| {
547 character.is_ascii_alphanumeric()
548 || character == '.'
549 || character == '-'
550 || character == '_'
551 }) {
552 return Err(InvalidNamespace::InvalidCharacter);
553 }
554 Ok(())
555 }
556
557 fn build_delete_transaction(
558 &self,
559 start_key: &[u8],
560 key: Vec<u8>,
561 ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
562 check_key_size(&key)?;
563 let request = Delete::builder()
564 .table_name(&self.namespace)
565 .set_key(Some(build_key(start_key, key)))
566 .build()?;
567 Ok(TransactWriteItem::builder().delete(request).build())
568 }
569
570 fn build_put_transaction(
571 &self,
572 start_key: &[u8],
573 key: Vec<u8>,
574 value: Vec<u8>,
575 ) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
576 check_key_size(&key)?;
577 ensure!(
578 value.len() <= RAW_MAX_VALUE_SIZE,
579 DynamoDbStoreInternalError::ValueLengthTooLarge
580 );
581 let request = Put::builder()
582 .table_name(&self.namespace)
583 .set_item(Some(build_key_value(start_key, key, value)))
584 .build()?;
585 Ok(TransactWriteItem::builder().put(request).build())
586 }
587
588 async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
590 match &self.semaphore {
591 None => None,
592 Some(count) => Some(count.acquire().await),
593 }
594 }
595
596 async fn get_query_output(
597 &self,
598 attribute_str: &str,
599 start_key: &[u8],
600 key_prefix: &[u8],
601 start_key_map: Option<HashMap<String, AttributeValue>>,
602 ) -> Result<QueryOutput, DynamoDbStoreInternalError> {
603 let _guard = self.acquire().await;
604 let start_key = start_key.to_vec();
605 let response = self
606 .client
607 .query()
608 .table_name(&self.namespace)
609 .projection_expression(attribute_str)
610 .key_condition_expression(format!(
611 "{PARTITION_ATTRIBUTE} = :partition and begins_with({KEY_ATTRIBUTE}, :prefix)"
612 ))
613 .expression_attribute_values(":partition", AttributeValue::B(Blob::new(start_key)))
614 .expression_attribute_values(":prefix", AttributeValue::B(Blob::new(key_prefix)))
615 .set_exclusive_start_key(start_key_map)
616 .send()
617 .boxed_sync()
618 .await?;
619 Ok(response)
620 }
621
622 async fn read_value_bytes_general(
623 &self,
624 key_db: HashMap<String, AttributeValue>,
625 ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
626 let _guard = self.acquire().await;
627 let response = self
628 .client
629 .get_item()
630 .table_name(&self.namespace)
631 .set_key(Some(key_db))
632 .send()
633 .boxed_sync()
634 .await?;
635
636 match response.item {
637 Some(mut item) => {
638 let value = extract_value_owned(&mut item)?;
639 Ok(Some(value))
640 }
641 None => Ok(None),
642 }
643 }
644
645 async fn contains_key_general(
646 &self,
647 key_db: HashMap<String, AttributeValue>,
648 ) -> Result<bool, DynamoDbStoreInternalError> {
649 let _guard = self.acquire().await;
650 let response = self
651 .client
652 .get_item()
653 .table_name(&self.namespace)
654 .set_key(Some(key_db))
655 .projection_expression(PARTITION_ATTRIBUTE)
656 .send()
657 .boxed_sync()
658 .await?;
659
660 Ok(response.item.is_some())
661 }
662
663 async fn get_list_responses(
664 &self,
665 attribute: &str,
666 start_key: &[u8],
667 key_prefix: &[u8],
668 ) -> Result<QueryResponses, DynamoDbStoreInternalError> {
669 check_key_size(key_prefix)?;
670 let mut responses = Vec::new();
671 let mut start_key_map = None;
672 loop {
673 let response = self
674 .get_query_output(attribute, start_key, key_prefix, start_key_map)
675 .await?;
676 let last_evaluated = response.last_evaluated_key.clone();
677 responses.push(response);
678 match last_evaluated {
679 None => {
680 break;
681 }
682 Some(value) => {
683 start_key_map = Some(value);
684 }
685 }
686 }
687 Ok(QueryResponses {
688 prefix_len: key_prefix.len(),
689 responses,
690 })
691 }
692}
693
694struct QueryResponses {
695 prefix_len: usize,
696 responses: Vec<QueryOutput>,
697}
698
699#[doc(hidden)]
701#[expect(clippy::type_complexity)]
702pub struct DynamoDbKeyBlockIterator<'a> {
703 prefix_len: usize,
704 pos: usize,
705 iters: Vec<
706 std::iter::Flatten<
707 std::option::Iter<'a, Vec<HashMap<std::string::String, AttributeValue>>>,
708 >,
709 >,
710}
711
712impl<'a> Iterator for DynamoDbKeyBlockIterator<'a> {
713 type Item = Result<&'a [u8], DynamoDbStoreInternalError>;
714
715 fn next(&mut self) -> Option<Self::Item> {
716 let result = self.iters[self.pos].next();
717 match result {
718 None => {
719 if self.pos == self.iters.len() - 1 {
720 return None;
721 }
722 self.pos += 1;
723 self.iters[self.pos]
724 .next()
725 .map(|x| extract_key(self.prefix_len, x))
726 }
727 Some(result) => Some(extract_key(self.prefix_len, result)),
728 }
729 }
730}
731
732pub struct DynamoDbKeys {
734 result_queries: QueryResponses,
735}
736
737impl KeyIterable<DynamoDbStoreInternalError> for DynamoDbKeys {
738 type Iterator<'a>
739 = DynamoDbKeyBlockIterator<'a>
740 where
741 Self: 'a;
742
743 fn iterator(&self) -> Self::Iterator<'_> {
744 let pos = 0;
745 let mut iters = Vec::new();
746 for response in &self.result_queries.responses {
747 let iter = response.items.iter().flatten();
748 iters.push(iter);
749 }
750 DynamoDbKeyBlockIterator {
751 prefix_len: self.result_queries.prefix_len,
752 pos,
753 iters,
754 }
755 }
756}
757
758pub struct DynamoDbKeyValues {
760 result_queries: QueryResponses,
761}
762
763#[doc(hidden)]
764#[expect(clippy::type_complexity)]
765pub struct DynamoDbKeyValueIterator<'a> {
766 prefix_len: usize,
767 pos: usize,
768 iters: Vec<
769 std::iter::Flatten<
770 std::option::Iter<'a, Vec<HashMap<std::string::String, AttributeValue>>>,
771 >,
772 >,
773}
774
775impl<'a> Iterator for DynamoDbKeyValueIterator<'a> {
776 type Item = Result<(&'a [u8], &'a [u8]), DynamoDbStoreInternalError>;
777
778 fn next(&mut self) -> Option<Self::Item> {
779 let result = self.iters[self.pos].next();
780 match result {
781 None => {
782 if self.pos == self.iters.len() - 1 {
783 return None;
784 }
785 self.pos += 1;
786 self.iters[self.pos]
787 .next()
788 .map(|x| extract_key_value(self.prefix_len, x))
789 }
790 Some(result) => Some(extract_key_value(self.prefix_len, result)),
791 }
792 }
793}
794
795#[doc(hidden)]
796#[expect(clippy::type_complexity)]
797pub struct DynamoDbKeyValueIteratorOwned {
798 prefix_len: usize,
799 pos: usize,
800 iters: Vec<
801 std::iter::Flatten<
802 std::option::IntoIter<Vec<HashMap<std::string::String, AttributeValue>>>,
803 >,
804 >,
805}
806
807impl Iterator for DynamoDbKeyValueIteratorOwned {
808 type Item = Result<(Vec<u8>, Vec<u8>), DynamoDbStoreInternalError>;
809
810 fn next(&mut self) -> Option<Self::Item> {
811 let result = self.iters[self.pos].next();
812 match result {
813 None => {
814 if self.pos == self.iters.len() - 1 {
815 return None;
816 }
817 self.pos += 1;
818 self.iters[self.pos]
819 .next()
820 .map(|mut x| extract_key_value_owned(self.prefix_len, &mut x))
821 }
822 Some(mut result) => Some(extract_key_value_owned(self.prefix_len, &mut result)),
823 }
824 }
825}
826
827impl KeyValueIterable<DynamoDbStoreInternalError> for DynamoDbKeyValues {
828 type Iterator<'a>
829 = DynamoDbKeyValueIterator<'a>
830 where
831 Self: 'a;
832 type IteratorOwned = DynamoDbKeyValueIteratorOwned;
833
834 fn iterator(&self) -> Self::Iterator<'_> {
835 let pos = 0;
836 let mut iters = Vec::new();
837 for response in &self.result_queries.responses {
838 let iter = response.items.iter().flatten();
839 iters.push(iter);
840 }
841 DynamoDbKeyValueIterator {
842 prefix_len: self.result_queries.prefix_len,
843 pos,
844 iters,
845 }
846 }
847
848 fn into_iterator_owned(self) -> Self::IteratorOwned {
849 let pos = 0;
850 let mut iters = Vec::new();
851 for response in self.result_queries.responses.into_iter() {
852 let iter = response.items.into_iter().flatten();
853 iters.push(iter);
854 }
855 DynamoDbKeyValueIteratorOwned {
856 prefix_len: self.result_queries.prefix_len,
857 pos,
858 iters,
859 }
860 }
861}
862
863impl WithError for DynamoDbStoreInternal {
864 type Error = DynamoDbStoreInternalError;
865}
866
867impl ReadableKeyValueStore for DynamoDbStoreInternal {
868 const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
869 type Keys = DynamoDbKeys;
870 type KeyValues = DynamoDbKeyValues;
871
872 fn max_stream_queries(&self) -> usize {
873 self.max_stream_queries
874 }
875
876 async fn read_value_bytes(
877 &self,
878 key: &[u8],
879 ) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
880 check_key_size(key)?;
881 let key_db = build_key(&self.start_key, key.to_vec());
882 self.read_value_bytes_general(key_db).await
883 }
884
885 async fn contains_key(&self, key: &[u8]) -> Result<bool, DynamoDbStoreInternalError> {
886 check_key_size(key)?;
887 let key_db = build_key(&self.start_key, key.to_vec());
888 self.contains_key_general(key_db).await
889 }
890
891 async fn contains_keys(
892 &self,
893 keys: Vec<Vec<u8>>,
894 ) -> Result<Vec<bool>, DynamoDbStoreInternalError> {
895 let mut handles = Vec::new();
896 for key in keys {
897 check_key_size(&key)?;
898 let key_db = build_key(&self.start_key, key);
899 let handle = self.contains_key_general(key_db);
900 handles.push(handle);
901 }
902 join_all(handles)
903 .await
904 .into_iter()
905 .collect::<Result<_, _>>()
906 }
907
908 async fn read_multi_values_bytes(
909 &self,
910 keys: Vec<Vec<u8>>,
911 ) -> Result<Vec<Option<Vec<u8>>>, DynamoDbStoreInternalError> {
912 let mut handles = Vec::new();
913 for key in keys {
914 check_key_size(&key)?;
915 let key_db = build_key(&self.start_key, key);
916 let handle = self.read_value_bytes_general(key_db);
917 handles.push(handle);
918 }
919 join_all(handles)
920 .await
921 .into_iter()
922 .collect::<Result<_, _>>()
923 }
924
925 async fn find_keys_by_prefix(
926 &self,
927 key_prefix: &[u8],
928 ) -> Result<DynamoDbKeys, DynamoDbStoreInternalError> {
929 let result_queries = self
930 .get_list_responses(KEY_ATTRIBUTE, &self.start_key, key_prefix)
931 .await?;
932 Ok(DynamoDbKeys { result_queries })
933 }
934
935 async fn find_key_values_by_prefix(
936 &self,
937 key_prefix: &[u8],
938 ) -> Result<DynamoDbKeyValues, DynamoDbStoreInternalError> {
939 let result_queries = self
940 .get_list_responses(KEY_VALUE_ATTRIBUTE, &self.start_key, key_prefix)
941 .await?;
942 Ok(DynamoDbKeyValues { result_queries })
943 }
944}
945
946impl DirectWritableKeyValueStore for DynamoDbStoreInternal {
947 const MAX_BATCH_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_SIZE;
948 const MAX_BATCH_TOTAL_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE;
949 const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
950
951 type Batch = SimpleUnorderedBatch;
953
954 async fn write_batch(&self, batch: Self::Batch) -> Result<(), DynamoDbStoreInternalError> {
955 if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
956 let mut builder = TransactionBuilder::new(PARTITION_KEY_ROOT_KEY);
957 builder.insert_put_request(self.start_key.clone(), vec![], self)?;
958 self.client
959 .transact_write_items()
960 .set_transact_items(Some(builder.transactions))
961 .send()
962 .boxed_sync()
963 .await?;
964 }
965 let mut builder = TransactionBuilder::new(&self.start_key);
966 for key in batch.deletions {
967 builder.insert_delete_request(key, self)?;
968 }
969 for (key, value) in batch.insertions {
970 builder.insert_put_request(key, value, self)?;
971 }
972 if !builder.transactions.is_empty() {
973 let _guard = self.acquire().await;
974 self.client
975 .transact_write_items()
976 .set_transact_items(Some(builder.transactions))
977 .send()
978 .boxed_sync()
979 .await?;
980 }
981 Ok(())
982 }
983}
984
985#[derive(Debug, Error)]
987pub enum InvalidNamespace {
988 #[error("Namespace must have at least 3 characters")]
990 TooShort,
991
992 #[error("Namespace must be at most 63 characters")]
994 TooLong,
995
996 #[error("Namespace must only contain lowercase letters, numbers, periods and hyphens")]
998 InvalidCharacter,
999}
1000
1001#[derive(Debug, Error)]
1003pub enum DynamoDbStoreInternalError {
1004 #[error(transparent)]
1006 Get(#[from] Box<SdkError<GetItemError>>),
1007
1008 #[error(transparent)]
1010 TransactWriteItem(#[from] Box<SdkError<TransactWriteItemsError>>),
1011
1012 #[error(transparent)]
1014 Query(#[from] Box<SdkError<QueryError>>),
1015
1016 #[error(transparent)]
1018 DeleteTable(#[from] Box<SdkError<DeleteTableError>>),
1019
1020 #[error(transparent)]
1022 ListTables(#[from] Box<SdkError<ListTablesError>>),
1023
1024 #[error("The transact must have length at most MAX_TRANSACT_WRITE_ITEM_SIZE")]
1026 TransactUpperLimitSize,
1027
1028 #[error("The key must be of strictly positive length")]
1030 ZeroLengthKey,
1031
1032 #[error("The key must have at most 1024 bytes")]
1034 KeyTooLong,
1035
1036 #[error("The key prefix must have at most 1024 bytes")]
1038 KeyPrefixTooLong,
1039
1040 #[error("The key_prefix must be of strictly positive length")]
1042 ZeroLengthKeyPrefix,
1043
1044 #[error(transparent)]
1046 JournalConsistencyError(#[from] JournalConsistencyError),
1047
1048 #[error("The DynamoDB value should be less than 400 KB")]
1050 ValueLengthTooLarge,
1051
1052 #[error("The stored key attribute is missing")]
1054 MissingKey,
1055
1056 #[error("Key was stored as {0}, but it was expected to be stored as a binary blob")]
1058 WrongKeyType(String),
1059
1060 #[error("The stored value attribute is missing")]
1062 MissingValue,
1063
1064 #[error("Value was stored as {0}, but it was expected to be stored as a binary blob")]
1066 WrongValueType(String),
1067
1068 #[error(transparent)]
1070 BcsError(#[from] bcs::Error),
1071
1072 #[error(transparent)]
1074 InvalidNamespace(#[from] InvalidNamespace),
1075
1076 #[error(transparent)]
1078 CreateTable(#[from] Box<SdkError<CreateTableError>>),
1079
1080 #[error(transparent)]
1082 Build(#[from] Box<BuildError>),
1083}
1084
1085impl<InnerError> From<SdkError<InnerError>> for DynamoDbStoreInternalError
1086where
1087 DynamoDbStoreInternalError: From<Box<SdkError<InnerError>>>,
1088{
1089 fn from(error: SdkError<InnerError>) -> Self {
1090 Box::new(error).into()
1091 }
1092}
1093
1094impl From<BuildError> for DynamoDbStoreInternalError {
1095 fn from(error: BuildError) -> Self {
1096 Box::new(error).into()
1097 }
1098}
1099
1100impl DynamoDbStoreInternalError {
1101 pub fn wrong_key_type(value: &AttributeValue) -> Self {
1107 DynamoDbStoreInternalError::WrongKeyType(Self::type_description_of(value))
1108 }
1109
1110 pub fn wrong_value_type(value: &AttributeValue) -> Self {
1116 DynamoDbStoreInternalError::WrongValueType(Self::type_description_of(value))
1117 }
1118
1119 fn type_description_of(value: &AttributeValue) -> String {
1120 match value {
1121 AttributeValue::B(_) => unreachable!("creating an error type for the correct type"),
1122 AttributeValue::Bool(_) => "a boolean",
1123 AttributeValue::Bs(_) => "a list of binary blobs",
1124 AttributeValue::L(_) => "a list",
1125 AttributeValue::M(_) => "a map",
1126 AttributeValue::N(_) => "a number",
1127 AttributeValue::Ns(_) => "a list of numbers",
1128 AttributeValue::Null(_) => "a null value",
1129 AttributeValue::S(_) => "a string",
1130 AttributeValue::Ss(_) => "a list of strings",
1131 _ => "an unknown type",
1132 }
1133 .to_owned()
1134 }
1135}
1136
1137impl KeyValueStoreError for DynamoDbStoreInternalError {
1138 const BACKEND: &'static str = "dynamo_db";
1139}
1140
1141#[cfg(with_testing)]
1142impl TestKeyValueStore for JournalingKeyValueStore<DynamoDbStoreInternal> {
1143 async fn new_test_config() -> Result<DynamoDbStoreInternalConfig, DynamoDbStoreInternalError> {
1144 let common_config = CommonStoreInternalConfig {
1145 max_concurrent_queries: Some(TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES),
1146 max_stream_queries: TEST_DYNAMO_DB_MAX_STREAM_QUERIES,
1147 replication_factor: 1,
1148 };
1149 Ok(DynamoDbStoreInternalConfig {
1150 use_dynamodb_local: true,
1151 common_config,
1152 })
1153 }
1154}
1155
1156#[cfg(with_metrics)]
1158pub type DynamoDbStore = MeteredStore<
1159 LruCachingStore<
1160 MeteredStore<
1161 ValueSplittingStore<MeteredStore<JournalingKeyValueStore<DynamoDbStoreInternal>>>,
1162 >,
1163 >,
1164>;
1165
1166#[cfg(not(with_metrics))]
1168pub type DynamoDbStore =
1169 LruCachingStore<ValueSplittingStore<JournalingKeyValueStore<DynamoDbStoreInternal>>>;
1170
1171pub type DynamoDbStoreError = ValueSplittingError<DynamoDbStoreInternalError>;
1173
1174pub type DynamoDbStoreConfig = LruCachingConfig<DynamoDbStoreInternalConfig>;
1176
1177impl DynamoDbStoreConfig {
1178 pub fn new(
1180 use_dynamodb_local: bool,
1181 common_config: crate::store::CommonStoreConfig,
1182 ) -> DynamoDbStoreConfig {
1183 let inner_config = DynamoDbStoreInternalConfig {
1184 use_dynamodb_local,
1185 common_config: common_config.reduced(),
1186 };
1187 DynamoDbStoreConfig {
1188 inner_config,
1189 storage_cache_config: common_config.storage_cache_config,
1190 }
1191 }
1192}
1193
1194#[cfg(test)]
1195mod tests {
1196 use bcs::serialized_size;
1197
1198 use crate::common::get_uleb128_size;
1199
1200 #[test]
1201 fn test_serialization_len() {
1202 for n in [0, 10, 127, 128, 129, 16383, 16384, 20000] {
1203 let vec = vec![0u8; n];
1204 let est_size = get_uleb128_size(n) + n;
1205 let serial_size = serialized_size(&vec).unwrap();
1206 assert_eq!(est_size, serial_size);
1207 }
1208 }
1209}