linera_service/
storage.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{fmt, path::PathBuf, str::FromStr};
5
6use anyhow::{anyhow, bail};
7use async_trait::async_trait;
8use linera_client::config::GenesisConfig;
9use linera_execution::WasmRuntime;
10use linera_storage::{DbStorage, Storage, DEFAULT_NAMESPACE};
11#[cfg(feature = "storage-service")]
12use linera_storage_service::{
13    client::StorageServiceStore,
14    common::{StorageServiceStoreConfig, StorageServiceStoreInternalConfig},
15};
16#[cfg(feature = "dynamodb")]
17use linera_views::dynamo_db::{DynamoDbStore, DynamoDbStoreConfig, DynamoDbStoreInternalConfig};
18#[cfg(feature = "rocksdb")]
19use linera_views::rocks_db::{
20    PathWithGuard, RocksDbSpawnMode, RocksDbStore, RocksDbStoreConfig, RocksDbStoreInternalConfig,
21};
22use linera_views::{
23    lru_caching::StorageCacheConfig,
24    memory::{MemoryStore, MemoryStoreConfig},
25    store::KeyValueStore,
26};
27use serde::{Deserialize, Serialize};
28use tracing::error;
29#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
30use {
31    linera_storage::ChainStatesFirstAssignment,
32    linera_views::backends::dual::{DualStore, DualStoreConfig},
33    std::path::Path,
34};
35#[cfg(feature = "scylladb")]
36use {
37    linera_views::scylla_db::{ScyllaDbStore, ScyllaDbStoreConfig, ScyllaDbStoreInternalConfig},
38    std::num::NonZeroU16,
39    tracing::debug,
40};
41
42#[derive(Clone, Debug, clap::Parser)]
43pub struct CommonStorageOptions {
44    /// The maximal number of simultaneous queries to the database
45    #[arg(long, global = true)]
46    pub storage_max_concurrent_queries: Option<usize>,
47
48    /// The maximal number of simultaneous stream queries to the database
49    #[arg(long, default_value = "10", global = true)]
50    pub storage_max_stream_queries: usize,
51
52    /// The maximal memory used in the storage cache.
53    #[arg(long, default_value = "10000000", global = true)]
54    pub storage_max_cache_size: usize,
55
56    /// The maximal size of an entry in the storage cache.
57    #[arg(long, default_value = "1000000", global = true)]
58    pub storage_max_entry_size: usize,
59
60    /// The maximal number of entries in the storage cache.
61    #[arg(long, default_value = "1000", global = true)]
62    pub storage_max_cache_entries: usize,
63
64    /// The replication factor for the keyspace
65    #[arg(long, default_value = "1", global = true)]
66    pub storage_replication_factor: u32,
67}
68
69impl CommonStorageOptions {
70    pub fn storage_cache_config(&self) -> StorageCacheConfig {
71        StorageCacheConfig {
72            max_cache_size: self.storage_max_cache_size,
73            max_entry_size: self.storage_max_entry_size,
74            max_cache_entries: self.storage_max_cache_entries,
75        }
76    }
77}
78
79/// The configuration of the key value store in use.
80#[derive(Clone, Debug, Deserialize, Serialize)]
81pub enum StoreConfig {
82    /// The memory key value store
83    Memory {
84        config: MemoryStoreConfig,
85        namespace: String,
86        genesis_path: PathBuf,
87    },
88    /// The storage service key-value store
89    #[cfg(feature = "storage-service")]
90    StorageService {
91        config: StorageServiceStoreConfig,
92        namespace: String,
93    },
94    /// The RocksDB key value store
95    #[cfg(feature = "rocksdb")]
96    RocksDb {
97        config: RocksDbStoreConfig,
98        namespace: String,
99    },
100    /// The DynamoDB key value store
101    #[cfg(feature = "dynamodb")]
102    DynamoDb {
103        config: DynamoDbStoreConfig,
104        namespace: String,
105    },
106    /// The ScyllaDB key value store
107    #[cfg(feature = "scylladb")]
108    ScyllaDb {
109        config: ScyllaDbStoreConfig,
110        namespace: String,
111    },
112    #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
113    DualRocksDbScyllaDb {
114        config: DualStoreConfig<RocksDbStoreConfig, ScyllaDbStoreConfig>,
115        namespace: String,
116    },
117}
118
119/// The description of a storage implementation.
120#[derive(Clone, Debug)]
121#[cfg_attr(any(test), derive(Eq, PartialEq))]
122pub enum InnerStorageConfig {
123    /// The memory description.
124    Memory {
125        /// The path to the genesis configuration. This is needed because we reinitialize
126        /// memory databases from the genesis config everytime.
127        genesis_path: PathBuf,
128    },
129    /// The storage service description.
130    #[cfg(feature = "storage-service")]
131    Service {
132        /// The endpoint used.
133        endpoint: String,
134    },
135    /// The RocksDB description.
136    #[cfg(feature = "rocksdb")]
137    RocksDb {
138        /// The path used.
139        path: PathBuf,
140        /// Whether to use `block_in_place` or `spawn_blocking`.
141        spawn_mode: RocksDbSpawnMode,
142    },
143    /// The DynamoDB description.
144    #[cfg(feature = "dynamodb")]
145    DynamoDb {
146        /// Whether to use the DynamoDB Local system
147        use_dynamodb_local: bool,
148    },
149    /// The ScyllaDB description.
150    #[cfg(feature = "scylladb")]
151    ScyllaDb {
152        /// The URI for accessing the database.
153        uri: String,
154    },
155    #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
156    DualRocksDbScyllaDb {
157        /// The path used.
158        path_with_guard: PathWithGuard,
159        /// Whether to use `block_in_place` or `spawn_blocking`.
160        spawn_mode: RocksDbSpawnMode,
161        /// The URI for accessing the database.
162        uri: String,
163    },
164}
165
166/// The description of a storage implementation.
167#[derive(Clone, Debug)]
168#[cfg_attr(any(test), derive(Eq, PartialEq))]
169pub struct StorageConfig {
170    /// The inner storage config.
171    pub inner_storage_config: InnerStorageConfig,
172    /// The namespace used
173    pub namespace: String,
174}
175
176const MEMORY: &str = "memory:";
177#[cfg(feature = "storage-service")]
178const STORAGE_SERVICE: &str = "service:";
179#[cfg(feature = "rocksdb")]
180const ROCKS_DB: &str = "rocksdb:";
181#[cfg(feature = "dynamodb")]
182const DYNAMO_DB: &str = "dynamodb:";
183#[cfg(feature = "scylladb")]
184const SCYLLA_DB: &str = "scylladb:";
185#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
186const DUAL_ROCKS_DB_SCYLLA_DB: &str = "dualrocksdbscylladb:";
187
188impl FromStr for StorageConfig {
189    type Err = anyhow::Error;
190
191    fn from_str(input: &str) -> Result<Self, Self::Err> {
192        if let Some(s) = input.strip_prefix(MEMORY) {
193            let parts = s.split(':').collect::<Vec<_>>();
194            if parts.len() == 1 {
195                let genesis_path = parts[0].to_string().into();
196                let namespace = DEFAULT_NAMESPACE.to_string();
197                let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
198                return Ok(StorageConfig {
199                    inner_storage_config,
200                    namespace,
201                });
202            }
203            if parts.len() != 2 {
204                bail!("We should have one genesis config path and one optional namespace");
205            }
206            let genesis_path = parts[0].to_string().into();
207            let namespace = parts[1].to_string();
208            let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
209            return Ok(StorageConfig {
210                inner_storage_config,
211                namespace,
212            });
213        }
214        #[cfg(feature = "storage-service")]
215        if let Some(s) = input.strip_prefix(STORAGE_SERVICE) {
216            if s.is_empty() {
217                bail!(
218                    "For Storage service, the formatting has to be service:endpoint:namespace,\
219example service:tcp:127.0.0.1:7878:table_do_my_test"
220                );
221            }
222            let parts = s.split(':').collect::<Vec<_>>();
223            if parts.len() != 4 {
224                bail!("We should have one endpoint and one namespace");
225            }
226            let protocol = parts[0];
227            if protocol != "tcp" {
228                bail!("Only allowed protocol is tcp");
229            }
230            let endpoint = parts[1];
231            let port = parts[2];
232            let mut endpoint = endpoint.to_string();
233            endpoint.push(':');
234            endpoint.push_str(port);
235            let endpoint = endpoint.to_string();
236            let namespace = parts[3].to_string();
237            let inner_storage_config = InnerStorageConfig::Service { endpoint };
238            return Ok(StorageConfig {
239                inner_storage_config,
240                namespace,
241            });
242        }
243        #[cfg(feature = "rocksdb")]
244        if let Some(s) = input.strip_prefix(ROCKS_DB) {
245            if s.is_empty() {
246                bail!(
247                    "For RocksDB, the formatting has to be rocksdb:directory or rocksdb:directory:spawn_mode:namespace");
248            }
249            let parts = s.split(':').collect::<Vec<_>>();
250            if parts.len() == 1 {
251                let path = parts[0].to_string().into();
252                let namespace = DEFAULT_NAMESPACE.to_string();
253                let spawn_mode = RocksDbSpawnMode::SpawnBlocking;
254                let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
255                return Ok(StorageConfig {
256                    inner_storage_config,
257                    namespace,
258                });
259            }
260            if parts.len() == 2 || parts.len() == 3 {
261                let path = parts[0].to_string().into();
262                let spawn_mode = match parts[1] {
263                    "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
264                    "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
265                    "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
266                    _ => Err(anyhow!("Failed to parse {} as a spawn_mode", parts[1])),
267                }?;
268                let namespace = if parts.len() == 2 {
269                    DEFAULT_NAMESPACE.to_string()
270                } else {
271                    parts[2].to_string()
272                };
273                let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
274                return Ok(StorageConfig {
275                    inner_storage_config,
276                    namespace,
277                });
278            }
279            bail!("We should have one, two or three parts");
280        }
281        #[cfg(feature = "dynamodb")]
282        if let Some(s) = input.strip_prefix(DYNAMO_DB) {
283            let mut parts = s.splitn(2, ':');
284            let namespace = parts
285                .next()
286                .ok_or_else(|| anyhow!("Missing DynamoDB table name, e.g. {DYNAMO_DB}TABLE"))?
287                .to_string();
288            let use_dynamodb_local = match parts.next() {
289                None | Some("env") => false,
290                Some("dynamodb_local") => true,
291                Some(unknown) => {
292                    bail!(
293                        "Invalid DynamoDB endpoint {unknown:?}. \
294                        Expected {DYNAMO_DB}TABLE:[env|dynamodb_local]"
295                    );
296                }
297            };
298            let inner_storage_config = InnerStorageConfig::DynamoDb { use_dynamodb_local };
299            return Ok(StorageConfig {
300                inner_storage_config,
301                namespace,
302            });
303        }
304        #[cfg(feature = "scylladb")]
305        if let Some(s) = input.strip_prefix(SCYLLA_DB) {
306            let mut uri: Option<String> = None;
307            let mut namespace: Option<String> = None;
308            let parse_error: &'static str = "Correct format is tcp:db_hostname:port.";
309            if !s.is_empty() {
310                let mut parts = s.split(':');
311                while let Some(part) = parts.next() {
312                    match part {
313                        "tcp" => {
314                            let address = parts.next().ok_or_else(|| {
315                                anyhow!("Failed to find address for {s}. {parse_error}")
316                            })?;
317                            let port_str = parts.next().ok_or_else(|| {
318                                anyhow!("Failed to find port for {s}. {parse_error}")
319                            })?;
320                            let port = NonZeroU16::from_str(port_str).map_err(|_| {
321                                anyhow!(
322                                    "Failed to find parse port {port_str} for {s}. {parse_error}",
323                                )
324                            })?;
325                            if uri.is_some() {
326                                bail!("The uri has already been assigned");
327                            }
328                            uri = Some(format!("{}:{}", &address, port));
329                        }
330                        _ if part.starts_with("table") => {
331                            if namespace.is_some() {
332                                bail!("The namespace has already been assigned");
333                            }
334                            namespace = Some(part.to_string());
335                        }
336                        _ => {
337                            bail!("the entry \"{part}\" is not matching");
338                        }
339                    }
340                }
341            }
342            let uri = uri.unwrap_or("localhost:9042".to_string());
343            let namespace = namespace.unwrap_or(DEFAULT_NAMESPACE.to_string());
344            let inner_storage_config = InnerStorageConfig::ScyllaDb { uri };
345            debug!("ScyllaDB connection info: {:?}", inner_storage_config);
346            return Ok(StorageConfig {
347                inner_storage_config,
348                namespace,
349            });
350        }
351        #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
352        if let Some(s) = input.strip_prefix(DUAL_ROCKS_DB_SCYLLA_DB) {
353            let parts = s.split(':').collect::<Vec<_>>();
354            if parts.len() != 5 && parts.len() != 6 {
355                bail!(
356                    "For DualRocksDbScyllaDb, the formatting has to be dualrocksdbscylladb:directory:mode:tcp:hostname:port:namespace"
357                );
358            }
359            let path = Path::new(parts[0]);
360            let path = path.to_path_buf();
361            let path_with_guard = PathWithGuard::new(path);
362            let spawn_mode = match parts[1] {
363                "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
364                "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
365                "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
366                _ => Err(anyhow!("Failed to parse {} as a spawn_mode", parts[1])),
367            }?;
368            let protocol = parts[2];
369            if protocol != "tcp" {
370                bail!("The only allowed protocol is tcp");
371            }
372            let address = parts[3];
373            let port_str = parts[4];
374            let port = NonZeroU16::from_str(port_str)
375                .map_err(|_| anyhow!("Failed to find parse port {port_str} for {s}"))?;
376            let uri = format!("{}:{}", &address, port);
377            let inner_storage_config = InnerStorageConfig::DualRocksDbScyllaDb {
378                path_with_guard,
379                spawn_mode,
380                uri,
381            };
382            let namespace = if parts.len() == 5 {
383                DEFAULT_NAMESPACE.to_string()
384            } else {
385                parts[5].to_string()
386            };
387            return Ok(StorageConfig {
388                inner_storage_config,
389                namespace,
390            });
391        }
392        error!("available storage: memory");
393        #[cfg(feature = "storage-service")]
394        error!("Also available is linera-storage-service");
395        #[cfg(feature = "rocksdb")]
396        error!("Also available is RocksDB");
397        #[cfg(feature = "dynamodb")]
398        error!("Also available is DynamoDB");
399        #[cfg(feature = "scylladb")]
400        error!("Also available is ScyllaDB");
401        #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
402        error!("Also available is DualRocksDbScyllaDb");
403        Err(anyhow!("The input has not matched: {input}"))
404    }
405}
406
407impl StorageConfig {
408    pub fn maybe_append_shard_path(&mut self, _shard: usize) -> std::io::Result<()> {
409        match &mut self.inner_storage_config {
410            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
411            InnerStorageConfig::DualRocksDbScyllaDb {
412                path_with_guard,
413                spawn_mode: _,
414                uri: _,
415            } => {
416                let shard_str = format!("shard_{}", _shard);
417                path_with_guard.path_buf.push(shard_str);
418                std::fs::create_dir_all(&path_with_guard.path_buf)
419            }
420            _ => Ok(()),
421        }
422    }
423
424    /// The addition of the common config to get a full configuration
425    pub async fn add_common_storage_options(
426        &self,
427        options: &CommonStorageOptions,
428    ) -> Result<StoreConfig, anyhow::Error> {
429        let namespace = self.namespace.clone();
430        match &self.inner_storage_config {
431            InnerStorageConfig::Memory { genesis_path } => {
432                let config = MemoryStoreConfig {
433                    max_stream_queries: options.storage_max_stream_queries,
434                };
435                let genesis_path = genesis_path.clone();
436                Ok(StoreConfig::Memory {
437                    config,
438                    namespace,
439                    genesis_path,
440                })
441            }
442            #[cfg(feature = "storage-service")]
443            InnerStorageConfig::Service { endpoint } => {
444                let inner_config = StorageServiceStoreInternalConfig {
445                    endpoint: endpoint.clone(),
446                    max_concurrent_queries: options.storage_max_concurrent_queries,
447                    max_stream_queries: options.storage_max_stream_queries,
448                };
449                let config = StorageServiceStoreConfig {
450                    inner_config,
451                    storage_cache_config: options.storage_cache_config(),
452                };
453                Ok(StoreConfig::StorageService { config, namespace })
454            }
455            #[cfg(feature = "rocksdb")]
456            InnerStorageConfig::RocksDb { path, spawn_mode } => {
457                let path_with_guard = PathWithGuard::new(path.to_path_buf());
458                let inner_config = RocksDbStoreInternalConfig {
459                    spawn_mode: *spawn_mode,
460                    path_with_guard,
461                    max_stream_queries: options.storage_max_stream_queries,
462                };
463                let config = RocksDbStoreConfig {
464                    inner_config,
465                    storage_cache_config: options.storage_cache_config(),
466                };
467                Ok(StoreConfig::RocksDb { config, namespace })
468            }
469            #[cfg(feature = "dynamodb")]
470            InnerStorageConfig::DynamoDb { use_dynamodb_local } => {
471                let inner_config = DynamoDbStoreInternalConfig {
472                    use_dynamodb_local: *use_dynamodb_local,
473                    max_concurrent_queries: options.storage_max_concurrent_queries,
474                    max_stream_queries: options.storage_max_stream_queries,
475                };
476                let config = DynamoDbStoreConfig {
477                    inner_config,
478                    storage_cache_config: options.storage_cache_config(),
479                };
480                Ok(StoreConfig::DynamoDb { config, namespace })
481            }
482            #[cfg(feature = "scylladb")]
483            InnerStorageConfig::ScyllaDb { uri } => {
484                let inner_config = ScyllaDbStoreInternalConfig {
485                    uri: uri.clone(),
486                    max_stream_queries: options.storage_max_stream_queries,
487                    max_concurrent_queries: options.storage_max_concurrent_queries,
488                    replication_factor: options.storage_replication_factor,
489                };
490                let config = ScyllaDbStoreConfig {
491                    inner_config,
492                    storage_cache_config: options.storage_cache_config(),
493                };
494                Ok(StoreConfig::ScyllaDb { config, namespace })
495            }
496            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
497            InnerStorageConfig::DualRocksDbScyllaDb {
498                path_with_guard,
499                spawn_mode,
500                uri,
501            } => {
502                let inner_config = RocksDbStoreInternalConfig {
503                    spawn_mode: *spawn_mode,
504                    path_with_guard: path_with_guard.clone(),
505                    max_stream_queries: options.storage_max_stream_queries,
506                };
507                let first_config = RocksDbStoreConfig {
508                    inner_config,
509                    storage_cache_config: options.storage_cache_config(),
510                };
511
512                let inner_config = ScyllaDbStoreInternalConfig {
513                    uri: uri.clone(),
514                    max_stream_queries: options.storage_max_stream_queries,
515                    max_concurrent_queries: options.storage_max_concurrent_queries,
516                    replication_factor: options.storage_replication_factor,
517                };
518                let second_config = ScyllaDbStoreConfig {
519                    inner_config,
520                    storage_cache_config: options.storage_cache_config(),
521                };
522
523                let config = DualStoreConfig {
524                    first_config,
525                    second_config,
526                };
527                Ok(StoreConfig::DualRocksDbScyllaDb { config, namespace })
528            }
529        }
530    }
531}
532
533impl fmt::Display for StorageConfig {
534    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
535        let namespace = &self.namespace;
536        match &self.inner_storage_config {
537            #[cfg(feature = "storage-service")]
538            InnerStorageConfig::Service { endpoint } => {
539                write!(f, "service:tcp:{}:{}", endpoint, namespace)
540            }
541            InnerStorageConfig::Memory { genesis_path } => {
542                write!(f, "memory:{}:{}", genesis_path.display(), namespace)
543            }
544            #[cfg(feature = "rocksdb")]
545            InnerStorageConfig::RocksDb { path, spawn_mode } => {
546                let spawn_mode = spawn_mode.to_string();
547                write!(f, "rocksdb:{}:{}:{}", path.display(), spawn_mode, namespace)
548            }
549            #[cfg(feature = "dynamodb")]
550            InnerStorageConfig::DynamoDb { use_dynamodb_local } => match use_dynamodb_local {
551                true => write!(f, "dynamodb:{}:dynamodb_local", namespace),
552                false => write!(f, "dynamodb:{}:env", namespace),
553            },
554            #[cfg(feature = "scylladb")]
555            InnerStorageConfig::ScyllaDb { uri } => {
556                write!(f, "scylladb:tcp:{}:{}", uri, namespace)
557            }
558            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
559            InnerStorageConfig::DualRocksDbScyllaDb {
560                path_with_guard,
561                spawn_mode,
562                uri,
563            } => {
564                write!(
565                    f,
566                    "dualrocksdbscylladb:{}:{}:tcp:{}:{}",
567                    path_with_guard.path_buf.display(),
568                    spawn_mode,
569                    uri,
570                    namespace
571                )
572            }
573        }
574    }
575}
576
577#[async_trait]
578pub trait Runnable {
579    type Output;
580
581    async fn run<S>(self, storage: S) -> Self::Output
582    where
583        S: Storage + Clone + Send + Sync + 'static;
584}
585
586#[async_trait]
587pub trait RunnableWithStore {
588    type Output;
589
590    async fn run<S>(
591        self,
592        config: S::Config,
593        namespace: String,
594    ) -> Result<Self::Output, anyhow::Error>
595    where
596        S: KeyValueStore + Clone + Send + Sync + 'static,
597        S::Error: Send + Sync;
598}
599
600impl StoreConfig {
601    pub async fn run_with_storage<Job>(
602        self,
603        wasm_runtime: Option<WasmRuntime>,
604        job: Job,
605    ) -> Result<Job::Output, anyhow::Error>
606    where
607        Job: Runnable,
608    {
609        match self {
610            StoreConfig::Memory {
611                config,
612                namespace,
613                genesis_path,
614            } => {
615                let mut storage = DbStorage::<MemoryStore, _>::maybe_create_and_connect(
616                    &config,
617                    &namespace,
618                    wasm_runtime,
619                )
620                .await?;
621                let genesis_config = crate::util::read_json::<GenesisConfig>(genesis_path)?;
622                // Memory storage must be initialized every time.
623                genesis_config.initialize_storage(&mut storage).await?;
624                Ok(job.run(storage).await)
625            }
626            #[cfg(feature = "storage-service")]
627            StoreConfig::StorageService { config, namespace } => {
628                let storage =
629                    DbStorage::<StorageServiceStore, _>::connect(&config, &namespace, wasm_runtime)
630                        .await?;
631                Ok(job.run(storage).await)
632            }
633            #[cfg(feature = "rocksdb")]
634            StoreConfig::RocksDb { config, namespace } => {
635                let storage =
636                    DbStorage::<RocksDbStore, _>::connect(&config, &namespace, wasm_runtime)
637                        .await?;
638                Ok(job.run(storage).await)
639            }
640            #[cfg(feature = "dynamodb")]
641            StoreConfig::DynamoDb { config, namespace } => {
642                let storage =
643                    DbStorage::<DynamoDbStore, _>::connect(&config, &namespace, wasm_runtime)
644                        .await?;
645                Ok(job.run(storage).await)
646            }
647            #[cfg(feature = "scylladb")]
648            StoreConfig::ScyllaDb { config, namespace } => {
649                let storage =
650                    DbStorage::<ScyllaDbStore, _>::connect(&config, &namespace, wasm_runtime)
651                        .await?;
652                Ok(job.run(storage).await)
653            }
654            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
655            StoreConfig::DualRocksDbScyllaDb { config, namespace } => {
656                let storage = DbStorage::<
657                    DualStore<RocksDbStore, ScyllaDbStore, ChainStatesFirstAssignment>,
658                    _,
659                >::connect(&config, &namespace, wasm_runtime)
660                .await?;
661                Ok(job.run(storage).await)
662            }
663        }
664    }
665
666    #[allow(unused_variables)]
667    pub async fn run_with_store<Job>(self, job: Job) -> Result<Job::Output, anyhow::Error>
668    where
669        Job: RunnableWithStore,
670    {
671        match self {
672            StoreConfig::Memory { .. } => {
673                Err(anyhow!("Cannot run admin operations on the memory store"))
674            }
675            #[cfg(feature = "storage-service")]
676            StoreConfig::StorageService { config, namespace } => {
677                Ok(job.run::<StorageServiceStore>(config, namespace).await?)
678            }
679            #[cfg(feature = "rocksdb")]
680            StoreConfig::RocksDb { config, namespace } => {
681                Ok(job.run::<RocksDbStore>(config, namespace).await?)
682            }
683            #[cfg(feature = "dynamodb")]
684            StoreConfig::DynamoDb { config, namespace } => {
685                Ok(job.run::<DynamoDbStore>(config, namespace).await?)
686            }
687            #[cfg(feature = "scylladb")]
688            StoreConfig::ScyllaDb { config, namespace } => {
689                Ok(job.run::<ScyllaDbStore>(config, namespace).await?)
690            }
691            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
692            StoreConfig::DualRocksDbScyllaDb { config, namespace } => Ok(job
693                .run::<DualStore<RocksDbStore, ScyllaDbStore, ChainStatesFirstAssignment>>(
694                    config, namespace,
695                )
696                .await?),
697        }
698    }
699
700    pub async fn initialize(self, config: &GenesisConfig) -> Result<(), anyhow::Error> {
701        self.run_with_store(InitializeStorageJob(config)).await
702    }
703}
704
705struct InitializeStorageJob<'a>(&'a GenesisConfig);
706
707#[async_trait]
708impl RunnableWithStore for InitializeStorageJob<'_> {
709    type Output = ();
710
711    async fn run<S>(
712        self,
713        config: S::Config,
714        namespace: String,
715    ) -> Result<Self::Output, anyhow::Error>
716    where
717        S: KeyValueStore + Clone + Send + Sync + 'static,
718        S::Error: Send + Sync,
719    {
720        let mut storage =
721            DbStorage::<S, _>::maybe_create_and_connect(&config, &namespace, None).await?;
722        self.0.initialize_storage(&mut storage).await?;
723        Ok(())
724    }
725}
726
727#[test]
728fn test_memory_storage_config_from_str() {
729    assert_eq!(
730        StorageConfig::from_str("memory:path/to/genesis.json").unwrap(),
731        StorageConfig {
732            inner_storage_config: InnerStorageConfig::Memory {
733                genesis_path: PathBuf::from("path/to/genesis.json")
734            },
735            namespace: DEFAULT_NAMESPACE.into()
736        }
737    );
738    assert_eq!(
739        StorageConfig::from_str("memory:path/to/genesis.json:namespace").unwrap(),
740        StorageConfig {
741            inner_storage_config: InnerStorageConfig::Memory {
742                genesis_path: PathBuf::from("path/to/genesis.json")
743            },
744            namespace: "namespace".into()
745        }
746    );
747    assert!(StorageConfig::from_str("memory").is_err(),);
748}
749
750#[cfg(feature = "storage-service")]
751#[test]
752fn test_shared_store_config_from_str() {
753    assert_eq!(
754        StorageConfig::from_str("service:tcp:127.0.0.1:8942:linera").unwrap(),
755        StorageConfig {
756            inner_storage_config: InnerStorageConfig::Service {
757                endpoint: "127.0.0.1:8942".to_string()
758            },
759            namespace: "linera".into()
760        }
761    );
762    assert!(StorageConfig::from_str("service:tcp:127.0.0.1:8942").is_err());
763    assert!(StorageConfig::from_str("service:tcp:127.0.0.1:linera").is_err());
764}
765
766#[cfg(feature = "rocksdb")]
767#[test]
768fn test_rocks_db_storage_config_from_str() {
769    assert!(StorageConfig::from_str("rocksdb_foo.db").is_err());
770    assert_eq!(
771        StorageConfig::from_str("rocksdb:foo.db").unwrap(),
772        StorageConfig {
773            inner_storage_config: InnerStorageConfig::RocksDb {
774                path: "foo.db".into(),
775                spawn_mode: RocksDbSpawnMode::SpawnBlocking,
776            },
777            namespace: DEFAULT_NAMESPACE.to_string()
778        }
779    );
780    assert_eq!(
781        StorageConfig::from_str("rocksdb:foo.db:block_in_place").unwrap(),
782        StorageConfig {
783            inner_storage_config: InnerStorageConfig::RocksDb {
784                path: "foo.db".into(),
785                spawn_mode: RocksDbSpawnMode::BlockInPlace,
786            },
787            namespace: DEFAULT_NAMESPACE.to_string()
788        }
789    );
790    assert_eq!(
791        StorageConfig::from_str("rocksdb:foo.db:block_in_place:chosen_namespace").unwrap(),
792        StorageConfig {
793            inner_storage_config: InnerStorageConfig::RocksDb {
794                path: "foo.db".into(),
795                spawn_mode: RocksDbSpawnMode::BlockInPlace,
796            },
797            namespace: "chosen_namespace".into()
798        }
799    );
800}
801
802#[cfg(feature = "dynamodb")]
803#[test]
804fn test_aws_storage_config_from_str() {
805    assert_eq!(
806        StorageConfig::from_str("dynamodb:table").unwrap(),
807        StorageConfig {
808            inner_storage_config: InnerStorageConfig::DynamoDb {
809                use_dynamodb_local: false
810            },
811            namespace: "table".to_string()
812        }
813    );
814    assert_eq!(
815        StorageConfig::from_str("dynamodb:table:env").unwrap(),
816        StorageConfig {
817            inner_storage_config: InnerStorageConfig::DynamoDb {
818                use_dynamodb_local: false
819            },
820            namespace: "table".to_string()
821        }
822    );
823    assert_eq!(
824        StorageConfig::from_str("dynamodb:table:dynamodb_local").unwrap(),
825        StorageConfig {
826            inner_storage_config: InnerStorageConfig::DynamoDb {
827                use_dynamodb_local: true
828            },
829            namespace: "table".to_string()
830        }
831    );
832    assert!(StorageConfig::from_str("dynamodb").is_err());
833    assert!(StorageConfig::from_str("dynamodb:").is_err());
834    assert!(StorageConfig::from_str("dynamodb:1").is_err());
835    assert!(StorageConfig::from_str("dynamodb:wrong:endpoint").is_err());
836}
837
838#[cfg(feature = "scylladb")]
839#[test]
840fn test_scylla_db_storage_config_from_str() {
841    assert_eq!(
842        StorageConfig::from_str("scylladb:").unwrap(),
843        StorageConfig {
844            inner_storage_config: InnerStorageConfig::ScyllaDb {
845                uri: "localhost:9042".to_string()
846            },
847            namespace: DEFAULT_NAMESPACE.to_string()
848        }
849    );
850    assert_eq!(
851        StorageConfig::from_str("scylladb:tcp:db_hostname:230:table_other_storage").unwrap(),
852        StorageConfig {
853            inner_storage_config: InnerStorageConfig::ScyllaDb {
854                uri: "db_hostname:230".to_string()
855            },
856            namespace: "table_other_storage".to_string()
857        }
858    );
859    assert_eq!(
860        StorageConfig::from_str("scylladb:tcp:db_hostname:230").unwrap(),
861        StorageConfig {
862            inner_storage_config: InnerStorageConfig::ScyllaDb {
863                uri: "db_hostname:230".to_string()
864            },
865            namespace: DEFAULT_NAMESPACE.to_string()
866        }
867    );
868    assert!(StorageConfig::from_str("scylladb:-10").is_err());
869    assert!(StorageConfig::from_str("scylladb:70000").is_err());
870    assert!(StorageConfig::from_str("scylladb:230:234").is_err());
871    assert!(StorageConfig::from_str("scylladb:tcp:address1").is_err());
872    assert!(StorageConfig::from_str("scylladb:tcp:address1:tcp:/address2").is_err());
873    assert!(StorageConfig::from_str("scylladb:wrong").is_err());
874}