1use std::{fmt, path::PathBuf, str::FromStr};
5
6use anyhow::{anyhow, bail};
7use async_trait::async_trait;
8use linera_client::config::GenesisConfig;
9use linera_execution::WasmRuntime;
10use linera_storage::{DbStorage, Storage, DEFAULT_NAMESPACE};
11#[cfg(feature = "storage-service")]
12use linera_storage_service::{
13 client::StorageServiceDatabase,
14 common::{StorageServiceStoreConfig, StorageServiceStoreInternalConfig},
15};
16#[cfg(feature = "dynamodb")]
17use linera_views::dynamo_db::{DynamoDbDatabase, DynamoDbStoreConfig, DynamoDbStoreInternalConfig};
18#[cfg(feature = "rocksdb")]
19use linera_views::rocks_db::{
20 PathWithGuard, RocksDbDatabase, RocksDbSpawnMode, RocksDbStoreConfig,
21 RocksDbStoreInternalConfig,
22};
23use linera_views::{
24 lru_prefix_cache::StorageCacheConfig,
25 memory::{MemoryDatabase, MemoryStoreConfig},
26 store::{KeyValueDatabase, KeyValueStore},
27};
28use serde::{Deserialize, Serialize};
29use tracing::error;
30#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
31use {
32 linera_storage::ChainStatesFirstAssignment,
33 linera_views::backends::dual::{DualDatabase, DualStoreConfig},
34 std::path::Path,
35};
36#[cfg(feature = "scylladb")]
37use {
38 linera_views::scylla_db::{ScyllaDbDatabase, ScyllaDbStoreConfig, ScyllaDbStoreInternalConfig},
39 std::num::NonZeroU16,
40 tracing::debug,
41};
42
43#[derive(Clone, Debug, clap::Parser)]
44pub struct CommonStorageOptions {
45 #[arg(long, global = true)]
47 pub storage_max_concurrent_queries: Option<usize>,
48
49 #[arg(long, default_value = "10", global = true)]
51 pub storage_max_stream_queries: usize,
52
53 #[arg(long, default_value = "10000000", global = true)]
55 pub storage_max_cache_size: usize,
56
57 #[arg(long, default_value = "1000000", global = true)]
59 pub storage_max_value_entry_size: usize,
60
61 #[arg(long, default_value = "1000000", global = true)]
63 pub storage_max_find_keys_entry_size: usize,
64
65 #[arg(long, default_value = "1000000", global = true)]
67 pub storage_max_find_key_values_entry_size: usize,
68
69 #[arg(long, default_value = "1000", global = true)]
71 pub storage_max_cache_entries: usize,
72
73 #[arg(long, default_value = "10000000", global = true)]
75 pub storage_max_cache_value_size: usize,
76
77 #[arg(long, default_value = "10000000", global = true)]
79 pub storage_max_cache_find_keys_size: usize,
80
81 #[arg(long, default_value = "10000000", global = true)]
83 pub storage_max_cache_find_key_values_size: usize,
84
85 #[arg(long, default_value = "1", global = true)]
87 pub storage_replication_factor: u32,
88}
89
90impl CommonStorageOptions {
91 pub fn storage_cache_config(&self) -> StorageCacheConfig {
92 StorageCacheConfig {
93 max_cache_size: self.storage_max_cache_size,
94 max_value_entry_size: self.storage_max_value_entry_size,
95 max_find_keys_entry_size: self.storage_max_find_keys_entry_size,
96 max_find_key_values_entry_size: self.storage_max_find_key_values_entry_size,
97 max_cache_entries: self.storage_max_cache_entries,
98 max_cache_value_size: self.storage_max_cache_value_size,
99 max_cache_find_keys_size: self.storage_max_cache_find_keys_size,
100 max_cache_find_key_values_size: self.storage_max_cache_find_key_values_size,
101 }
102 }
103}
104
105#[derive(Clone, Debug, Deserialize, Serialize)]
107pub enum StoreConfig {
108 Memory {
110 config: MemoryStoreConfig,
111 namespace: String,
112 genesis_path: PathBuf,
113 },
114 #[cfg(feature = "storage-service")]
116 StorageService {
117 config: StorageServiceStoreConfig,
118 namespace: String,
119 },
120 #[cfg(feature = "rocksdb")]
122 RocksDb {
123 config: RocksDbStoreConfig,
124 namespace: String,
125 },
126 #[cfg(feature = "dynamodb")]
128 DynamoDb {
129 config: DynamoDbStoreConfig,
130 namespace: String,
131 },
132 #[cfg(feature = "scylladb")]
134 ScyllaDb {
135 config: ScyllaDbStoreConfig,
136 namespace: String,
137 },
138 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
139 DualRocksDbScyllaDb {
140 config: DualStoreConfig<RocksDbStoreConfig, ScyllaDbStoreConfig>,
141 namespace: String,
142 },
143}
144
145#[derive(Clone, Debug)]
147#[cfg_attr(any(test), derive(Eq, PartialEq))]
148pub enum InnerStorageConfig {
149 Memory {
151 genesis_path: PathBuf,
154 },
155 #[cfg(feature = "storage-service")]
157 Service {
158 endpoint: String,
160 },
161 #[cfg(feature = "rocksdb")]
163 RocksDb {
164 path: PathBuf,
166 spawn_mode: RocksDbSpawnMode,
168 },
169 #[cfg(feature = "dynamodb")]
171 DynamoDb {
172 use_dynamodb_local: bool,
174 },
175 #[cfg(feature = "scylladb")]
177 ScyllaDb {
178 uri: String,
180 },
181 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
182 DualRocksDbScyllaDb {
183 path_with_guard: PathWithGuard,
185 spawn_mode: RocksDbSpawnMode,
187 uri: String,
189 },
190}
191
192#[derive(Clone, Debug)]
194#[cfg_attr(any(test), derive(Eq, PartialEq))]
195pub struct StorageConfig {
196 pub inner_storage_config: InnerStorageConfig,
198 pub namespace: String,
200}
201
202const MEMORY: &str = "memory:";
203#[cfg(feature = "storage-service")]
204const STORAGE_SERVICE: &str = "service:";
205#[cfg(feature = "rocksdb")]
206const ROCKS_DB: &str = "rocksdb:";
207#[cfg(feature = "dynamodb")]
208const DYNAMO_DB: &str = "dynamodb:";
209#[cfg(feature = "scylladb")]
210const SCYLLA_DB: &str = "scylladb:";
211#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
212const DUAL_ROCKS_DB_SCYLLA_DB: &str = "dualrocksdbscylladb:";
213
214impl FromStr for StorageConfig {
215 type Err = anyhow::Error;
216
217 fn from_str(input: &str) -> Result<Self, Self::Err> {
218 if let Some(s) = input.strip_prefix(MEMORY) {
219 let parts = s.split(':').collect::<Vec<_>>();
220 if parts.len() == 1 {
221 let genesis_path = parts[0].to_string().into();
222 let namespace = DEFAULT_NAMESPACE.to_string();
223 let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
224 return Ok(StorageConfig {
225 inner_storage_config,
226 namespace,
227 });
228 }
229 if parts.len() != 2 {
230 bail!("We should have one genesis config path and one optional namespace");
231 }
232 let genesis_path = parts[0].to_string().into();
233 let namespace = parts[1].to_string();
234 let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
235 return Ok(StorageConfig {
236 inner_storage_config,
237 namespace,
238 });
239 }
240 #[cfg(feature = "storage-service")]
241 if let Some(s) = input.strip_prefix(STORAGE_SERVICE) {
242 if s.is_empty() {
243 bail!(
244 "For Storage service, the formatting has to be service:endpoint:namespace,\
245example service:tcp:127.0.0.1:7878:table_do_my_test"
246 );
247 }
248 let parts = s.split(':').collect::<Vec<_>>();
249 if parts.len() != 4 {
250 bail!("We should have one endpoint and one namespace");
251 }
252 let protocol = parts[0];
253 if protocol != "tcp" {
254 bail!("Only allowed protocol is tcp");
255 }
256 let endpoint = parts[1];
257 let port = parts[2];
258 let mut endpoint = endpoint.to_string();
259 endpoint.push(':');
260 endpoint.push_str(port);
261 let endpoint = endpoint.to_string();
262 let namespace = parts[3].to_string();
263 let inner_storage_config = InnerStorageConfig::Service { endpoint };
264 return Ok(StorageConfig {
265 inner_storage_config,
266 namespace,
267 });
268 }
269 #[cfg(feature = "rocksdb")]
270 if let Some(s) = input.strip_prefix(ROCKS_DB) {
271 if s.is_empty() {
272 bail!(
273 "For RocksDB, the formatting has to be rocksdb:directory or rocksdb:directory:spawn_mode:namespace");
274 }
275 let parts = s.split(':').collect::<Vec<_>>();
276 if parts.len() == 1 {
277 let path = parts[0].to_string().into();
278 let namespace = DEFAULT_NAMESPACE.to_string();
279 let spawn_mode = RocksDbSpawnMode::SpawnBlocking;
280 let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
281 return Ok(StorageConfig {
282 inner_storage_config,
283 namespace,
284 });
285 }
286 if parts.len() == 2 || parts.len() == 3 {
287 let path = parts[0].to_string().into();
288 let spawn_mode = match parts[1] {
289 "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
290 "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
291 "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
292 _ => Err(anyhow!("Failed to parse {} as a spawn_mode", parts[1])),
293 }?;
294 let namespace = if parts.len() == 2 {
295 DEFAULT_NAMESPACE.to_string()
296 } else {
297 parts[2].to_string()
298 };
299 let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
300 return Ok(StorageConfig {
301 inner_storage_config,
302 namespace,
303 });
304 }
305 bail!("We should have one, two or three parts");
306 }
307 #[cfg(feature = "dynamodb")]
308 if let Some(s) = input.strip_prefix(DYNAMO_DB) {
309 let mut parts = s.splitn(2, ':');
310 let namespace = parts
311 .next()
312 .ok_or_else(|| anyhow!("Missing DynamoDB table name, e.g. {DYNAMO_DB}TABLE"))?
313 .to_string();
314 let use_dynamodb_local = match parts.next() {
315 None | Some("env") => false,
316 Some("dynamodb_local") => true,
317 Some(unknown) => {
318 bail!(
319 "Invalid DynamoDB endpoint {unknown:?}. \
320 Expected {DYNAMO_DB}TABLE:[env|dynamodb_local]"
321 );
322 }
323 };
324 let inner_storage_config = InnerStorageConfig::DynamoDb { use_dynamodb_local };
325 return Ok(StorageConfig {
326 inner_storage_config,
327 namespace,
328 });
329 }
330 #[cfg(feature = "scylladb")]
331 if let Some(s) = input.strip_prefix(SCYLLA_DB) {
332 let mut uri: Option<String> = None;
333 let mut namespace: Option<String> = None;
334 let parse_error: &'static str = "Correct format is tcp:db_hostname:port.";
335 if !s.is_empty() {
336 let mut parts = s.split(':');
337 while let Some(part) = parts.next() {
338 match part {
339 "tcp" => {
340 let address = parts.next().ok_or_else(|| {
341 anyhow!("Failed to find address for {s}. {parse_error}")
342 })?;
343 let port_str = parts.next().ok_or_else(|| {
344 anyhow!("Failed to find port for {s}. {parse_error}")
345 })?;
346 let port = NonZeroU16::from_str(port_str).map_err(|_| {
347 anyhow!(
348 "Failed to find parse port {port_str} for {s}. {parse_error}",
349 )
350 })?;
351 if uri.is_some() {
352 bail!("The uri has already been assigned");
353 }
354 uri = Some(format!("{}:{}", &address, port));
355 }
356 _ if part.starts_with("table") => {
357 if namespace.is_some() {
358 bail!("The namespace has already been assigned");
359 }
360 namespace = Some(part.to_string());
361 }
362 _ => {
363 bail!("the entry \"{part}\" is not matching");
364 }
365 }
366 }
367 }
368 let uri = uri.unwrap_or("localhost:9042".to_string());
369 let namespace = namespace.unwrap_or(DEFAULT_NAMESPACE.to_string());
370 let inner_storage_config = InnerStorageConfig::ScyllaDb { uri };
371 debug!("ScyllaDB connection info: {:?}", inner_storage_config);
372 return Ok(StorageConfig {
373 inner_storage_config,
374 namespace,
375 });
376 }
377 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
378 if let Some(s) = input.strip_prefix(DUAL_ROCKS_DB_SCYLLA_DB) {
379 let parts = s.split(':').collect::<Vec<_>>();
380 if parts.len() != 5 && parts.len() != 6 {
381 bail!(
382 "For DualRocksDbScyllaDb, the formatting has to be dualrocksdbscylladb:directory:mode:tcp:hostname:port:namespace"
383 );
384 }
385 let path = Path::new(parts[0]);
386 let path = path.to_path_buf();
387 let path_with_guard = PathWithGuard::new(path);
388 let spawn_mode = match parts[1] {
389 "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
390 "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
391 "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
392 _ => Err(anyhow!("Failed to parse {} as a spawn_mode", parts[1])),
393 }?;
394 let protocol = parts[2];
395 if protocol != "tcp" {
396 bail!("The only allowed protocol is tcp");
397 }
398 let address = parts[3];
399 let port_str = parts[4];
400 let port = NonZeroU16::from_str(port_str)
401 .map_err(|_| anyhow!("Failed to find parse port {port_str} for {s}"))?;
402 let uri = format!("{}:{}", &address, port);
403 let inner_storage_config = InnerStorageConfig::DualRocksDbScyllaDb {
404 path_with_guard,
405 spawn_mode,
406 uri,
407 };
408 let namespace = if parts.len() == 5 {
409 DEFAULT_NAMESPACE.to_string()
410 } else {
411 parts[5].to_string()
412 };
413 return Ok(StorageConfig {
414 inner_storage_config,
415 namespace,
416 });
417 }
418 error!("available storage: memory");
419 #[cfg(feature = "storage-service")]
420 error!("Also available is linera-storage-service");
421 #[cfg(feature = "rocksdb")]
422 error!("Also available is RocksDB");
423 #[cfg(feature = "dynamodb")]
424 error!("Also available is DynamoDB");
425 #[cfg(feature = "scylladb")]
426 error!("Also available is ScyllaDB");
427 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
428 error!("Also available is DualRocksDbScyllaDb");
429 Err(anyhow!("The input has not matched: {input}"))
430 }
431}
432
433impl StorageConfig {
434 pub fn maybe_append_shard_path(&mut self, _shard: usize) -> std::io::Result<()> {
435 match &mut self.inner_storage_config {
436 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
437 InnerStorageConfig::DualRocksDbScyllaDb {
438 path_with_guard,
439 spawn_mode: _,
440 uri: _,
441 } => {
442 let shard_str = format!("shard_{}", _shard);
443 path_with_guard.path_buf.push(shard_str);
444 std::fs::create_dir_all(&path_with_guard.path_buf)
445 }
446 _ => Ok(()),
447 }
448 }
449
450 pub fn add_common_storage_options(
452 &self,
453 options: &CommonStorageOptions,
454 ) -> Result<StoreConfig, anyhow::Error> {
455 let namespace = self.namespace.clone();
456 match &self.inner_storage_config {
457 InnerStorageConfig::Memory { genesis_path } => {
458 let config = MemoryStoreConfig {
459 max_stream_queries: options.storage_max_stream_queries,
460 kill_on_drop: false,
461 };
462 let genesis_path = genesis_path.clone();
463 Ok(StoreConfig::Memory {
464 config,
465 namespace,
466 genesis_path,
467 })
468 }
469 #[cfg(feature = "storage-service")]
470 InnerStorageConfig::Service { endpoint } => {
471 let inner_config = StorageServiceStoreInternalConfig {
472 endpoint: endpoint.clone(),
473 max_concurrent_queries: options.storage_max_concurrent_queries,
474 max_stream_queries: options.storage_max_stream_queries,
475 };
476 let config = StorageServiceStoreConfig {
477 inner_config,
478 storage_cache_config: options.storage_cache_config(),
479 };
480 Ok(StoreConfig::StorageService { config, namespace })
481 }
482 #[cfg(feature = "rocksdb")]
483 InnerStorageConfig::RocksDb { path, spawn_mode } => {
484 let path_with_guard = PathWithGuard::new(path.to_path_buf());
485 let inner_config = RocksDbStoreInternalConfig {
486 spawn_mode: *spawn_mode,
487 path_with_guard,
488 max_stream_queries: options.storage_max_stream_queries,
489 };
490 let config = RocksDbStoreConfig {
491 inner_config,
492 storage_cache_config: options.storage_cache_config(),
493 };
494 Ok(StoreConfig::RocksDb { config, namespace })
495 }
496 #[cfg(feature = "dynamodb")]
497 InnerStorageConfig::DynamoDb { use_dynamodb_local } => {
498 let inner_config = DynamoDbStoreInternalConfig {
499 use_dynamodb_local: *use_dynamodb_local,
500 max_concurrent_queries: options.storage_max_concurrent_queries,
501 max_stream_queries: options.storage_max_stream_queries,
502 };
503 let config = DynamoDbStoreConfig {
504 inner_config,
505 storage_cache_config: options.storage_cache_config(),
506 };
507 Ok(StoreConfig::DynamoDb { config, namespace })
508 }
509 #[cfg(feature = "scylladb")]
510 InnerStorageConfig::ScyllaDb { uri } => {
511 let inner_config = ScyllaDbStoreInternalConfig {
512 uri: uri.clone(),
513 max_stream_queries: options.storage_max_stream_queries,
514 max_concurrent_queries: options.storage_max_concurrent_queries,
515 replication_factor: options.storage_replication_factor,
516 };
517 let config = ScyllaDbStoreConfig {
518 inner_config,
519 storage_cache_config: options.storage_cache_config(),
520 };
521 Ok(StoreConfig::ScyllaDb { config, namespace })
522 }
523 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
524 InnerStorageConfig::DualRocksDbScyllaDb {
525 path_with_guard,
526 spawn_mode,
527 uri,
528 } => {
529 let inner_config = RocksDbStoreInternalConfig {
530 spawn_mode: *spawn_mode,
531 path_with_guard: path_with_guard.clone(),
532 max_stream_queries: options.storage_max_stream_queries,
533 };
534 let first_config = RocksDbStoreConfig {
535 inner_config,
536 storage_cache_config: options.storage_cache_config(),
537 };
538
539 let inner_config = ScyllaDbStoreInternalConfig {
540 uri: uri.clone(),
541 max_stream_queries: options.storage_max_stream_queries,
542 max_concurrent_queries: options.storage_max_concurrent_queries,
543 replication_factor: options.storage_replication_factor,
544 };
545 let second_config = ScyllaDbStoreConfig {
546 inner_config,
547 storage_cache_config: options.storage_cache_config(),
548 };
549
550 let config = DualStoreConfig {
551 first_config,
552 second_config,
553 };
554 Ok(StoreConfig::DualRocksDbScyllaDb { config, namespace })
555 }
556 }
557 }
558}
559
560impl fmt::Display for StorageConfig {
561 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
562 let namespace = &self.namespace;
563 match &self.inner_storage_config {
564 #[cfg(feature = "storage-service")]
565 InnerStorageConfig::Service { endpoint } => {
566 write!(f, "service:tcp:{}:{}", endpoint, namespace)
567 }
568 InnerStorageConfig::Memory { genesis_path } => {
569 write!(f, "memory:{}:{}", genesis_path.display(), namespace)
570 }
571 #[cfg(feature = "rocksdb")]
572 InnerStorageConfig::RocksDb { path, spawn_mode } => {
573 let spawn_mode = spawn_mode.to_string();
574 write!(f, "rocksdb:{}:{}:{}", path.display(), spawn_mode, namespace)
575 }
576 #[cfg(feature = "dynamodb")]
577 InnerStorageConfig::DynamoDb { use_dynamodb_local } => match use_dynamodb_local {
578 true => write!(f, "dynamodb:{}:dynamodb_local", namespace),
579 false => write!(f, "dynamodb:{}:env", namespace),
580 },
581 #[cfg(feature = "scylladb")]
582 InnerStorageConfig::ScyllaDb { uri } => {
583 write!(f, "scylladb:tcp:{}:{}", uri, namespace)
584 }
585 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
586 InnerStorageConfig::DualRocksDbScyllaDb {
587 path_with_guard,
588 spawn_mode,
589 uri,
590 } => {
591 write!(
592 f,
593 "dualrocksdbscylladb:{}:{}:tcp:{}:{}",
594 path_with_guard.path_buf.display(),
595 spawn_mode,
596 uri,
597 namespace
598 )
599 }
600 }
601 }
602}
603
604#[async_trait]
605pub trait Runnable {
606 type Output;
607
608 async fn run<S>(self, storage: S) -> Self::Output
609 where
610 S: Storage + Clone + Send + Sync + 'static;
611}
612
613#[async_trait]
614pub trait RunnableWithStore {
615 type Output;
616
617 async fn run<D>(
618 self,
619 config: D::Config,
620 namespace: String,
621 ) -> Result<Self::Output, anyhow::Error>
622 where
623 D: KeyValueDatabase + Clone + Send + Sync + 'static,
624 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
625 D::Error: Send + Sync;
626}
627
628impl StoreConfig {
629 pub async fn run_with_storage<Job>(
630 self,
631 wasm_runtime: Option<WasmRuntime>,
632 job: Job,
633 ) -> Result<Job::Output, anyhow::Error>
634 where
635 Job: Runnable,
636 {
637 match self {
638 StoreConfig::Memory {
639 config,
640 namespace,
641 genesis_path,
642 } => {
643 let mut storage = DbStorage::<MemoryDatabase, _>::maybe_create_and_connect(
644 &config,
645 &namespace,
646 wasm_runtime,
647 )
648 .await?;
649 let genesis_config = crate::util::read_json::<GenesisConfig>(genesis_path)?;
650 genesis_config.initialize_storage(&mut storage).await?;
652 Ok(job.run(storage).await)
653 }
654 #[cfg(feature = "storage-service")]
655 StoreConfig::StorageService { config, namespace } => {
656 let storage = DbStorage::<StorageServiceDatabase, _>::connect(
657 &config,
658 &namespace,
659 wasm_runtime,
660 )
661 .await?;
662 Ok(job.run(storage).await)
663 }
664 #[cfg(feature = "rocksdb")]
665 StoreConfig::RocksDb { config, namespace } => {
666 let storage =
667 DbStorage::<RocksDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
668 .await?;
669 Ok(job.run(storage).await)
670 }
671 #[cfg(feature = "dynamodb")]
672 StoreConfig::DynamoDb { config, namespace } => {
673 let storage =
674 DbStorage::<DynamoDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
675 .await?;
676 Ok(job.run(storage).await)
677 }
678 #[cfg(feature = "scylladb")]
679 StoreConfig::ScyllaDb { config, namespace } => {
680 let storage =
681 DbStorage::<ScyllaDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
682 .await?;
683 Ok(job.run(storage).await)
684 }
685 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
686 StoreConfig::DualRocksDbScyllaDb { config, namespace } => {
687 let storage = DbStorage::<
688 DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>,
689 _,
690 >::connect(&config, &namespace, wasm_runtime)
691 .await?;
692 Ok(job.run(storage).await)
693 }
694 }
695 }
696
697 #[allow(unused_variables)]
698 pub async fn run_with_store<Job>(self, job: Job) -> Result<Job::Output, anyhow::Error>
699 where
700 Job: RunnableWithStore,
701 {
702 match self {
703 StoreConfig::Memory { .. } => {
704 Err(anyhow!("Cannot run admin operations on the memory store"))
705 }
706 #[cfg(feature = "storage-service")]
707 StoreConfig::StorageService { config, namespace } => {
708 Ok(job.run::<StorageServiceDatabase>(config, namespace).await?)
709 }
710 #[cfg(feature = "rocksdb")]
711 StoreConfig::RocksDb { config, namespace } => {
712 Ok(job.run::<RocksDbDatabase>(config, namespace).await?)
713 }
714 #[cfg(feature = "dynamodb")]
715 StoreConfig::DynamoDb { config, namespace } => {
716 Ok(job.run::<DynamoDbDatabase>(config, namespace).await?)
717 }
718 #[cfg(feature = "scylladb")]
719 StoreConfig::ScyllaDb { config, namespace } => {
720 Ok(job.run::<ScyllaDbDatabase>(config, namespace).await?)
721 }
722 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
723 StoreConfig::DualRocksDbScyllaDb { config, namespace } => Ok(job
724 .run::<DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>>(
725 config, namespace,
726 )
727 .await?),
728 }
729 }
730
731 pub async fn initialize(self, config: &GenesisConfig) -> Result<(), anyhow::Error> {
732 self.run_with_store(InitializeStorageJob(config)).await
733 }
734}
735
736struct InitializeStorageJob<'a>(&'a GenesisConfig);
737
738#[async_trait]
739impl RunnableWithStore for InitializeStorageJob<'_> {
740 type Output = ();
741
742 async fn run<D>(
743 self,
744 config: D::Config,
745 namespace: String,
746 ) -> Result<Self::Output, anyhow::Error>
747 where
748 D: KeyValueDatabase + Clone + Send + Sync + 'static,
749 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
750 D::Error: Send + Sync,
751 {
752 let mut storage =
753 DbStorage::<D, _>::maybe_create_and_connect(&config, &namespace, None).await?;
754 self.0.initialize_storage(&mut storage).await?;
755 Ok(())
756 }
757}
758
759#[test]
760fn test_memory_storage_config_from_str() {
761 assert_eq!(
762 StorageConfig::from_str("memory:path/to/genesis.json").unwrap(),
763 StorageConfig {
764 inner_storage_config: InnerStorageConfig::Memory {
765 genesis_path: PathBuf::from("path/to/genesis.json")
766 },
767 namespace: DEFAULT_NAMESPACE.into()
768 }
769 );
770 assert_eq!(
771 StorageConfig::from_str("memory:path/to/genesis.json:namespace").unwrap(),
772 StorageConfig {
773 inner_storage_config: InnerStorageConfig::Memory {
774 genesis_path: PathBuf::from("path/to/genesis.json")
775 },
776 namespace: "namespace".into()
777 }
778 );
779 assert!(StorageConfig::from_str("memory").is_err(),);
780}
781
782#[cfg(feature = "storage-service")]
783#[test]
784fn test_shared_store_config_from_str() {
785 assert_eq!(
786 StorageConfig::from_str("service:tcp:127.0.0.1:8942:linera").unwrap(),
787 StorageConfig {
788 inner_storage_config: InnerStorageConfig::Service {
789 endpoint: "127.0.0.1:8942".to_string()
790 },
791 namespace: "linera".into()
792 }
793 );
794 assert!(StorageConfig::from_str("service:tcp:127.0.0.1:8942").is_err());
795 assert!(StorageConfig::from_str("service:tcp:127.0.0.1:linera").is_err());
796}
797
798#[cfg(feature = "rocksdb")]
799#[test]
800fn test_rocks_db_storage_config_from_str() {
801 assert!(StorageConfig::from_str("rocksdb_foo.db").is_err());
802 assert_eq!(
803 StorageConfig::from_str("rocksdb:foo.db").unwrap(),
804 StorageConfig {
805 inner_storage_config: InnerStorageConfig::RocksDb {
806 path: "foo.db".into(),
807 spawn_mode: RocksDbSpawnMode::SpawnBlocking,
808 },
809 namespace: DEFAULT_NAMESPACE.to_string()
810 }
811 );
812 assert_eq!(
813 StorageConfig::from_str("rocksdb:foo.db:block_in_place").unwrap(),
814 StorageConfig {
815 inner_storage_config: InnerStorageConfig::RocksDb {
816 path: "foo.db".into(),
817 spawn_mode: RocksDbSpawnMode::BlockInPlace,
818 },
819 namespace: DEFAULT_NAMESPACE.to_string()
820 }
821 );
822 assert_eq!(
823 StorageConfig::from_str("rocksdb:foo.db:block_in_place:chosen_namespace").unwrap(),
824 StorageConfig {
825 inner_storage_config: InnerStorageConfig::RocksDb {
826 path: "foo.db".into(),
827 spawn_mode: RocksDbSpawnMode::BlockInPlace,
828 },
829 namespace: "chosen_namespace".into()
830 }
831 );
832}
833
834#[cfg(feature = "dynamodb")]
835#[test]
836fn test_aws_storage_config_from_str() {
837 assert_eq!(
838 StorageConfig::from_str("dynamodb:table").unwrap(),
839 StorageConfig {
840 inner_storage_config: InnerStorageConfig::DynamoDb {
841 use_dynamodb_local: false
842 },
843 namespace: "table".to_string()
844 }
845 );
846 assert_eq!(
847 StorageConfig::from_str("dynamodb:table:env").unwrap(),
848 StorageConfig {
849 inner_storage_config: InnerStorageConfig::DynamoDb {
850 use_dynamodb_local: false
851 },
852 namespace: "table".to_string()
853 }
854 );
855 assert_eq!(
856 StorageConfig::from_str("dynamodb:table:dynamodb_local").unwrap(),
857 StorageConfig {
858 inner_storage_config: InnerStorageConfig::DynamoDb {
859 use_dynamodb_local: true
860 },
861 namespace: "table".to_string()
862 }
863 );
864 assert!(StorageConfig::from_str("dynamodb").is_err());
865 assert!(StorageConfig::from_str("dynamodb:").is_err());
866 assert!(StorageConfig::from_str("dynamodb:1").is_err());
867 assert!(StorageConfig::from_str("dynamodb:wrong:endpoint").is_err());
868}
869
870#[cfg(feature = "scylladb")]
871#[test]
872fn test_scylla_db_storage_config_from_str() {
873 assert_eq!(
874 StorageConfig::from_str("scylladb:").unwrap(),
875 StorageConfig {
876 inner_storage_config: InnerStorageConfig::ScyllaDb {
877 uri: "localhost:9042".to_string()
878 },
879 namespace: DEFAULT_NAMESPACE.to_string()
880 }
881 );
882 assert_eq!(
883 StorageConfig::from_str("scylladb:tcp:db_hostname:230:table_other_storage").unwrap(),
884 StorageConfig {
885 inner_storage_config: InnerStorageConfig::ScyllaDb {
886 uri: "db_hostname:230".to_string()
887 },
888 namespace: "table_other_storage".to_string()
889 }
890 );
891 assert_eq!(
892 StorageConfig::from_str("scylladb:tcp:db_hostname:230").unwrap(),
893 StorageConfig {
894 inner_storage_config: InnerStorageConfig::ScyllaDb {
895 uri: "db_hostname:230".to_string()
896 },
897 namespace: DEFAULT_NAMESPACE.to_string()
898 }
899 );
900 assert!(StorageConfig::from_str("scylladb:-10").is_err());
901 assert!(StorageConfig::from_str("scylladb:70000").is_err());
902 assert!(StorageConfig::from_str("scylladb:230:234").is_err());
903 assert!(StorageConfig::from_str("scylladb:tcp:address1").is_err());
904 assert!(StorageConfig::from_str("scylladb:tcp:address1:tcp:/address2").is_err());
905 assert!(StorageConfig::from_str("scylladb:wrong").is_err());
906}