linera_service/
storage.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{fmt, path::PathBuf, str::FromStr};
5
6use anyhow::{anyhow, bail};
7use async_trait::async_trait;
8use linera_client::config::GenesisConfig;
9use linera_execution::WasmRuntime;
10use linera_storage::{DbStorage, Storage, DEFAULT_NAMESPACE};
11#[cfg(feature = "storage-service")]
12use linera_storage_service::{
13    client::StorageServiceDatabase,
14    common::{StorageServiceStoreConfig, StorageServiceStoreInternalConfig},
15};
16#[cfg(feature = "dynamodb")]
17use linera_views::dynamo_db::{DynamoDbDatabase, DynamoDbStoreConfig, DynamoDbStoreInternalConfig};
18#[cfg(feature = "rocksdb")]
19use linera_views::rocks_db::{
20    PathWithGuard, RocksDbDatabase, RocksDbSpawnMode, RocksDbStoreConfig,
21    RocksDbStoreInternalConfig,
22};
23use linera_views::{
24    lru_prefix_cache::StorageCacheConfig,
25    memory::{MemoryDatabase, MemoryStoreConfig},
26    store::{KeyValueDatabase, KeyValueStore},
27};
28use serde::{Deserialize, Serialize};
29use tracing::error;
30#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
31use {
32    linera_storage::ChainStatesFirstAssignment,
33    linera_views::backends::dual::{DualDatabase, DualStoreConfig},
34    std::path::Path,
35};
36#[cfg(feature = "scylladb")]
37use {
38    linera_views::scylla_db::{ScyllaDbDatabase, ScyllaDbStoreConfig, ScyllaDbStoreInternalConfig},
39    std::num::NonZeroU16,
40    tracing::debug,
41};
42
43#[derive(Clone, Debug, clap::Parser)]
44pub struct CommonStorageOptions {
45    /// The maximal number of simultaneous queries to the database
46    #[arg(long, global = true)]
47    pub storage_max_concurrent_queries: Option<usize>,
48
49    /// The maximal number of simultaneous stream queries to the database
50    #[arg(long, default_value = "10", global = true)]
51    pub storage_max_stream_queries: usize,
52
53    /// The maximal memory used in the storage cache.
54    #[arg(long, default_value = "10000000", global = true)]
55    pub storage_max_cache_size: usize,
56
57    /// The maximal size of a value entry in the storage cache.
58    #[arg(long, default_value = "1000000", global = true)]
59    pub storage_max_value_entry_size: usize,
60
61    /// The maximal size of a find-keys entry in the storage cache.
62    #[arg(long, default_value = "1000000", global = true)]
63    pub storage_max_find_keys_entry_size: usize,
64
65    /// The maximal size of a find-key-values entry in the storage cache.
66    #[arg(long, default_value = "1000000", global = true)]
67    pub storage_max_find_key_values_entry_size: usize,
68
69    /// The maximal number of entries in the storage cache.
70    #[arg(long, default_value = "1000", global = true)]
71    pub storage_max_cache_entries: usize,
72
73    /// The maximal memory used in the value cache.
74    #[arg(long, default_value = "10000000", global = true)]
75    pub storage_max_cache_value_size: usize,
76
77    /// The maximal memory used in the find_keys_by_prefix cache.
78    #[arg(long, default_value = "10000000", global = true)]
79    pub storage_max_cache_find_keys_size: usize,
80
81    /// The maximal memory used in the find_key_values_by_prefix cache.
82    #[arg(long, default_value = "10000000", global = true)]
83    pub storage_max_cache_find_key_values_size: usize,
84
85    /// The replication factor for the keyspace
86    #[arg(long, default_value = "1", global = true)]
87    pub storage_replication_factor: u32,
88}
89
90impl CommonStorageOptions {
91    pub fn storage_cache_config(&self) -> StorageCacheConfig {
92        StorageCacheConfig {
93            max_cache_size: self.storage_max_cache_size,
94            max_value_entry_size: self.storage_max_value_entry_size,
95            max_find_keys_entry_size: self.storage_max_find_keys_entry_size,
96            max_find_key_values_entry_size: self.storage_max_find_key_values_entry_size,
97            max_cache_entries: self.storage_max_cache_entries,
98            max_cache_value_size: self.storage_max_cache_value_size,
99            max_cache_find_keys_size: self.storage_max_cache_find_keys_size,
100            max_cache_find_key_values_size: self.storage_max_cache_find_key_values_size,
101        }
102    }
103}
104
105/// The configuration of the key value store in use.
106#[derive(Clone, Debug, Deserialize, Serialize)]
107pub enum StoreConfig {
108    /// The memory key value store
109    Memory {
110        config: MemoryStoreConfig,
111        namespace: String,
112        genesis_path: PathBuf,
113    },
114    /// The storage service key-value store
115    #[cfg(feature = "storage-service")]
116    StorageService {
117        config: StorageServiceStoreConfig,
118        namespace: String,
119    },
120    /// The RocksDB key value store
121    #[cfg(feature = "rocksdb")]
122    RocksDb {
123        config: RocksDbStoreConfig,
124        namespace: String,
125    },
126    /// The DynamoDB key value store
127    #[cfg(feature = "dynamodb")]
128    DynamoDb {
129        config: DynamoDbStoreConfig,
130        namespace: String,
131    },
132    /// The ScyllaDB key value store
133    #[cfg(feature = "scylladb")]
134    ScyllaDb {
135        config: ScyllaDbStoreConfig,
136        namespace: String,
137    },
138    #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
139    DualRocksDbScyllaDb {
140        config: DualStoreConfig<RocksDbStoreConfig, ScyllaDbStoreConfig>,
141        namespace: String,
142    },
143}
144
145/// The description of a storage implementation.
146#[derive(Clone, Debug)]
147#[cfg_attr(any(test), derive(Eq, PartialEq))]
148pub enum InnerStorageConfig {
149    /// The memory description.
150    Memory {
151        /// The path to the genesis configuration. This is needed because we reinitialize
152        /// memory databases from the genesis config everytime.
153        genesis_path: PathBuf,
154    },
155    /// The storage service description.
156    #[cfg(feature = "storage-service")]
157    Service {
158        /// The endpoint used.
159        endpoint: String,
160    },
161    /// The RocksDB description.
162    #[cfg(feature = "rocksdb")]
163    RocksDb {
164        /// The path used.
165        path: PathBuf,
166        /// Whether to use `block_in_place` or `spawn_blocking`.
167        spawn_mode: RocksDbSpawnMode,
168    },
169    /// The DynamoDB description.
170    #[cfg(feature = "dynamodb")]
171    DynamoDb {
172        /// Whether to use the DynamoDB Local system
173        use_dynamodb_local: bool,
174    },
175    /// The ScyllaDB description.
176    #[cfg(feature = "scylladb")]
177    ScyllaDb {
178        /// The URI for accessing the database.
179        uri: String,
180    },
181    #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
182    DualRocksDbScyllaDb {
183        /// The path used.
184        path_with_guard: PathWithGuard,
185        /// Whether to use `block_in_place` or `spawn_blocking`.
186        spawn_mode: RocksDbSpawnMode,
187        /// The URI for accessing the database.
188        uri: String,
189    },
190}
191
192/// The description of a storage implementation.
193#[derive(Clone, Debug)]
194#[cfg_attr(any(test), derive(Eq, PartialEq))]
195pub struct StorageConfig {
196    /// The inner storage config.
197    pub inner_storage_config: InnerStorageConfig,
198    /// The namespace used
199    pub namespace: String,
200}
201
202const MEMORY: &str = "memory:";
203#[cfg(feature = "storage-service")]
204const STORAGE_SERVICE: &str = "service:";
205#[cfg(feature = "rocksdb")]
206const ROCKS_DB: &str = "rocksdb:";
207#[cfg(feature = "dynamodb")]
208const DYNAMO_DB: &str = "dynamodb:";
209#[cfg(feature = "scylladb")]
210const SCYLLA_DB: &str = "scylladb:";
211#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
212const DUAL_ROCKS_DB_SCYLLA_DB: &str = "dualrocksdbscylladb:";
213
214impl FromStr for StorageConfig {
215    type Err = anyhow::Error;
216
217    fn from_str(input: &str) -> Result<Self, Self::Err> {
218        if let Some(s) = input.strip_prefix(MEMORY) {
219            let parts = s.split(':').collect::<Vec<_>>();
220            if parts.len() == 1 {
221                let genesis_path = parts[0].to_string().into();
222                let namespace = DEFAULT_NAMESPACE.to_string();
223                let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
224                return Ok(StorageConfig {
225                    inner_storage_config,
226                    namespace,
227                });
228            }
229            if parts.len() != 2 {
230                bail!("We should have one genesis config path and one optional namespace");
231            }
232            let genesis_path = parts[0].to_string().into();
233            let namespace = parts[1].to_string();
234            let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
235            return Ok(StorageConfig {
236                inner_storage_config,
237                namespace,
238            });
239        }
240        #[cfg(feature = "storage-service")]
241        if let Some(s) = input.strip_prefix(STORAGE_SERVICE) {
242            if s.is_empty() {
243                bail!(
244                    "For Storage service, the formatting has to be service:endpoint:namespace,\
245example service:tcp:127.0.0.1:7878:table_do_my_test"
246                );
247            }
248            let parts = s.split(':').collect::<Vec<_>>();
249            if parts.len() != 4 {
250                bail!("We should have one endpoint and one namespace");
251            }
252            let protocol = parts[0];
253            if protocol != "tcp" {
254                bail!("Only allowed protocol is tcp");
255            }
256            let endpoint = parts[1];
257            let port = parts[2];
258            let mut endpoint = endpoint.to_string();
259            endpoint.push(':');
260            endpoint.push_str(port);
261            let endpoint = endpoint.to_string();
262            let namespace = parts[3].to_string();
263            let inner_storage_config = InnerStorageConfig::Service { endpoint };
264            return Ok(StorageConfig {
265                inner_storage_config,
266                namespace,
267            });
268        }
269        #[cfg(feature = "rocksdb")]
270        if let Some(s) = input.strip_prefix(ROCKS_DB) {
271            if s.is_empty() {
272                bail!(
273                    "For RocksDB, the formatting has to be rocksdb:directory or rocksdb:directory:spawn_mode:namespace");
274            }
275            let parts = s.split(':').collect::<Vec<_>>();
276            if parts.len() == 1 {
277                let path = parts[0].to_string().into();
278                let namespace = DEFAULT_NAMESPACE.to_string();
279                let spawn_mode = RocksDbSpawnMode::SpawnBlocking;
280                let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
281                return Ok(StorageConfig {
282                    inner_storage_config,
283                    namespace,
284                });
285            }
286            if parts.len() == 2 || parts.len() == 3 {
287                let path = parts[0].to_string().into();
288                let spawn_mode = match parts[1] {
289                    "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
290                    "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
291                    "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
292                    _ => Err(anyhow!("Failed to parse {} as a spawn_mode", parts[1])),
293                }?;
294                let namespace = if parts.len() == 2 {
295                    DEFAULT_NAMESPACE.to_string()
296                } else {
297                    parts[2].to_string()
298                };
299                let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
300                return Ok(StorageConfig {
301                    inner_storage_config,
302                    namespace,
303                });
304            }
305            bail!("We should have one, two or three parts");
306        }
307        #[cfg(feature = "dynamodb")]
308        if let Some(s) = input.strip_prefix(DYNAMO_DB) {
309            let mut parts = s.splitn(2, ':');
310            let namespace = parts
311                .next()
312                .ok_or_else(|| anyhow!("Missing DynamoDB table name, e.g. {DYNAMO_DB}TABLE"))?
313                .to_string();
314            let use_dynamodb_local = match parts.next() {
315                None | Some("env") => false,
316                Some("dynamodb_local") => true,
317                Some(unknown) => {
318                    bail!(
319                        "Invalid DynamoDB endpoint {unknown:?}. \
320                        Expected {DYNAMO_DB}TABLE:[env|dynamodb_local]"
321                    );
322                }
323            };
324            let inner_storage_config = InnerStorageConfig::DynamoDb { use_dynamodb_local };
325            return Ok(StorageConfig {
326                inner_storage_config,
327                namespace,
328            });
329        }
330        #[cfg(feature = "scylladb")]
331        if let Some(s) = input.strip_prefix(SCYLLA_DB) {
332            let mut uri: Option<String> = None;
333            let mut namespace: Option<String> = None;
334            let parse_error: &'static str = "Correct format is tcp:db_hostname:port.";
335            if !s.is_empty() {
336                let mut parts = s.split(':');
337                while let Some(part) = parts.next() {
338                    match part {
339                        "tcp" => {
340                            let address = parts.next().ok_or_else(|| {
341                                anyhow!("Failed to find address for {s}. {parse_error}")
342                            })?;
343                            let port_str = parts.next().ok_or_else(|| {
344                                anyhow!("Failed to find port for {s}. {parse_error}")
345                            })?;
346                            let port = NonZeroU16::from_str(port_str).map_err(|_| {
347                                anyhow!(
348                                    "Failed to find parse port {port_str} for {s}. {parse_error}",
349                                )
350                            })?;
351                            if uri.is_some() {
352                                bail!("The uri has already been assigned");
353                            }
354                            uri = Some(format!("{}:{}", &address, port));
355                        }
356                        _ if part.starts_with("table") => {
357                            if namespace.is_some() {
358                                bail!("The namespace has already been assigned");
359                            }
360                            namespace = Some(part.to_string());
361                        }
362                        _ => {
363                            bail!("the entry \"{part}\" is not matching");
364                        }
365                    }
366                }
367            }
368            let uri = uri.unwrap_or("localhost:9042".to_string());
369            let namespace = namespace.unwrap_or(DEFAULT_NAMESPACE.to_string());
370            let inner_storage_config = InnerStorageConfig::ScyllaDb { uri };
371            debug!("ScyllaDB connection info: {:?}", inner_storage_config);
372            return Ok(StorageConfig {
373                inner_storage_config,
374                namespace,
375            });
376        }
377        #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
378        if let Some(s) = input.strip_prefix(DUAL_ROCKS_DB_SCYLLA_DB) {
379            let parts = s.split(':').collect::<Vec<_>>();
380            if parts.len() != 5 && parts.len() != 6 {
381                bail!(
382                    "For DualRocksDbScyllaDb, the formatting has to be dualrocksdbscylladb:directory:mode:tcp:hostname:port:namespace"
383                );
384            }
385            let path = Path::new(parts[0]);
386            let path = path.to_path_buf();
387            let path_with_guard = PathWithGuard::new(path);
388            let spawn_mode = match parts[1] {
389                "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
390                "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
391                "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
392                _ => Err(anyhow!("Failed to parse {} as a spawn_mode", parts[1])),
393            }?;
394            let protocol = parts[2];
395            if protocol != "tcp" {
396                bail!("The only allowed protocol is tcp");
397            }
398            let address = parts[3];
399            let port_str = parts[4];
400            let port = NonZeroU16::from_str(port_str)
401                .map_err(|_| anyhow!("Failed to find parse port {port_str} for {s}"))?;
402            let uri = format!("{}:{}", &address, port);
403            let inner_storage_config = InnerStorageConfig::DualRocksDbScyllaDb {
404                path_with_guard,
405                spawn_mode,
406                uri,
407            };
408            let namespace = if parts.len() == 5 {
409                DEFAULT_NAMESPACE.to_string()
410            } else {
411                parts[5].to_string()
412            };
413            return Ok(StorageConfig {
414                inner_storage_config,
415                namespace,
416            });
417        }
418        error!("available storage: memory");
419        #[cfg(feature = "storage-service")]
420        error!("Also available is linera-storage-service");
421        #[cfg(feature = "rocksdb")]
422        error!("Also available is RocksDB");
423        #[cfg(feature = "dynamodb")]
424        error!("Also available is DynamoDB");
425        #[cfg(feature = "scylladb")]
426        error!("Also available is ScyllaDB");
427        #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
428        error!("Also available is DualRocksDbScyllaDb");
429        Err(anyhow!("The input has not matched: {input}"))
430    }
431}
432
433impl StorageConfig {
434    pub fn maybe_append_shard_path(&mut self, _shard: usize) -> std::io::Result<()> {
435        match &mut self.inner_storage_config {
436            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
437            InnerStorageConfig::DualRocksDbScyllaDb {
438                path_with_guard,
439                spawn_mode: _,
440                uri: _,
441            } => {
442                let shard_str = format!("shard_{}", _shard);
443                path_with_guard.path_buf.push(shard_str);
444                std::fs::create_dir_all(&path_with_guard.path_buf)
445            }
446            _ => Ok(()),
447        }
448    }
449
450    /// The addition of the common config to get a full configuration
451    pub fn add_common_storage_options(
452        &self,
453        options: &CommonStorageOptions,
454    ) -> Result<StoreConfig, anyhow::Error> {
455        let namespace = self.namespace.clone();
456        match &self.inner_storage_config {
457            InnerStorageConfig::Memory { genesis_path } => {
458                let config = MemoryStoreConfig {
459                    max_stream_queries: options.storage_max_stream_queries,
460                    kill_on_drop: false,
461                };
462                let genesis_path = genesis_path.clone();
463                Ok(StoreConfig::Memory {
464                    config,
465                    namespace,
466                    genesis_path,
467                })
468            }
469            #[cfg(feature = "storage-service")]
470            InnerStorageConfig::Service { endpoint } => {
471                let inner_config = StorageServiceStoreInternalConfig {
472                    endpoint: endpoint.clone(),
473                    max_concurrent_queries: options.storage_max_concurrent_queries,
474                    max_stream_queries: options.storage_max_stream_queries,
475                };
476                let config = StorageServiceStoreConfig {
477                    inner_config,
478                    storage_cache_config: options.storage_cache_config(),
479                };
480                Ok(StoreConfig::StorageService { config, namespace })
481            }
482            #[cfg(feature = "rocksdb")]
483            InnerStorageConfig::RocksDb { path, spawn_mode } => {
484                let path_with_guard = PathWithGuard::new(path.to_path_buf());
485                let inner_config = RocksDbStoreInternalConfig {
486                    spawn_mode: *spawn_mode,
487                    path_with_guard,
488                    max_stream_queries: options.storage_max_stream_queries,
489                };
490                let config = RocksDbStoreConfig {
491                    inner_config,
492                    storage_cache_config: options.storage_cache_config(),
493                };
494                Ok(StoreConfig::RocksDb { config, namespace })
495            }
496            #[cfg(feature = "dynamodb")]
497            InnerStorageConfig::DynamoDb { use_dynamodb_local } => {
498                let inner_config = DynamoDbStoreInternalConfig {
499                    use_dynamodb_local: *use_dynamodb_local,
500                    max_concurrent_queries: options.storage_max_concurrent_queries,
501                    max_stream_queries: options.storage_max_stream_queries,
502                };
503                let config = DynamoDbStoreConfig {
504                    inner_config,
505                    storage_cache_config: options.storage_cache_config(),
506                };
507                Ok(StoreConfig::DynamoDb { config, namespace })
508            }
509            #[cfg(feature = "scylladb")]
510            InnerStorageConfig::ScyllaDb { uri } => {
511                let inner_config = ScyllaDbStoreInternalConfig {
512                    uri: uri.clone(),
513                    max_stream_queries: options.storage_max_stream_queries,
514                    max_concurrent_queries: options.storage_max_concurrent_queries,
515                    replication_factor: options.storage_replication_factor,
516                };
517                let config = ScyllaDbStoreConfig {
518                    inner_config,
519                    storage_cache_config: options.storage_cache_config(),
520                };
521                Ok(StoreConfig::ScyllaDb { config, namespace })
522            }
523            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
524            InnerStorageConfig::DualRocksDbScyllaDb {
525                path_with_guard,
526                spawn_mode,
527                uri,
528            } => {
529                let inner_config = RocksDbStoreInternalConfig {
530                    spawn_mode: *spawn_mode,
531                    path_with_guard: path_with_guard.clone(),
532                    max_stream_queries: options.storage_max_stream_queries,
533                };
534                let first_config = RocksDbStoreConfig {
535                    inner_config,
536                    storage_cache_config: options.storage_cache_config(),
537                };
538
539                let inner_config = ScyllaDbStoreInternalConfig {
540                    uri: uri.clone(),
541                    max_stream_queries: options.storage_max_stream_queries,
542                    max_concurrent_queries: options.storage_max_concurrent_queries,
543                    replication_factor: options.storage_replication_factor,
544                };
545                let second_config = ScyllaDbStoreConfig {
546                    inner_config,
547                    storage_cache_config: options.storage_cache_config(),
548                };
549
550                let config = DualStoreConfig {
551                    first_config,
552                    second_config,
553                };
554                Ok(StoreConfig::DualRocksDbScyllaDb { config, namespace })
555            }
556        }
557    }
558}
559
560impl fmt::Display for StorageConfig {
561    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
562        let namespace = &self.namespace;
563        match &self.inner_storage_config {
564            #[cfg(feature = "storage-service")]
565            InnerStorageConfig::Service { endpoint } => {
566                write!(f, "service:tcp:{}:{}", endpoint, namespace)
567            }
568            InnerStorageConfig::Memory { genesis_path } => {
569                write!(f, "memory:{}:{}", genesis_path.display(), namespace)
570            }
571            #[cfg(feature = "rocksdb")]
572            InnerStorageConfig::RocksDb { path, spawn_mode } => {
573                let spawn_mode = spawn_mode.to_string();
574                write!(f, "rocksdb:{}:{}:{}", path.display(), spawn_mode, namespace)
575            }
576            #[cfg(feature = "dynamodb")]
577            InnerStorageConfig::DynamoDb { use_dynamodb_local } => match use_dynamodb_local {
578                true => write!(f, "dynamodb:{}:dynamodb_local", namespace),
579                false => write!(f, "dynamodb:{}:env", namespace),
580            },
581            #[cfg(feature = "scylladb")]
582            InnerStorageConfig::ScyllaDb { uri } => {
583                write!(f, "scylladb:tcp:{}:{}", uri, namespace)
584            }
585            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
586            InnerStorageConfig::DualRocksDbScyllaDb {
587                path_with_guard,
588                spawn_mode,
589                uri,
590            } => {
591                write!(
592                    f,
593                    "dualrocksdbscylladb:{}:{}:tcp:{}:{}",
594                    path_with_guard.path_buf.display(),
595                    spawn_mode,
596                    uri,
597                    namespace
598                )
599            }
600        }
601    }
602}
603
604#[async_trait]
605pub trait Runnable {
606    type Output;
607
608    async fn run<S>(self, storage: S) -> Self::Output
609    where
610        S: Storage + Clone + Send + Sync + 'static;
611}
612
613#[async_trait]
614pub trait RunnableWithStore {
615    type Output;
616
617    async fn run<D>(
618        self,
619        config: D::Config,
620        namespace: String,
621    ) -> Result<Self::Output, anyhow::Error>
622    where
623        D: KeyValueDatabase + Clone + Send + Sync + 'static,
624        D::Store: KeyValueStore + Clone + Send + Sync + 'static,
625        D::Error: Send + Sync;
626}
627
628impl StoreConfig {
629    pub async fn run_with_storage<Job>(
630        self,
631        wasm_runtime: Option<WasmRuntime>,
632        job: Job,
633    ) -> Result<Job::Output, anyhow::Error>
634    where
635        Job: Runnable,
636    {
637        match self {
638            StoreConfig::Memory {
639                config,
640                namespace,
641                genesis_path,
642            } => {
643                let mut storage = DbStorage::<MemoryDatabase, _>::maybe_create_and_connect(
644                    &config,
645                    &namespace,
646                    wasm_runtime,
647                )
648                .await?;
649                let genesis_config = crate::util::read_json::<GenesisConfig>(genesis_path)?;
650                // Memory storage must be initialized every time.
651                genesis_config.initialize_storage(&mut storage).await?;
652                Ok(job.run(storage).await)
653            }
654            #[cfg(feature = "storage-service")]
655            StoreConfig::StorageService { config, namespace } => {
656                let storage = DbStorage::<StorageServiceDatabase, _>::connect(
657                    &config,
658                    &namespace,
659                    wasm_runtime,
660                )
661                .await?;
662                Ok(job.run(storage).await)
663            }
664            #[cfg(feature = "rocksdb")]
665            StoreConfig::RocksDb { config, namespace } => {
666                let storage =
667                    DbStorage::<RocksDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
668                        .await?;
669                Ok(job.run(storage).await)
670            }
671            #[cfg(feature = "dynamodb")]
672            StoreConfig::DynamoDb { config, namespace } => {
673                let storage =
674                    DbStorage::<DynamoDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
675                        .await?;
676                Ok(job.run(storage).await)
677            }
678            #[cfg(feature = "scylladb")]
679            StoreConfig::ScyllaDb { config, namespace } => {
680                let storage =
681                    DbStorage::<ScyllaDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
682                        .await?;
683                Ok(job.run(storage).await)
684            }
685            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
686            StoreConfig::DualRocksDbScyllaDb { config, namespace } => {
687                let storage = DbStorage::<
688                    DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>,
689                    _,
690                >::connect(&config, &namespace, wasm_runtime)
691                .await?;
692                Ok(job.run(storage).await)
693            }
694        }
695    }
696
697    #[allow(unused_variables)]
698    pub async fn run_with_store<Job>(self, job: Job) -> Result<Job::Output, anyhow::Error>
699    where
700        Job: RunnableWithStore,
701    {
702        match self {
703            StoreConfig::Memory { .. } => {
704                Err(anyhow!("Cannot run admin operations on the memory store"))
705            }
706            #[cfg(feature = "storage-service")]
707            StoreConfig::StorageService { config, namespace } => {
708                Ok(job.run::<StorageServiceDatabase>(config, namespace).await?)
709            }
710            #[cfg(feature = "rocksdb")]
711            StoreConfig::RocksDb { config, namespace } => {
712                Ok(job.run::<RocksDbDatabase>(config, namespace).await?)
713            }
714            #[cfg(feature = "dynamodb")]
715            StoreConfig::DynamoDb { config, namespace } => {
716                Ok(job.run::<DynamoDbDatabase>(config, namespace).await?)
717            }
718            #[cfg(feature = "scylladb")]
719            StoreConfig::ScyllaDb { config, namespace } => {
720                Ok(job.run::<ScyllaDbDatabase>(config, namespace).await?)
721            }
722            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
723            StoreConfig::DualRocksDbScyllaDb { config, namespace } => Ok(job
724                .run::<DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>>(
725                    config, namespace,
726                )
727                .await?),
728        }
729    }
730
731    pub async fn initialize(self, config: &GenesisConfig) -> Result<(), anyhow::Error> {
732        self.run_with_store(InitializeStorageJob(config)).await
733    }
734}
735
736struct InitializeStorageJob<'a>(&'a GenesisConfig);
737
738#[async_trait]
739impl RunnableWithStore for InitializeStorageJob<'_> {
740    type Output = ();
741
742    async fn run<D>(
743        self,
744        config: D::Config,
745        namespace: String,
746    ) -> Result<Self::Output, anyhow::Error>
747    where
748        D: KeyValueDatabase + Clone + Send + Sync + 'static,
749        D::Store: KeyValueStore + Clone + Send + Sync + 'static,
750        D::Error: Send + Sync,
751    {
752        let mut storage =
753            DbStorage::<D, _>::maybe_create_and_connect(&config, &namespace, None).await?;
754        self.0.initialize_storage(&mut storage).await?;
755        Ok(())
756    }
757}
758
759#[test]
760fn test_memory_storage_config_from_str() {
761    assert_eq!(
762        StorageConfig::from_str("memory:path/to/genesis.json").unwrap(),
763        StorageConfig {
764            inner_storage_config: InnerStorageConfig::Memory {
765                genesis_path: PathBuf::from("path/to/genesis.json")
766            },
767            namespace: DEFAULT_NAMESPACE.into()
768        }
769    );
770    assert_eq!(
771        StorageConfig::from_str("memory:path/to/genesis.json:namespace").unwrap(),
772        StorageConfig {
773            inner_storage_config: InnerStorageConfig::Memory {
774                genesis_path: PathBuf::from("path/to/genesis.json")
775            },
776            namespace: "namespace".into()
777        }
778    );
779    assert!(StorageConfig::from_str("memory").is_err(),);
780}
781
782#[cfg(feature = "storage-service")]
783#[test]
784fn test_shared_store_config_from_str() {
785    assert_eq!(
786        StorageConfig::from_str("service:tcp:127.0.0.1:8942:linera").unwrap(),
787        StorageConfig {
788            inner_storage_config: InnerStorageConfig::Service {
789                endpoint: "127.0.0.1:8942".to_string()
790            },
791            namespace: "linera".into()
792        }
793    );
794    assert!(StorageConfig::from_str("service:tcp:127.0.0.1:8942").is_err());
795    assert!(StorageConfig::from_str("service:tcp:127.0.0.1:linera").is_err());
796}
797
798#[cfg(feature = "rocksdb")]
799#[test]
800fn test_rocks_db_storage_config_from_str() {
801    assert!(StorageConfig::from_str("rocksdb_foo.db").is_err());
802    assert_eq!(
803        StorageConfig::from_str("rocksdb:foo.db").unwrap(),
804        StorageConfig {
805            inner_storage_config: InnerStorageConfig::RocksDb {
806                path: "foo.db".into(),
807                spawn_mode: RocksDbSpawnMode::SpawnBlocking,
808            },
809            namespace: DEFAULT_NAMESPACE.to_string()
810        }
811    );
812    assert_eq!(
813        StorageConfig::from_str("rocksdb:foo.db:block_in_place").unwrap(),
814        StorageConfig {
815            inner_storage_config: InnerStorageConfig::RocksDb {
816                path: "foo.db".into(),
817                spawn_mode: RocksDbSpawnMode::BlockInPlace,
818            },
819            namespace: DEFAULT_NAMESPACE.to_string()
820        }
821    );
822    assert_eq!(
823        StorageConfig::from_str("rocksdb:foo.db:block_in_place:chosen_namespace").unwrap(),
824        StorageConfig {
825            inner_storage_config: InnerStorageConfig::RocksDb {
826                path: "foo.db".into(),
827                spawn_mode: RocksDbSpawnMode::BlockInPlace,
828            },
829            namespace: "chosen_namespace".into()
830        }
831    );
832}
833
834#[cfg(feature = "dynamodb")]
835#[test]
836fn test_aws_storage_config_from_str() {
837    assert_eq!(
838        StorageConfig::from_str("dynamodb:table").unwrap(),
839        StorageConfig {
840            inner_storage_config: InnerStorageConfig::DynamoDb {
841                use_dynamodb_local: false
842            },
843            namespace: "table".to_string()
844        }
845    );
846    assert_eq!(
847        StorageConfig::from_str("dynamodb:table:env").unwrap(),
848        StorageConfig {
849            inner_storage_config: InnerStorageConfig::DynamoDb {
850                use_dynamodb_local: false
851            },
852            namespace: "table".to_string()
853        }
854    );
855    assert_eq!(
856        StorageConfig::from_str("dynamodb:table:dynamodb_local").unwrap(),
857        StorageConfig {
858            inner_storage_config: InnerStorageConfig::DynamoDb {
859                use_dynamodb_local: true
860            },
861            namespace: "table".to_string()
862        }
863    );
864    assert!(StorageConfig::from_str("dynamodb").is_err());
865    assert!(StorageConfig::from_str("dynamodb:").is_err());
866    assert!(StorageConfig::from_str("dynamodb:1").is_err());
867    assert!(StorageConfig::from_str("dynamodb:wrong:endpoint").is_err());
868}
869
870#[cfg(feature = "scylladb")]
871#[test]
872fn test_scylla_db_storage_config_from_str() {
873    assert_eq!(
874        StorageConfig::from_str("scylladb:").unwrap(),
875        StorageConfig {
876            inner_storage_config: InnerStorageConfig::ScyllaDb {
877                uri: "localhost:9042".to_string()
878            },
879            namespace: DEFAULT_NAMESPACE.to_string()
880        }
881    );
882    assert_eq!(
883        StorageConfig::from_str("scylladb:tcp:db_hostname:230:table_other_storage").unwrap(),
884        StorageConfig {
885            inner_storage_config: InnerStorageConfig::ScyllaDb {
886                uri: "db_hostname:230".to_string()
887            },
888            namespace: "table_other_storage".to_string()
889        }
890    );
891    assert_eq!(
892        StorageConfig::from_str("scylladb:tcp:db_hostname:230").unwrap(),
893        StorageConfig {
894            inner_storage_config: InnerStorageConfig::ScyllaDb {
895                uri: "db_hostname:230".to_string()
896            },
897            namespace: DEFAULT_NAMESPACE.to_string()
898        }
899    );
900    assert!(StorageConfig::from_str("scylladb:-10").is_err());
901    assert!(StorageConfig::from_str("scylladb:70000").is_err());
902    assert!(StorageConfig::from_str("scylladb:230:234").is_err());
903    assert!(StorageConfig::from_str("scylladb:tcp:address1").is_err());
904    assert!(StorageConfig::from_str("scylladb:tcp:address1:tcp:/address2").is_err());
905    assert!(StorageConfig::from_str("scylladb:wrong").is_err());
906}