use std::{
collections::HashMap,
env,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use async_lock::{Semaphore, SemaphoreGuard};
use async_trait::async_trait;
use aws_sdk_dynamodb::{
error::SdkError,
operation::{
batch_write_item::BatchWriteItemError,
create_table::CreateTableError,
delete_table::DeleteTableError,
describe_table::DescribeTableError,
get_item::GetItemError,
list_tables::ListTablesError,
query::{QueryError, QueryOutput},
transact_write_items::TransactWriteItemsError,
},
primitives::Blob,
types::{
AttributeDefinition, AttributeValue, Delete, KeySchemaElement, KeyType,
ProvisionedThroughput, Put, ScalarAttributeType, TransactWriteItem,
},
Client,
};
use aws_smithy_types::error::operation::BuildError;
use futures::future::{join_all, FutureExt as _};
use linera_base::ensure;
use serde::{Deserialize, Serialize};
use thiserror::Error;
#[cfg(with_metrics)]
use crate::metering::MeteredStore;
#[cfg(with_testing)]
use crate::store::TestKeyValueStore;
use crate::{
batch::SimpleUnorderedBatch,
common::get_uleb128_size,
journaling::{DirectWritableKeyValueStore, JournalConsistencyError, JournalingKeyValueStore},
lru_caching::{LruCachingConfig, LruCachingStore},
store::{
AdminKeyValueStore, CommonStoreInternalConfig, KeyIterable, KeyValueIterable,
KeyValueStoreError, ReadableKeyValueStore, WithError,
},
value_splitting::{ValueSplittingError, ValueSplittingStore},
};
const LOCALSTACK_ENDPOINT: &str = "LOCALSTACK_ENDPOINT";
async fn get_base_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError> {
let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
.boxed()
.await;
Ok((&base_config).into())
}
fn get_endpoint_address() -> Option<String> {
let endpoint_address = env::var(LOCALSTACK_ENDPOINT);
match endpoint_address {
Err(_) => None,
Ok(address) => Some(address),
}
}
async fn get_localstack_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError> {
let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
.boxed()
.await;
let endpoint_address = get_endpoint_address().unwrap();
let config = aws_sdk_dynamodb::config::Builder::from(&base_config)
.endpoint_url(endpoint_address)
.build();
Ok(config)
}
const PARTITION_KEY_ROOT_KEY: &[u8] = &[1];
const PARTITION_ATTRIBUTE: &str = "item_partition";
const EMPTY_ROOT_KEY: &[u8] = &[0];
const DB_KEY: &[u8] = &[0];
const KEY_ATTRIBUTE: &str = "item_key";
const VALUE_ATTRIBUTE: &str = "item_value";
const KEY_VALUE_ATTRIBUTE: &str = "item_key, item_value";
const RAW_MAX_VALUE_SIZE: usize = 409600;
const VISIBLE_MAX_VALUE_SIZE: usize = RAW_MAX_VALUE_SIZE
- MAX_KEY_SIZE
- get_uleb128_size(RAW_MAX_VALUE_SIZE)
- get_uleb128_size(MAX_KEY_SIZE)
- 1
- 1;
const MAX_KEY_SIZE: usize = 1024;
const MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE: usize = 4000000;
#[cfg(with_testing)]
const TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES: usize = 10;
#[cfg(with_testing)]
const TEST_DYNAMO_DB_MAX_STREAM_QUERIES: usize = 10;
const MAX_TRANSACT_WRITE_ITEM_SIZE: usize = 100;
fn extend_root_key(root_key: &[u8]) -> Vec<u8> {
let mut start_key = EMPTY_ROOT_KEY.to_vec();
start_key.extend(root_key);
start_key
}
fn build_key(start_key: &[u8], key: Vec<u8>) -> HashMap<String, AttributeValue> {
[
(
PARTITION_ATTRIBUTE.to_owned(),
AttributeValue::B(Blob::new(start_key.to_vec())),
),
(KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
]
.into()
}
fn build_key_value(
start_key: &[u8],
key: Vec<u8>,
value: Vec<u8>,
) -> HashMap<String, AttributeValue> {
[
(
PARTITION_ATTRIBUTE.to_owned(),
AttributeValue::B(Blob::new(start_key.to_vec())),
),
(KEY_ATTRIBUTE.to_owned(), AttributeValue::B(Blob::new(key))),
(
VALUE_ATTRIBUTE.to_owned(),
AttributeValue::B(Blob::new(value)),
),
]
.into()
}
fn check_key_size(key: &[u8]) -> Result<(), DynamoDbStoreInternalError> {
ensure!(!key.is_empty(), DynamoDbStoreInternalError::ZeroLengthKey);
ensure!(
key.len() <= MAX_KEY_SIZE,
DynamoDbStoreInternalError::KeyTooLong
);
Ok(())
}
fn extract_key(
prefix_len: usize,
attributes: &HashMap<String, AttributeValue>,
) -> Result<&[u8], DynamoDbStoreInternalError> {
let key = attributes
.get(KEY_ATTRIBUTE)
.ok_or(DynamoDbStoreInternalError::MissingKey)?;
match key {
AttributeValue::B(blob) => Ok(&blob.as_ref()[prefix_len..]),
key => Err(DynamoDbStoreInternalError::wrong_key_type(key)),
}
}
fn extract_value(
attributes: &HashMap<String, AttributeValue>,
) -> Result<&[u8], DynamoDbStoreInternalError> {
let value = attributes
.get(VALUE_ATTRIBUTE)
.ok_or(DynamoDbStoreInternalError::MissingValue)?;
match value {
AttributeValue::B(blob) => Ok(blob.as_ref()),
value => Err(DynamoDbStoreInternalError::wrong_value_type(value)),
}
}
fn extract_value_owned(
attributes: &mut HashMap<String, AttributeValue>,
) -> Result<Vec<u8>, DynamoDbStoreInternalError> {
let value = attributes
.remove(VALUE_ATTRIBUTE)
.ok_or(DynamoDbStoreInternalError::MissingValue)?;
match value {
AttributeValue::B(blob) => Ok(blob.into_inner()),
value => Err(DynamoDbStoreInternalError::wrong_value_type(&value)),
}
}
fn extract_key_value(
prefix_len: usize,
attributes: &HashMap<String, AttributeValue>,
) -> Result<(&[u8], &[u8]), DynamoDbStoreInternalError> {
let key = extract_key(prefix_len, attributes)?;
let value = extract_value(attributes)?;
Ok((key, value))
}
fn extract_key_value_owned(
prefix_len: usize,
attributes: &mut HashMap<String, AttributeValue>,
) -> Result<(Vec<u8>, Vec<u8>), DynamoDbStoreInternalError> {
let key = extract_key(prefix_len, attributes)?.to_vec();
let value = extract_value_owned(attributes)?;
Ok((key, value))
}
struct TransactionBuilder {
start_key: Vec<u8>,
transactions: Vec<TransactWriteItem>,
}
impl TransactionBuilder {
fn new(start_key: &[u8]) -> Self {
Self {
start_key: start_key.to_vec(),
transactions: Vec::new(),
}
}
fn insert_delete_request(
&mut self,
key: Vec<u8>,
store: &DynamoDbStoreInternal,
) -> Result<(), DynamoDbStoreInternalError> {
let transaction = store.build_delete_transaction(&self.start_key, key)?;
self.transactions.push(transaction);
Ok(())
}
fn insert_put_request(
&mut self,
key: Vec<u8>,
value: Vec<u8>,
store: &DynamoDbStoreInternal,
) -> Result<(), DynamoDbStoreInternalError> {
let transaction = store.build_put_transaction(&self.start_key, key, value)?;
self.transactions.push(transaction);
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct DynamoDbStoreInternal {
client: Client,
namespace: String,
semaphore: Option<Arc<Semaphore>>,
max_stream_queries: usize,
start_key: Vec<u8>,
root_key_written: Arc<AtomicBool>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DynamoDbStoreInternalConfig {
use_localstack: bool,
common_config: CommonStoreInternalConfig,
}
impl DynamoDbStoreInternalConfig {
async fn client(&self) -> Result<Client, DynamoDbStoreInternalError> {
let config = if self.use_localstack {
get_localstack_config().await?
} else {
get_base_config().await?
};
Ok(Client::from_conf(config))
}
}
impl AdminKeyValueStore for DynamoDbStoreInternal {
type Config = DynamoDbStoreInternalConfig;
fn get_name() -> String {
"dynamodb internal".to_string()
}
async fn connect(
config: &Self::Config,
namespace: &str,
root_key: &[u8],
) -> Result<Self, DynamoDbStoreInternalError> {
Self::check_namespace(namespace)?;
let client = config.client().await?;
let semaphore = config
.common_config
.max_concurrent_queries
.map(|n| Arc::new(Semaphore::new(n)));
let max_stream_queries = config.common_config.max_stream_queries;
let namespace = namespace.to_string();
let start_key = extend_root_key(root_key);
let store = Self {
client,
namespace,
semaphore,
max_stream_queries,
start_key,
root_key_written: Arc::new(AtomicBool::new(false)),
};
Ok(store)
}
fn clone_with_root_key(&self, root_key: &[u8]) -> Result<Self, DynamoDbStoreInternalError> {
let client = self.client.clone();
let namespace = self.namespace.clone();
let semaphore = self.semaphore.clone();
let max_stream_queries = self.max_stream_queries;
let start_key = extend_root_key(root_key);
Ok(Self {
client,
namespace,
semaphore,
max_stream_queries,
start_key,
root_key_written: Arc::new(AtomicBool::new(false)),
})
}
async fn list_all(config: &Self::Config) -> Result<Vec<String>, DynamoDbStoreInternalError> {
let client = config.client().await?;
let mut namespaces = Vec::new();
let mut start_table = None;
loop {
let response = client
.list_tables()
.set_exclusive_start_table_name(start_table)
.send()
.boxed()
.await?;
if let Some(namespaces_blk) = response.table_names {
namespaces.extend(namespaces_blk);
}
if response.last_evaluated_table_name.is_none() {
break;
} else {
start_table = response.last_evaluated_table_name;
}
}
Ok(namespaces)
}
async fn list_root_keys(
config: &Self::Config,
namespace: &str,
) -> Result<Vec<Vec<u8>>, DynamoDbStoreInternalError> {
let mut store = Self::connect(config, namespace, &[]).await?;
store.start_key = PARTITION_KEY_ROOT_KEY.to_vec();
let keys = store.find_keys_by_prefix(EMPTY_ROOT_KEY).await?;
let mut root_keys = Vec::new();
for key in keys.iterator() {
let key = key?;
root_keys.push(key.to_vec());
}
Ok(root_keys)
}
async fn delete_all(config: &Self::Config) -> Result<(), DynamoDbStoreInternalError> {
let client = config.client().await?;
let tables = Self::list_all(config).await?;
for table in tables {
client
.delete_table()
.table_name(&table)
.send()
.boxed()
.await?;
}
Ok(())
}
async fn exists(
config: &Self::Config,
namespace: &str,
) -> Result<bool, DynamoDbStoreInternalError> {
Self::check_namespace(namespace)?;
let client = config.client().await?;
let key_db = build_key(EMPTY_ROOT_KEY, DB_KEY.to_vec());
let response = client
.get_item()
.table_name(namespace)
.set_key(Some(key_db))
.send()
.boxed()
.await;
let Err(error) = response else {
return Ok(true);
};
let test = match &error {
SdkError::ServiceError(error) => match error.err() {
GetItemError::ResourceNotFoundException(error) => {
error.message
== Some("Cannot do operations on a non-existent table".to_string())
}
_ => false,
},
_ => false,
};
if test {
Ok(false)
} else {
Err(error.into())
}
}
async fn create(
config: &Self::Config,
namespace: &str,
) -> Result<(), DynamoDbStoreInternalError> {
Self::check_namespace(namespace)?;
let client = config.client().await?;
client
.create_table()
.table_name(namespace)
.attribute_definitions(
AttributeDefinition::builder()
.attribute_name(PARTITION_ATTRIBUTE)
.attribute_type(ScalarAttributeType::B)
.build()?,
)
.attribute_definitions(
AttributeDefinition::builder()
.attribute_name(KEY_ATTRIBUTE)
.attribute_type(ScalarAttributeType::B)
.build()?,
)
.key_schema(
KeySchemaElement::builder()
.attribute_name(PARTITION_ATTRIBUTE)
.key_type(KeyType::Hash)
.build()?,
)
.key_schema(
KeySchemaElement::builder()
.attribute_name(KEY_ATTRIBUTE)
.key_type(KeyType::Range)
.build()?,
)
.provisioned_throughput(
ProvisionedThroughput::builder()
.read_capacity_units(10)
.write_capacity_units(10)
.build()?,
)
.send()
.boxed()
.await?;
Ok(())
}
async fn delete(
config: &Self::Config,
namespace: &str,
) -> Result<(), DynamoDbStoreInternalError> {
Self::check_namespace(namespace)?;
let client = config.client().await?;
client
.delete_table()
.table_name(namespace)
.send()
.boxed()
.await?;
Ok(())
}
}
impl DynamoDbStoreInternal {
fn check_namespace(namespace: &str) -> Result<(), InvalidNamespace> {
if namespace.len() < 3 {
return Err(InvalidNamespace::TooShort);
}
if namespace.len() > 255 {
return Err(InvalidNamespace::TooLong);
}
if !namespace.chars().all(|character| {
character.is_ascii_alphanumeric()
|| character == '.'
|| character == '-'
|| character == '_'
}) {
return Err(InvalidNamespace::InvalidCharacter);
}
Ok(())
}
fn build_delete_transaction(
&self,
start_key: &[u8],
key: Vec<u8>,
) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
check_key_size(&key)?;
let request = Delete::builder()
.table_name(&self.namespace)
.set_key(Some(build_key(start_key, key)))
.build()?;
Ok(TransactWriteItem::builder().delete(request).build())
}
fn build_put_transaction(
&self,
start_key: &[u8],
key: Vec<u8>,
value: Vec<u8>,
) -> Result<TransactWriteItem, DynamoDbStoreInternalError> {
check_key_size(&key)?;
ensure!(
value.len() <= RAW_MAX_VALUE_SIZE,
DynamoDbStoreInternalError::ValueLengthTooLarge
);
let request = Put::builder()
.table_name(&self.namespace)
.set_item(Some(build_key_value(start_key, key, value)))
.build()?;
Ok(TransactWriteItem::builder().put(request).build())
}
async fn acquire(&self) -> Option<SemaphoreGuard<'_>> {
match &self.semaphore {
None => None,
Some(count) => Some(count.acquire().await),
}
}
async fn get_query_output(
&self,
attribute_str: &str,
start_key: &[u8],
key_prefix: &[u8],
start_key_map: Option<HashMap<String, AttributeValue>>,
) -> Result<QueryOutput, DynamoDbStoreInternalError> {
let _guard = self.acquire().await;
let start_key = start_key.to_vec();
let response = self
.client
.query()
.table_name(&self.namespace)
.projection_expression(attribute_str)
.key_condition_expression(format!(
"{PARTITION_ATTRIBUTE} = :partition and begins_with({KEY_ATTRIBUTE}, :prefix)"
))
.expression_attribute_values(":partition", AttributeValue::B(Blob::new(start_key)))
.expression_attribute_values(":prefix", AttributeValue::B(Blob::new(key_prefix)))
.set_exclusive_start_key(start_key_map)
.send()
.boxed()
.await?;
Ok(response)
}
async fn read_value_bytes_general(
&self,
key_db: HashMap<String, AttributeValue>,
) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
let _guard = self.acquire().await;
let response = self
.client
.get_item()
.table_name(&self.namespace)
.set_key(Some(key_db))
.send()
.boxed()
.await?;
match response.item {
Some(mut item) => {
let value = extract_value_owned(&mut item)?;
Ok(Some(value))
}
None => Ok(None),
}
}
async fn contains_key_general(
&self,
key_db: HashMap<String, AttributeValue>,
) -> Result<bool, DynamoDbStoreInternalError> {
let _guard = self.acquire().await;
let response = self
.client
.get_item()
.table_name(&self.namespace)
.set_key(Some(key_db))
.projection_expression(PARTITION_ATTRIBUTE)
.send()
.boxed()
.await?;
Ok(response.item.is_some())
}
async fn get_list_responses(
&self,
attribute: &str,
start_key: &[u8],
key_prefix: &[u8],
) -> Result<QueryResponses, DynamoDbStoreInternalError> {
check_key_size(key_prefix)?;
let mut responses = Vec::new();
let mut start_key_map = None;
loop {
let response = self
.get_query_output(attribute, start_key, key_prefix, start_key_map)
.await?;
let last_evaluated = response.last_evaluated_key.clone();
responses.push(response);
match last_evaluated {
None => {
break;
}
Some(value) => {
start_key_map = Some(value);
}
}
}
Ok(QueryResponses {
prefix_len: key_prefix.len(),
responses,
})
}
}
struct QueryResponses {
prefix_len: usize,
responses: Vec<QueryOutput>,
}
#[doc(hidden)]
#[expect(clippy::type_complexity)]
pub struct DynamoDbKeyBlockIterator<'a> {
prefix_len: usize,
pos: usize,
iters: Vec<
std::iter::Flatten<
std::option::Iter<'a, Vec<HashMap<std::string::String, AttributeValue>>>,
>,
>,
}
impl<'a> Iterator for DynamoDbKeyBlockIterator<'a> {
type Item = Result<&'a [u8], DynamoDbStoreInternalError>;
fn next(&mut self) -> Option<Self::Item> {
let result = self.iters[self.pos].next();
match result {
None => {
if self.pos == self.iters.len() - 1 {
return None;
}
self.pos += 1;
self.iters[self.pos]
.next()
.map(|x| extract_key(self.prefix_len, x))
}
Some(result) => Some(extract_key(self.prefix_len, result)),
}
}
}
pub struct DynamoDbKeys {
result_queries: QueryResponses,
}
impl KeyIterable<DynamoDbStoreInternalError> for DynamoDbKeys {
type Iterator<'a>
= DynamoDbKeyBlockIterator<'a>
where
Self: 'a;
fn iterator(&self) -> Self::Iterator<'_> {
let pos = 0;
let mut iters = Vec::new();
for response in &self.result_queries.responses {
let iter = response.items.iter().flatten();
iters.push(iter);
}
DynamoDbKeyBlockIterator {
prefix_len: self.result_queries.prefix_len,
pos,
iters,
}
}
}
pub struct DynamoDbKeyValues {
result_queries: QueryResponses,
}
#[doc(hidden)]
#[expect(clippy::type_complexity)]
pub struct DynamoDbKeyValueIterator<'a> {
prefix_len: usize,
pos: usize,
iters: Vec<
std::iter::Flatten<
std::option::Iter<'a, Vec<HashMap<std::string::String, AttributeValue>>>,
>,
>,
}
impl<'a> Iterator for DynamoDbKeyValueIterator<'a> {
type Item = Result<(&'a [u8], &'a [u8]), DynamoDbStoreInternalError>;
fn next(&mut self) -> Option<Self::Item> {
let result = self.iters[self.pos].next();
match result {
None => {
if self.pos == self.iters.len() - 1 {
return None;
}
self.pos += 1;
self.iters[self.pos]
.next()
.map(|x| extract_key_value(self.prefix_len, x))
}
Some(result) => Some(extract_key_value(self.prefix_len, result)),
}
}
}
#[doc(hidden)]
#[expect(clippy::type_complexity)]
pub struct DynamoDbKeyValueIteratorOwned {
prefix_len: usize,
pos: usize,
iters: Vec<
std::iter::Flatten<
std::option::IntoIter<Vec<HashMap<std::string::String, AttributeValue>>>,
>,
>,
}
impl Iterator for DynamoDbKeyValueIteratorOwned {
type Item = Result<(Vec<u8>, Vec<u8>), DynamoDbStoreInternalError>;
fn next(&mut self) -> Option<Self::Item> {
let result = self.iters[self.pos].next();
match result {
None => {
if self.pos == self.iters.len() - 1 {
return None;
}
self.pos += 1;
self.iters[self.pos]
.next()
.map(|mut x| extract_key_value_owned(self.prefix_len, &mut x))
}
Some(mut result) => Some(extract_key_value_owned(self.prefix_len, &mut result)),
}
}
}
impl KeyValueIterable<DynamoDbStoreInternalError> for DynamoDbKeyValues {
type Iterator<'a>
= DynamoDbKeyValueIterator<'a>
where
Self: 'a;
type IteratorOwned = DynamoDbKeyValueIteratorOwned;
fn iterator(&self) -> Self::Iterator<'_> {
let pos = 0;
let mut iters = Vec::new();
for response in &self.result_queries.responses {
let iter = response.items.iter().flatten();
iters.push(iter);
}
DynamoDbKeyValueIterator {
prefix_len: self.result_queries.prefix_len,
pos,
iters,
}
}
fn into_iterator_owned(self) -> Self::IteratorOwned {
let pos = 0;
let mut iters = Vec::new();
for response in self.result_queries.responses.into_iter() {
let iter = response.items.into_iter().flatten();
iters.push(iter);
}
DynamoDbKeyValueIteratorOwned {
prefix_len: self.result_queries.prefix_len,
pos,
iters,
}
}
}
impl WithError for DynamoDbStoreInternal {
type Error = DynamoDbStoreInternalError;
}
impl ReadableKeyValueStore for DynamoDbStoreInternal {
const MAX_KEY_SIZE: usize = MAX_KEY_SIZE;
type Keys = DynamoDbKeys;
type KeyValues = DynamoDbKeyValues;
fn max_stream_queries(&self) -> usize {
self.max_stream_queries
}
async fn read_value_bytes(
&self,
key: &[u8],
) -> Result<Option<Vec<u8>>, DynamoDbStoreInternalError> {
check_key_size(key)?;
let key_db = build_key(&self.start_key, key.to_vec());
self.read_value_bytes_general(key_db).await
}
async fn contains_key(&self, key: &[u8]) -> Result<bool, DynamoDbStoreInternalError> {
check_key_size(key)?;
let key_db = build_key(&self.start_key, key.to_vec());
self.contains_key_general(key_db).await
}
async fn contains_keys(
&self,
keys: Vec<Vec<u8>>,
) -> Result<Vec<bool>, DynamoDbStoreInternalError> {
let mut handles = Vec::new();
for key in keys {
check_key_size(&key)?;
let key_db = build_key(&self.start_key, key);
let handle = self.contains_key_general(key_db);
handles.push(handle);
}
join_all(handles)
.await
.into_iter()
.collect::<Result<_, _>>()
}
async fn read_multi_values_bytes(
&self,
keys: Vec<Vec<u8>>,
) -> Result<Vec<Option<Vec<u8>>>, DynamoDbStoreInternalError> {
let mut handles = Vec::new();
for key in keys {
check_key_size(&key)?;
let key_db = build_key(&self.start_key, key);
let handle = self.read_value_bytes_general(key_db);
handles.push(handle);
}
join_all(handles)
.await
.into_iter()
.collect::<Result<_, _>>()
}
async fn find_keys_by_prefix(
&self,
key_prefix: &[u8],
) -> Result<DynamoDbKeys, DynamoDbStoreInternalError> {
let result_queries = self
.get_list_responses(KEY_ATTRIBUTE, &self.start_key, key_prefix)
.await?;
Ok(DynamoDbKeys { result_queries })
}
async fn find_key_values_by_prefix(
&self,
key_prefix: &[u8],
) -> Result<DynamoDbKeyValues, DynamoDbStoreInternalError> {
let result_queries = self
.get_list_responses(KEY_VALUE_ATTRIBUTE, &self.start_key, key_prefix)
.await?;
Ok(DynamoDbKeyValues { result_queries })
}
}
#[async_trait]
impl DirectWritableKeyValueStore for DynamoDbStoreInternal {
const MAX_BATCH_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_SIZE;
const MAX_BATCH_TOTAL_SIZE: usize = MAX_TRANSACT_WRITE_ITEM_TOTAL_SIZE;
const MAX_VALUE_SIZE: usize = VISIBLE_MAX_VALUE_SIZE;
type Batch = SimpleUnorderedBatch;
async fn write_batch(&self, batch: Self::Batch) -> Result<(), DynamoDbStoreInternalError> {
if !self.root_key_written.fetch_or(true, Ordering::SeqCst) {
let mut builder = TransactionBuilder::new(PARTITION_KEY_ROOT_KEY);
builder.insert_put_request(self.start_key.clone(), vec![], self)?;
self.client
.transact_write_items()
.set_transact_items(Some(builder.transactions))
.send()
.boxed()
.await?;
}
let mut builder = TransactionBuilder::new(&self.start_key);
for key in batch.deletions {
builder.insert_delete_request(key, self)?;
}
for (key, value) in batch.insertions {
builder.insert_put_request(key, value, self)?;
}
if !builder.transactions.is_empty() {
let _guard = self.acquire().await;
self.client
.transact_write_items()
.set_transact_items(Some(builder.transactions))
.send()
.boxed()
.await?;
}
Ok(())
}
}
#[derive(Debug, Error)]
pub enum InvalidNamespace {
#[error("Namespace must have at least 3 characters")]
TooShort,
#[error("Namespace must be at most 63 characters")]
TooLong,
#[error("Namespace must only contain lowercase letters, numbers, periods and hyphens")]
InvalidCharacter,
}
#[derive(Debug, Error)]
pub enum DynamoDbStoreInternalError {
#[error(transparent)]
Get(#[from] Box<SdkError<GetItemError>>),
#[error(transparent)]
BatchWriteItem(#[from] Box<SdkError<BatchWriteItemError>>),
#[error(transparent)]
TransactWriteItem(#[from] Box<SdkError<TransactWriteItemsError>>),
#[error(transparent)]
Query(#[from] Box<SdkError<QueryError>>),
#[error(transparent)]
DeleteTable(#[from] Box<SdkError<DeleteTableError>>),
#[error(transparent)]
ListTables(#[from] Box<SdkError<ListTablesError>>),
#[error(transparent)]
DescribeTables(#[from] Box<SdkError<DescribeTableError>>),
#[error("The transact must have length at most MAX_TRANSACT_WRITE_ITEM_SIZE")]
TransactUpperLimitSize,
#[error("The key must be of strictly positive length")]
ZeroLengthKey,
#[error("The key must have at most 1024 bytes")]
KeyTooLong,
#[error("The key prefix must have at most 1024 bytes")]
KeyPrefixTooLong,
#[error("The key_prefix must be of strictly positive length")]
ZeroLengthKeyPrefix,
#[error("The DynamoDB database recovery failed")]
DatabaseRecoveryFailed,
#[error(transparent)]
JournalConsistencyError(#[from] JournalConsistencyError),
#[error("The DynamoDB value should be less than 400 KB")]
ValueLengthTooLarge,
#[error("The stored key attribute is missing")]
MissingKey,
#[error("Key was stored as {0}, but it was expected to be stored as a binary blob")]
WrongKeyType(String),
#[error("The stored value attribute is missing")]
MissingValue,
#[error("Value was stored as {0}, but it was expected to be stored as a binary blob")]
WrongValueType(String),
#[error(transparent)]
BcsError(#[from] bcs::Error),
#[error(transparent)]
InvalidNamespace(#[from] InvalidNamespace),
#[error(transparent)]
CreateTable(#[from] SdkError<CreateTableError>),
#[error(transparent)]
Build(#[from] Box<BuildError>),
}
impl<InnerError> From<SdkError<InnerError>> for DynamoDbStoreInternalError
where
DynamoDbStoreInternalError: From<Box<SdkError<InnerError>>>,
{
fn from(error: SdkError<InnerError>) -> Self {
Box::new(error).into()
}
}
impl From<BuildError> for DynamoDbStoreInternalError {
fn from(error: BuildError) -> Self {
Box::new(error).into()
}
}
impl DynamoDbStoreInternalError {
pub fn wrong_key_type(value: &AttributeValue) -> Self {
DynamoDbStoreInternalError::WrongKeyType(Self::type_description_of(value))
}
pub fn wrong_value_type(value: &AttributeValue) -> Self {
DynamoDbStoreInternalError::WrongValueType(Self::type_description_of(value))
}
fn type_description_of(value: &AttributeValue) -> String {
match value {
AttributeValue::B(_) => unreachable!("creating an error type for the correct type"),
AttributeValue::Bool(_) => "a boolean",
AttributeValue::Bs(_) => "a list of binary blobs",
AttributeValue::L(_) => "a list",
AttributeValue::M(_) => "a map",
AttributeValue::N(_) => "a number",
AttributeValue::Ns(_) => "a list of numbers",
AttributeValue::Null(_) => "a null value",
AttributeValue::S(_) => "a string",
AttributeValue::Ss(_) => "a list of strings",
_ => "an unknown type",
}
.to_owned()
}
}
impl KeyValueStoreError for DynamoDbStoreInternalError {
const BACKEND: &'static str = "dynamo_db";
}
#[cfg(with_testing)]
impl TestKeyValueStore for JournalingKeyValueStore<DynamoDbStoreInternal> {
async fn new_test_config() -> Result<DynamoDbStoreInternalConfig, DynamoDbStoreInternalError> {
let common_config = CommonStoreInternalConfig {
max_concurrent_queries: Some(TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES),
max_stream_queries: TEST_DYNAMO_DB_MAX_STREAM_QUERIES,
};
Ok(DynamoDbStoreInternalConfig {
use_localstack: true,
common_config,
})
}
}
#[cfg(with_metrics)]
pub type DynamoDbStore = MeteredStore<
LruCachingStore<
MeteredStore<
ValueSplittingStore<MeteredStore<JournalingKeyValueStore<DynamoDbStoreInternal>>>,
>,
>,
>;
#[cfg(not(with_metrics))]
pub type DynamoDbStore =
LruCachingStore<ValueSplittingStore<JournalingKeyValueStore<DynamoDbStoreInternal>>>;
pub type DynamoDbStoreError = ValueSplittingError<DynamoDbStoreInternalError>;
pub type DynamoDbStoreConfig = LruCachingConfig<DynamoDbStoreInternalConfig>;
impl DynamoDbStoreConfig {
pub fn new(
use_localstack: bool,
common_config: crate::store::CommonStoreConfig,
) -> DynamoDbStoreConfig {
let inner_config = DynamoDbStoreInternalConfig {
use_localstack,
common_config: common_config.reduced(),
};
DynamoDbStoreConfig {
inner_config,
cache_size: common_config.cache_size,
}
}
}
#[cfg(test)]
mod tests {
use bcs::serialized_size;
use crate::common::get_uleb128_size;
#[test]
fn test_serialization_len() {
for n in [0, 10, 127, 128, 129, 16383, 16384, 20000] {
let vec = vec![0u8; n];
let est_size = get_uleb128_size(n) + n;
let serial_size = serialized_size(&vec).unwrap();
assert_eq!(est_size, serial_size);
}
}
}