1use std::{fmt, path::PathBuf, str::FromStr};
5
6use anyhow::{anyhow, bail};
7use async_trait::async_trait;
8use linera_client::config::GenesisConfig;
9use linera_execution::WasmRuntime;
10use linera_storage::{DbStorage, Storage, DEFAULT_NAMESPACE};
11#[cfg(feature = "storage-service")]
12use linera_storage_service::{
13 client::StorageServiceDatabase,
14 common::{StorageServiceStoreConfig, StorageServiceStoreInternalConfig},
15};
16#[cfg(feature = "dynamodb")]
17use linera_views::dynamo_db::{DynamoDbDatabase, DynamoDbStoreConfig, DynamoDbStoreInternalConfig};
18#[cfg(feature = "rocksdb")]
19use linera_views::rocks_db::{
20 PathWithGuard, RocksDbDatabase, RocksDbSpawnMode, RocksDbStoreConfig,
21 RocksDbStoreInternalConfig,
22};
23use linera_views::{
24 lru_prefix_cache::StorageCacheConfig,
25 memory::{MemoryDatabase, MemoryStoreConfig},
26 store::{KeyValueDatabase, KeyValueStore},
27};
28use serde::{Deserialize, Serialize};
29use tracing::error;
30#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
31use {
32 linera_storage::ChainStatesFirstAssignment,
33 linera_views::backends::dual::{DualDatabase, DualStoreConfig},
34 std::path::Path,
35};
36#[cfg(feature = "scylladb")]
37use {
38 linera_views::scylla_db::{ScyllaDbDatabase, ScyllaDbStoreConfig, ScyllaDbStoreInternalConfig},
39 std::num::NonZeroU16,
40 tracing::debug,
41};
42
43#[derive(Clone, Debug, clap::Parser)]
44pub struct CommonStorageOptions {
45 #[arg(long, global = true)]
47 pub storage_max_concurrent_queries: Option<usize>,
48
49 #[arg(long, default_value = "10", global = true)]
51 pub storage_max_stream_queries: usize,
52
53 #[arg(long, default_value = "10000000", global = true)]
55 pub storage_max_cache_size: usize,
56
57 #[arg(long, default_value = "1000000", global = true)]
59 pub storage_max_value_entry_size: usize,
60
61 #[arg(long, default_value = "1000000", global = true)]
63 pub storage_max_find_keys_entry_size: usize,
64
65 #[arg(long, default_value = "1000000", global = true)]
67 pub storage_max_find_key_values_entry_size: usize,
68
69 #[arg(long, default_value = "1000", global = true)]
71 pub storage_max_cache_entries: usize,
72
73 #[arg(long, default_value = "10000000", global = true)]
75 pub storage_max_cache_value_size: usize,
76
77 #[arg(long, default_value = "10000000", global = true)]
79 pub storage_max_cache_find_keys_size: usize,
80
81 #[arg(long, default_value = "10000000", global = true)]
83 pub storage_max_cache_find_key_values_size: usize,
84
85 #[arg(long, default_value = "1", global = true)]
87 pub storage_replication_factor: u32,
88}
89
90impl CommonStorageOptions {
91 pub fn storage_cache_config(&self) -> StorageCacheConfig {
92 StorageCacheConfig {
93 max_cache_size: self.storage_max_cache_size,
94 max_value_entry_size: self.storage_max_value_entry_size,
95 max_find_keys_entry_size: self.storage_max_find_keys_entry_size,
96 max_find_key_values_entry_size: self.storage_max_find_key_values_entry_size,
97 max_cache_entries: self.storage_max_cache_entries,
98 max_cache_value_size: self.storage_max_cache_value_size,
99 max_cache_find_keys_size: self.storage_max_cache_find_keys_size,
100 max_cache_find_key_values_size: self.storage_max_cache_find_key_values_size,
101 }
102 }
103}
104
105#[derive(Clone, Debug, Deserialize, Serialize)]
107pub enum StoreConfig {
108 Memory {
110 config: MemoryStoreConfig,
111 namespace: String,
112 genesis_path: PathBuf,
113 },
114 #[cfg(feature = "storage-service")]
116 StorageService {
117 config: StorageServiceStoreConfig,
118 namespace: String,
119 },
120 #[cfg(feature = "rocksdb")]
122 RocksDb {
123 config: RocksDbStoreConfig,
124 namespace: String,
125 },
126 #[cfg(feature = "dynamodb")]
128 DynamoDb {
129 config: DynamoDbStoreConfig,
130 namespace: String,
131 },
132 #[cfg(feature = "scylladb")]
134 ScyllaDb {
135 config: ScyllaDbStoreConfig,
136 namespace: String,
137 },
138 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
139 DualRocksDbScyllaDb {
140 config: DualStoreConfig<RocksDbStoreConfig, ScyllaDbStoreConfig>,
141 namespace: String,
142 },
143}
144
145#[derive(Clone, Debug)]
147#[cfg_attr(any(test), derive(Eq, PartialEq))]
148pub enum InnerStorageConfig {
149 Memory {
151 genesis_path: PathBuf,
154 },
155 #[cfg(feature = "storage-service")]
157 Service {
158 endpoint: String,
160 },
161 #[cfg(feature = "rocksdb")]
163 RocksDb {
164 path: PathBuf,
166 spawn_mode: RocksDbSpawnMode,
168 },
169 #[cfg(feature = "dynamodb")]
171 DynamoDb {
172 use_dynamodb_local: bool,
174 },
175 #[cfg(feature = "scylladb")]
177 ScyllaDb {
178 uri: String,
180 },
181 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
182 DualRocksDbScyllaDb {
183 path_with_guard: PathWithGuard,
185 spawn_mode: RocksDbSpawnMode,
187 uri: String,
189 },
190}
191
192#[derive(Clone, Debug)]
194#[cfg_attr(any(test), derive(Eq, PartialEq))]
195pub struct StorageConfig {
196 pub inner_storage_config: InnerStorageConfig,
198 pub namespace: String,
200}
201
202const MEMORY: &str = "memory:";
203#[cfg(feature = "storage-service")]
204const STORAGE_SERVICE: &str = "service:";
205#[cfg(feature = "rocksdb")]
206const ROCKS_DB: &str = "rocksdb:";
207#[cfg(feature = "dynamodb")]
208const DYNAMO_DB: &str = "dynamodb:";
209#[cfg(feature = "scylladb")]
210const SCYLLA_DB: &str = "scylladb:";
211#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
212const DUAL_ROCKS_DB_SCYLLA_DB: &str = "dualrocksdbscylladb:";
213
214impl FromStr for StorageConfig {
215 type Err = anyhow::Error;
216
217 fn from_str(input: &str) -> Result<Self, Self::Err> {
218 if let Some(s) = input.strip_prefix(MEMORY) {
219 let parts = s.split(':').collect::<Vec<_>>();
220 if parts.len() == 1 {
221 let genesis_path = parts[0].to_string().into();
222 let namespace = DEFAULT_NAMESPACE.to_string();
223 let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
224 return Ok(StorageConfig {
225 inner_storage_config,
226 namespace,
227 });
228 }
229 if parts.len() != 2 {
230 bail!("We should have one genesis config path and one optional namespace");
231 }
232 let genesis_path = parts[0].to_string().into();
233 let namespace = parts[1].to_string();
234 let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
235 return Ok(StorageConfig {
236 inner_storage_config,
237 namespace,
238 });
239 }
240 #[cfg(feature = "storage-service")]
241 if let Some(s) = input.strip_prefix(STORAGE_SERVICE) {
242 if s.is_empty() {
243 bail!(
244 "For Storage service, the formatting has to be service:endpoint:namespace,\
245example service:tcp:127.0.0.1:7878:table_do_my_test"
246 );
247 }
248 let parts = s.split(':').collect::<Vec<_>>();
249 if parts.len() != 4 {
250 bail!("We should have one endpoint and one namespace");
251 }
252 let protocol = parts[0];
253 if protocol != "tcp" {
254 bail!("Only allowed protocol is tcp");
255 }
256 let endpoint = parts[1];
257 let port = parts[2];
258 let mut endpoint = endpoint.to_string();
259 endpoint.push(':');
260 endpoint.push_str(port);
261 let endpoint = endpoint.to_string();
262 let namespace = parts[3].to_string();
263 let inner_storage_config = InnerStorageConfig::Service { endpoint };
264 return Ok(StorageConfig {
265 inner_storage_config,
266 namespace,
267 });
268 }
269 #[cfg(feature = "rocksdb")]
270 if let Some(s) = input.strip_prefix(ROCKS_DB) {
271 if s.is_empty() {
272 bail!(
273 "For RocksDB, the formatting has to be rocksdb:directory or rocksdb:directory:spawn_mode:namespace");
274 }
275 let parts = s.split(':').collect::<Vec<_>>();
276 if parts.len() == 1 {
277 let path = parts[0].to_string().into();
278 let namespace = DEFAULT_NAMESPACE.to_string();
279 let spawn_mode = RocksDbSpawnMode::SpawnBlocking;
280 let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
281 return Ok(StorageConfig {
282 inner_storage_config,
283 namespace,
284 });
285 }
286 if parts.len() == 2 || parts.len() == 3 {
287 let path = parts[0].to_string().into();
288 let spawn_mode = match parts[1] {
289 "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
290 "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
291 "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
292 _ => Err(anyhow!("Failed to parse {} as a spawn_mode", parts[1])),
293 }?;
294 let namespace = if parts.len() == 2 {
295 DEFAULT_NAMESPACE.to_string()
296 } else {
297 parts[2].to_string()
298 };
299 let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
300 return Ok(StorageConfig {
301 inner_storage_config,
302 namespace,
303 });
304 }
305 bail!("We should have one, two or three parts");
306 }
307 #[cfg(feature = "dynamodb")]
308 if let Some(s) = input.strip_prefix(DYNAMO_DB) {
309 let mut parts = s.splitn(2, ':');
310 let namespace = parts
311 .next()
312 .ok_or_else(|| anyhow!("Missing DynamoDB table name, e.g. {DYNAMO_DB}TABLE"))?
313 .to_string();
314 let use_dynamodb_local = match parts.next() {
315 None | Some("env") => false,
316 Some("dynamodb_local") => true,
317 Some(unknown) => {
318 bail!(
319 "Invalid DynamoDB endpoint {unknown:?}. \
320 Expected {DYNAMO_DB}TABLE:[env|dynamodb_local]"
321 );
322 }
323 };
324 let inner_storage_config = InnerStorageConfig::DynamoDb { use_dynamodb_local };
325 return Ok(StorageConfig {
326 inner_storage_config,
327 namespace,
328 });
329 }
330 #[cfg(feature = "scylladb")]
331 if let Some(s) = input.strip_prefix(SCYLLA_DB) {
332 let mut uri: Option<String> = None;
333 let mut namespace: Option<String> = None;
334 let parse_error: &'static str = "Correct format is tcp:db_hostname:port.";
335 if !s.is_empty() {
336 let mut parts = s.split(':');
337 while let Some(part) = parts.next() {
338 match part {
339 "tcp" => {
340 let address = parts.next().ok_or_else(|| {
341 anyhow!("Failed to find address for {s}. {parse_error}")
342 })?;
343 let port_str = parts.next().ok_or_else(|| {
344 anyhow!("Failed to find port for {s}. {parse_error}")
345 })?;
346 let port = NonZeroU16::from_str(port_str).map_err(|_| {
347 anyhow!(
348 "Failed to find parse port {port_str} for {s}. {parse_error}",
349 )
350 })?;
351 if uri.is_some() {
352 bail!("The uri has already been assigned");
353 }
354 uri = Some(format!("{}:{}", &address, port));
355 }
356 _ if part.starts_with("table") => {
357 if namespace.is_some() {
358 bail!("The namespace has already been assigned");
359 }
360 namespace = Some(part.to_string());
361 }
362 _ => {
363 bail!("the entry \"{part}\" is not matching");
364 }
365 }
366 }
367 }
368 let uri = uri.unwrap_or("localhost:9042".to_string());
369 let namespace = namespace.unwrap_or(DEFAULT_NAMESPACE.to_string());
370 let inner_storage_config = InnerStorageConfig::ScyllaDb { uri };
371 debug!("ScyllaDB connection info: {:?}", inner_storage_config);
372 return Ok(StorageConfig {
373 inner_storage_config,
374 namespace,
375 });
376 }
377 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
378 if let Some(s) = input.strip_prefix(DUAL_ROCKS_DB_SCYLLA_DB) {
379 let parts = s.split(':').collect::<Vec<_>>();
380 if parts.len() != 5 && parts.len() != 6 {
381 bail!(
382 "For DualRocksDbScyllaDb, the formatting has to be dualrocksdbscylladb:directory:mode:tcp:hostname:port:namespace"
383 );
384 }
385 let path = Path::new(parts[0]);
386 let path = path.to_path_buf();
387 let path_with_guard = PathWithGuard::new(path);
388 let spawn_mode = match parts[1] {
389 "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
390 "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
391 "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
392 _ => Err(anyhow!("Failed to parse {} as a spawn_mode", parts[1])),
393 }?;
394 let protocol = parts[2];
395 if protocol != "tcp" {
396 bail!("The only allowed protocol is tcp");
397 }
398 let address = parts[3];
399 let port_str = parts[4];
400 let port = NonZeroU16::from_str(port_str)
401 .map_err(|_| anyhow!("Failed to find parse port {port_str} for {s}"))?;
402 let uri = format!("{}:{}", &address, port);
403 let inner_storage_config = InnerStorageConfig::DualRocksDbScyllaDb {
404 path_with_guard,
405 spawn_mode,
406 uri,
407 };
408 let namespace = if parts.len() == 5 {
409 DEFAULT_NAMESPACE.to_string()
410 } else {
411 parts[5].to_string()
412 };
413 return Ok(StorageConfig {
414 inner_storage_config,
415 namespace,
416 });
417 }
418 error!("available storage: memory");
419 #[cfg(feature = "storage-service")]
420 error!("Also available is linera-storage-service");
421 #[cfg(feature = "rocksdb")]
422 error!("Also available is RocksDB");
423 #[cfg(feature = "dynamodb")]
424 error!("Also available is DynamoDB");
425 #[cfg(feature = "scylladb")]
426 error!("Also available is ScyllaDB");
427 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
428 error!("Also available is DualRocksDbScyllaDb");
429 Err(anyhow!("The input has not matched: {input}"))
430 }
431}
432
433impl StorageConfig {
434 pub fn maybe_append_shard_path(&mut self, _shard: usize) -> std::io::Result<()> {
435 match &mut self.inner_storage_config {
436 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
437 InnerStorageConfig::DualRocksDbScyllaDb {
438 path_with_guard,
439 spawn_mode: _,
440 uri: _,
441 } => {
442 let shard_str = format!("shard_{}", _shard);
443 path_with_guard.path_buf.push(shard_str);
444 std::fs::create_dir_all(&path_with_guard.path_buf)
445 }
446 _ => Ok(()),
447 }
448 }
449
450 pub fn add_common_storage_options(
452 &self,
453 options: &CommonStorageOptions,
454 ) -> Result<StoreConfig, anyhow::Error> {
455 let namespace = self.namespace.clone();
456 match &self.inner_storage_config {
457 InnerStorageConfig::Memory { genesis_path } => {
458 let config = MemoryStoreConfig {
459 max_stream_queries: options.storage_max_stream_queries,
460 kill_on_drop: false,
461 };
462 let genesis_path = genesis_path.clone();
463 Ok(StoreConfig::Memory {
464 config,
465 namespace,
466 genesis_path,
467 })
468 }
469 #[cfg(feature = "storage-service")]
470 InnerStorageConfig::Service { endpoint } => {
471 let inner_config = StorageServiceStoreInternalConfig {
472 endpoint: endpoint.clone(),
473 max_concurrent_queries: options.storage_max_concurrent_queries,
474 max_stream_queries: options.storage_max_stream_queries,
475 };
476 let config = StorageServiceStoreConfig {
477 inner_config,
478 storage_cache_config: options.storage_cache_config(),
479 };
480 Ok(StoreConfig::StorageService { config, namespace })
481 }
482 #[cfg(feature = "rocksdb")]
483 InnerStorageConfig::RocksDb { path, spawn_mode } => {
484 let path_with_guard = PathWithGuard::new(path.to_path_buf());
485 let inner_config = RocksDbStoreInternalConfig {
486 spawn_mode: *spawn_mode,
487 path_with_guard,
488 max_stream_queries: options.storage_max_stream_queries,
489 };
490 let config = RocksDbStoreConfig {
491 inner_config,
492 storage_cache_config: options.storage_cache_config(),
493 };
494 Ok(StoreConfig::RocksDb { config, namespace })
495 }
496 #[cfg(feature = "dynamodb")]
497 InnerStorageConfig::DynamoDb { use_dynamodb_local } => {
498 let inner_config = DynamoDbStoreInternalConfig {
499 use_dynamodb_local: *use_dynamodb_local,
500 max_concurrent_queries: options.storage_max_concurrent_queries,
501 max_stream_queries: options.storage_max_stream_queries,
502 };
503 let config = DynamoDbStoreConfig {
504 inner_config,
505 storage_cache_config: options.storage_cache_config(),
506 };
507 Ok(StoreConfig::DynamoDb { config, namespace })
508 }
509 #[cfg(feature = "scylladb")]
510 InnerStorageConfig::ScyllaDb { uri } => {
511 let inner_config = ScyllaDbStoreInternalConfig {
512 uri: uri.clone(),
513 max_stream_queries: options.storage_max_stream_queries,
514 max_concurrent_queries: options.storage_max_concurrent_queries,
515 replication_factor: options.storage_replication_factor,
516 };
517 let config = ScyllaDbStoreConfig {
518 inner_config,
519 storage_cache_config: options.storage_cache_config(),
520 };
521 Ok(StoreConfig::ScyllaDb { config, namespace })
522 }
523 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
524 InnerStorageConfig::DualRocksDbScyllaDb {
525 path_with_guard,
526 spawn_mode,
527 uri,
528 } => {
529 let inner_config = RocksDbStoreInternalConfig {
530 spawn_mode: *spawn_mode,
531 path_with_guard: path_with_guard.clone(),
532 max_stream_queries: options.storage_max_stream_queries,
533 };
534 let first_config = RocksDbStoreConfig {
535 inner_config,
536 storage_cache_config: options.storage_cache_config(),
537 };
538
539 let inner_config = ScyllaDbStoreInternalConfig {
540 uri: uri.clone(),
541 max_stream_queries: options.storage_max_stream_queries,
542 max_concurrent_queries: options.storage_max_concurrent_queries,
543 replication_factor: options.storage_replication_factor,
544 };
545 let second_config = ScyllaDbStoreConfig {
546 inner_config,
547 storage_cache_config: options.storage_cache_config(),
548 };
549
550 let config = DualStoreConfig {
551 first_config,
552 second_config,
553 };
554 Ok(StoreConfig::DualRocksDbScyllaDb { config, namespace })
555 }
556 }
557 }
558}
559
560impl fmt::Display for StorageConfig {
561 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
562 let namespace = &self.namespace;
563 match &self.inner_storage_config {
564 #[cfg(feature = "storage-service")]
565 InnerStorageConfig::Service { endpoint } => {
566 write!(f, "service:tcp:{}:{}", endpoint, namespace)
567 }
568 InnerStorageConfig::Memory { genesis_path } => {
569 write!(f, "memory:{}:{}", genesis_path.display(), namespace)
570 }
571 #[cfg(feature = "rocksdb")]
572 InnerStorageConfig::RocksDb { path, spawn_mode } => {
573 let spawn_mode = spawn_mode.to_string();
574 write!(f, "rocksdb:{}:{}:{}", path.display(), spawn_mode, namespace)
575 }
576 #[cfg(feature = "dynamodb")]
577 InnerStorageConfig::DynamoDb { use_dynamodb_local } => match use_dynamodb_local {
578 true => write!(f, "dynamodb:{}:dynamodb_local", namespace),
579 false => write!(f, "dynamodb:{}:env", namespace),
580 },
581 #[cfg(feature = "scylladb")]
582 InnerStorageConfig::ScyllaDb { uri } => {
583 write!(f, "scylladb:tcp:{}:{}", uri, namespace)
584 }
585 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
586 InnerStorageConfig::DualRocksDbScyllaDb {
587 path_with_guard,
588 spawn_mode,
589 uri,
590 } => {
591 write!(
592 f,
593 "dualrocksdbscylladb:{}:{}:tcp:{}:{}",
594 path_with_guard.path_buf.display(),
595 spawn_mode,
596 uri,
597 namespace
598 )
599 }
600 }
601 }
602}
603
604#[async_trait]
605pub trait Runnable {
606 type Output;
607
608 async fn run<S>(self, storage: S) -> Self::Output
609 where
610 S: Storage + Clone + Send + Sync + 'static;
611}
612
613#[async_trait]
614pub trait RunnableWithStore {
615 type Output;
616
617 async fn run<D>(
618 self,
619 config: D::Config,
620 namespace: String,
621 ) -> Result<Self::Output, anyhow::Error>
622 where
623 D: KeyValueDatabase + Clone + Send + Sync + 'static,
624 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
625 D::Error: Send + Sync;
626}
627
628impl StoreConfig {
629 pub async fn run_with_storage<Job>(
630 self,
631 wasm_runtime: Option<WasmRuntime>,
632 allow_application_logs: bool,
633 job: Job,
634 ) -> Result<Job::Output, anyhow::Error>
635 where
636 Job: Runnable,
637 {
638 match self {
639 StoreConfig::Memory {
640 config,
641 namespace,
642 genesis_path,
643 } => {
644 let mut storage = DbStorage::<MemoryDatabase, _>::maybe_create_and_connect(
645 &config,
646 &namespace,
647 wasm_runtime,
648 )
649 .await?
650 .with_allow_application_logs(allow_application_logs);
651 let genesis_config = crate::util::read_json::<GenesisConfig>(genesis_path)?;
652 genesis_config.initialize_storage(&mut storage).await?;
654 Ok(job.run(storage).await)
655 }
656 #[cfg(feature = "storage-service")]
657 StoreConfig::StorageService { config, namespace } => {
658 let storage = DbStorage::<StorageServiceDatabase, _>::connect(
659 &config,
660 &namespace,
661 wasm_runtime,
662 )
663 .await?
664 .with_allow_application_logs(allow_application_logs);
665 Ok(job.run(storage).await)
666 }
667 #[cfg(feature = "rocksdb")]
668 StoreConfig::RocksDb { config, namespace } => {
669 let storage =
670 DbStorage::<RocksDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
671 .await?
672 .with_allow_application_logs(allow_application_logs);
673 Ok(job.run(storage).await)
674 }
675 #[cfg(feature = "dynamodb")]
676 StoreConfig::DynamoDb { config, namespace } => {
677 let storage =
678 DbStorage::<DynamoDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
679 .await?
680 .with_allow_application_logs(allow_application_logs);
681 Ok(job.run(storage).await)
682 }
683 #[cfg(feature = "scylladb")]
684 StoreConfig::ScyllaDb { config, namespace } => {
685 let storage =
686 DbStorage::<ScyllaDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
687 .await?
688 .with_allow_application_logs(allow_application_logs);
689 Ok(job.run(storage).await)
690 }
691 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
692 StoreConfig::DualRocksDbScyllaDb { config, namespace } => {
693 let storage = DbStorage::<
694 DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>,
695 _,
696 >::connect(&config, &namespace, wasm_runtime)
697 .await?
698 .with_allow_application_logs(allow_application_logs);
699 Ok(job.run(storage).await)
700 }
701 }
702 }
703
704 #[allow(unused_variables)]
705 pub async fn run_with_store<Job>(self, job: Job) -> Result<Job::Output, anyhow::Error>
706 where
707 Job: RunnableWithStore,
708 {
709 match self {
710 StoreConfig::Memory { .. } => {
711 Err(anyhow!("Cannot run admin operations on the memory store"))
712 }
713 #[cfg(feature = "storage-service")]
714 StoreConfig::StorageService { config, namespace } => {
715 Ok(job.run::<StorageServiceDatabase>(config, namespace).await?)
716 }
717 #[cfg(feature = "rocksdb")]
718 StoreConfig::RocksDb { config, namespace } => {
719 Ok(job.run::<RocksDbDatabase>(config, namespace).await?)
720 }
721 #[cfg(feature = "dynamodb")]
722 StoreConfig::DynamoDb { config, namespace } => {
723 Ok(job.run::<DynamoDbDatabase>(config, namespace).await?)
724 }
725 #[cfg(feature = "scylladb")]
726 StoreConfig::ScyllaDb { config, namespace } => {
727 Ok(job.run::<ScyllaDbDatabase>(config, namespace).await?)
728 }
729 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
730 StoreConfig::DualRocksDbScyllaDb { config, namespace } => Ok(job
731 .run::<DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>>(
732 config, namespace,
733 )
734 .await?),
735 }
736 }
737
738 pub async fn initialize(self, config: &GenesisConfig) -> Result<(), anyhow::Error> {
739 self.run_with_store(InitializeStorageJob(config)).await
740 }
741}
742
743struct InitializeStorageJob<'a>(&'a GenesisConfig);
744
745#[async_trait]
746impl RunnableWithStore for InitializeStorageJob<'_> {
747 type Output = ();
748
749 async fn run<D>(
750 self,
751 config: D::Config,
752 namespace: String,
753 ) -> Result<Self::Output, anyhow::Error>
754 where
755 D: KeyValueDatabase + Clone + Send + Sync + 'static,
756 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
757 D::Error: Send + Sync,
758 {
759 let mut storage =
760 DbStorage::<D, _>::maybe_create_and_connect(&config, &namespace, None).await?;
761 self.0.initialize_storage(&mut storage).await?;
762 Ok(())
763 }
764}
765
766#[test]
767fn test_memory_storage_config_from_str() {
768 assert_eq!(
769 StorageConfig::from_str("memory:path/to/genesis.json").unwrap(),
770 StorageConfig {
771 inner_storage_config: InnerStorageConfig::Memory {
772 genesis_path: PathBuf::from("path/to/genesis.json")
773 },
774 namespace: DEFAULT_NAMESPACE.into()
775 }
776 );
777 assert_eq!(
778 StorageConfig::from_str("memory:path/to/genesis.json:namespace").unwrap(),
779 StorageConfig {
780 inner_storage_config: InnerStorageConfig::Memory {
781 genesis_path: PathBuf::from("path/to/genesis.json")
782 },
783 namespace: "namespace".into()
784 }
785 );
786 assert!(StorageConfig::from_str("memory").is_err(),);
787}
788
789#[cfg(feature = "storage-service")]
790#[test]
791fn test_shared_store_config_from_str() {
792 assert_eq!(
793 StorageConfig::from_str("service:tcp:127.0.0.1:8942:linera").unwrap(),
794 StorageConfig {
795 inner_storage_config: InnerStorageConfig::Service {
796 endpoint: "127.0.0.1:8942".to_string()
797 },
798 namespace: "linera".into()
799 }
800 );
801 assert!(StorageConfig::from_str("service:tcp:127.0.0.1:8942").is_err());
802 assert!(StorageConfig::from_str("service:tcp:127.0.0.1:linera").is_err());
803}
804
805#[cfg(feature = "rocksdb")]
806#[test]
807fn test_rocks_db_storage_config_from_str() {
808 assert!(StorageConfig::from_str("rocksdb_foo.db").is_err());
809 assert_eq!(
810 StorageConfig::from_str("rocksdb:foo.db").unwrap(),
811 StorageConfig {
812 inner_storage_config: InnerStorageConfig::RocksDb {
813 path: "foo.db".into(),
814 spawn_mode: RocksDbSpawnMode::SpawnBlocking,
815 },
816 namespace: DEFAULT_NAMESPACE.to_string()
817 }
818 );
819 assert_eq!(
820 StorageConfig::from_str("rocksdb:foo.db:block_in_place").unwrap(),
821 StorageConfig {
822 inner_storage_config: InnerStorageConfig::RocksDb {
823 path: "foo.db".into(),
824 spawn_mode: RocksDbSpawnMode::BlockInPlace,
825 },
826 namespace: DEFAULT_NAMESPACE.to_string()
827 }
828 );
829 assert_eq!(
830 StorageConfig::from_str("rocksdb:foo.db:block_in_place:chosen_namespace").unwrap(),
831 StorageConfig {
832 inner_storage_config: InnerStorageConfig::RocksDb {
833 path: "foo.db".into(),
834 spawn_mode: RocksDbSpawnMode::BlockInPlace,
835 },
836 namespace: "chosen_namespace".into()
837 }
838 );
839}
840
841#[cfg(feature = "dynamodb")]
842#[test]
843fn test_aws_storage_config_from_str() {
844 assert_eq!(
845 StorageConfig::from_str("dynamodb:table").unwrap(),
846 StorageConfig {
847 inner_storage_config: InnerStorageConfig::DynamoDb {
848 use_dynamodb_local: false
849 },
850 namespace: "table".to_string()
851 }
852 );
853 assert_eq!(
854 StorageConfig::from_str("dynamodb:table:env").unwrap(),
855 StorageConfig {
856 inner_storage_config: InnerStorageConfig::DynamoDb {
857 use_dynamodb_local: false
858 },
859 namespace: "table".to_string()
860 }
861 );
862 assert_eq!(
863 StorageConfig::from_str("dynamodb:table:dynamodb_local").unwrap(),
864 StorageConfig {
865 inner_storage_config: InnerStorageConfig::DynamoDb {
866 use_dynamodb_local: true
867 },
868 namespace: "table".to_string()
869 }
870 );
871 assert!(StorageConfig::from_str("dynamodb").is_err());
872 assert!(StorageConfig::from_str("dynamodb:").is_err());
873 assert!(StorageConfig::from_str("dynamodb:1").is_err());
874 assert!(StorageConfig::from_str("dynamodb:wrong:endpoint").is_err());
875}
876
877#[cfg(feature = "scylladb")]
878#[test]
879fn test_scylla_db_storage_config_from_str() {
880 assert_eq!(
881 StorageConfig::from_str("scylladb:").unwrap(),
882 StorageConfig {
883 inner_storage_config: InnerStorageConfig::ScyllaDb {
884 uri: "localhost:9042".to_string()
885 },
886 namespace: DEFAULT_NAMESPACE.to_string()
887 }
888 );
889 assert_eq!(
890 StorageConfig::from_str("scylladb:tcp:db_hostname:230:table_other_storage").unwrap(),
891 StorageConfig {
892 inner_storage_config: InnerStorageConfig::ScyllaDb {
893 uri: "db_hostname:230".to_string()
894 },
895 namespace: "table_other_storage".to_string()
896 }
897 );
898 assert_eq!(
899 StorageConfig::from_str("scylladb:tcp:db_hostname:230").unwrap(),
900 StorageConfig {
901 inner_storage_config: InnerStorageConfig::ScyllaDb {
902 uri: "db_hostname:230".to_string()
903 },
904 namespace: DEFAULT_NAMESPACE.to_string()
905 }
906 );
907 assert!(StorageConfig::from_str("scylladb:-10").is_err());
908 assert!(StorageConfig::from_str("scylladb:70000").is_err());
909 assert!(StorageConfig::from_str("scylladb:230:234").is_err());
910 assert!(StorageConfig::from_str("scylladb:tcp:address1").is_err());
911 assert!(StorageConfig::from_str("scylladb:tcp:address1:tcp:/address2").is_err());
912 assert!(StorageConfig::from_str("scylladb:wrong").is_err());
913}