1use std::{fmt, path::PathBuf, str::FromStr};
5
6use anyhow::{anyhow, bail};
7use async_trait::async_trait;
8use linera_client::config::GenesisConfig;
9use linera_execution::WasmRuntime;
10use linera_storage::{DbStorage, Storage, DEFAULT_NAMESPACE};
11#[cfg(feature = "storage-service")]
12use linera_storage_service::{
13 client::StorageServiceDatabase,
14 common::{StorageServiceStoreConfig, StorageServiceStoreInternalConfig},
15};
16#[cfg(feature = "dynamodb")]
17use linera_views::dynamo_db::{DynamoDbDatabase, DynamoDbStoreConfig, DynamoDbStoreInternalConfig};
18#[cfg(feature = "rocksdb")]
19use linera_views::rocks_db::{
20 PathWithGuard, RocksDbDatabase, RocksDbSpawnMode, RocksDbStoreConfig,
21 RocksDbStoreInternalConfig,
22};
23use linera_views::{
24 lru_prefix_cache::StorageCacheConfig,
25 memory::{MemoryDatabase, MemoryStoreConfig},
26 store::{KeyValueDatabase, KeyValueStore},
27};
28use serde::{Deserialize, Serialize};
29use tracing::error;
30#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
31use {
32 linera_storage::ChainStatesFirstAssignment,
33 linera_views::backends::dual::{DualDatabase, DualStoreConfig},
34 std::path::Path,
35};
36#[cfg(feature = "scylladb")]
37use {
38 linera_views::scylla_db::{ScyllaDbDatabase, ScyllaDbStoreConfig, ScyllaDbStoreInternalConfig},
39 std::num::NonZeroU16,
40 tracing::debug,
41};
42
43#[derive(Clone, Debug, clap::Parser)]
44pub struct CommonStorageOptions {
45 #[arg(long, global = true)]
47 pub storage_max_concurrent_queries: Option<usize>,
48
49 #[arg(long, default_value = "10", global = true)]
51 pub storage_max_stream_queries: usize,
52
53 #[arg(long, default_value = "10000000", global = true)]
55 pub storage_max_cache_size: usize,
56
57 #[arg(long, default_value = "1000000", global = true)]
59 pub storage_max_value_entry_size: usize,
60
61 #[arg(long, default_value = "1000000", global = true)]
63 pub storage_max_find_keys_entry_size: usize,
64
65 #[arg(long, default_value = "1000000", global = true)]
67 pub storage_max_find_key_values_entry_size: usize,
68
69 #[arg(long, default_value = "1000", global = true)]
71 pub storage_max_cache_entries: usize,
72
73 #[arg(long, default_value = "10000000", global = true)]
75 pub storage_max_cache_value_size: usize,
76
77 #[arg(long, default_value = "10000000", global = true)]
79 pub storage_max_cache_find_keys_size: usize,
80
81 #[arg(long, default_value = "10000000", global = true)]
83 pub storage_max_cache_find_key_values_size: usize,
84
85 #[arg(long, default_value = "1", global = true)]
87 pub storage_replication_factor: u32,
88}
89
90impl CommonStorageOptions {
91 pub fn storage_cache_config(&self) -> StorageCacheConfig {
92 StorageCacheConfig {
93 max_cache_size: self.storage_max_cache_size,
94 max_value_entry_size: self.storage_max_value_entry_size,
95 max_find_keys_entry_size: self.storage_max_find_keys_entry_size,
96 max_find_key_values_entry_size: self.storage_max_find_key_values_entry_size,
97 max_cache_entries: self.storage_max_cache_entries,
98 max_cache_value_size: self.storage_max_cache_value_size,
99 max_cache_find_keys_size: self.storage_max_cache_find_keys_size,
100 max_cache_find_key_values_size: self.storage_max_cache_find_key_values_size,
101 }
102 }
103}
104
105#[derive(Clone, Debug, Deserialize, Serialize)]
107pub enum StoreConfig {
108 Memory {
110 config: MemoryStoreConfig,
111 namespace: String,
112 genesis_path: PathBuf,
113 },
114 #[cfg(feature = "storage-service")]
116 StorageService {
117 config: StorageServiceStoreConfig,
118 namespace: String,
119 },
120 #[cfg(feature = "rocksdb")]
122 RocksDb {
123 config: RocksDbStoreConfig,
124 namespace: String,
125 },
126 #[cfg(feature = "dynamodb")]
128 DynamoDb {
129 config: DynamoDbStoreConfig,
130 namespace: String,
131 },
132 #[cfg(feature = "scylladb")]
134 ScyllaDb {
135 config: ScyllaDbStoreConfig,
136 namespace: String,
137 },
138 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
139 DualRocksDbScyllaDb {
140 config: DualStoreConfig<RocksDbStoreConfig, ScyllaDbStoreConfig>,
141 namespace: String,
142 },
143}
144
145#[derive(Clone, Debug)]
147#[cfg_attr(any(test), derive(Eq, PartialEq))]
148pub enum InnerStorageConfig {
149 Memory {
151 genesis_path: PathBuf,
154 },
155 #[cfg(feature = "storage-service")]
157 Service {
158 endpoint: String,
160 },
161 #[cfg(feature = "rocksdb")]
163 RocksDb {
164 path: PathBuf,
166 spawn_mode: RocksDbSpawnMode,
168 },
169 #[cfg(feature = "dynamodb")]
171 DynamoDb {
172 use_dynamodb_local: bool,
174 },
175 #[cfg(feature = "scylladb")]
177 ScyllaDb {
178 uri: String,
180 },
181 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
182 DualRocksDbScyllaDb {
183 path_with_guard: PathWithGuard,
185 spawn_mode: RocksDbSpawnMode,
187 uri: String,
189 },
190}
191
192#[derive(Clone, Debug)]
194#[cfg_attr(any(test), derive(Eq, PartialEq))]
195pub struct StorageConfig {
196 pub inner_storage_config: InnerStorageConfig,
198 pub namespace: String,
200}
201
202const MEMORY: &str = "memory:";
203#[cfg(feature = "storage-service")]
204const STORAGE_SERVICE: &str = "service:";
205#[cfg(feature = "rocksdb")]
206const ROCKS_DB: &str = "rocksdb:";
207#[cfg(feature = "dynamodb")]
208const DYNAMO_DB: &str = "dynamodb:";
209#[cfg(feature = "scylladb")]
210const SCYLLA_DB: &str = "scylladb:";
211#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
212const DUAL_ROCKS_DB_SCYLLA_DB: &str = "dualrocksdbscylladb:";
213
214impl FromStr for StorageConfig {
215 type Err = anyhow::Error;
216
217 fn from_str(input: &str) -> Result<Self, Self::Err> {
218 if let Some(s) = input.strip_prefix(MEMORY) {
219 let parts = s.split(':').collect::<Vec<_>>();
220 if parts.len() == 1 {
221 let genesis_path = parts[0].to_string().into();
222 let namespace = DEFAULT_NAMESPACE.to_string();
223 let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
224 return Ok(StorageConfig {
225 inner_storage_config,
226 namespace,
227 });
228 }
229 if parts.len() != 2 {
230 bail!("We should have one genesis config path and one optional namespace");
231 }
232 let genesis_path = parts[0].to_string().into();
233 let namespace = parts[1].to_string();
234 let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
235 return Ok(StorageConfig {
236 inner_storage_config,
237 namespace,
238 });
239 }
240 #[cfg(feature = "storage-service")]
241 if let Some(s) = input.strip_prefix(STORAGE_SERVICE) {
242 if s.is_empty() {
243 bail!(
244 "For Storage service, the formatting has to be service:endpoint:namespace,\
245example service:tcp:127.0.0.1:7878:table_do_my_test"
246 );
247 }
248 let parts = s.split(':').collect::<Vec<_>>();
249 if parts.len() != 4 {
250 bail!("We should have one endpoint and one namespace");
251 }
252 let protocol = parts[0];
253 if protocol != "tcp" {
254 bail!("Only allowed protocol is tcp");
255 }
256 let endpoint = parts[1];
257 let port = parts[2];
258 let mut endpoint = endpoint.to_string();
259 endpoint.push(':');
260 endpoint.push_str(port);
261 let endpoint = endpoint.to_string();
262 let namespace = parts[3].to_string();
263 let inner_storage_config = InnerStorageConfig::Service { endpoint };
264 return Ok(StorageConfig {
265 inner_storage_config,
266 namespace,
267 });
268 }
269 #[cfg(feature = "rocksdb")]
270 if let Some(s) = input.strip_prefix(ROCKS_DB) {
271 if s.is_empty() {
272 bail!(
273 "For RocksDB, the formatting has to be rocksdb:directory or rocksdb:directory:spawn_mode:namespace");
274 }
275 let parts = s.split(':').collect::<Vec<_>>();
276 if parts.len() == 1 {
277 let path = parts[0].to_string().into();
278 let namespace = DEFAULT_NAMESPACE.to_string();
279 let spawn_mode = RocksDbSpawnMode::SpawnBlocking;
280 let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
281 return Ok(StorageConfig {
282 inner_storage_config,
283 namespace,
284 });
285 }
286 if parts.len() == 2 || parts.len() == 3 {
287 let path = parts[0].to_string().into();
288 let spawn_mode_name = parts
289 .get(1)
290 .copied()
291 .expect("validated by the parts length check above");
292 let spawn_mode = match spawn_mode_name {
293 "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
294 "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
295 "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
296 _ => Err(anyhow!(
297 "Failed to parse {} as a spawn_mode",
298 spawn_mode_name
299 )),
300 }?;
301 let namespace = if parts.len() == 2 {
302 DEFAULT_NAMESPACE.to_string()
303 } else {
304 parts[2].to_string()
305 };
306 let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
307 return Ok(StorageConfig {
308 inner_storage_config,
309 namespace,
310 });
311 }
312 bail!("We should have one, two or three parts");
313 }
314 #[cfg(feature = "dynamodb")]
315 if let Some(s) = input.strip_prefix(DYNAMO_DB) {
316 let mut parts = s.splitn(2, ':');
317 let namespace = parts
318 .next()
319 .ok_or_else(|| anyhow!("Missing DynamoDB table name, e.g. {DYNAMO_DB}TABLE"))?
320 .to_string();
321 let use_dynamodb_local = match parts.next() {
322 None | Some("env") => false,
323 Some("dynamodb_local") => true,
324 Some(unknown) => {
325 bail!(
326 "Invalid DynamoDB endpoint {unknown:?}. \
327 Expected {DYNAMO_DB}TABLE:[env|dynamodb_local]"
328 );
329 }
330 };
331 let inner_storage_config = InnerStorageConfig::DynamoDb { use_dynamodb_local };
332 return Ok(StorageConfig {
333 inner_storage_config,
334 namespace,
335 });
336 }
337 #[cfg(feature = "scylladb")]
338 if let Some(s) = input.strip_prefix(SCYLLA_DB) {
339 let mut uri: Option<String> = None;
340 let mut namespace: Option<String> = None;
341 let parse_error: &'static str = "Correct format is tcp:db_hostname:port.";
342 if !s.is_empty() {
343 let mut parts = s.split(':');
344 while let Some(part) = parts.next() {
345 match part {
346 "tcp" => {
347 let address = parts.next().ok_or_else(|| {
348 anyhow!("Failed to find address for {s}. {parse_error}")
349 })?;
350 let port_str = parts.next().ok_or_else(|| {
351 anyhow!("Failed to find port for {s}. {parse_error}")
352 })?;
353 let port = NonZeroU16::from_str(port_str).map_err(|_| {
354 anyhow!(
355 "Failed to find parse port {port_str} for {s}. {parse_error}",
356 )
357 })?;
358 if uri.is_some() {
359 bail!("The uri has already been assigned");
360 }
361 uri = Some(format!("{}:{}", &address, port));
362 }
363 _ if part.starts_with("table") => {
364 if namespace.is_some() {
365 bail!("The namespace has already been assigned");
366 }
367 namespace = Some(part.to_string());
368 }
369 _ => {
370 bail!("the entry \"{part}\" is not matching");
371 }
372 }
373 }
374 }
375 let uri = uri.unwrap_or_else(|| "localhost:9042".to_string());
376 let namespace = namespace.unwrap_or_else(|| DEFAULT_NAMESPACE.to_string());
377 let inner_storage_config = InnerStorageConfig::ScyllaDb { uri };
378 debug!("ScyllaDB connection info: {:?}", inner_storage_config);
379 return Ok(StorageConfig {
380 inner_storage_config,
381 namespace,
382 });
383 }
384 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
385 if let Some(s) = input.strip_prefix(DUAL_ROCKS_DB_SCYLLA_DB) {
386 let parts = s.split(':').collect::<Vec<_>>();
387 if parts.len() != 5 && parts.len() != 6 {
388 bail!(
389 "For DualRocksDbScyllaDb, the formatting has to be dualrocksdbscylladb:directory:mode:tcp:hostname:port:namespace"
390 );
391 }
392 let path = Path::new(parts[0]);
393 let path = path.to_path_buf();
394 let path_with_guard = PathWithGuard::new(path);
395 let spawn_mode_name = parts
396 .get(1)
397 .copied()
398 .expect("validated by the parts length check above");
399 let spawn_mode = match spawn_mode_name {
400 "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
401 "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
402 "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
403 _ => Err(anyhow!(
404 "Failed to parse {} as a spawn_mode",
405 spawn_mode_name
406 )),
407 }?;
408 let protocol = parts[2];
409 if protocol != "tcp" {
410 bail!("The only allowed protocol is tcp");
411 }
412 let address = parts[3];
413 let port_str = parts[4];
414 let port = NonZeroU16::from_str(port_str)
415 .map_err(|_| anyhow!("Failed to find parse port {port_str} for {s}"))?;
416 let uri = format!("{}:{}", &address, port);
417 let inner_storage_config = InnerStorageConfig::DualRocksDbScyllaDb {
418 path_with_guard,
419 spawn_mode,
420 uri,
421 };
422 let namespace = if parts.len() == 5 {
423 DEFAULT_NAMESPACE.to_string()
424 } else {
425 parts[5].to_string()
426 };
427 return Ok(StorageConfig {
428 inner_storage_config,
429 namespace,
430 });
431 }
432 error!("available storage: memory");
433 #[cfg(feature = "storage-service")]
434 error!("Also available is linera-storage-service");
435 #[cfg(feature = "rocksdb")]
436 error!("Also available is RocksDB");
437 #[cfg(feature = "dynamodb")]
438 error!("Also available is DynamoDB");
439 #[cfg(feature = "scylladb")]
440 error!("Also available is ScyllaDB");
441 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
442 error!("Also available is DualRocksDbScyllaDb");
443 Err(anyhow!("The input has not matched: {input}"))
444 }
445}
446
447impl StorageConfig {
448 #[allow(clippy::used_underscore_binding)]
449 pub fn maybe_append_shard_path(&mut self, _shard: usize) -> std::io::Result<()> {
450 match &mut self.inner_storage_config {
451 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
452 InnerStorageConfig::DualRocksDbScyllaDb {
453 path_with_guard,
454 spawn_mode: _,
455 uri: _,
456 } => {
457 let shard_str = format!("shard_{}", _shard);
458 path_with_guard.path_buf.push(shard_str);
459 std::fs::create_dir_all(&path_with_guard.path_buf)
460 }
461 _ => Ok(()),
462 }
463 }
464
465 pub fn add_common_storage_options(
467 &self,
468 options: &CommonStorageOptions,
469 ) -> Result<StoreConfig, anyhow::Error> {
470 let namespace = self.namespace.clone();
471 match &self.inner_storage_config {
472 InnerStorageConfig::Memory { genesis_path } => {
473 let config = MemoryStoreConfig {
474 max_stream_queries: options.storage_max_stream_queries,
475 kill_on_drop: false,
476 };
477 let genesis_path = genesis_path.clone();
478 Ok(StoreConfig::Memory {
479 config,
480 namespace,
481 genesis_path,
482 })
483 }
484 #[cfg(feature = "storage-service")]
485 InnerStorageConfig::Service { endpoint } => {
486 let inner_config = StorageServiceStoreInternalConfig {
487 endpoint: endpoint.clone(),
488 max_concurrent_queries: options.storage_max_concurrent_queries,
489 max_stream_queries: options.storage_max_stream_queries,
490 };
491 let config = StorageServiceStoreConfig {
492 inner_config,
493 storage_cache_config: options.storage_cache_config(),
494 };
495 Ok(StoreConfig::StorageService { config, namespace })
496 }
497 #[cfg(feature = "rocksdb")]
498 InnerStorageConfig::RocksDb { path, spawn_mode } => {
499 let path_with_guard = PathWithGuard::new(path.to_path_buf());
500 let inner_config = RocksDbStoreInternalConfig {
501 spawn_mode: *spawn_mode,
502 path_with_guard,
503 max_stream_queries: options.storage_max_stream_queries,
504 };
505 let config = RocksDbStoreConfig {
506 inner_config,
507 storage_cache_config: options.storage_cache_config(),
508 };
509 Ok(StoreConfig::RocksDb { config, namespace })
510 }
511 #[cfg(feature = "dynamodb")]
512 InnerStorageConfig::DynamoDb { use_dynamodb_local } => {
513 let inner_config = DynamoDbStoreInternalConfig {
514 use_dynamodb_local: *use_dynamodb_local,
515 max_concurrent_queries: options.storage_max_concurrent_queries,
516 max_stream_queries: options.storage_max_stream_queries,
517 };
518 let config = DynamoDbStoreConfig {
519 inner_config,
520 storage_cache_config: options.storage_cache_config(),
521 };
522 Ok(StoreConfig::DynamoDb { config, namespace })
523 }
524 #[cfg(feature = "scylladb")]
525 InnerStorageConfig::ScyllaDb { uri } => {
526 let inner_config = ScyllaDbStoreInternalConfig {
527 uri: uri.clone(),
528 max_stream_queries: options.storage_max_stream_queries,
529 max_concurrent_queries: options.storage_max_concurrent_queries,
530 replication_factor: options.storage_replication_factor,
531 };
532 let config = ScyllaDbStoreConfig {
533 inner_config,
534 storage_cache_config: options.storage_cache_config(),
535 };
536 Ok(StoreConfig::ScyllaDb { config, namespace })
537 }
538 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
539 InnerStorageConfig::DualRocksDbScyllaDb {
540 path_with_guard,
541 spawn_mode,
542 uri,
543 } => {
544 let inner_config = RocksDbStoreInternalConfig {
545 spawn_mode: *spawn_mode,
546 path_with_guard: path_with_guard.clone(),
547 max_stream_queries: options.storage_max_stream_queries,
548 };
549 let first_config = RocksDbStoreConfig {
550 inner_config,
551 storage_cache_config: options.storage_cache_config(),
552 };
553
554 let inner_config = ScyllaDbStoreInternalConfig {
555 uri: uri.clone(),
556 max_stream_queries: options.storage_max_stream_queries,
557 max_concurrent_queries: options.storage_max_concurrent_queries,
558 replication_factor: options.storage_replication_factor,
559 };
560 let second_config = ScyllaDbStoreConfig {
561 inner_config,
562 storage_cache_config: options.storage_cache_config(),
563 };
564
565 let config = DualStoreConfig {
566 first_config,
567 second_config,
568 };
569 Ok(StoreConfig::DualRocksDbScyllaDb { config, namespace })
570 }
571 }
572 }
573}
574
575impl fmt::Display for StorageConfig {
576 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
577 let namespace = &self.namespace;
578 match &self.inner_storage_config {
579 #[cfg(feature = "storage-service")]
580 InnerStorageConfig::Service { endpoint } => {
581 write!(f, "service:tcp:{}:{}", endpoint, namespace)
582 }
583 InnerStorageConfig::Memory { genesis_path } => {
584 write!(f, "memory:{}:{}", genesis_path.display(), namespace)
585 }
586 #[cfg(feature = "rocksdb")]
587 InnerStorageConfig::RocksDb { path, spawn_mode } => {
588 let spawn_mode = spawn_mode.to_string();
589 write!(f, "rocksdb:{}:{}:{}", path.display(), spawn_mode, namespace)
590 }
591 #[cfg(feature = "dynamodb")]
592 InnerStorageConfig::DynamoDb { use_dynamodb_local } => match use_dynamodb_local {
593 true => write!(f, "dynamodb:{}:dynamodb_local", namespace),
594 false => write!(f, "dynamodb:{}:env", namespace),
595 },
596 #[cfg(feature = "scylladb")]
597 InnerStorageConfig::ScyllaDb { uri } => {
598 write!(f, "scylladb:tcp:{}:{}", uri, namespace)
599 }
600 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
601 InnerStorageConfig::DualRocksDbScyllaDb {
602 path_with_guard,
603 spawn_mode,
604 uri,
605 } => {
606 write!(
607 f,
608 "dualrocksdbscylladb:{}:{}:tcp:{}:{}",
609 path_with_guard.path_buf.display(),
610 spawn_mode,
611 uri,
612 namespace
613 )
614 }
615 }
616 }
617}
618
619#[async_trait]
620pub trait Runnable {
621 type Output;
622
623 async fn run<S>(self, storage: S) -> Self::Output
624 where
625 S: Storage + Clone + Send + Sync + 'static;
626}
627
628#[async_trait]
629pub trait RunnableWithStore {
630 type Output;
631
632 async fn run<D>(
633 self,
634 config: D::Config,
635 namespace: String,
636 ) -> Result<Self::Output, anyhow::Error>
637 where
638 D: KeyValueDatabase + Clone + Send + Sync + 'static,
639 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
640 D::Error: Send + Sync;
641}
642
643impl StoreConfig {
644 pub async fn run_with_storage<Job>(
645 self,
646 wasm_runtime: Option<WasmRuntime>,
647 allow_application_logs: bool,
648 job: Job,
649 ) -> Result<Job::Output, anyhow::Error>
650 where
651 Job: Runnable,
652 {
653 match self {
654 StoreConfig::Memory {
655 config,
656 namespace,
657 genesis_path,
658 } => {
659 let mut storage = DbStorage::<MemoryDatabase, _>::maybe_create_and_connect(
660 &config,
661 &namespace,
662 wasm_runtime,
663 )
664 .await?
665 .with_allow_application_logs(allow_application_logs);
666 let genesis_config = crate::util::read_json::<GenesisConfig>(genesis_path)?;
667 genesis_config.initialize_storage(&mut storage).await?;
669 Ok(job.run(storage).await)
670 }
671 #[cfg(feature = "storage-service")]
672 StoreConfig::StorageService { config, namespace } => {
673 let storage = DbStorage::<StorageServiceDatabase, _>::connect(
674 &config,
675 &namespace,
676 wasm_runtime,
677 )
678 .await?
679 .with_allow_application_logs(allow_application_logs);
680 Ok(job.run(storage).await)
681 }
682 #[cfg(feature = "rocksdb")]
683 StoreConfig::RocksDb { config, namespace } => {
684 let storage =
685 DbStorage::<RocksDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
686 .await?
687 .with_allow_application_logs(allow_application_logs);
688 Ok(job.run(storage).await)
689 }
690 #[cfg(feature = "dynamodb")]
691 StoreConfig::DynamoDb { config, namespace } => {
692 let storage =
693 DbStorage::<DynamoDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
694 .await?
695 .with_allow_application_logs(allow_application_logs);
696 Ok(job.run(storage).await)
697 }
698 #[cfg(feature = "scylladb")]
699 StoreConfig::ScyllaDb { config, namespace } => {
700 let storage =
701 DbStorage::<ScyllaDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
702 .await?
703 .with_allow_application_logs(allow_application_logs);
704 Ok(job.run(storage).await)
705 }
706 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
707 StoreConfig::DualRocksDbScyllaDb { config, namespace } => {
708 let storage = DbStorage::<
709 DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>,
710 _,
711 >::connect(&config, &namespace, wasm_runtime)
712 .await?
713 .with_allow_application_logs(allow_application_logs);
714 Ok(job.run(storage).await)
715 }
716 }
717 }
718
719 #[allow(unused_variables)]
720 pub async fn run_with_store<Job>(self, job: Job) -> Result<Job::Output, anyhow::Error>
721 where
722 Job: RunnableWithStore,
723 {
724 match self {
725 StoreConfig::Memory { .. } => {
726 Err(anyhow!("Cannot run admin operations on the memory store"))
727 }
728 #[cfg(feature = "storage-service")]
729 StoreConfig::StorageService { config, namespace } => {
730 Ok(job.run::<StorageServiceDatabase>(config, namespace).await?)
731 }
732 #[cfg(feature = "rocksdb")]
733 StoreConfig::RocksDb { config, namespace } => {
734 Ok(job.run::<RocksDbDatabase>(config, namespace).await?)
735 }
736 #[cfg(feature = "dynamodb")]
737 StoreConfig::DynamoDb { config, namespace } => {
738 Ok(job.run::<DynamoDbDatabase>(config, namespace).await?)
739 }
740 #[cfg(feature = "scylladb")]
741 StoreConfig::ScyllaDb { config, namespace } => {
742 Ok(job.run::<ScyllaDbDatabase>(config, namespace).await?)
743 }
744 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
745 StoreConfig::DualRocksDbScyllaDb { config, namespace } => Ok(job
746 .run::<DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>>(
747 config, namespace,
748 )
749 .await?),
750 }
751 }
752
753 pub async fn initialize(self, config: &GenesisConfig) -> Result<(), anyhow::Error> {
754 self.run_with_store(InitializeStorageJob(config)).await
755 }
756}
757
758struct InitializeStorageJob<'a>(&'a GenesisConfig);
759
760#[async_trait]
761impl RunnableWithStore for InitializeStorageJob<'_> {
762 type Output = ();
763
764 async fn run<D>(
765 self,
766 config: D::Config,
767 namespace: String,
768 ) -> Result<Self::Output, anyhow::Error>
769 where
770 D: KeyValueDatabase + Clone + Send + Sync + 'static,
771 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
772 D::Error: Send + Sync,
773 {
774 let mut storage =
775 DbStorage::<D, _>::maybe_create_and_connect(&config, &namespace, None).await?;
776 self.0.initialize_storage(&mut storage).await?;
777 Ok(())
778 }
779}
780
781#[test]
782fn test_memory_storage_config_from_str() {
783 assert_eq!(
784 StorageConfig::from_str("memory:path/to/genesis.json").unwrap(),
785 StorageConfig {
786 inner_storage_config: InnerStorageConfig::Memory {
787 genesis_path: PathBuf::from("path/to/genesis.json")
788 },
789 namespace: DEFAULT_NAMESPACE.into()
790 }
791 );
792 assert_eq!(
793 StorageConfig::from_str("memory:path/to/genesis.json:namespace").unwrap(),
794 StorageConfig {
795 inner_storage_config: InnerStorageConfig::Memory {
796 genesis_path: PathBuf::from("path/to/genesis.json")
797 },
798 namespace: "namespace".into()
799 }
800 );
801 assert!(StorageConfig::from_str("memory").is_err(),);
802}
803
804#[cfg(feature = "storage-service")]
805#[test]
806fn test_shared_store_config_from_str() {
807 assert_eq!(
808 StorageConfig::from_str("service:tcp:127.0.0.1:8942:linera").unwrap(),
809 StorageConfig {
810 inner_storage_config: InnerStorageConfig::Service {
811 endpoint: "127.0.0.1:8942".to_string()
812 },
813 namespace: "linera".into()
814 }
815 );
816 assert!(StorageConfig::from_str("service:tcp:127.0.0.1:8942").is_err());
817 assert!(StorageConfig::from_str("service:tcp:127.0.0.1:linera").is_err());
818}
819
820#[cfg(feature = "rocksdb")]
821#[test]
822fn test_rocks_db_storage_config_from_str() {
823 assert!(StorageConfig::from_str("rocksdb_foo.db").is_err());
824 assert_eq!(
825 StorageConfig::from_str("rocksdb:foo.db").unwrap(),
826 StorageConfig {
827 inner_storage_config: InnerStorageConfig::RocksDb {
828 path: "foo.db".into(),
829 spawn_mode: RocksDbSpawnMode::SpawnBlocking,
830 },
831 namespace: DEFAULT_NAMESPACE.to_string()
832 }
833 );
834 assert_eq!(
835 StorageConfig::from_str("rocksdb:foo.db:block_in_place").unwrap(),
836 StorageConfig {
837 inner_storage_config: InnerStorageConfig::RocksDb {
838 path: "foo.db".into(),
839 spawn_mode: RocksDbSpawnMode::BlockInPlace,
840 },
841 namespace: DEFAULT_NAMESPACE.to_string()
842 }
843 );
844 assert_eq!(
845 StorageConfig::from_str("rocksdb:foo.db:block_in_place:chosen_namespace").unwrap(),
846 StorageConfig {
847 inner_storage_config: InnerStorageConfig::RocksDb {
848 path: "foo.db".into(),
849 spawn_mode: RocksDbSpawnMode::BlockInPlace,
850 },
851 namespace: "chosen_namespace".into()
852 }
853 );
854}
855
856#[cfg(feature = "dynamodb")]
857#[test]
858fn test_aws_storage_config_from_str() {
859 assert_eq!(
860 StorageConfig::from_str("dynamodb:table").unwrap(),
861 StorageConfig {
862 inner_storage_config: InnerStorageConfig::DynamoDb {
863 use_dynamodb_local: false
864 },
865 namespace: "table".to_string()
866 }
867 );
868 assert_eq!(
869 StorageConfig::from_str("dynamodb:table:env").unwrap(),
870 StorageConfig {
871 inner_storage_config: InnerStorageConfig::DynamoDb {
872 use_dynamodb_local: false
873 },
874 namespace: "table".to_string()
875 }
876 );
877 assert_eq!(
878 StorageConfig::from_str("dynamodb:table:dynamodb_local").unwrap(),
879 StorageConfig {
880 inner_storage_config: InnerStorageConfig::DynamoDb {
881 use_dynamodb_local: true
882 },
883 namespace: "table".to_string()
884 }
885 );
886 assert!(StorageConfig::from_str("dynamodb").is_err());
887 assert!(StorageConfig::from_str("dynamodb:").is_err());
888 assert!(StorageConfig::from_str("dynamodb:1").is_err());
889 assert!(StorageConfig::from_str("dynamodb:wrong:endpoint").is_err());
890}
891
892#[cfg(feature = "scylladb")]
893#[test]
894fn test_scylla_db_storage_config_from_str() {
895 assert_eq!(
896 StorageConfig::from_str("scylladb:").unwrap(),
897 StorageConfig {
898 inner_storage_config: InnerStorageConfig::ScyllaDb {
899 uri: "localhost:9042".to_string()
900 },
901 namespace: DEFAULT_NAMESPACE.to_string()
902 }
903 );
904 assert_eq!(
905 StorageConfig::from_str("scylladb:tcp:db_hostname:230:table_other_storage").unwrap(),
906 StorageConfig {
907 inner_storage_config: InnerStorageConfig::ScyllaDb {
908 uri: "db_hostname:230".to_string()
909 },
910 namespace: "table_other_storage".to_string()
911 }
912 );
913 assert_eq!(
914 StorageConfig::from_str("scylladb:tcp:db_hostname:230").unwrap(),
915 StorageConfig {
916 inner_storage_config: InnerStorageConfig::ScyllaDb {
917 uri: "db_hostname:230".to_string()
918 },
919 namespace: DEFAULT_NAMESPACE.to_string()
920 }
921 );
922 assert!(StorageConfig::from_str("scylladb:-10").is_err());
923 assert!(StorageConfig::from_str("scylladb:70000").is_err());
924 assert!(StorageConfig::from_str("scylladb:230:234").is_err());
925 assert!(StorageConfig::from_str("scylladb:tcp:address1").is_err());
926 assert!(StorageConfig::from_str("scylladb:tcp:address1:tcp:/address2").is_err());
927 assert!(StorageConfig::from_str("scylladb:wrong").is_err());
928}