1use std::{fmt, path::PathBuf, str::FromStr};
5
6use anyhow::{anyhow, bail};
7use async_trait::async_trait;
8use linera_client::config::GenesisConfig;
9use linera_execution::WasmRuntime;
10use linera_storage::{DbStorage, Storage, DEFAULT_NAMESPACE};
11#[cfg(feature = "storage-service")]
12use linera_storage_service::{
13 client::StorageServiceStore,
14 common::{StorageServiceStoreConfig, StorageServiceStoreInternalConfig},
15};
16#[cfg(feature = "dynamodb")]
17use linera_views::dynamo_db::{DynamoDbStore, DynamoDbStoreConfig, DynamoDbStoreInternalConfig};
18#[cfg(feature = "rocksdb")]
19use linera_views::rocks_db::{
20 PathWithGuard, RocksDbSpawnMode, RocksDbStore, RocksDbStoreConfig, RocksDbStoreInternalConfig,
21};
22use linera_views::{
23 lru_caching::StorageCacheConfig,
24 memory::{MemoryStore, MemoryStoreConfig},
25 store::KeyValueStore,
26};
27use serde::{Deserialize, Serialize};
28use tracing::error;
29#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
30use {
31 linera_storage::ChainStatesFirstAssignment,
32 linera_views::backends::dual::{DualStore, DualStoreConfig},
33 std::path::Path,
34};
35#[cfg(feature = "scylladb")]
36use {
37 linera_views::scylla_db::{ScyllaDbStore, ScyllaDbStoreConfig, ScyllaDbStoreInternalConfig},
38 std::num::NonZeroU16,
39 tracing::debug,
40};
41
42#[derive(Clone, Debug, clap::Parser)]
43pub struct CommonStorageOptions {
44 #[arg(long, global = true)]
46 pub storage_max_concurrent_queries: Option<usize>,
47
48 #[arg(long, default_value = "10", global = true)]
50 pub storage_max_stream_queries: usize,
51
52 #[arg(long, default_value = "10000000", global = true)]
54 pub storage_max_cache_size: usize,
55
56 #[arg(long, default_value = "1000000", global = true)]
58 pub storage_max_entry_size: usize,
59
60 #[arg(long, default_value = "1000", global = true)]
62 pub storage_max_cache_entries: usize,
63
64 #[arg(long, default_value = "1", global = true)]
66 pub storage_replication_factor: u32,
67}
68
69impl CommonStorageOptions {
70 pub fn storage_cache_config(&self) -> StorageCacheConfig {
71 StorageCacheConfig {
72 max_cache_size: self.storage_max_cache_size,
73 max_entry_size: self.storage_max_entry_size,
74 max_cache_entries: self.storage_max_cache_entries,
75 }
76 }
77}
78
79#[derive(Clone, Debug, Deserialize, Serialize)]
81pub enum StoreConfig {
82 Memory {
84 config: MemoryStoreConfig,
85 namespace: String,
86 genesis_path: PathBuf,
87 },
88 #[cfg(feature = "storage-service")]
90 StorageService {
91 config: StorageServiceStoreConfig,
92 namespace: String,
93 },
94 #[cfg(feature = "rocksdb")]
96 RocksDb {
97 config: RocksDbStoreConfig,
98 namespace: String,
99 },
100 #[cfg(feature = "dynamodb")]
102 DynamoDb {
103 config: DynamoDbStoreConfig,
104 namespace: String,
105 },
106 #[cfg(feature = "scylladb")]
108 ScyllaDb {
109 config: ScyllaDbStoreConfig,
110 namespace: String,
111 },
112 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
113 DualRocksDbScyllaDb {
114 config: DualStoreConfig<RocksDbStoreConfig, ScyllaDbStoreConfig>,
115 namespace: String,
116 },
117}
118
119#[derive(Clone, Debug)]
121#[cfg_attr(any(test), derive(Eq, PartialEq))]
122pub enum InnerStorageConfig {
123 Memory {
125 genesis_path: PathBuf,
128 },
129 #[cfg(feature = "storage-service")]
131 Service {
132 endpoint: String,
134 },
135 #[cfg(feature = "rocksdb")]
137 RocksDb {
138 path: PathBuf,
140 spawn_mode: RocksDbSpawnMode,
142 },
143 #[cfg(feature = "dynamodb")]
145 DynamoDb {
146 use_dynamodb_local: bool,
148 },
149 #[cfg(feature = "scylladb")]
151 ScyllaDb {
152 uri: String,
154 },
155 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
156 DualRocksDbScyllaDb {
157 path_with_guard: PathWithGuard,
159 spawn_mode: RocksDbSpawnMode,
161 uri: String,
163 },
164}
165
166#[derive(Clone, Debug)]
168#[cfg_attr(any(test), derive(Eq, PartialEq))]
169pub struct StorageConfig {
170 pub inner_storage_config: InnerStorageConfig,
172 pub namespace: String,
174}
175
176const MEMORY: &str = "memory:";
177#[cfg(feature = "storage-service")]
178const STORAGE_SERVICE: &str = "service:";
179#[cfg(feature = "rocksdb")]
180const ROCKS_DB: &str = "rocksdb:";
181#[cfg(feature = "dynamodb")]
182const DYNAMO_DB: &str = "dynamodb:";
183#[cfg(feature = "scylladb")]
184const SCYLLA_DB: &str = "scylladb:";
185#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
186const DUAL_ROCKS_DB_SCYLLA_DB: &str = "dualrocksdbscylladb:";
187
188impl FromStr for StorageConfig {
189 type Err = anyhow::Error;
190
191 fn from_str(input: &str) -> Result<Self, Self::Err> {
192 if let Some(s) = input.strip_prefix(MEMORY) {
193 let parts = s.split(':').collect::<Vec<_>>();
194 if parts.len() == 1 {
195 let genesis_path = parts[0].to_string().into();
196 let namespace = DEFAULT_NAMESPACE.to_string();
197 let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
198 return Ok(StorageConfig {
199 inner_storage_config,
200 namespace,
201 });
202 }
203 if parts.len() != 2 {
204 bail!("We should have one genesis config path and one optional namespace");
205 }
206 let genesis_path = parts[0].to_string().into();
207 let namespace = parts[1].to_string();
208 let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
209 return Ok(StorageConfig {
210 inner_storage_config,
211 namespace,
212 });
213 }
214 #[cfg(feature = "storage-service")]
215 if let Some(s) = input.strip_prefix(STORAGE_SERVICE) {
216 if s.is_empty() {
217 bail!(
218 "For Storage service, the formatting has to be service:endpoint:namespace,\
219example service:tcp:127.0.0.1:7878:table_do_my_test"
220 );
221 }
222 let parts = s.split(':').collect::<Vec<_>>();
223 if parts.len() != 4 {
224 bail!("We should have one endpoint and one namespace");
225 }
226 let protocol = parts[0];
227 if protocol != "tcp" {
228 bail!("Only allowed protocol is tcp");
229 }
230 let endpoint = parts[1];
231 let port = parts[2];
232 let mut endpoint = endpoint.to_string();
233 endpoint.push(':');
234 endpoint.push_str(port);
235 let endpoint = endpoint.to_string();
236 let namespace = parts[3].to_string();
237 let inner_storage_config = InnerStorageConfig::Service { endpoint };
238 return Ok(StorageConfig {
239 inner_storage_config,
240 namespace,
241 });
242 }
243 #[cfg(feature = "rocksdb")]
244 if let Some(s) = input.strip_prefix(ROCKS_DB) {
245 if s.is_empty() {
246 bail!(
247 "For RocksDB, the formatting has to be rocksdb:directory or rocksdb:directory:spawn_mode:namespace");
248 }
249 let parts = s.split(':').collect::<Vec<_>>();
250 if parts.len() == 1 {
251 let path = parts[0].to_string().into();
252 let namespace = DEFAULT_NAMESPACE.to_string();
253 let spawn_mode = RocksDbSpawnMode::SpawnBlocking;
254 let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
255 return Ok(StorageConfig {
256 inner_storage_config,
257 namespace,
258 });
259 }
260 if parts.len() == 2 || parts.len() == 3 {
261 let path = parts[0].to_string().into();
262 let spawn_mode = match parts[1] {
263 "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
264 "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
265 "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
266 _ => Err(anyhow!("Failed to parse {} as a spawn_mode", parts[1])),
267 }?;
268 let namespace = if parts.len() == 2 {
269 DEFAULT_NAMESPACE.to_string()
270 } else {
271 parts[2].to_string()
272 };
273 let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
274 return Ok(StorageConfig {
275 inner_storage_config,
276 namespace,
277 });
278 }
279 bail!("We should have one, two or three parts");
280 }
281 #[cfg(feature = "dynamodb")]
282 if let Some(s) = input.strip_prefix(DYNAMO_DB) {
283 let mut parts = s.splitn(2, ':');
284 let namespace = parts
285 .next()
286 .ok_or_else(|| anyhow!("Missing DynamoDB table name, e.g. {DYNAMO_DB}TABLE"))?
287 .to_string();
288 let use_dynamodb_local = match parts.next() {
289 None | Some("env") => false,
290 Some("dynamodb_local") => true,
291 Some(unknown) => {
292 bail!(
293 "Invalid DynamoDB endpoint {unknown:?}. \
294 Expected {DYNAMO_DB}TABLE:[env|dynamodb_local]"
295 );
296 }
297 };
298 let inner_storage_config = InnerStorageConfig::DynamoDb { use_dynamodb_local };
299 return Ok(StorageConfig {
300 inner_storage_config,
301 namespace,
302 });
303 }
304 #[cfg(feature = "scylladb")]
305 if let Some(s) = input.strip_prefix(SCYLLA_DB) {
306 let mut uri: Option<String> = None;
307 let mut namespace: Option<String> = None;
308 let parse_error: &'static str = "Correct format is tcp:db_hostname:port.";
309 if !s.is_empty() {
310 let mut parts = s.split(':');
311 while let Some(part) = parts.next() {
312 match part {
313 "tcp" => {
314 let address = parts.next().ok_or_else(|| {
315 anyhow!("Failed to find address for {s}. {parse_error}")
316 })?;
317 let port_str = parts.next().ok_or_else(|| {
318 anyhow!("Failed to find port for {s}. {parse_error}")
319 })?;
320 let port = NonZeroU16::from_str(port_str).map_err(|_| {
321 anyhow!(
322 "Failed to find parse port {port_str} for {s}. {parse_error}",
323 )
324 })?;
325 if uri.is_some() {
326 bail!("The uri has already been assigned");
327 }
328 uri = Some(format!("{}:{}", &address, port));
329 }
330 _ if part.starts_with("table") => {
331 if namespace.is_some() {
332 bail!("The namespace has already been assigned");
333 }
334 namespace = Some(part.to_string());
335 }
336 _ => {
337 bail!("the entry \"{part}\" is not matching");
338 }
339 }
340 }
341 }
342 let uri = uri.unwrap_or("localhost:9042".to_string());
343 let namespace = namespace.unwrap_or(DEFAULT_NAMESPACE.to_string());
344 let inner_storage_config = InnerStorageConfig::ScyllaDb { uri };
345 debug!("ScyllaDB connection info: {:?}", inner_storage_config);
346 return Ok(StorageConfig {
347 inner_storage_config,
348 namespace,
349 });
350 }
351 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
352 if let Some(s) = input.strip_prefix(DUAL_ROCKS_DB_SCYLLA_DB) {
353 let parts = s.split(':').collect::<Vec<_>>();
354 if parts.len() != 5 && parts.len() != 6 {
355 bail!(
356 "For DualRocksDbScyllaDb, the formatting has to be dualrocksdbscylladb:directory:mode:tcp:hostname:port:namespace"
357 );
358 }
359 let path = Path::new(parts[0]);
360 let path = path.to_path_buf();
361 let path_with_guard = PathWithGuard::new(path);
362 let spawn_mode = match parts[1] {
363 "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
364 "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
365 "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
366 _ => Err(anyhow!("Failed to parse {} as a spawn_mode", parts[1])),
367 }?;
368 let protocol = parts[2];
369 if protocol != "tcp" {
370 bail!("The only allowed protocol is tcp");
371 }
372 let address = parts[3];
373 let port_str = parts[4];
374 let port = NonZeroU16::from_str(port_str)
375 .map_err(|_| anyhow!("Failed to find parse port {port_str} for {s}"))?;
376 let uri = format!("{}:{}", &address, port);
377 let inner_storage_config = InnerStorageConfig::DualRocksDbScyllaDb {
378 path_with_guard,
379 spawn_mode,
380 uri,
381 };
382 let namespace = if parts.len() == 5 {
383 DEFAULT_NAMESPACE.to_string()
384 } else {
385 parts[5].to_string()
386 };
387 return Ok(StorageConfig {
388 inner_storage_config,
389 namespace,
390 });
391 }
392 error!("available storage: memory");
393 #[cfg(feature = "storage-service")]
394 error!("Also available is linera-storage-service");
395 #[cfg(feature = "rocksdb")]
396 error!("Also available is RocksDB");
397 #[cfg(feature = "dynamodb")]
398 error!("Also available is DynamoDB");
399 #[cfg(feature = "scylladb")]
400 error!("Also available is ScyllaDB");
401 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
402 error!("Also available is DualRocksDbScyllaDb");
403 Err(anyhow!("The input has not matched: {input}"))
404 }
405}
406
407impl StorageConfig {
408 pub fn maybe_append_shard_path(&mut self, _shard: usize) -> std::io::Result<()> {
409 match &mut self.inner_storage_config {
410 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
411 InnerStorageConfig::DualRocksDbScyllaDb {
412 path_with_guard,
413 spawn_mode: _,
414 uri: _,
415 } => {
416 let shard_str = format!("shard_{}", _shard);
417 path_with_guard.path_buf.push(shard_str);
418 std::fs::create_dir_all(&path_with_guard.path_buf)
419 }
420 _ => Ok(()),
421 }
422 }
423
424 pub async fn add_common_storage_options(
426 &self,
427 options: &CommonStorageOptions,
428 ) -> Result<StoreConfig, anyhow::Error> {
429 let namespace = self.namespace.clone();
430 match &self.inner_storage_config {
431 InnerStorageConfig::Memory { genesis_path } => {
432 let config = MemoryStoreConfig {
433 max_stream_queries: options.storage_max_stream_queries,
434 };
435 let genesis_path = genesis_path.clone();
436 Ok(StoreConfig::Memory {
437 config,
438 namespace,
439 genesis_path,
440 })
441 }
442 #[cfg(feature = "storage-service")]
443 InnerStorageConfig::Service { endpoint } => {
444 let inner_config = StorageServiceStoreInternalConfig {
445 endpoint: endpoint.clone(),
446 max_concurrent_queries: options.storage_max_concurrent_queries,
447 max_stream_queries: options.storage_max_stream_queries,
448 };
449 let config = StorageServiceStoreConfig {
450 inner_config,
451 storage_cache_config: options.storage_cache_config(),
452 };
453 Ok(StoreConfig::StorageService { config, namespace })
454 }
455 #[cfg(feature = "rocksdb")]
456 InnerStorageConfig::RocksDb { path, spawn_mode } => {
457 let path_with_guard = PathWithGuard::new(path.to_path_buf());
458 let inner_config = RocksDbStoreInternalConfig {
459 spawn_mode: *spawn_mode,
460 path_with_guard,
461 max_stream_queries: options.storage_max_stream_queries,
462 };
463 let config = RocksDbStoreConfig {
464 inner_config,
465 storage_cache_config: options.storage_cache_config(),
466 };
467 Ok(StoreConfig::RocksDb { config, namespace })
468 }
469 #[cfg(feature = "dynamodb")]
470 InnerStorageConfig::DynamoDb { use_dynamodb_local } => {
471 let inner_config = DynamoDbStoreInternalConfig {
472 use_dynamodb_local: *use_dynamodb_local,
473 max_concurrent_queries: options.storage_max_concurrent_queries,
474 max_stream_queries: options.storage_max_stream_queries,
475 };
476 let config = DynamoDbStoreConfig {
477 inner_config,
478 storage_cache_config: options.storage_cache_config(),
479 };
480 Ok(StoreConfig::DynamoDb { config, namespace })
481 }
482 #[cfg(feature = "scylladb")]
483 InnerStorageConfig::ScyllaDb { uri } => {
484 let inner_config = ScyllaDbStoreInternalConfig {
485 uri: uri.clone(),
486 max_stream_queries: options.storage_max_stream_queries,
487 max_concurrent_queries: options.storage_max_concurrent_queries,
488 replication_factor: options.storage_replication_factor,
489 };
490 let config = ScyllaDbStoreConfig {
491 inner_config,
492 storage_cache_config: options.storage_cache_config(),
493 };
494 Ok(StoreConfig::ScyllaDb { config, namespace })
495 }
496 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
497 InnerStorageConfig::DualRocksDbScyllaDb {
498 path_with_guard,
499 spawn_mode,
500 uri,
501 } => {
502 let inner_config = RocksDbStoreInternalConfig {
503 spawn_mode: *spawn_mode,
504 path_with_guard: path_with_guard.clone(),
505 max_stream_queries: options.storage_max_stream_queries,
506 };
507 let first_config = RocksDbStoreConfig {
508 inner_config,
509 storage_cache_config: options.storage_cache_config(),
510 };
511
512 let inner_config = ScyllaDbStoreInternalConfig {
513 uri: uri.clone(),
514 max_stream_queries: options.storage_max_stream_queries,
515 max_concurrent_queries: options.storage_max_concurrent_queries,
516 replication_factor: options.storage_replication_factor,
517 };
518 let second_config = ScyllaDbStoreConfig {
519 inner_config,
520 storage_cache_config: options.storage_cache_config(),
521 };
522
523 let config = DualStoreConfig {
524 first_config,
525 second_config,
526 };
527 Ok(StoreConfig::DualRocksDbScyllaDb { config, namespace })
528 }
529 }
530 }
531}
532
533impl fmt::Display for StorageConfig {
534 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
535 let namespace = &self.namespace;
536 match &self.inner_storage_config {
537 #[cfg(feature = "storage-service")]
538 InnerStorageConfig::Service { endpoint } => {
539 write!(f, "service:tcp:{}:{}", endpoint, namespace)
540 }
541 InnerStorageConfig::Memory { genesis_path } => {
542 write!(f, "memory:{}:{}", genesis_path.display(), namespace)
543 }
544 #[cfg(feature = "rocksdb")]
545 InnerStorageConfig::RocksDb { path, spawn_mode } => {
546 let spawn_mode = spawn_mode.to_string();
547 write!(f, "rocksdb:{}:{}:{}", path.display(), spawn_mode, namespace)
548 }
549 #[cfg(feature = "dynamodb")]
550 InnerStorageConfig::DynamoDb { use_dynamodb_local } => match use_dynamodb_local {
551 true => write!(f, "dynamodb:{}:dynamodb_local", namespace),
552 false => write!(f, "dynamodb:{}:env", namespace),
553 },
554 #[cfg(feature = "scylladb")]
555 InnerStorageConfig::ScyllaDb { uri } => {
556 write!(f, "scylladb:tcp:{}:{}", uri, namespace)
557 }
558 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
559 InnerStorageConfig::DualRocksDbScyllaDb {
560 path_with_guard,
561 spawn_mode,
562 uri,
563 } => {
564 write!(
565 f,
566 "dualrocksdbscylladb:{}:{}:tcp:{}:{}",
567 path_with_guard.path_buf.display(),
568 spawn_mode,
569 uri,
570 namespace
571 )
572 }
573 }
574 }
575}
576
577#[async_trait]
578pub trait Runnable {
579 type Output;
580
581 async fn run<S>(self, storage: S) -> Self::Output
582 where
583 S: Storage + Clone + Send + Sync + 'static;
584}
585
586#[async_trait]
587pub trait RunnableWithStore {
588 type Output;
589
590 async fn run<S>(
591 self,
592 config: S::Config,
593 namespace: String,
594 ) -> Result<Self::Output, anyhow::Error>
595 where
596 S: KeyValueStore + Clone + Send + Sync + 'static,
597 S::Error: Send + Sync;
598}
599
600impl StoreConfig {
601 pub async fn run_with_storage<Job>(
602 self,
603 wasm_runtime: Option<WasmRuntime>,
604 job: Job,
605 ) -> Result<Job::Output, anyhow::Error>
606 where
607 Job: Runnable,
608 {
609 match self {
610 StoreConfig::Memory {
611 config,
612 namespace,
613 genesis_path,
614 } => {
615 let mut storage = DbStorage::<MemoryStore, _>::maybe_create_and_connect(
616 &config,
617 &namespace,
618 wasm_runtime,
619 )
620 .await?;
621 let genesis_config = crate::util::read_json::<GenesisConfig>(genesis_path)?;
622 genesis_config.initialize_storage(&mut storage).await?;
624 Ok(job.run(storage).await)
625 }
626 #[cfg(feature = "storage-service")]
627 StoreConfig::StorageService { config, namespace } => {
628 let storage =
629 DbStorage::<StorageServiceStore, _>::connect(&config, &namespace, wasm_runtime)
630 .await?;
631 Ok(job.run(storage).await)
632 }
633 #[cfg(feature = "rocksdb")]
634 StoreConfig::RocksDb { config, namespace } => {
635 let storage =
636 DbStorage::<RocksDbStore, _>::connect(&config, &namespace, wasm_runtime)
637 .await?;
638 Ok(job.run(storage).await)
639 }
640 #[cfg(feature = "dynamodb")]
641 StoreConfig::DynamoDb { config, namespace } => {
642 let storage =
643 DbStorage::<DynamoDbStore, _>::connect(&config, &namespace, wasm_runtime)
644 .await?;
645 Ok(job.run(storage).await)
646 }
647 #[cfg(feature = "scylladb")]
648 StoreConfig::ScyllaDb { config, namespace } => {
649 let storage =
650 DbStorage::<ScyllaDbStore, _>::connect(&config, &namespace, wasm_runtime)
651 .await?;
652 Ok(job.run(storage).await)
653 }
654 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
655 StoreConfig::DualRocksDbScyllaDb { config, namespace } => {
656 let storage = DbStorage::<
657 DualStore<RocksDbStore, ScyllaDbStore, ChainStatesFirstAssignment>,
658 _,
659 >::connect(&config, &namespace, wasm_runtime)
660 .await?;
661 Ok(job.run(storage).await)
662 }
663 }
664 }
665
666 #[allow(unused_variables)]
667 pub async fn run_with_store<Job>(self, job: Job) -> Result<Job::Output, anyhow::Error>
668 where
669 Job: RunnableWithStore,
670 {
671 match self {
672 StoreConfig::Memory { .. } => {
673 Err(anyhow!("Cannot run admin operations on the memory store"))
674 }
675 #[cfg(feature = "storage-service")]
676 StoreConfig::StorageService { config, namespace } => {
677 Ok(job.run::<StorageServiceStore>(config, namespace).await?)
678 }
679 #[cfg(feature = "rocksdb")]
680 StoreConfig::RocksDb { config, namespace } => {
681 Ok(job.run::<RocksDbStore>(config, namespace).await?)
682 }
683 #[cfg(feature = "dynamodb")]
684 StoreConfig::DynamoDb { config, namespace } => {
685 Ok(job.run::<DynamoDbStore>(config, namespace).await?)
686 }
687 #[cfg(feature = "scylladb")]
688 StoreConfig::ScyllaDb { config, namespace } => {
689 Ok(job.run::<ScyllaDbStore>(config, namespace).await?)
690 }
691 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
692 StoreConfig::DualRocksDbScyllaDb { config, namespace } => Ok(job
693 .run::<DualStore<RocksDbStore, ScyllaDbStore, ChainStatesFirstAssignment>>(
694 config, namespace,
695 )
696 .await?),
697 }
698 }
699
700 pub async fn initialize(self, config: &GenesisConfig) -> Result<(), anyhow::Error> {
701 self.run_with_store(InitializeStorageJob(config)).await
702 }
703}
704
705struct InitializeStorageJob<'a>(&'a GenesisConfig);
706
707#[async_trait]
708impl RunnableWithStore for InitializeStorageJob<'_> {
709 type Output = ();
710
711 async fn run<S>(
712 self,
713 config: S::Config,
714 namespace: String,
715 ) -> Result<Self::Output, anyhow::Error>
716 where
717 S: KeyValueStore + Clone + Send + Sync + 'static,
718 S::Error: Send + Sync,
719 {
720 let mut storage =
721 DbStorage::<S, _>::maybe_create_and_connect(&config, &namespace, None).await?;
722 self.0.initialize_storage(&mut storage).await?;
723 Ok(())
724 }
725}
726
727#[test]
728fn test_memory_storage_config_from_str() {
729 assert_eq!(
730 StorageConfig::from_str("memory:path/to/genesis.json").unwrap(),
731 StorageConfig {
732 inner_storage_config: InnerStorageConfig::Memory {
733 genesis_path: PathBuf::from("path/to/genesis.json")
734 },
735 namespace: DEFAULT_NAMESPACE.into()
736 }
737 );
738 assert_eq!(
739 StorageConfig::from_str("memory:path/to/genesis.json:namespace").unwrap(),
740 StorageConfig {
741 inner_storage_config: InnerStorageConfig::Memory {
742 genesis_path: PathBuf::from("path/to/genesis.json")
743 },
744 namespace: "namespace".into()
745 }
746 );
747 assert!(StorageConfig::from_str("memory").is_err(),);
748}
749
750#[cfg(feature = "storage-service")]
751#[test]
752fn test_shared_store_config_from_str() {
753 assert_eq!(
754 StorageConfig::from_str("service:tcp:127.0.0.1:8942:linera").unwrap(),
755 StorageConfig {
756 inner_storage_config: InnerStorageConfig::Service {
757 endpoint: "127.0.0.1:8942".to_string()
758 },
759 namespace: "linera".into()
760 }
761 );
762 assert!(StorageConfig::from_str("service:tcp:127.0.0.1:8942").is_err());
763 assert!(StorageConfig::from_str("service:tcp:127.0.0.1:linera").is_err());
764}
765
766#[cfg(feature = "rocksdb")]
767#[test]
768fn test_rocks_db_storage_config_from_str() {
769 assert!(StorageConfig::from_str("rocksdb_foo.db").is_err());
770 assert_eq!(
771 StorageConfig::from_str("rocksdb:foo.db").unwrap(),
772 StorageConfig {
773 inner_storage_config: InnerStorageConfig::RocksDb {
774 path: "foo.db".into(),
775 spawn_mode: RocksDbSpawnMode::SpawnBlocking,
776 },
777 namespace: DEFAULT_NAMESPACE.to_string()
778 }
779 );
780 assert_eq!(
781 StorageConfig::from_str("rocksdb:foo.db:block_in_place").unwrap(),
782 StorageConfig {
783 inner_storage_config: InnerStorageConfig::RocksDb {
784 path: "foo.db".into(),
785 spawn_mode: RocksDbSpawnMode::BlockInPlace,
786 },
787 namespace: DEFAULT_NAMESPACE.to_string()
788 }
789 );
790 assert_eq!(
791 StorageConfig::from_str("rocksdb:foo.db:block_in_place:chosen_namespace").unwrap(),
792 StorageConfig {
793 inner_storage_config: InnerStorageConfig::RocksDb {
794 path: "foo.db".into(),
795 spawn_mode: RocksDbSpawnMode::BlockInPlace,
796 },
797 namespace: "chosen_namespace".into()
798 }
799 );
800}
801
802#[cfg(feature = "dynamodb")]
803#[test]
804fn test_aws_storage_config_from_str() {
805 assert_eq!(
806 StorageConfig::from_str("dynamodb:table").unwrap(),
807 StorageConfig {
808 inner_storage_config: InnerStorageConfig::DynamoDb {
809 use_dynamodb_local: false
810 },
811 namespace: "table".to_string()
812 }
813 );
814 assert_eq!(
815 StorageConfig::from_str("dynamodb:table:env").unwrap(),
816 StorageConfig {
817 inner_storage_config: InnerStorageConfig::DynamoDb {
818 use_dynamodb_local: false
819 },
820 namespace: "table".to_string()
821 }
822 );
823 assert_eq!(
824 StorageConfig::from_str("dynamodb:table:dynamodb_local").unwrap(),
825 StorageConfig {
826 inner_storage_config: InnerStorageConfig::DynamoDb {
827 use_dynamodb_local: true
828 },
829 namespace: "table".to_string()
830 }
831 );
832 assert!(StorageConfig::from_str("dynamodb").is_err());
833 assert!(StorageConfig::from_str("dynamodb:").is_err());
834 assert!(StorageConfig::from_str("dynamodb:1").is_err());
835 assert!(StorageConfig::from_str("dynamodb:wrong:endpoint").is_err());
836}
837
838#[cfg(feature = "scylladb")]
839#[test]
840fn test_scylla_db_storage_config_from_str() {
841 assert_eq!(
842 StorageConfig::from_str("scylladb:").unwrap(),
843 StorageConfig {
844 inner_storage_config: InnerStorageConfig::ScyllaDb {
845 uri: "localhost:9042".to_string()
846 },
847 namespace: DEFAULT_NAMESPACE.to_string()
848 }
849 );
850 assert_eq!(
851 StorageConfig::from_str("scylladb:tcp:db_hostname:230:table_other_storage").unwrap(),
852 StorageConfig {
853 inner_storage_config: InnerStorageConfig::ScyllaDb {
854 uri: "db_hostname:230".to_string()
855 },
856 namespace: "table_other_storage".to_string()
857 }
858 );
859 assert_eq!(
860 StorageConfig::from_str("scylladb:tcp:db_hostname:230").unwrap(),
861 StorageConfig {
862 inner_storage_config: InnerStorageConfig::ScyllaDb {
863 uri: "db_hostname:230".to_string()
864 },
865 namespace: DEFAULT_NAMESPACE.to_string()
866 }
867 );
868 assert!(StorageConfig::from_str("scylladb:-10").is_err());
869 assert!(StorageConfig::from_str("scylladb:70000").is_err());
870 assert!(StorageConfig::from_str("scylladb:230:234").is_err());
871 assert!(StorageConfig::from_str("scylladb:tcp:address1").is_err());
872 assert!(StorageConfig::from_str("scylladb:tcp:address1:tcp:/address2").is_err());
873 assert!(StorageConfig::from_str("scylladb:wrong").is_err());
874}