1use std::{fmt, path::PathBuf, str::FromStr};
5
6use anyhow::{anyhow, bail};
7use async_trait::async_trait;
8use linera_client::config::GenesisConfig;
9use linera_execution::WasmRuntime;
10use linera_storage::{DbStorage, Storage, DEFAULT_NAMESPACE};
11#[cfg(feature = "storage-service")]
12use linera_storage_service::{
13 client::StorageServiceDatabase,
14 common::{StorageServiceStoreConfig, StorageServiceStoreInternalConfig},
15};
16#[cfg(feature = "dynamodb")]
17use linera_views::dynamo_db::{DynamoDbDatabase, DynamoDbStoreConfig, DynamoDbStoreInternalConfig};
18#[cfg(feature = "rocksdb")]
19use linera_views::rocks_db::{
20 PathWithGuard, RocksDbDatabase, RocksDbSpawnMode, RocksDbStoreConfig,
21 RocksDbStoreInternalConfig,
22};
23use linera_views::{
24 lru_caching::StorageCacheConfig,
25 memory::{MemoryDatabase, MemoryStoreConfig},
26 store::{KeyValueDatabase, KeyValueStore},
27};
28use serde::{Deserialize, Serialize};
29use tracing::error;
30#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
31use {
32 linera_storage::ChainStatesFirstAssignment,
33 linera_views::backends::dual::{DualDatabase, DualStoreConfig},
34 std::path::Path,
35};
36#[cfg(feature = "scylladb")]
37use {
38 linera_views::scylla_db::{ScyllaDbDatabase, ScyllaDbStoreConfig, ScyllaDbStoreInternalConfig},
39 std::num::NonZeroU16,
40 tracing::debug,
41};
42
43#[derive(Clone, Debug, clap::Parser)]
44pub struct CommonStorageOptions {
45 #[arg(long, global = true)]
47 pub storage_max_concurrent_queries: Option<usize>,
48
49 #[arg(long, default_value = "10", global = true)]
51 pub storage_max_stream_queries: usize,
52
53 #[arg(long, default_value = "10000000", global = true)]
55 pub storage_max_cache_size: usize,
56
57 #[arg(long, default_value = "1000000", global = true)]
59 pub storage_max_entry_size: usize,
60
61 #[arg(long, default_value = "1000", global = true)]
63 pub storage_max_cache_entries: usize,
64
65 #[arg(long, default_value = "1", global = true)]
67 pub storage_replication_factor: u32,
68}
69
70impl CommonStorageOptions {
71 pub fn storage_cache_config(&self) -> StorageCacheConfig {
72 StorageCacheConfig {
73 max_cache_size: self.storage_max_cache_size,
74 max_entry_size: self.storage_max_entry_size,
75 max_cache_entries: self.storage_max_cache_entries,
76 }
77 }
78}
79
80#[derive(Clone, Debug, Deserialize, Serialize)]
82pub enum StoreConfig {
83 Memory {
85 config: MemoryStoreConfig,
86 namespace: String,
87 genesis_path: PathBuf,
88 },
89 #[cfg(feature = "storage-service")]
91 StorageService {
92 config: StorageServiceStoreConfig,
93 namespace: String,
94 },
95 #[cfg(feature = "rocksdb")]
97 RocksDb {
98 config: RocksDbStoreConfig,
99 namespace: String,
100 },
101 #[cfg(feature = "dynamodb")]
103 DynamoDb {
104 config: DynamoDbStoreConfig,
105 namespace: String,
106 },
107 #[cfg(feature = "scylladb")]
109 ScyllaDb {
110 config: ScyllaDbStoreConfig,
111 namespace: String,
112 },
113 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
114 DualRocksDbScyllaDb {
115 config: DualStoreConfig<RocksDbStoreConfig, ScyllaDbStoreConfig>,
116 namespace: String,
117 },
118}
119
120#[derive(Clone, Debug)]
122#[cfg_attr(any(test), derive(Eq, PartialEq))]
123pub enum InnerStorageConfig {
124 Memory {
126 genesis_path: PathBuf,
129 },
130 #[cfg(feature = "storage-service")]
132 Service {
133 endpoint: String,
135 },
136 #[cfg(feature = "rocksdb")]
138 RocksDb {
139 path: PathBuf,
141 spawn_mode: RocksDbSpawnMode,
143 },
144 #[cfg(feature = "dynamodb")]
146 DynamoDb {
147 use_dynamodb_local: bool,
149 },
150 #[cfg(feature = "scylladb")]
152 ScyllaDb {
153 uri: String,
155 },
156 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
157 DualRocksDbScyllaDb {
158 path_with_guard: PathWithGuard,
160 spawn_mode: RocksDbSpawnMode,
162 uri: String,
164 },
165}
166
167#[derive(Clone, Debug)]
169#[cfg_attr(any(test), derive(Eq, PartialEq))]
170pub struct StorageConfig {
171 pub inner_storage_config: InnerStorageConfig,
173 pub namespace: String,
175}
176
177const MEMORY: &str = "memory:";
178#[cfg(feature = "storage-service")]
179const STORAGE_SERVICE: &str = "service:";
180#[cfg(feature = "rocksdb")]
181const ROCKS_DB: &str = "rocksdb:";
182#[cfg(feature = "dynamodb")]
183const DYNAMO_DB: &str = "dynamodb:";
184#[cfg(feature = "scylladb")]
185const SCYLLA_DB: &str = "scylladb:";
186#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
187const DUAL_ROCKS_DB_SCYLLA_DB: &str = "dualrocksdbscylladb:";
188
189impl FromStr for StorageConfig {
190 type Err = anyhow::Error;
191
192 fn from_str(input: &str) -> Result<Self, Self::Err> {
193 if let Some(s) = input.strip_prefix(MEMORY) {
194 let parts = s.split(':').collect::<Vec<_>>();
195 if parts.len() == 1 {
196 let genesis_path = parts[0].to_string().into();
197 let namespace = DEFAULT_NAMESPACE.to_string();
198 let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
199 return Ok(StorageConfig {
200 inner_storage_config,
201 namespace,
202 });
203 }
204 if parts.len() != 2 {
205 bail!("We should have one genesis config path and one optional namespace");
206 }
207 let genesis_path = parts[0].to_string().into();
208 let namespace = parts[1].to_string();
209 let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
210 return Ok(StorageConfig {
211 inner_storage_config,
212 namespace,
213 });
214 }
215 #[cfg(feature = "storage-service")]
216 if let Some(s) = input.strip_prefix(STORAGE_SERVICE) {
217 if s.is_empty() {
218 bail!(
219 "For Storage service, the formatting has to be service:endpoint:namespace,\
220example service:tcp:127.0.0.1:7878:table_do_my_test"
221 );
222 }
223 let parts = s.split(':').collect::<Vec<_>>();
224 if parts.len() != 4 {
225 bail!("We should have one endpoint and one namespace");
226 }
227 let protocol = parts[0];
228 if protocol != "tcp" {
229 bail!("Only allowed protocol is tcp");
230 }
231 let endpoint = parts[1];
232 let port = parts[2];
233 let mut endpoint = endpoint.to_string();
234 endpoint.push(':');
235 endpoint.push_str(port);
236 let endpoint = endpoint.to_string();
237 let namespace = parts[3].to_string();
238 let inner_storage_config = InnerStorageConfig::Service { endpoint };
239 return Ok(StorageConfig {
240 inner_storage_config,
241 namespace,
242 });
243 }
244 #[cfg(feature = "rocksdb")]
245 if let Some(s) = input.strip_prefix(ROCKS_DB) {
246 if s.is_empty() {
247 bail!(
248 "For RocksDB, the formatting has to be rocksdb:directory or rocksdb:directory:spawn_mode:namespace");
249 }
250 let parts = s.split(':').collect::<Vec<_>>();
251 if parts.len() == 1 {
252 let path = parts[0].to_string().into();
253 let namespace = DEFAULT_NAMESPACE.to_string();
254 let spawn_mode = RocksDbSpawnMode::SpawnBlocking;
255 let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
256 return Ok(StorageConfig {
257 inner_storage_config,
258 namespace,
259 });
260 }
261 if parts.len() == 2 || parts.len() == 3 {
262 let path = parts[0].to_string().into();
263 let spawn_mode = match parts[1] {
264 "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
265 "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
266 "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
267 _ => Err(anyhow!("Failed to parse {} as a spawn_mode", parts[1])),
268 }?;
269 let namespace = if parts.len() == 2 {
270 DEFAULT_NAMESPACE.to_string()
271 } else {
272 parts[2].to_string()
273 };
274 let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
275 return Ok(StorageConfig {
276 inner_storage_config,
277 namespace,
278 });
279 }
280 bail!("We should have one, two or three parts");
281 }
282 #[cfg(feature = "dynamodb")]
283 if let Some(s) = input.strip_prefix(DYNAMO_DB) {
284 let mut parts = s.splitn(2, ':');
285 let namespace = parts
286 .next()
287 .ok_or_else(|| anyhow!("Missing DynamoDB table name, e.g. {DYNAMO_DB}TABLE"))?
288 .to_string();
289 let use_dynamodb_local = match parts.next() {
290 None | Some("env") => false,
291 Some("dynamodb_local") => true,
292 Some(unknown) => {
293 bail!(
294 "Invalid DynamoDB endpoint {unknown:?}. \
295 Expected {DYNAMO_DB}TABLE:[env|dynamodb_local]"
296 );
297 }
298 };
299 let inner_storage_config = InnerStorageConfig::DynamoDb { use_dynamodb_local };
300 return Ok(StorageConfig {
301 inner_storage_config,
302 namespace,
303 });
304 }
305 #[cfg(feature = "scylladb")]
306 if let Some(s) = input.strip_prefix(SCYLLA_DB) {
307 let mut uri: Option<String> = None;
308 let mut namespace: Option<String> = None;
309 let parse_error: &'static str = "Correct format is tcp:db_hostname:port.";
310 if !s.is_empty() {
311 let mut parts = s.split(':');
312 while let Some(part) = parts.next() {
313 match part {
314 "tcp" => {
315 let address = parts.next().ok_or_else(|| {
316 anyhow!("Failed to find address for {s}. {parse_error}")
317 })?;
318 let port_str = parts.next().ok_or_else(|| {
319 anyhow!("Failed to find port for {s}. {parse_error}")
320 })?;
321 let port = NonZeroU16::from_str(port_str).map_err(|_| {
322 anyhow!(
323 "Failed to find parse port {port_str} for {s}. {parse_error}",
324 )
325 })?;
326 if uri.is_some() {
327 bail!("The uri has already been assigned");
328 }
329 uri = Some(format!("{}:{}", &address, port));
330 }
331 _ if part.starts_with("table") => {
332 if namespace.is_some() {
333 bail!("The namespace has already been assigned");
334 }
335 namespace = Some(part.to_string());
336 }
337 _ => {
338 bail!("the entry \"{part}\" is not matching");
339 }
340 }
341 }
342 }
343 let uri = uri.unwrap_or("localhost:9042".to_string());
344 let namespace = namespace.unwrap_or(DEFAULT_NAMESPACE.to_string());
345 let inner_storage_config = InnerStorageConfig::ScyllaDb { uri };
346 debug!("ScyllaDB connection info: {:?}", inner_storage_config);
347 return Ok(StorageConfig {
348 inner_storage_config,
349 namespace,
350 });
351 }
352 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
353 if let Some(s) = input.strip_prefix(DUAL_ROCKS_DB_SCYLLA_DB) {
354 let parts = s.split(':').collect::<Vec<_>>();
355 if parts.len() != 5 && parts.len() != 6 {
356 bail!(
357 "For DualRocksDbScyllaDb, the formatting has to be dualrocksdbscylladb:directory:mode:tcp:hostname:port:namespace"
358 );
359 }
360 let path = Path::new(parts[0]);
361 let path = path.to_path_buf();
362 let path_with_guard = PathWithGuard::new(path);
363 let spawn_mode = match parts[1] {
364 "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
365 "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
366 "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
367 _ => Err(anyhow!("Failed to parse {} as a spawn_mode", parts[1])),
368 }?;
369 let protocol = parts[2];
370 if protocol != "tcp" {
371 bail!("The only allowed protocol is tcp");
372 }
373 let address = parts[3];
374 let port_str = parts[4];
375 let port = NonZeroU16::from_str(port_str)
376 .map_err(|_| anyhow!("Failed to find parse port {port_str} for {s}"))?;
377 let uri = format!("{}:{}", &address, port);
378 let inner_storage_config = InnerStorageConfig::DualRocksDbScyllaDb {
379 path_with_guard,
380 spawn_mode,
381 uri,
382 };
383 let namespace = if parts.len() == 5 {
384 DEFAULT_NAMESPACE.to_string()
385 } else {
386 parts[5].to_string()
387 };
388 return Ok(StorageConfig {
389 inner_storage_config,
390 namespace,
391 });
392 }
393 error!("available storage: memory");
394 #[cfg(feature = "storage-service")]
395 error!("Also available is linera-storage-service");
396 #[cfg(feature = "rocksdb")]
397 error!("Also available is RocksDB");
398 #[cfg(feature = "dynamodb")]
399 error!("Also available is DynamoDB");
400 #[cfg(feature = "scylladb")]
401 error!("Also available is ScyllaDB");
402 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
403 error!("Also available is DualRocksDbScyllaDb");
404 Err(anyhow!("The input has not matched: {input}"))
405 }
406}
407
408impl StorageConfig {
409 pub fn maybe_append_shard_path(&mut self, _shard: usize) -> std::io::Result<()> {
410 match &mut self.inner_storage_config {
411 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
412 InnerStorageConfig::DualRocksDbScyllaDb {
413 path_with_guard,
414 spawn_mode: _,
415 uri: _,
416 } => {
417 let shard_str = format!("shard_{}", _shard);
418 path_with_guard.path_buf.push(shard_str);
419 std::fs::create_dir_all(&path_with_guard.path_buf)
420 }
421 _ => Ok(()),
422 }
423 }
424
425 pub async fn add_common_storage_options(
427 &self,
428 options: &CommonStorageOptions,
429 ) -> Result<StoreConfig, anyhow::Error> {
430 let namespace = self.namespace.clone();
431 match &self.inner_storage_config {
432 InnerStorageConfig::Memory { genesis_path } => {
433 let config = MemoryStoreConfig {
434 max_stream_queries: options.storage_max_stream_queries,
435 kill_on_drop: false,
436 };
437 let genesis_path = genesis_path.clone();
438 Ok(StoreConfig::Memory {
439 config,
440 namespace,
441 genesis_path,
442 })
443 }
444 #[cfg(feature = "storage-service")]
445 InnerStorageConfig::Service { endpoint } => {
446 let inner_config = StorageServiceStoreInternalConfig {
447 endpoint: endpoint.clone(),
448 max_concurrent_queries: options.storage_max_concurrent_queries,
449 max_stream_queries: options.storage_max_stream_queries,
450 };
451 let config = StorageServiceStoreConfig {
452 inner_config,
453 storage_cache_config: options.storage_cache_config(),
454 };
455 Ok(StoreConfig::StorageService { config, namespace })
456 }
457 #[cfg(feature = "rocksdb")]
458 InnerStorageConfig::RocksDb { path, spawn_mode } => {
459 let path_with_guard = PathWithGuard::new(path.to_path_buf());
460 let inner_config = RocksDbStoreInternalConfig {
461 spawn_mode: *spawn_mode,
462 path_with_guard,
463 max_stream_queries: options.storage_max_stream_queries,
464 };
465 let config = RocksDbStoreConfig {
466 inner_config,
467 storage_cache_config: options.storage_cache_config(),
468 };
469 Ok(StoreConfig::RocksDb { config, namespace })
470 }
471 #[cfg(feature = "dynamodb")]
472 InnerStorageConfig::DynamoDb { use_dynamodb_local } => {
473 let inner_config = DynamoDbStoreInternalConfig {
474 use_dynamodb_local: *use_dynamodb_local,
475 max_concurrent_queries: options.storage_max_concurrent_queries,
476 max_stream_queries: options.storage_max_stream_queries,
477 };
478 let config = DynamoDbStoreConfig {
479 inner_config,
480 storage_cache_config: options.storage_cache_config(),
481 };
482 Ok(StoreConfig::DynamoDb { config, namespace })
483 }
484 #[cfg(feature = "scylladb")]
485 InnerStorageConfig::ScyllaDb { uri } => {
486 let inner_config = ScyllaDbStoreInternalConfig {
487 uri: uri.clone(),
488 max_stream_queries: options.storage_max_stream_queries,
489 max_concurrent_queries: options.storage_max_concurrent_queries,
490 replication_factor: options.storage_replication_factor,
491 };
492 let config = ScyllaDbStoreConfig {
493 inner_config,
494 storage_cache_config: options.storage_cache_config(),
495 };
496 Ok(StoreConfig::ScyllaDb { config, namespace })
497 }
498 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
499 InnerStorageConfig::DualRocksDbScyllaDb {
500 path_with_guard,
501 spawn_mode,
502 uri,
503 } => {
504 let inner_config = RocksDbStoreInternalConfig {
505 spawn_mode: *spawn_mode,
506 path_with_guard: path_with_guard.clone(),
507 max_stream_queries: options.storage_max_stream_queries,
508 };
509 let first_config = RocksDbStoreConfig {
510 inner_config,
511 storage_cache_config: options.storage_cache_config(),
512 };
513
514 let inner_config = ScyllaDbStoreInternalConfig {
515 uri: uri.clone(),
516 max_stream_queries: options.storage_max_stream_queries,
517 max_concurrent_queries: options.storage_max_concurrent_queries,
518 replication_factor: options.storage_replication_factor,
519 };
520 let second_config = ScyllaDbStoreConfig {
521 inner_config,
522 storage_cache_config: options.storage_cache_config(),
523 };
524
525 let config = DualStoreConfig {
526 first_config,
527 second_config,
528 };
529 Ok(StoreConfig::DualRocksDbScyllaDb { config, namespace })
530 }
531 }
532 }
533}
534
535impl fmt::Display for StorageConfig {
536 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
537 let namespace = &self.namespace;
538 match &self.inner_storage_config {
539 #[cfg(feature = "storage-service")]
540 InnerStorageConfig::Service { endpoint } => {
541 write!(f, "service:tcp:{}:{}", endpoint, namespace)
542 }
543 InnerStorageConfig::Memory { genesis_path } => {
544 write!(f, "memory:{}:{}", genesis_path.display(), namespace)
545 }
546 #[cfg(feature = "rocksdb")]
547 InnerStorageConfig::RocksDb { path, spawn_mode } => {
548 let spawn_mode = spawn_mode.to_string();
549 write!(f, "rocksdb:{}:{}:{}", path.display(), spawn_mode, namespace)
550 }
551 #[cfg(feature = "dynamodb")]
552 InnerStorageConfig::DynamoDb { use_dynamodb_local } => match use_dynamodb_local {
553 true => write!(f, "dynamodb:{}:dynamodb_local", namespace),
554 false => write!(f, "dynamodb:{}:env", namespace),
555 },
556 #[cfg(feature = "scylladb")]
557 InnerStorageConfig::ScyllaDb { uri } => {
558 write!(f, "scylladb:tcp:{}:{}", uri, namespace)
559 }
560 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
561 InnerStorageConfig::DualRocksDbScyllaDb {
562 path_with_guard,
563 spawn_mode,
564 uri,
565 } => {
566 write!(
567 f,
568 "dualrocksdbscylladb:{}:{}:tcp:{}:{}",
569 path_with_guard.path_buf.display(),
570 spawn_mode,
571 uri,
572 namespace
573 )
574 }
575 }
576 }
577}
578
579#[async_trait]
580pub trait Runnable {
581 type Output;
582
583 async fn run<S>(self, storage: S) -> Self::Output
584 where
585 S: Storage + Clone + Send + Sync + 'static;
586}
587
588#[async_trait]
589pub trait RunnableWithStore {
590 type Output;
591
592 async fn run<D>(
593 self,
594 config: D::Config,
595 namespace: String,
596 ) -> Result<Self::Output, anyhow::Error>
597 where
598 D: KeyValueDatabase + Clone + Send + Sync + 'static,
599 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
600 D::Error: Send + Sync;
601}
602
603impl StoreConfig {
604 pub async fn run_with_storage<Job>(
605 self,
606 wasm_runtime: Option<WasmRuntime>,
607 job: Job,
608 ) -> Result<Job::Output, anyhow::Error>
609 where
610 Job: Runnable,
611 {
612 match self {
613 StoreConfig::Memory {
614 config,
615 namespace,
616 genesis_path,
617 } => {
618 let mut storage = DbStorage::<MemoryDatabase, _>::maybe_create_and_connect(
619 &config,
620 &namespace,
621 wasm_runtime,
622 )
623 .await?;
624 let genesis_config = crate::util::read_json::<GenesisConfig>(genesis_path)?;
625 genesis_config.initialize_storage(&mut storage).await?;
627 Ok(job.run(storage).await)
628 }
629 #[cfg(feature = "storage-service")]
630 StoreConfig::StorageService { config, namespace } => {
631 let storage = DbStorage::<StorageServiceDatabase, _>::connect(
632 &config,
633 &namespace,
634 wasm_runtime,
635 )
636 .await?;
637 Ok(job.run(storage).await)
638 }
639 #[cfg(feature = "rocksdb")]
640 StoreConfig::RocksDb { config, namespace } => {
641 let storage =
642 DbStorage::<RocksDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
643 .await?;
644 Ok(job.run(storage).await)
645 }
646 #[cfg(feature = "dynamodb")]
647 StoreConfig::DynamoDb { config, namespace } => {
648 let storage =
649 DbStorage::<DynamoDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
650 .await?;
651 Ok(job.run(storage).await)
652 }
653 #[cfg(feature = "scylladb")]
654 StoreConfig::ScyllaDb { config, namespace } => {
655 let storage =
656 DbStorage::<ScyllaDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
657 .await?;
658 Ok(job.run(storage).await)
659 }
660 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
661 StoreConfig::DualRocksDbScyllaDb { config, namespace } => {
662 let storage = DbStorage::<
663 DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>,
664 _,
665 >::connect(&config, &namespace, wasm_runtime)
666 .await?;
667 Ok(job.run(storage).await)
668 }
669 }
670 }
671
672 #[allow(unused_variables)]
673 pub async fn run_with_store<Job>(self, job: Job) -> Result<Job::Output, anyhow::Error>
674 where
675 Job: RunnableWithStore,
676 {
677 match self {
678 StoreConfig::Memory { .. } => {
679 Err(anyhow!("Cannot run admin operations on the memory store"))
680 }
681 #[cfg(feature = "storage-service")]
682 StoreConfig::StorageService { config, namespace } => {
683 Ok(job.run::<StorageServiceDatabase>(config, namespace).await?)
684 }
685 #[cfg(feature = "rocksdb")]
686 StoreConfig::RocksDb { config, namespace } => {
687 Ok(job.run::<RocksDbDatabase>(config, namespace).await?)
688 }
689 #[cfg(feature = "dynamodb")]
690 StoreConfig::DynamoDb { config, namespace } => {
691 Ok(job.run::<DynamoDbDatabase>(config, namespace).await?)
692 }
693 #[cfg(feature = "scylladb")]
694 StoreConfig::ScyllaDb { config, namespace } => {
695 Ok(job.run::<ScyllaDbDatabase>(config, namespace).await?)
696 }
697 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
698 StoreConfig::DualRocksDbScyllaDb { config, namespace } => Ok(job
699 .run::<DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>>(
700 config, namespace,
701 )
702 .await?),
703 }
704 }
705
706 pub async fn initialize(self, config: &GenesisConfig) -> Result<(), anyhow::Error> {
707 self.run_with_store(InitializeStorageJob(config)).await
708 }
709}
710
711struct InitializeStorageJob<'a>(&'a GenesisConfig);
712
713#[async_trait]
714impl RunnableWithStore for InitializeStorageJob<'_> {
715 type Output = ();
716
717 async fn run<D>(
718 self,
719 config: D::Config,
720 namespace: String,
721 ) -> Result<Self::Output, anyhow::Error>
722 where
723 D: KeyValueDatabase + Clone + Send + Sync + 'static,
724 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
725 D::Error: Send + Sync,
726 {
727 let mut storage =
728 DbStorage::<D, _>::maybe_create_and_connect(&config, &namespace, None).await?;
729 self.0.initialize_storage(&mut storage).await?;
730 Ok(())
731 }
732}
733
734#[test]
735fn test_memory_storage_config_from_str() {
736 assert_eq!(
737 StorageConfig::from_str("memory:path/to/genesis.json").unwrap(),
738 StorageConfig {
739 inner_storage_config: InnerStorageConfig::Memory {
740 genesis_path: PathBuf::from("path/to/genesis.json")
741 },
742 namespace: DEFAULT_NAMESPACE.into()
743 }
744 );
745 assert_eq!(
746 StorageConfig::from_str("memory:path/to/genesis.json:namespace").unwrap(),
747 StorageConfig {
748 inner_storage_config: InnerStorageConfig::Memory {
749 genesis_path: PathBuf::from("path/to/genesis.json")
750 },
751 namespace: "namespace".into()
752 }
753 );
754 assert!(StorageConfig::from_str("memory").is_err(),);
755}
756
757#[cfg(feature = "storage-service")]
758#[test]
759fn test_shared_store_config_from_str() {
760 assert_eq!(
761 StorageConfig::from_str("service:tcp:127.0.0.1:8942:linera").unwrap(),
762 StorageConfig {
763 inner_storage_config: InnerStorageConfig::Service {
764 endpoint: "127.0.0.1:8942".to_string()
765 },
766 namespace: "linera".into()
767 }
768 );
769 assert!(StorageConfig::from_str("service:tcp:127.0.0.1:8942").is_err());
770 assert!(StorageConfig::from_str("service:tcp:127.0.0.1:linera").is_err());
771}
772
773#[cfg(feature = "rocksdb")]
774#[test]
775fn test_rocks_db_storage_config_from_str() {
776 assert!(StorageConfig::from_str("rocksdb_foo.db").is_err());
777 assert_eq!(
778 StorageConfig::from_str("rocksdb:foo.db").unwrap(),
779 StorageConfig {
780 inner_storage_config: InnerStorageConfig::RocksDb {
781 path: "foo.db".into(),
782 spawn_mode: RocksDbSpawnMode::SpawnBlocking,
783 },
784 namespace: DEFAULT_NAMESPACE.to_string()
785 }
786 );
787 assert_eq!(
788 StorageConfig::from_str("rocksdb:foo.db:block_in_place").unwrap(),
789 StorageConfig {
790 inner_storage_config: InnerStorageConfig::RocksDb {
791 path: "foo.db".into(),
792 spawn_mode: RocksDbSpawnMode::BlockInPlace,
793 },
794 namespace: DEFAULT_NAMESPACE.to_string()
795 }
796 );
797 assert_eq!(
798 StorageConfig::from_str("rocksdb:foo.db:block_in_place:chosen_namespace").unwrap(),
799 StorageConfig {
800 inner_storage_config: InnerStorageConfig::RocksDb {
801 path: "foo.db".into(),
802 spawn_mode: RocksDbSpawnMode::BlockInPlace,
803 },
804 namespace: "chosen_namespace".into()
805 }
806 );
807}
808
809#[cfg(feature = "dynamodb")]
810#[test]
811fn test_aws_storage_config_from_str() {
812 assert_eq!(
813 StorageConfig::from_str("dynamodb:table").unwrap(),
814 StorageConfig {
815 inner_storage_config: InnerStorageConfig::DynamoDb {
816 use_dynamodb_local: false
817 },
818 namespace: "table".to_string()
819 }
820 );
821 assert_eq!(
822 StorageConfig::from_str("dynamodb:table:env").unwrap(),
823 StorageConfig {
824 inner_storage_config: InnerStorageConfig::DynamoDb {
825 use_dynamodb_local: false
826 },
827 namespace: "table".to_string()
828 }
829 );
830 assert_eq!(
831 StorageConfig::from_str("dynamodb:table:dynamodb_local").unwrap(),
832 StorageConfig {
833 inner_storage_config: InnerStorageConfig::DynamoDb {
834 use_dynamodb_local: true
835 },
836 namespace: "table".to_string()
837 }
838 );
839 assert!(StorageConfig::from_str("dynamodb").is_err());
840 assert!(StorageConfig::from_str("dynamodb:").is_err());
841 assert!(StorageConfig::from_str("dynamodb:1").is_err());
842 assert!(StorageConfig::from_str("dynamodb:wrong:endpoint").is_err());
843}
844
845#[cfg(feature = "scylladb")]
846#[test]
847fn test_scylla_db_storage_config_from_str() {
848 assert_eq!(
849 StorageConfig::from_str("scylladb:").unwrap(),
850 StorageConfig {
851 inner_storage_config: InnerStorageConfig::ScyllaDb {
852 uri: "localhost:9042".to_string()
853 },
854 namespace: DEFAULT_NAMESPACE.to_string()
855 }
856 );
857 assert_eq!(
858 StorageConfig::from_str("scylladb:tcp:db_hostname:230:table_other_storage").unwrap(),
859 StorageConfig {
860 inner_storage_config: InnerStorageConfig::ScyllaDb {
861 uri: "db_hostname:230".to_string()
862 },
863 namespace: "table_other_storage".to_string()
864 }
865 );
866 assert_eq!(
867 StorageConfig::from_str("scylladb:tcp:db_hostname:230").unwrap(),
868 StorageConfig {
869 inner_storage_config: InnerStorageConfig::ScyllaDb {
870 uri: "db_hostname:230".to_string()
871 },
872 namespace: DEFAULT_NAMESPACE.to_string()
873 }
874 );
875 assert!(StorageConfig::from_str("scylladb:-10").is_err());
876 assert!(StorageConfig::from_str("scylladb:70000").is_err());
877 assert!(StorageConfig::from_str("scylladb:230:234").is_err());
878 assert!(StorageConfig::from_str("scylladb:tcp:address1").is_err());
879 assert!(StorageConfig::from_str("scylladb:tcp:address1:tcp:/address2").is_err());
880 assert!(StorageConfig::from_str("scylladb:wrong").is_err());
881}