linera_service/
storage.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{fmt, path::PathBuf, str::FromStr};
5
6use anyhow::{anyhow, bail};
7use async_trait::async_trait;
8use linera_client::config::GenesisConfig;
9use linera_execution::WasmRuntime;
10use linera_storage::{DbStorage, Storage, DEFAULT_NAMESPACE};
11#[cfg(feature = "storage-service")]
12use linera_storage_service::{
13    client::StorageServiceDatabase,
14    common::{StorageServiceStoreConfig, StorageServiceStoreInternalConfig},
15};
16#[cfg(feature = "dynamodb")]
17use linera_views::dynamo_db::{DynamoDbDatabase, DynamoDbStoreConfig, DynamoDbStoreInternalConfig};
18#[cfg(feature = "rocksdb")]
19use linera_views::rocks_db::{
20    PathWithGuard, RocksDbDatabase, RocksDbSpawnMode, RocksDbStoreConfig,
21    RocksDbStoreInternalConfig,
22};
23use linera_views::{
24    lru_prefix_cache::StorageCacheConfig,
25    memory::{MemoryDatabase, MemoryStoreConfig},
26    store::{KeyValueDatabase, KeyValueStore},
27};
28use serde::{Deserialize, Serialize};
29use tracing::error;
30#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
31use {
32    linera_storage::ChainStatesFirstAssignment,
33    linera_views::backends::dual::{DualDatabase, DualStoreConfig},
34    std::path::Path,
35};
36#[cfg(feature = "scylladb")]
37use {
38    linera_views::scylla_db::{ScyllaDbDatabase, ScyllaDbStoreConfig, ScyllaDbStoreInternalConfig},
39    std::num::NonZeroU16,
40    tracing::debug,
41};
42
43#[derive(Clone, Debug, clap::Parser)]
44pub struct CommonStorageOptions {
45    /// The maximal number of simultaneous queries to the database
46    #[arg(long, global = true)]
47    pub storage_max_concurrent_queries: Option<usize>,
48
49    /// The maximal number of simultaneous stream queries to the database
50    #[arg(long, default_value = "10", global = true)]
51    pub storage_max_stream_queries: usize,
52
53    /// The maximal memory used in the storage cache.
54    #[arg(long, default_value = "10000000", global = true)]
55    pub storage_max_cache_size: usize,
56
57    /// The maximal size of a value entry in the storage cache.
58    #[arg(long, default_value = "1000000", global = true)]
59    pub storage_max_value_entry_size: usize,
60
61    /// The maximal size of a find-keys entry in the storage cache.
62    #[arg(long, default_value = "1000000", global = true)]
63    pub storage_max_find_keys_entry_size: usize,
64
65    /// The maximal size of a find-key-values entry in the storage cache.
66    #[arg(long, default_value = "1000000", global = true)]
67    pub storage_max_find_key_values_entry_size: usize,
68
69    /// The maximal number of entries in the storage cache.
70    #[arg(long, default_value = "1000", global = true)]
71    pub storage_max_cache_entries: usize,
72
73    /// The maximal memory used in the value cache.
74    #[arg(long, default_value = "10000000", global = true)]
75    pub storage_max_cache_value_size: usize,
76
77    /// The maximal memory used in the find_keys_by_prefix cache.
78    #[arg(long, default_value = "10000000", global = true)]
79    pub storage_max_cache_find_keys_size: usize,
80
81    /// The maximal memory used in the find_key_values_by_prefix cache.
82    #[arg(long, default_value = "10000000", global = true)]
83    pub storage_max_cache_find_key_values_size: usize,
84
85    /// The replication factor for the keyspace
86    #[arg(long, default_value = "1", global = true)]
87    pub storage_replication_factor: u32,
88}
89
90impl CommonStorageOptions {
91    pub fn storage_cache_config(&self) -> StorageCacheConfig {
92        StorageCacheConfig {
93            max_cache_size: self.storage_max_cache_size,
94            max_value_entry_size: self.storage_max_value_entry_size,
95            max_find_keys_entry_size: self.storage_max_find_keys_entry_size,
96            max_find_key_values_entry_size: self.storage_max_find_key_values_entry_size,
97            max_cache_entries: self.storage_max_cache_entries,
98            max_cache_value_size: self.storage_max_cache_value_size,
99            max_cache_find_keys_size: self.storage_max_cache_find_keys_size,
100            max_cache_find_key_values_size: self.storage_max_cache_find_key_values_size,
101        }
102    }
103}
104
105/// The configuration of the key value store in use.
106#[derive(Clone, Debug, Deserialize, Serialize)]
107pub enum StoreConfig {
108    /// The memory key value store
109    Memory {
110        config: MemoryStoreConfig,
111        namespace: String,
112        genesis_path: PathBuf,
113    },
114    /// The storage service key-value store
115    #[cfg(feature = "storage-service")]
116    StorageService {
117        config: StorageServiceStoreConfig,
118        namespace: String,
119    },
120    /// The RocksDB key value store
121    #[cfg(feature = "rocksdb")]
122    RocksDb {
123        config: RocksDbStoreConfig,
124        namespace: String,
125    },
126    /// The DynamoDB key value store
127    #[cfg(feature = "dynamodb")]
128    DynamoDb {
129        config: DynamoDbStoreConfig,
130        namespace: String,
131    },
132    /// The ScyllaDB key value store
133    #[cfg(feature = "scylladb")]
134    ScyllaDb {
135        config: ScyllaDbStoreConfig,
136        namespace: String,
137    },
138    #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
139    DualRocksDbScyllaDb {
140        config: DualStoreConfig<RocksDbStoreConfig, ScyllaDbStoreConfig>,
141        namespace: String,
142    },
143}
144
145/// The description of a storage implementation.
146#[derive(Clone, Debug)]
147#[cfg_attr(any(test), derive(Eq, PartialEq))]
148pub enum InnerStorageConfig {
149    /// The memory description.
150    Memory {
151        /// The path to the genesis configuration. This is needed because we reinitialize
152        /// memory databases from the genesis config everytime.
153        genesis_path: PathBuf,
154    },
155    /// The storage service description.
156    #[cfg(feature = "storage-service")]
157    Service {
158        /// The endpoint used.
159        endpoint: String,
160    },
161    /// The RocksDB description.
162    #[cfg(feature = "rocksdb")]
163    RocksDb {
164        /// The path used.
165        path: PathBuf,
166        /// Whether to use `block_in_place` or `spawn_blocking`.
167        spawn_mode: RocksDbSpawnMode,
168    },
169    /// The DynamoDB description.
170    #[cfg(feature = "dynamodb")]
171    DynamoDb {
172        /// Whether to use the DynamoDB Local system
173        use_dynamodb_local: bool,
174    },
175    /// The ScyllaDB description.
176    #[cfg(feature = "scylladb")]
177    ScyllaDb {
178        /// The URI for accessing the database.
179        uri: String,
180    },
181    #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
182    DualRocksDbScyllaDb {
183        /// The path used.
184        path_with_guard: PathWithGuard,
185        /// Whether to use `block_in_place` or `spawn_blocking`.
186        spawn_mode: RocksDbSpawnMode,
187        /// The URI for accessing the database.
188        uri: String,
189    },
190}
191
192/// The description of a storage implementation.
193#[derive(Clone, Debug)]
194#[cfg_attr(any(test), derive(Eq, PartialEq))]
195pub struct StorageConfig {
196    /// The inner storage config.
197    pub inner_storage_config: InnerStorageConfig,
198    /// The namespace used
199    pub namespace: String,
200}
201
202const MEMORY: &str = "memory:";
203#[cfg(feature = "storage-service")]
204const STORAGE_SERVICE: &str = "service:";
205#[cfg(feature = "rocksdb")]
206const ROCKS_DB: &str = "rocksdb:";
207#[cfg(feature = "dynamodb")]
208const DYNAMO_DB: &str = "dynamodb:";
209#[cfg(feature = "scylladb")]
210const SCYLLA_DB: &str = "scylladb:";
211#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
212const DUAL_ROCKS_DB_SCYLLA_DB: &str = "dualrocksdbscylladb:";
213
214impl FromStr for StorageConfig {
215    type Err = anyhow::Error;
216
217    fn from_str(input: &str) -> Result<Self, Self::Err> {
218        if let Some(s) = input.strip_prefix(MEMORY) {
219            let parts = s.split(':').collect::<Vec<_>>();
220            if parts.len() == 1 {
221                let genesis_path = parts[0].to_string().into();
222                let namespace = DEFAULT_NAMESPACE.to_string();
223                let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
224                return Ok(StorageConfig {
225                    inner_storage_config,
226                    namespace,
227                });
228            }
229            if parts.len() != 2 {
230                bail!("We should have one genesis config path and one optional namespace");
231            }
232            let genesis_path = parts[0].to_string().into();
233            let namespace = parts[1].to_string();
234            let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
235            return Ok(StorageConfig {
236                inner_storage_config,
237                namespace,
238            });
239        }
240        #[cfg(feature = "storage-service")]
241        if let Some(s) = input.strip_prefix(STORAGE_SERVICE) {
242            if s.is_empty() {
243                bail!(
244                    "For Storage service, the formatting has to be service:endpoint:namespace,\
245example service:tcp:127.0.0.1:7878:table_do_my_test"
246                );
247            }
248            let parts = s.split(':').collect::<Vec<_>>();
249            if parts.len() != 4 {
250                bail!("We should have one endpoint and one namespace");
251            }
252            let protocol = parts[0];
253            if protocol != "tcp" {
254                bail!("Only allowed protocol is tcp");
255            }
256            let endpoint = parts[1];
257            let port = parts[2];
258            let mut endpoint = endpoint.to_string();
259            endpoint.push(':');
260            endpoint.push_str(port);
261            let endpoint = endpoint.to_string();
262            let namespace = parts[3].to_string();
263            let inner_storage_config = InnerStorageConfig::Service { endpoint };
264            return Ok(StorageConfig {
265                inner_storage_config,
266                namespace,
267            });
268        }
269        #[cfg(feature = "rocksdb")]
270        if let Some(s) = input.strip_prefix(ROCKS_DB) {
271            if s.is_empty() {
272                bail!(
273                    "For RocksDB, the formatting has to be rocksdb:directory or rocksdb:directory:spawn_mode:namespace");
274            }
275            let parts = s.split(':').collect::<Vec<_>>();
276            if parts.len() == 1 {
277                let path = parts[0].to_string().into();
278                let namespace = DEFAULT_NAMESPACE.to_string();
279                let spawn_mode = RocksDbSpawnMode::SpawnBlocking;
280                let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
281                return Ok(StorageConfig {
282                    inner_storage_config,
283                    namespace,
284                });
285            }
286            if parts.len() == 2 || parts.len() == 3 {
287                let path = parts[0].to_string().into();
288                let spawn_mode_name = parts
289                    .get(1)
290                    .copied()
291                    .expect("validated by the parts length check above");
292                let spawn_mode = match spawn_mode_name {
293                    "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
294                    "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
295                    "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
296                    _ => Err(anyhow!(
297                        "Failed to parse {} as a spawn_mode",
298                        spawn_mode_name
299                    )),
300                }?;
301                let namespace = if parts.len() == 2 {
302                    DEFAULT_NAMESPACE.to_string()
303                } else {
304                    parts[2].to_string()
305                };
306                let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
307                return Ok(StorageConfig {
308                    inner_storage_config,
309                    namespace,
310                });
311            }
312            bail!("We should have one, two or three parts");
313        }
314        #[cfg(feature = "dynamodb")]
315        if let Some(s) = input.strip_prefix(DYNAMO_DB) {
316            let mut parts = s.splitn(2, ':');
317            let namespace = parts
318                .next()
319                .ok_or_else(|| anyhow!("Missing DynamoDB table name, e.g. {DYNAMO_DB}TABLE"))?
320                .to_string();
321            let use_dynamodb_local = match parts.next() {
322                None | Some("env") => false,
323                Some("dynamodb_local") => true,
324                Some(unknown) => {
325                    bail!(
326                        "Invalid DynamoDB endpoint {unknown:?}. \
327                        Expected {DYNAMO_DB}TABLE:[env|dynamodb_local]"
328                    );
329                }
330            };
331            let inner_storage_config = InnerStorageConfig::DynamoDb { use_dynamodb_local };
332            return Ok(StorageConfig {
333                inner_storage_config,
334                namespace,
335            });
336        }
337        #[cfg(feature = "scylladb")]
338        if let Some(s) = input.strip_prefix(SCYLLA_DB) {
339            let mut uri: Option<String> = None;
340            let mut namespace: Option<String> = None;
341            let parse_error: &'static str = "Correct format is tcp:db_hostname:port.";
342            if !s.is_empty() {
343                let mut parts = s.split(':');
344                while let Some(part) = parts.next() {
345                    match part {
346                        "tcp" => {
347                            let address = parts.next().ok_or_else(|| {
348                                anyhow!("Failed to find address for {s}. {parse_error}")
349                            })?;
350                            let port_str = parts.next().ok_or_else(|| {
351                                anyhow!("Failed to find port for {s}. {parse_error}")
352                            })?;
353                            let port = NonZeroU16::from_str(port_str).map_err(|_| {
354                                anyhow!(
355                                    "Failed to find parse port {port_str} for {s}. {parse_error}",
356                                )
357                            })?;
358                            if uri.is_some() {
359                                bail!("The uri has already been assigned");
360                            }
361                            uri = Some(format!("{}:{}", &address, port));
362                        }
363                        _ if part.starts_with("table") => {
364                            if namespace.is_some() {
365                                bail!("The namespace has already been assigned");
366                            }
367                            namespace = Some(part.to_string());
368                        }
369                        _ => {
370                            bail!("the entry \"{part}\" is not matching");
371                        }
372                    }
373                }
374            }
375            let uri = uri.unwrap_or_else(|| "localhost:9042".to_string());
376            let namespace = namespace.unwrap_or_else(|| DEFAULT_NAMESPACE.to_string());
377            let inner_storage_config = InnerStorageConfig::ScyllaDb { uri };
378            debug!("ScyllaDB connection info: {:?}", inner_storage_config);
379            return Ok(StorageConfig {
380                inner_storage_config,
381                namespace,
382            });
383        }
384        #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
385        if let Some(s) = input.strip_prefix(DUAL_ROCKS_DB_SCYLLA_DB) {
386            let parts = s.split(':').collect::<Vec<_>>();
387            if parts.len() != 5 && parts.len() != 6 {
388                bail!(
389                    "For DualRocksDbScyllaDb, the formatting has to be dualrocksdbscylladb:directory:mode:tcp:hostname:port:namespace"
390                );
391            }
392            let path = Path::new(parts[0]);
393            let path = path.to_path_buf();
394            let path_with_guard = PathWithGuard::new(path);
395            let spawn_mode_name = parts
396                .get(1)
397                .copied()
398                .expect("validated by the parts length check above");
399            let spawn_mode = match spawn_mode_name {
400                "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
401                "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
402                "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
403                _ => Err(anyhow!(
404                    "Failed to parse {} as a spawn_mode",
405                    spawn_mode_name
406                )),
407            }?;
408            let protocol = parts[2];
409            if protocol != "tcp" {
410                bail!("The only allowed protocol is tcp");
411            }
412            let address = parts[3];
413            let port_str = parts[4];
414            let port = NonZeroU16::from_str(port_str)
415                .map_err(|_| anyhow!("Failed to find parse port {port_str} for {s}"))?;
416            let uri = format!("{}:{}", &address, port);
417            let inner_storage_config = InnerStorageConfig::DualRocksDbScyllaDb {
418                path_with_guard,
419                spawn_mode,
420                uri,
421            };
422            let namespace = if parts.len() == 5 {
423                DEFAULT_NAMESPACE.to_string()
424            } else {
425                parts[5].to_string()
426            };
427            return Ok(StorageConfig {
428                inner_storage_config,
429                namespace,
430            });
431        }
432        error!("available storage: memory");
433        #[cfg(feature = "storage-service")]
434        error!("Also available is linera-storage-service");
435        #[cfg(feature = "rocksdb")]
436        error!("Also available is RocksDB");
437        #[cfg(feature = "dynamodb")]
438        error!("Also available is DynamoDB");
439        #[cfg(feature = "scylladb")]
440        error!("Also available is ScyllaDB");
441        #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
442        error!("Also available is DualRocksDbScyllaDb");
443        Err(anyhow!("The input has not matched: {input}"))
444    }
445}
446
447impl StorageConfig {
448    #[allow(clippy::used_underscore_binding)]
449    pub fn maybe_append_shard_path(&mut self, _shard: usize) -> std::io::Result<()> {
450        match &mut self.inner_storage_config {
451            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
452            InnerStorageConfig::DualRocksDbScyllaDb {
453                path_with_guard,
454                spawn_mode: _,
455                uri: _,
456            } => {
457                let shard_str = format!("shard_{}", _shard);
458                path_with_guard.path_buf.push(shard_str);
459                std::fs::create_dir_all(&path_with_guard.path_buf)
460            }
461            _ => Ok(()),
462        }
463    }
464
465    /// The addition of the common config to get a full configuration
466    pub fn add_common_storage_options(
467        &self,
468        options: &CommonStorageOptions,
469    ) -> Result<StoreConfig, anyhow::Error> {
470        let namespace = self.namespace.clone();
471        match &self.inner_storage_config {
472            InnerStorageConfig::Memory { genesis_path } => {
473                let config = MemoryStoreConfig {
474                    max_stream_queries: options.storage_max_stream_queries,
475                    kill_on_drop: false,
476                };
477                let genesis_path = genesis_path.clone();
478                Ok(StoreConfig::Memory {
479                    config,
480                    namespace,
481                    genesis_path,
482                })
483            }
484            #[cfg(feature = "storage-service")]
485            InnerStorageConfig::Service { endpoint } => {
486                let inner_config = StorageServiceStoreInternalConfig {
487                    endpoint: endpoint.clone(),
488                    max_concurrent_queries: options.storage_max_concurrent_queries,
489                    max_stream_queries: options.storage_max_stream_queries,
490                };
491                let config = StorageServiceStoreConfig {
492                    inner_config,
493                    storage_cache_config: options.storage_cache_config(),
494                };
495                Ok(StoreConfig::StorageService { config, namespace })
496            }
497            #[cfg(feature = "rocksdb")]
498            InnerStorageConfig::RocksDb { path, spawn_mode } => {
499                let path_with_guard = PathWithGuard::new(path.to_path_buf());
500                let inner_config = RocksDbStoreInternalConfig {
501                    spawn_mode: *spawn_mode,
502                    path_with_guard,
503                    max_stream_queries: options.storage_max_stream_queries,
504                };
505                let config = RocksDbStoreConfig {
506                    inner_config,
507                    storage_cache_config: options.storage_cache_config(),
508                };
509                Ok(StoreConfig::RocksDb { config, namespace })
510            }
511            #[cfg(feature = "dynamodb")]
512            InnerStorageConfig::DynamoDb { use_dynamodb_local } => {
513                let inner_config = DynamoDbStoreInternalConfig {
514                    use_dynamodb_local: *use_dynamodb_local,
515                    max_concurrent_queries: options.storage_max_concurrent_queries,
516                    max_stream_queries: options.storage_max_stream_queries,
517                };
518                let config = DynamoDbStoreConfig {
519                    inner_config,
520                    storage_cache_config: options.storage_cache_config(),
521                };
522                Ok(StoreConfig::DynamoDb { config, namespace })
523            }
524            #[cfg(feature = "scylladb")]
525            InnerStorageConfig::ScyllaDb { uri } => {
526                let inner_config = ScyllaDbStoreInternalConfig {
527                    uri: uri.clone(),
528                    max_stream_queries: options.storage_max_stream_queries,
529                    max_concurrent_queries: options.storage_max_concurrent_queries,
530                    replication_factor: options.storage_replication_factor,
531                };
532                let config = ScyllaDbStoreConfig {
533                    inner_config,
534                    storage_cache_config: options.storage_cache_config(),
535                };
536                Ok(StoreConfig::ScyllaDb { config, namespace })
537            }
538            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
539            InnerStorageConfig::DualRocksDbScyllaDb {
540                path_with_guard,
541                spawn_mode,
542                uri,
543            } => {
544                let inner_config = RocksDbStoreInternalConfig {
545                    spawn_mode: *spawn_mode,
546                    path_with_guard: path_with_guard.clone(),
547                    max_stream_queries: options.storage_max_stream_queries,
548                };
549                let first_config = RocksDbStoreConfig {
550                    inner_config,
551                    storage_cache_config: options.storage_cache_config(),
552                };
553
554                let inner_config = ScyllaDbStoreInternalConfig {
555                    uri: uri.clone(),
556                    max_stream_queries: options.storage_max_stream_queries,
557                    max_concurrent_queries: options.storage_max_concurrent_queries,
558                    replication_factor: options.storage_replication_factor,
559                };
560                let second_config = ScyllaDbStoreConfig {
561                    inner_config,
562                    storage_cache_config: options.storage_cache_config(),
563                };
564
565                let config = DualStoreConfig {
566                    first_config,
567                    second_config,
568                };
569                Ok(StoreConfig::DualRocksDbScyllaDb { config, namespace })
570            }
571        }
572    }
573}
574
575impl fmt::Display for StorageConfig {
576    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
577        let namespace = &self.namespace;
578        match &self.inner_storage_config {
579            #[cfg(feature = "storage-service")]
580            InnerStorageConfig::Service { endpoint } => {
581                write!(f, "service:tcp:{}:{}", endpoint, namespace)
582            }
583            InnerStorageConfig::Memory { genesis_path } => {
584                write!(f, "memory:{}:{}", genesis_path.display(), namespace)
585            }
586            #[cfg(feature = "rocksdb")]
587            InnerStorageConfig::RocksDb { path, spawn_mode } => {
588                let spawn_mode = spawn_mode.to_string();
589                write!(f, "rocksdb:{}:{}:{}", path.display(), spawn_mode, namespace)
590            }
591            #[cfg(feature = "dynamodb")]
592            InnerStorageConfig::DynamoDb { use_dynamodb_local } => match use_dynamodb_local {
593                true => write!(f, "dynamodb:{}:dynamodb_local", namespace),
594                false => write!(f, "dynamodb:{}:env", namespace),
595            },
596            #[cfg(feature = "scylladb")]
597            InnerStorageConfig::ScyllaDb { uri } => {
598                write!(f, "scylladb:tcp:{}:{}", uri, namespace)
599            }
600            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
601            InnerStorageConfig::DualRocksDbScyllaDb {
602                path_with_guard,
603                spawn_mode,
604                uri,
605            } => {
606                write!(
607                    f,
608                    "dualrocksdbscylladb:{}:{}:tcp:{}:{}",
609                    path_with_guard.path_buf.display(),
610                    spawn_mode,
611                    uri,
612                    namespace
613                )
614            }
615        }
616    }
617}
618
619#[async_trait]
620pub trait Runnable {
621    type Output;
622
623    async fn run<S>(self, storage: S) -> Self::Output
624    where
625        S: Storage + Clone + Send + Sync + 'static;
626}
627
628#[async_trait]
629pub trait RunnableWithStore {
630    type Output;
631
632    async fn run<D>(
633        self,
634        config: D::Config,
635        namespace: String,
636    ) -> Result<Self::Output, anyhow::Error>
637    where
638        D: KeyValueDatabase + Clone + Send + Sync + 'static,
639        D::Store: KeyValueStore + Clone + Send + Sync + 'static,
640        D::Error: Send + Sync;
641}
642
643impl StoreConfig {
644    pub async fn run_with_storage<Job>(
645        self,
646        wasm_runtime: Option<WasmRuntime>,
647        allow_application_logs: bool,
648        job: Job,
649    ) -> Result<Job::Output, anyhow::Error>
650    where
651        Job: Runnable,
652    {
653        match self {
654            StoreConfig::Memory {
655                config,
656                namespace,
657                genesis_path,
658            } => {
659                let mut storage = DbStorage::<MemoryDatabase, _>::maybe_create_and_connect(
660                    &config,
661                    &namespace,
662                    wasm_runtime,
663                )
664                .await?
665                .with_allow_application_logs(allow_application_logs);
666                let genesis_config = crate::util::read_json::<GenesisConfig>(genesis_path)?;
667                // Memory storage must be initialized every time.
668                genesis_config.initialize_storage(&mut storage).await?;
669                Ok(job.run(storage).await)
670            }
671            #[cfg(feature = "storage-service")]
672            StoreConfig::StorageService { config, namespace } => {
673                let storage = DbStorage::<StorageServiceDatabase, _>::connect(
674                    &config,
675                    &namespace,
676                    wasm_runtime,
677                )
678                .await?
679                .with_allow_application_logs(allow_application_logs);
680                Ok(job.run(storage).await)
681            }
682            #[cfg(feature = "rocksdb")]
683            StoreConfig::RocksDb { config, namespace } => {
684                let storage =
685                    DbStorage::<RocksDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
686                        .await?
687                        .with_allow_application_logs(allow_application_logs);
688                Ok(job.run(storage).await)
689            }
690            #[cfg(feature = "dynamodb")]
691            StoreConfig::DynamoDb { config, namespace } => {
692                let storage =
693                    DbStorage::<DynamoDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
694                        .await?
695                        .with_allow_application_logs(allow_application_logs);
696                Ok(job.run(storage).await)
697            }
698            #[cfg(feature = "scylladb")]
699            StoreConfig::ScyllaDb { config, namespace } => {
700                let storage =
701                    DbStorage::<ScyllaDbDatabase, _>::connect(&config, &namespace, wasm_runtime)
702                        .await?
703                        .with_allow_application_logs(allow_application_logs);
704                Ok(job.run(storage).await)
705            }
706            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
707            StoreConfig::DualRocksDbScyllaDb { config, namespace } => {
708                let storage = DbStorage::<
709                    DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>,
710                    _,
711                >::connect(&config, &namespace, wasm_runtime)
712                .await?
713                .with_allow_application_logs(allow_application_logs);
714                Ok(job.run(storage).await)
715            }
716        }
717    }
718
719    #[allow(unused_variables)]
720    pub async fn run_with_store<Job>(self, job: Job) -> Result<Job::Output, anyhow::Error>
721    where
722        Job: RunnableWithStore,
723    {
724        match self {
725            StoreConfig::Memory { .. } => {
726                Err(anyhow!("Cannot run admin operations on the memory store"))
727            }
728            #[cfg(feature = "storage-service")]
729            StoreConfig::StorageService { config, namespace } => {
730                Ok(job.run::<StorageServiceDatabase>(config, namespace).await?)
731            }
732            #[cfg(feature = "rocksdb")]
733            StoreConfig::RocksDb { config, namespace } => {
734                Ok(job.run::<RocksDbDatabase>(config, namespace).await?)
735            }
736            #[cfg(feature = "dynamodb")]
737            StoreConfig::DynamoDb { config, namespace } => {
738                Ok(job.run::<DynamoDbDatabase>(config, namespace).await?)
739            }
740            #[cfg(feature = "scylladb")]
741            StoreConfig::ScyllaDb { config, namespace } => {
742                Ok(job.run::<ScyllaDbDatabase>(config, namespace).await?)
743            }
744            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
745            StoreConfig::DualRocksDbScyllaDb { config, namespace } => Ok(job
746                .run::<DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>>(
747                    config, namespace,
748                )
749                .await?),
750        }
751    }
752
753    pub async fn initialize(self, config: &GenesisConfig) -> Result<(), anyhow::Error> {
754        self.run_with_store(InitializeStorageJob(config)).await
755    }
756}
757
758struct InitializeStorageJob<'a>(&'a GenesisConfig);
759
760#[async_trait]
761impl RunnableWithStore for InitializeStorageJob<'_> {
762    type Output = ();
763
764    async fn run<D>(
765        self,
766        config: D::Config,
767        namespace: String,
768    ) -> Result<Self::Output, anyhow::Error>
769    where
770        D: KeyValueDatabase + Clone + Send + Sync + 'static,
771        D::Store: KeyValueStore + Clone + Send + Sync + 'static,
772        D::Error: Send + Sync,
773    {
774        let mut storage =
775            DbStorage::<D, _>::maybe_create_and_connect(&config, &namespace, None).await?;
776        self.0.initialize_storage(&mut storage).await?;
777        Ok(())
778    }
779}
780
781#[test]
782fn test_memory_storage_config_from_str() {
783    assert_eq!(
784        StorageConfig::from_str("memory:path/to/genesis.json").unwrap(),
785        StorageConfig {
786            inner_storage_config: InnerStorageConfig::Memory {
787                genesis_path: PathBuf::from("path/to/genesis.json")
788            },
789            namespace: DEFAULT_NAMESPACE.into()
790        }
791    );
792    assert_eq!(
793        StorageConfig::from_str("memory:path/to/genesis.json:namespace").unwrap(),
794        StorageConfig {
795            inner_storage_config: InnerStorageConfig::Memory {
796                genesis_path: PathBuf::from("path/to/genesis.json")
797            },
798            namespace: "namespace".into()
799        }
800    );
801    assert!(StorageConfig::from_str("memory").is_err(),);
802}
803
804#[cfg(feature = "storage-service")]
805#[test]
806fn test_shared_store_config_from_str() {
807    assert_eq!(
808        StorageConfig::from_str("service:tcp:127.0.0.1:8942:linera").unwrap(),
809        StorageConfig {
810            inner_storage_config: InnerStorageConfig::Service {
811                endpoint: "127.0.0.1:8942".to_string()
812            },
813            namespace: "linera".into()
814        }
815    );
816    assert!(StorageConfig::from_str("service:tcp:127.0.0.1:8942").is_err());
817    assert!(StorageConfig::from_str("service:tcp:127.0.0.1:linera").is_err());
818}
819
820#[cfg(feature = "rocksdb")]
821#[test]
822fn test_rocks_db_storage_config_from_str() {
823    assert!(StorageConfig::from_str("rocksdb_foo.db").is_err());
824    assert_eq!(
825        StorageConfig::from_str("rocksdb:foo.db").unwrap(),
826        StorageConfig {
827            inner_storage_config: InnerStorageConfig::RocksDb {
828                path: "foo.db".into(),
829                spawn_mode: RocksDbSpawnMode::SpawnBlocking,
830            },
831            namespace: DEFAULT_NAMESPACE.to_string()
832        }
833    );
834    assert_eq!(
835        StorageConfig::from_str("rocksdb:foo.db:block_in_place").unwrap(),
836        StorageConfig {
837            inner_storage_config: InnerStorageConfig::RocksDb {
838                path: "foo.db".into(),
839                spawn_mode: RocksDbSpawnMode::BlockInPlace,
840            },
841            namespace: DEFAULT_NAMESPACE.to_string()
842        }
843    );
844    assert_eq!(
845        StorageConfig::from_str("rocksdb:foo.db:block_in_place:chosen_namespace").unwrap(),
846        StorageConfig {
847            inner_storage_config: InnerStorageConfig::RocksDb {
848                path: "foo.db".into(),
849                spawn_mode: RocksDbSpawnMode::BlockInPlace,
850            },
851            namespace: "chosen_namespace".into()
852        }
853    );
854}
855
856#[cfg(feature = "dynamodb")]
857#[test]
858fn test_aws_storage_config_from_str() {
859    assert_eq!(
860        StorageConfig::from_str("dynamodb:table").unwrap(),
861        StorageConfig {
862            inner_storage_config: InnerStorageConfig::DynamoDb {
863                use_dynamodb_local: false
864            },
865            namespace: "table".to_string()
866        }
867    );
868    assert_eq!(
869        StorageConfig::from_str("dynamodb:table:env").unwrap(),
870        StorageConfig {
871            inner_storage_config: InnerStorageConfig::DynamoDb {
872                use_dynamodb_local: false
873            },
874            namespace: "table".to_string()
875        }
876    );
877    assert_eq!(
878        StorageConfig::from_str("dynamodb:table:dynamodb_local").unwrap(),
879        StorageConfig {
880            inner_storage_config: InnerStorageConfig::DynamoDb {
881                use_dynamodb_local: true
882            },
883            namespace: "table".to_string()
884        }
885    );
886    assert!(StorageConfig::from_str("dynamodb").is_err());
887    assert!(StorageConfig::from_str("dynamodb:").is_err());
888    assert!(StorageConfig::from_str("dynamodb:1").is_err());
889    assert!(StorageConfig::from_str("dynamodb:wrong:endpoint").is_err());
890}
891
892#[cfg(feature = "scylladb")]
893#[test]
894fn test_scylla_db_storage_config_from_str() {
895    assert_eq!(
896        StorageConfig::from_str("scylladb:").unwrap(),
897        StorageConfig {
898            inner_storage_config: InnerStorageConfig::ScyllaDb {
899                uri: "localhost:9042".to_string()
900            },
901            namespace: DEFAULT_NAMESPACE.to_string()
902        }
903    );
904    assert_eq!(
905        StorageConfig::from_str("scylladb:tcp:db_hostname:230:table_other_storage").unwrap(),
906        StorageConfig {
907            inner_storage_config: InnerStorageConfig::ScyllaDb {
908                uri: "db_hostname:230".to_string()
909            },
910            namespace: "table_other_storage".to_string()
911        }
912    );
913    assert_eq!(
914        StorageConfig::from_str("scylladb:tcp:db_hostname:230").unwrap(),
915        StorageConfig {
916            inner_storage_config: InnerStorageConfig::ScyllaDb {
917                uri: "db_hostname:230".to_string()
918            },
919            namespace: DEFAULT_NAMESPACE.to_string()
920        }
921    );
922    assert!(StorageConfig::from_str("scylladb:-10").is_err());
923    assert!(StorageConfig::from_str("scylladb:70000").is_err());
924    assert!(StorageConfig::from_str("scylladb:230:234").is_err());
925    assert!(StorageConfig::from_str("scylladb:tcp:address1").is_err());
926    assert!(StorageConfig::from_str("scylladb:tcp:address1:tcp:/address2").is_err());
927    assert!(StorageConfig::from_str("scylladb:wrong").is_err());
928}