linera_service/
storage.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{fmt, path::PathBuf, str::FromStr};
5
6use anyhow::{anyhow, bail};
7use async_trait::async_trait;
8use linera_client::config::GenesisConfig;
9use linera_execution::WasmRuntime;
10use linera_storage::{DbStorage, Storage, DEFAULT_NAMESPACE};
11#[cfg(feature = "storage-service")]
12use linera_storage_service::{
13    client::StorageServiceDatabase,
14    common::{StorageServiceStoreConfig, StorageServiceStoreInternalConfig},
15};
16#[cfg(feature = "dynamodb")]
17use linera_views::dynamo_db::{DynamoDbDatabase, DynamoDbStoreConfig, DynamoDbStoreInternalConfig};
18#[cfg(feature = "rocksdb")]
19use linera_views::rocks_db::{
20    PathWithGuard, RocksDbDatabase, RocksDbSpawnMode, RocksDbStoreConfig,
21    RocksDbStoreInternalConfig,
22};
23use linera_views::{
24    lru_caching::StorageCacheConfig,
25    memory::{MemoryDatabase, MemoryStoreConfig},
26    store::{KeyValueDatabase, KeyValueStore},
27};
28use serde::{Deserialize, Serialize};
29use tracing::error;
30#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
31use {
32    linera_storage::ChainStatesFirstAssignment,
33    linera_views::backends::dual::{DualDatabase, DualStoreConfig},
34    std::path::Path,
35};
36#[cfg(feature = "scylladb")]
37use {
38    linera_views::scylla_db::{ScyllaDbDatabase, ScyllaDbStoreConfig, ScyllaDbStoreInternalConfig},
39    std::num::NonZeroU16,
40    tracing::debug,
41};
42
43#[derive(Clone, Debug, clap::Parser)]
44pub struct CommonStorageOptions {
45    /// The maximal number of simultaneous queries to the database
46    #[arg(long, global = true)]
47    pub storage_max_concurrent_queries: Option<usize>,
48
49    /// The maximal number of simultaneous stream queries to the database
50    #[arg(long, default_value = "10", global = true)]
51    pub storage_max_stream_queries: usize,
52
53    /// The maximal memory used in the storage cache.
54    #[arg(long, default_value = "10000000", global = true)]
55    pub storage_max_cache_size: usize,
56
57    /// The maximal size of an entry in the storage cache.
58    #[arg(long, default_value = "1000000", global = true)]
59    pub storage_max_entry_size: usize,
60
61    /// The maximal number of entries in the storage cache.
62    #[arg(long, default_value = "1000", global = true)]
63    pub storage_max_cache_entries: usize,
64
65    /// The replication factor for the keyspace
66    #[arg(long, default_value = "1", global = true)]
67    pub storage_replication_factor: u32,
68}
69
70impl CommonStorageOptions {
71    pub fn storage_cache_config(&self) -> StorageCacheConfig {
72        StorageCacheConfig {
73            max_cache_size: self.storage_max_cache_size,
74            max_entry_size: self.storage_max_entry_size,
75            max_cache_entries: self.storage_max_cache_entries,
76        }
77    }
78}
79
80/// The configuration of the key value store in use.
81#[derive(Clone, Debug, Deserialize, Serialize)]
82pub enum StoreConfig {
83    /// The memory key value store
84    Memory {
85        config: MemoryStoreConfig,
86        namespace: String,
87        genesis_path: PathBuf,
88    },
89    /// The storage service key-value store
90    #[cfg(feature = "storage-service")]
91    StorageService {
92        config: StorageServiceStoreConfig,
93        namespace: String,
94    },
95    /// The RocksDB key value store
96    #[cfg(feature = "rocksdb")]
97    RocksDb {
98        config: RocksDbStoreConfig,
99        namespace: String,
100    },
101    /// The DynamoDB key value store
102    #[cfg(feature = "dynamodb")]
103    DynamoDb {
104        config: DynamoDbStoreConfig,
105        namespace: String,
106    },
107    /// The ScyllaDB key value store
108    #[cfg(feature = "scylladb")]
109    ScyllaDb {
110        config: ScyllaDbStoreConfig,
111        namespace: String,
112    },
113    #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
114    DualRocksDbScyllaDb {
115        config: DualStoreConfig<RocksDbStoreConfig, ScyllaDbStoreConfig>,
116        namespace: String,
117    },
118}
119
120/// The description of a storage implementation.
121#[derive(Clone, Debug)]
122#[cfg_attr(any(test), derive(Eq, PartialEq))]
123pub enum InnerStorageConfig {
124    /// The memory description.
125    Memory {
126        /// The path to the genesis configuration. This is needed because we reinitialize
127        /// memory databases from the genesis config everytime.
128        genesis_path: PathBuf,
129    },
130    /// The storage service description.
131    #[cfg(feature = "storage-service")]
132    Service {
133        /// The endpoint used.
134        endpoint: String,
135    },
136    /// The RocksDB description.
137    #[cfg(feature = "rocksdb")]
138    RocksDb {
139        /// The path used.
140        path: PathBuf,
141        /// Whether to use `block_in_place` or `spawn_blocking`.
142        spawn_mode: RocksDbSpawnMode,
143    },
144    /// The DynamoDB description.
145    #[cfg(feature = "dynamodb")]
146    DynamoDb {
147        /// Whether to use the DynamoDB Local system
148        use_dynamodb_local: bool,
149    },
150    /// The ScyllaDB description.
151    #[cfg(feature = "scylladb")]
152    ScyllaDb {
153        /// The URI for accessing the database.
154        uri: String,
155    },
156    #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
157    DualRocksDbScyllaDb {
158        /// The path used.
159        path_with_guard: PathWithGuard,
160        /// Whether to use `block_in_place` or `spawn_blocking`.
161        spawn_mode: RocksDbSpawnMode,
162        /// The URI for accessing the database.
163        uri: String,
164    },
165}
166
167/// The description of a storage implementation.
168#[derive(Clone, Debug)]
169#[cfg_attr(any(test), derive(Eq, PartialEq))]
170pub struct StorageConfig {
171    /// The inner storage config.
172    pub inner_storage_config: InnerStorageConfig,
173    /// The namespace used
174    pub namespace: String,
175}
176
177const MEMORY: &str = "memory:";
178#[cfg(feature = "storage-service")]
179const STORAGE_SERVICE: &str = "service:";
180#[cfg(feature = "rocksdb")]
181const ROCKS_DB: &str = "rocksdb:";
182#[cfg(feature = "dynamodb")]
183const DYNAMO_DB: &str = "dynamodb:";
184#[cfg(feature = "scylladb")]
185const SCYLLA_DB: &str = "scylladb:";
186#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
187const DUAL_ROCKS_DB_SCYLLA_DB: &str = "dualrocksdbscylladb:";
188
189impl FromStr for StorageConfig {
190    type Err = anyhow::Error;
191
192    fn from_str(input: &str) -> Result<Self, Self::Err> {
193        if let Some(s) = input.strip_prefix(MEMORY) {
194            let parts = s.split(':').collect::<Vec<_>>();
195            if parts.len() == 1 {
196                let genesis_path = parts[0].to_string().into();
197                let namespace = DEFAULT_NAMESPACE.to_string();
198                let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
199                return Ok(StorageConfig {
200                    inner_storage_config,
201                    namespace,
202                });
203            }
204            if parts.len() != 2 {
205                bail!("We should have one genesis config path and one optional namespace");
206            }
207            let genesis_path = parts[0].to_string().into();
208            let namespace = parts[1].to_string();
209            let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
210            return Ok(StorageConfig {
211                inner_storage_config,
212                namespace,
213            });
214        }
215        #[cfg(feature = "storage-service")]
216        if let Some(s) = input.strip_prefix(STORAGE_SERVICE) {
217            if s.is_empty() {
218                bail!(
219                    "For Storage service, the formatting has to be service:endpoint:namespace,\
220example service:tcp:127.0.0.1:7878:table_do_my_test"
221                );
222            }
223            let parts = s.split(':').collect::<Vec<_>>();
224            if parts.len() != 4 {
225                bail!("We should have one endpoint and one namespace");
226            }
227            let protocol = parts[0];
228            if protocol != "tcp" {
229                bail!("Only allowed protocol is tcp");
230            }
231            let endpoint = parts[1];
232            let port = parts[2];
233            let mut endpoint = endpoint.to_string();
234            endpoint.push(':');
235            endpoint.push_str(port);
236            let endpoint = endpoint.to_string();
237            let namespace = parts[3].to_string();
238            let inner_storage_config = InnerStorageConfig::Service { endpoint };
239            return Ok(StorageConfig {
240                inner_storage_config,
241                namespace,
242            });
243        }
244        #[cfg(feature = "rocksdb")]
245        if let Some(s) = input.strip_prefix(ROCKS_DB) {
246            if s.is_empty() {
247                bail!(
248                    "For RocksDB, the formatting has to be rocksdb:directory or rocksdb:directory:spawn_mode:namespace");
249            }
250            let parts = s.split(':').collect::<Vec<_>>();
251            if parts.len() == 1 {
252                let path = parts[0].to_string().into();
253                let namespace = DEFAULT_NAMESPACE.to_string();
254                let spawn_mode = RocksDbSpawnMode::SpawnBlocking;
255                let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
256                return Ok(StorageConfig {
257                    inner_storage_config,
258                    namespace,
259                });
260            }
261            if parts.len() == 2 || parts.len() == 3 {
262                let path = parts[0].to_string().into();
263                let spawn_mode = match parts[1] {
264                    "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
265                    "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
266                    "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
267                    _ => Err(anyhow!("Failed to parse {} as a spawn_mode", parts[1])),
268                }?;
269                let namespace = if parts.len() == 2 {
270                    DEFAULT_NAMESPACE.to_string()
271                } else {
272                    parts[2].to_string()
273                };
274                let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
275                return Ok(StorageConfig {
276                    inner_storage_config,
277                    namespace,
278                });
279            }
280            bail!("We should have one, two or three parts");
281        }
282        #[cfg(feature = "dynamodb")]
283        if let Some(s) = input.strip_prefix(DYNAMO_DB) {
284            let mut parts = s.splitn(2, ':');
285            let namespace = parts
286                .next()
287                .ok_or_else(|| anyhow!("Missing DynamoDB table name, e.g. {DYNAMO_DB}TABLE"))?
288                .to_string();
289            let use_dynamodb_local = match parts.next() {
290                None | Some("env") => false,
291                Some("dynamodb_local") => true,
292                Some(unknown) => {
293                    bail!(
294                        "Invalid DynamoDB endpoint {unknown:?}. \
295                        Expected {DYNAMO_DB}TABLE:[env|dynamodb_local]"
296                    );
297                }
298            };
299            let inner_storage_config = InnerStorageConfig::DynamoDb { use_dynamodb_local };
300            return Ok(StorageConfig {
301                inner_storage_config,
302                namespace,
303            });
304        }
305        #[cfg(feature = "scylladb")]
306        if let Some(s) = input.strip_prefix(SCYLLA_DB) {
307            let mut uri: Option<String> = None;
308            let mut namespace: Option<String> = None;
309            let parse_error: &'static str = "Correct format is tcp:db_hostname:port.";
310            if !s.is_empty() {
311                let mut parts = s.split(':');
312                while let Some(part) = parts.next() {
313                    match part {
314                        "tcp" => {
315                            let address = parts.next().ok_or_else(|| {
316                                anyhow!("Failed to find address for {s}. {parse_error}")
317                            })?;
318                            let port_str = parts.next().ok_or_else(|| {
319                                anyhow!("Failed to find port for {s}. {parse_error}")
320                            })?;
321                            let port = NonZeroU16::from_str(port_str).map_err(|_| {
322                                anyhow!(
323                                    "Failed to find parse port {port_str} for {s}. {parse_error}",
324                                )
325                            })?;
326                            if uri.is_some() {
327                                bail!("The uri has already been assigned");
328                            }
329                            uri = Some(format!("{}:{}", &address, port));
330                        }
331                        _ if part.starts_with("table") => {
332                            if namespace.is_some() {
333                                bail!("The namespace has already been assigned");
334                            }
335                            namespace = Some(part.to_string());
336                        }
337                        _ => {
338                            bail!("the entry \"{part}\" is not matching");
339                        }
340                    }
341                }
342            }
343            let uri = uri.unwrap_or("localhost:9042".to_string());
344            let namespace = namespace.unwrap_or(DEFAULT_NAMESPACE.to_string());
345            let inner_storage_config = InnerStorageConfig::ScyllaDb { uri };
346            debug!("ScyllaDB connection info: {:?}", inner_storage_config);
347            return Ok(StorageConfig {
348                inner_storage_config,
349                namespace,
350            });
351        }
352        #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
353        if let Some(s) = input.strip_prefix(DUAL_ROCKS_DB_SCYLLA_DB) {
354            let parts = s.split(':').collect::<Vec<_>>();
355            if parts.len() != 5 && parts.len() != 6 {
356                bail!(
357                    "For DualRocksDbScyllaDb, the formatting has to be dualrocksdbscylladb:directory:mode:tcp:hostname:port:namespace"
358                );
359            }
360            let path = Path::new(parts[0]);
361            let path = path.to_path_buf();
362            let path_with_guard = PathWithGuard::new(path);
363            let spawn_mode = match parts[1] {
364                "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
365                "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
366                "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
367                _ => Err(anyhow!("Failed to parse {} as a spawn_mode", parts[1])),
368            }?;
369            let protocol = parts[2];
370            if protocol != "tcp" {
371                bail!("The only allowed protocol is tcp");
372            }
373            let address = parts[3];
374            let port_str = parts[4];
375            let port = NonZeroU16::from_str(port_str)
376                .map_err(|_| anyhow!("Failed to find parse port {port_str} for {s}"))?;
377            let uri = format!("{}:{}", &address, port);
378            let inner_storage_config = InnerStorageConfig::DualRocksDbScyllaDb {
379                path_with_guard,
380                spawn_mode,
381                uri,
382            };
383            let namespace = if parts.len() == 5 {
384                DEFAULT_NAMESPACE.to_string()
385            } else {
386                parts[5].to_string()
387            };
388            return Ok(StorageConfig {
389                inner_storage_config,
390                namespace,
391            });
392        }
393        error!("available storage: memory");
394        #[cfg(feature = "storage-service")]
395        error!("Also available is linera-storage-service");
396        #[cfg(feature = "rocksdb")]
397        error!("Also available is RocksDB");
398        #[cfg(feature = "dynamodb")]
399        error!("Also available is DynamoDB");
400        #[cfg(feature = "scylladb")]
401        error!("Also available is ScyllaDB");
402        #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
403        error!("Also available is DualRocksDbScyllaDb");
404        Err(anyhow!("The input has not matched: {input}"))
405    }
406}
407
408impl StorageConfig {
409    pub fn maybe_append_shard_path(&mut self, _shard: usize) -> std::io::Result<()> {
410        match &mut self.inner_storage_config {
411            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
412            InnerStorageConfig::DualRocksDbScyllaDb {
413                path_with_guard,
414                spawn_mode: _,
415                uri: _,
416            } => {
417                let shard_str = format!("shard_{}", _shard);
418                path_with_guard.path_buf.push(shard_str);
419                std::fs::create_dir_all(&path_with_guard.path_buf)
420            }
421            _ => Ok(()),
422        }
423    }
424
425    /// The addition of the common config to get a full configuration
426    pub async fn add_common_storage_options(
427        &self,
428        options: &CommonStorageOptions,
429    ) -> Result<StoreConfig, anyhow::Error> {
430        let namespace = self.namespace.clone();
431        match &self.inner_storage_config {
432            InnerStorageConfig::Memory { genesis_path } => {
433                let config = MemoryStoreConfig {
434                    max_stream_queries: options.storage_max_stream_queries,
435                    kill_on_drop: false,
436                };
437                let genesis_path = genesis_path.clone();
438                Ok(StoreConfig::Memory {
439                    config,
440                    namespace,
441                    genesis_path,
442                })
443            }
444            #[cfg(feature = "storage-service")]
445            InnerStorageConfig::Service { endpoint } => {
446                let inner_config = StorageServiceStoreInternalConfig {
447                    endpoint: endpoint.clone(),
448                    max_concurrent_queries: options.storage_max_concurrent_queries,
449                    max_stream_queries: options.storage_max_stream_queries,
450                };
451                let config = StorageServiceStoreConfig {
452                    inner_config,
453                    storage_cache_config: options.storage_cache_config(),
454                };
455                Ok(StoreConfig::StorageService { config, namespace })
456            }
457            #[cfg(feature = "rocksdb")]
458            InnerStorageConfig::RocksDb { path, spawn_mode } => {
459                let path_with_guard = PathWithGuard::new(path.to_path_buf());
460                let inner_config = RocksDbStoreInternalConfig {
461                    spawn_mode: *spawn_mode,
462                    path_with_guard,
463                    max_stream_queries: options.storage_max_stream_queries,
464                };
465                let config = RocksDbStoreConfig {
466                    inner_config,
467                    storage_cache_config: options.storage_cache_config(),
468                };
469                Ok(StoreConfig::RocksDb { config, namespace })
470            }
471            #[cfg(feature = "dynamodb")]
472            InnerStorageConfig::DynamoDb { use_dynamodb_local } => {
473                let inner_config = DynamoDbStoreInternalConfig {
474                    use_dynamodb_local: *use_dynamodb_local,
475                    max_concurrent_queries: options.storage_max_concurrent_queries,
476                    max_stream_queries: options.storage_max_stream_queries,
477                };
478                let config = DynamoDbStoreConfig {
479                    inner_config,
480                    storage_cache_config: options.storage_cache_config(),
481                };
482                Ok(StoreConfig::DynamoDb { config, namespace })
483            }
484            #[cfg(feature = "scylladb")]
485            InnerStorageConfig::ScyllaDb { uri } => {
486                let inner_config = ScyllaDbStoreInternalConfig {
487                    uri: uri.clone(),
488                    max_stream_queries: options.storage_max_stream_queries,
489                    max_concurrent_queries: options.storage_max_concurrent_queries,
490                    replication_factor: options.storage_replication_factor,
491                };
492                let config = ScyllaDbStoreConfig {
493                    inner_config,
494                    storage_cache_config: options.storage_cache_config(),
495                };
496                Ok(StoreConfig::ScyllaDb { config, namespace })
497            }
498            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
499            InnerStorageConfig::DualRocksDbScyllaDb {
500                path_with_guard,
501                spawn_mode,
502                uri,
503            } => {
504                let inner_config = RocksDbStoreInternalConfig {
505                    spawn_mode: *spawn_mode,
506                    path_with_guard: path_with_guard.clone(),
507                    max_stream_queries: options.storage_max_stream_queries,
508                };
509                let first_config = RocksDbStoreConfig {
510                    inner_config,
511                    storage_cache_config: options.storage_cache_config(),
512                };
513
514                let inner_config = ScyllaDbStoreInternalConfig {
515                    uri: uri.clone(),
516                    max_stream_queries: options.storage_max_stream_queries,
517                    max_concurrent_queries: options.storage_max_concurrent_queries,
518                    replication_factor: options.storage_replication_factor,
519                };
520                let second_config = ScyllaDbStoreConfig {
521                    inner_config,
522                    storage_cache_config: options.storage_cache_config(),
523                };
524
525                let config = DualStoreConfig {
526                    first_config,
527                    second_config,
528                };
529                Ok(StoreConfig::DualRocksDbScyllaDb { config, namespace })
530            }
531        }
532    }
533}
534
535impl fmt::Display for StorageConfig {
536    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
537        let namespace = &self.namespace;
538        match &self.inner_storage_config {
539            #[cfg(feature = "storage-service")]
540            InnerStorageConfig::Service { endpoint } => {
541                write!(f, "service:tcp:{}:{}", endpoint, namespace)
542            }
543            InnerStorageConfig::Memory { genesis_path } => {
544                write!(f, "memory:{}:{}", genesis_path.display(), namespace)
545            }
546            #[cfg(feature = "rocksdb")]
547            InnerStorageConfig::RocksDb { path, spawn_mode } => {
548                let spawn_mode = spawn_mode.to_string();
549                write!(f, "rocksdb:{}:{}:{}", path.display(), spawn_mode, namespace)
550            }
551            #[cfg(feature = "dynamodb")]
552            InnerStorageConfig::DynamoDb { use_dynamodb_local } => match use_dynamodb_local {
553                true => write!(f, "dynamodb:{}:dynamodb_local", namespace),
554                false => write!(f, "dynamodb:{}:env", namespace),
555            },
556            #[cfg(feature = "scylladb")]
557            InnerStorageConfig::ScyllaDb { uri } => {
558                write!(f, "scylladb:tcp:{}:{}", uri, namespace)
559            }
560            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
561            InnerStorageConfig::DualRocksDbScyllaDb {
562                path_with_guard,
563                spawn_mode,
564                uri,
565            } => {
566                write!(
567                    f,
568                    "dualrocksdbscylladb:{}:{}:tcp:{}:{}",
569                    path_with_guard.path_buf.display(),
570                    spawn_mode,
571                    uri,
572                    namespace
573                )
574            }
575        }
576    }
577}
578
579#[async_trait]
580pub trait Runnable {
581    type Output;
582
583    async fn run<S>(self, storage: S) -> Self::Output
584    where
585        S: Storage + Clone + Send + Sync + 'static;
586}
587
588#[async_trait]
589pub trait RunnableWithStore {
590    type Output;
591
592    async fn run<D>(
593        self,
594        config: D::Config,
595        namespace: String,
596    ) -> Result<Self::Output, anyhow::Error>
597    where
598        D: KeyValueDatabase + Clone + Send + Sync + 'static,
599        D::Store: KeyValueStore + Clone + Send + Sync + 'static,
600        D::Error: Send + Sync;
601}
602
603impl StoreConfig {
604    pub async fn run_with_storage<Job>(
605        self,
606        wasm_runtime: Option<WasmRuntime>,
607        job: Job,
608    ) -> Result<Job::Output, anyhow::Error>
609    where
610        Job: Runnable,
611    {
612        match self {
613            StoreConfig::Memory {
614                config,
615                namespace,
616                genesis_path,
617            } => {
618                let mut storage = DbStorage::<MemoryDatabase, _>::maybe_create_and_connect(
619                    &config,
620                    &namespace,
621                    wasm_runtime,
622                )
623                .await?;
624                let genesis_config = crate::util::read_json::<GenesisConfig>(genesis_path)?;
625                // Memory storage must be initialized every time.
626                genesis_config.initialize_storage(&mut storage).await?;
627                Ok(job.run(storage).await)
628            }
629            #[cfg(feature = "storage-service")]
630            StoreConfig::StorageService { config, namespace } => {
631                let storage = DbStorage::<StorageServiceDatabase, _>::connect(
632                    &config,
633                    &namespace,
634                    wasm_runtime,
635                )
636                .await?;
637                Ok(job.run(storage).await)
638            }
639            #[cfg(feature = "rocksdb")]
640            StoreConfig::RocksDb { config, namespace } => {
641                let storage =
642                    DbStorage::<RocksDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
643                        .await?;
644                Ok(job.run(storage).await)
645            }
646            #[cfg(feature = "dynamodb")]
647            StoreConfig::DynamoDb { config, namespace } => {
648                let storage =
649                    DbStorage::<DynamoDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
650                        .await?;
651                Ok(job.run(storage).await)
652            }
653            #[cfg(feature = "scylladb")]
654            StoreConfig::ScyllaDb { config, namespace } => {
655                let storage =
656                    DbStorage::<ScyllaDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
657                        .await?;
658                Ok(job.run(storage).await)
659            }
660            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
661            StoreConfig::DualRocksDbScyllaDb { config, namespace } => {
662                let storage = DbStorage::<
663                    DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>,
664                    _,
665                >::connect(&config, &namespace, wasm_runtime)
666                .await?;
667                Ok(job.run(storage).await)
668            }
669        }
670    }
671
672    #[allow(unused_variables)]
673    pub async fn run_with_store<Job>(self, job: Job) -> Result<Job::Output, anyhow::Error>
674    where
675        Job: RunnableWithStore,
676    {
677        match self {
678            StoreConfig::Memory { .. } => {
679                Err(anyhow!("Cannot run admin operations on the memory store"))
680            }
681            #[cfg(feature = "storage-service")]
682            StoreConfig::StorageService { config, namespace } => {
683                Ok(job.run::<StorageServiceDatabase>(config, namespace).await?)
684            }
685            #[cfg(feature = "rocksdb")]
686            StoreConfig::RocksDb { config, namespace } => {
687                Ok(job.run::<RocksDbDatabase>(config, namespace).await?)
688            }
689            #[cfg(feature = "dynamodb")]
690            StoreConfig::DynamoDb { config, namespace } => {
691                Ok(job.run::<DynamoDbDatabase>(config, namespace).await?)
692            }
693            #[cfg(feature = "scylladb")]
694            StoreConfig::ScyllaDb { config, namespace } => {
695                Ok(job.run::<ScyllaDbDatabase>(config, namespace).await?)
696            }
697            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
698            StoreConfig::DualRocksDbScyllaDb { config, namespace } => Ok(job
699                .run::<DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>>(
700                    config, namespace,
701                )
702                .await?),
703        }
704    }
705
706    pub async fn initialize(self, config: &GenesisConfig) -> Result<(), anyhow::Error> {
707        self.run_with_store(InitializeStorageJob(config)).await
708    }
709}
710
711struct InitializeStorageJob<'a>(&'a GenesisConfig);
712
713#[async_trait]
714impl RunnableWithStore for InitializeStorageJob<'_> {
715    type Output = ();
716
717    async fn run<D>(
718        self,
719        config: D::Config,
720        namespace: String,
721    ) -> Result<Self::Output, anyhow::Error>
722    where
723        D: KeyValueDatabase + Clone + Send + Sync + 'static,
724        D::Store: KeyValueStore + Clone + Send + Sync + 'static,
725        D::Error: Send + Sync,
726    {
727        let mut storage =
728            DbStorage::<D, _>::maybe_create_and_connect(&config, &namespace, None).await?;
729        self.0.initialize_storage(&mut storage).await?;
730        Ok(())
731    }
732}
733
734#[test]
735fn test_memory_storage_config_from_str() {
736    assert_eq!(
737        StorageConfig::from_str("memory:path/to/genesis.json").unwrap(),
738        StorageConfig {
739            inner_storage_config: InnerStorageConfig::Memory {
740                genesis_path: PathBuf::from("path/to/genesis.json")
741            },
742            namespace: DEFAULT_NAMESPACE.into()
743        }
744    );
745    assert_eq!(
746        StorageConfig::from_str("memory:path/to/genesis.json:namespace").unwrap(),
747        StorageConfig {
748            inner_storage_config: InnerStorageConfig::Memory {
749                genesis_path: PathBuf::from("path/to/genesis.json")
750            },
751            namespace: "namespace".into()
752        }
753    );
754    assert!(StorageConfig::from_str("memory").is_err(),);
755}
756
757#[cfg(feature = "storage-service")]
758#[test]
759fn test_shared_store_config_from_str() {
760    assert_eq!(
761        StorageConfig::from_str("service:tcp:127.0.0.1:8942:linera").unwrap(),
762        StorageConfig {
763            inner_storage_config: InnerStorageConfig::Service {
764                endpoint: "127.0.0.1:8942".to_string()
765            },
766            namespace: "linera".into()
767        }
768    );
769    assert!(StorageConfig::from_str("service:tcp:127.0.0.1:8942").is_err());
770    assert!(StorageConfig::from_str("service:tcp:127.0.0.1:linera").is_err());
771}
772
773#[cfg(feature = "rocksdb")]
774#[test]
775fn test_rocks_db_storage_config_from_str() {
776    assert!(StorageConfig::from_str("rocksdb_foo.db").is_err());
777    assert_eq!(
778        StorageConfig::from_str("rocksdb:foo.db").unwrap(),
779        StorageConfig {
780            inner_storage_config: InnerStorageConfig::RocksDb {
781                path: "foo.db".into(),
782                spawn_mode: RocksDbSpawnMode::SpawnBlocking,
783            },
784            namespace: DEFAULT_NAMESPACE.to_string()
785        }
786    );
787    assert_eq!(
788        StorageConfig::from_str("rocksdb:foo.db:block_in_place").unwrap(),
789        StorageConfig {
790            inner_storage_config: InnerStorageConfig::RocksDb {
791                path: "foo.db".into(),
792                spawn_mode: RocksDbSpawnMode::BlockInPlace,
793            },
794            namespace: DEFAULT_NAMESPACE.to_string()
795        }
796    );
797    assert_eq!(
798        StorageConfig::from_str("rocksdb:foo.db:block_in_place:chosen_namespace").unwrap(),
799        StorageConfig {
800            inner_storage_config: InnerStorageConfig::RocksDb {
801                path: "foo.db".into(),
802                spawn_mode: RocksDbSpawnMode::BlockInPlace,
803            },
804            namespace: "chosen_namespace".into()
805        }
806    );
807}
808
809#[cfg(feature = "dynamodb")]
810#[test]
811fn test_aws_storage_config_from_str() {
812    assert_eq!(
813        StorageConfig::from_str("dynamodb:table").unwrap(),
814        StorageConfig {
815            inner_storage_config: InnerStorageConfig::DynamoDb {
816                use_dynamodb_local: false
817            },
818            namespace: "table".to_string()
819        }
820    );
821    assert_eq!(
822        StorageConfig::from_str("dynamodb:table:env").unwrap(),
823        StorageConfig {
824            inner_storage_config: InnerStorageConfig::DynamoDb {
825                use_dynamodb_local: false
826            },
827            namespace: "table".to_string()
828        }
829    );
830    assert_eq!(
831        StorageConfig::from_str("dynamodb:table:dynamodb_local").unwrap(),
832        StorageConfig {
833            inner_storage_config: InnerStorageConfig::DynamoDb {
834                use_dynamodb_local: true
835            },
836            namespace: "table".to_string()
837        }
838    );
839    assert!(StorageConfig::from_str("dynamodb").is_err());
840    assert!(StorageConfig::from_str("dynamodb:").is_err());
841    assert!(StorageConfig::from_str("dynamodb:1").is_err());
842    assert!(StorageConfig::from_str("dynamodb:wrong:endpoint").is_err());
843}
844
845#[cfg(feature = "scylladb")]
846#[test]
847fn test_scylla_db_storage_config_from_str() {
848    assert_eq!(
849        StorageConfig::from_str("scylladb:").unwrap(),
850        StorageConfig {
851            inner_storage_config: InnerStorageConfig::ScyllaDb {
852                uri: "localhost:9042".to_string()
853            },
854            namespace: DEFAULT_NAMESPACE.to_string()
855        }
856    );
857    assert_eq!(
858        StorageConfig::from_str("scylladb:tcp:db_hostname:230:table_other_storage").unwrap(),
859        StorageConfig {
860            inner_storage_config: InnerStorageConfig::ScyllaDb {
861                uri: "db_hostname:230".to_string()
862            },
863            namespace: "table_other_storage".to_string()
864        }
865    );
866    assert_eq!(
867        StorageConfig::from_str("scylladb:tcp:db_hostname:230").unwrap(),
868        StorageConfig {
869            inner_storage_config: InnerStorageConfig::ScyllaDb {
870                uri: "db_hostname:230".to_string()
871            },
872            namespace: DEFAULT_NAMESPACE.to_string()
873        }
874    );
875    assert!(StorageConfig::from_str("scylladb:-10").is_err());
876    assert!(StorageConfig::from_str("scylladb:70000").is_err());
877    assert!(StorageConfig::from_str("scylladb:230:234").is_err());
878    assert!(StorageConfig::from_str("scylladb:tcp:address1").is_err());
879    assert!(StorageConfig::from_str("scylladb:tcp:address1:tcp:/address2").is_err());
880    assert!(StorageConfig::from_str("scylladb:wrong").is_err());
881}