Skip to main content

linera_storage_runtime/
storage_config.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{fmt, path::PathBuf, str::FromStr};
5
6use anyhow::{anyhow, bail};
7use linera_storage::DEFAULT_NAMESPACE;
8#[cfg(feature = "rocksdb")]
9use linera_views::rocks_db::{PathWithGuard, RocksDbSpawnMode};
10use tracing::error;
11#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
12use {linera_views::backends::dual::DualStoreConfig, std::path::Path};
13#[cfg(feature = "scylladb")]
14use {std::num::NonZeroU16, tracing::debug};
15
16use crate::{CommonStorageOptions, StoreConfig};
17
18/// The description of a storage implementation.
19#[derive(Clone, Debug)]
20#[cfg_attr(any(test), derive(Eq, PartialEq))]
21pub enum InnerStorageConfig {
22    /// The memory description.
23    Memory {
24        /// The path to the genesis configuration. This is needed because we reinitialize
25        /// memory databases from the genesis config everytime.
26        genesis_path: PathBuf,
27    },
28    /// The storage service description.
29    #[cfg(feature = "storage-service")]
30    Service {
31        /// The endpoint used.
32        endpoint: String,
33    },
34    /// The RocksDB description.
35    #[cfg(feature = "rocksdb")]
36    RocksDb {
37        /// The path used.
38        path: PathBuf,
39        /// Whether to use `block_in_place` or `spawn_blocking`.
40        spawn_mode: RocksDbSpawnMode,
41    },
42    /// The ScyllaDB description.
43    #[cfg(feature = "scylladb")]
44    ScyllaDb {
45        /// The URI for accessing the database.
46        uri: String,
47    },
48    /// The dual RocksDB / ScyllaDB description.
49    #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
50    DualRocksDbScyllaDb {
51        /// The path used.
52        path_with_guard: PathWithGuard,
53        /// Whether to use `block_in_place` or `spawn_blocking`.
54        spawn_mode: RocksDbSpawnMode,
55        /// The URI for accessing the database.
56        uri: String,
57    },
58}
59
60/// The description of a storage implementation.
61#[derive(Clone, Debug)]
62#[cfg_attr(any(test), derive(Eq, PartialEq))]
63pub struct StorageConfig {
64    /// The inner storage config.
65    pub inner_storage_config: InnerStorageConfig,
66    /// The namespace used
67    pub namespace: String,
68}
69
70const MEMORY: &str = "memory:";
71#[cfg(feature = "storage-service")]
72const STORAGE_SERVICE: &str = "service:";
73#[cfg(feature = "rocksdb")]
74const ROCKS_DB: &str = "rocksdb:";
75#[cfg(feature = "scylladb")]
76const SCYLLA_DB: &str = "scylladb:";
77#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
78const DUAL_ROCKS_DB_SCYLLA_DB: &str = "dualrocksdbscylladb:";
79
80impl FromStr for StorageConfig {
81    type Err = anyhow::Error;
82
83    fn from_str(input: &str) -> Result<Self, Self::Err> {
84        if let Some(s) = input.strip_prefix(MEMORY) {
85            let parts = s.split(':').collect::<Vec<_>>();
86            if parts.len() == 1 {
87                let genesis_path = parts[0].to_string().into();
88                let namespace = DEFAULT_NAMESPACE.to_string();
89                let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
90                return Ok(StorageConfig {
91                    inner_storage_config,
92                    namespace,
93                });
94            }
95            if parts.len() != 2 {
96                bail!("We should have one genesis config path and one optional namespace");
97            }
98            let genesis_path = parts[0].to_string().into();
99            let namespace = parts[1].to_string();
100            let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
101            return Ok(StorageConfig {
102                inner_storage_config,
103                namespace,
104            });
105        }
106        #[cfg(feature = "storage-service")]
107        if let Some(s) = input.strip_prefix(STORAGE_SERVICE) {
108            if s.is_empty() {
109                bail!(
110                    "For Storage service, the formatting has to be service:endpoint:namespace,\
111example service:tcp:127.0.0.1:7878:table_do_my_test"
112                );
113            }
114            let parts = s.split(':').collect::<Vec<_>>();
115            if parts.len() != 4 {
116                bail!("We should have one endpoint and one namespace");
117            }
118            let protocol = parts[0];
119            if protocol != "tcp" {
120                bail!("Only allowed protocol is tcp");
121            }
122            let endpoint = parts[1];
123            let port = parts[2];
124            let mut endpoint = endpoint.to_string();
125            endpoint.push(':');
126            endpoint.push_str(port);
127            let endpoint = endpoint.to_string();
128            let namespace = parts[3].to_string();
129            let inner_storage_config = InnerStorageConfig::Service { endpoint };
130            return Ok(StorageConfig {
131                inner_storage_config,
132                namespace,
133            });
134        }
135        #[cfg(feature = "rocksdb")]
136        if let Some(s) = input.strip_prefix(ROCKS_DB) {
137            if s.is_empty() {
138                bail!(
139                    "For RocksDB, the formatting has to be rocksdb:directory or rocksdb:directory:spawn_mode:namespace");
140            }
141            let parts = s.split(':').collect::<Vec<_>>();
142            if parts.len() == 1 {
143                let path = parts[0].to_string().into();
144                let namespace = DEFAULT_NAMESPACE.to_string();
145                let spawn_mode = RocksDbSpawnMode::SpawnBlocking;
146                let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
147                return Ok(StorageConfig {
148                    inner_storage_config,
149                    namespace,
150                });
151            }
152            if parts.len() == 2 || parts.len() == 3 {
153                let path = parts[0].to_string().into();
154                let spawn_mode_name = parts
155                    .get(1)
156                    .copied()
157                    .expect("validated by the parts length check above");
158                let spawn_mode = match spawn_mode_name {
159                    "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
160                    "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
161                    "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
162                    _ => Err(anyhow!("Failed to parse {spawn_mode_name} as a spawn_mode")),
163                }?;
164                let namespace = if parts.len() == 2 {
165                    DEFAULT_NAMESPACE.to_string()
166                } else {
167                    parts[2].to_string()
168                };
169                let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
170                return Ok(StorageConfig {
171                    inner_storage_config,
172                    namespace,
173                });
174            }
175            bail!("We should have one, two or three parts");
176        }
177        #[cfg(feature = "scylladb")]
178        if let Some(s) = input.strip_prefix(SCYLLA_DB) {
179            let mut uri: Option<String> = None;
180            let mut namespace: Option<String> = None;
181            let parse_error: &'static str = "Correct format is tcp:db_hostname:port.";
182            if !s.is_empty() {
183                let mut parts = s.split(':');
184                while let Some(part) = parts.next() {
185                    match part {
186                        "tcp" => {
187                            let address = parts.next().ok_or_else(|| {
188                                anyhow!("Failed to find address for {s}. {parse_error}")
189                            })?;
190                            let port_str = parts.next().ok_or_else(|| {
191                                anyhow!("Failed to find port for {s}. {parse_error}")
192                            })?;
193                            let port = NonZeroU16::from_str(port_str).map_err(|_| {
194                                anyhow!(
195                                    "Failed to find parse port {port_str} for {s}. {parse_error}",
196                                )
197                            })?;
198                            if uri.is_some() {
199                                bail!("The uri has already been assigned");
200                            }
201                            uri = Some(format!("{address}:{port}"));
202                        }
203                        _ if part.starts_with("table") => {
204                            if namespace.is_some() {
205                                bail!("The namespace has already been assigned");
206                            }
207                            namespace = Some(part.to_string());
208                        }
209                        _ => {
210                            bail!("the entry \"{part}\" is not matching");
211                        }
212                    }
213                }
214            }
215            let uri = uri.unwrap_or_else(|| "localhost:9042".to_string());
216            let namespace = namespace.unwrap_or_else(|| DEFAULT_NAMESPACE.to_string());
217            let inner_storage_config = InnerStorageConfig::ScyllaDb { uri };
218            debug!("ScyllaDB connection info: {:?}", inner_storage_config);
219            return Ok(StorageConfig {
220                inner_storage_config,
221                namespace,
222            });
223        }
224        #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
225        if let Some(s) = input.strip_prefix(DUAL_ROCKS_DB_SCYLLA_DB) {
226            let parts = s.split(':').collect::<Vec<_>>();
227            if parts.len() != 5 && parts.len() != 6 {
228                bail!(
229                    "For DualRocksDbScyllaDb, the formatting has to be dualrocksdbscylladb:directory:mode:tcp:hostname:port:namespace"
230                );
231            }
232            let path = Path::new(parts[0]);
233            let path = path.to_path_buf();
234            let path_with_guard = PathWithGuard::new(path);
235            let spawn_mode_name = parts
236                .get(1)
237                .copied()
238                .expect("validated by the parts length check above");
239            let spawn_mode = match spawn_mode_name {
240                "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
241                "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
242                "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
243                _ => Err(anyhow!("Failed to parse {spawn_mode_name} as a spawn_mode",)),
244            }?;
245            let protocol = parts[2];
246            if protocol != "tcp" {
247                bail!("The only allowed protocol is tcp");
248            }
249            let address = parts[3];
250            let port_str = parts[4];
251            let port = NonZeroU16::from_str(port_str)
252                .map_err(|_| anyhow!("Failed to find parse port {port_str} for {s}"))?;
253            let uri = format!("{address}:{port}");
254            let inner_storage_config = InnerStorageConfig::DualRocksDbScyllaDb {
255                path_with_guard,
256                spawn_mode,
257                uri,
258            };
259            let namespace = if parts.len() == 5 {
260                DEFAULT_NAMESPACE.to_string()
261            } else {
262                parts[5].to_string()
263            };
264            return Ok(StorageConfig {
265                inner_storage_config,
266                namespace,
267            });
268        }
269        error!("available storage: memory");
270        #[cfg(feature = "storage-service")]
271        error!("Also available is linera-storage-service");
272        #[cfg(feature = "rocksdb")]
273        error!("Also available is RocksDB");
274        #[cfg(feature = "scylladb")]
275        error!("Also available is ScyllaDB");
276        #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
277        error!("Also available is DualRocksDbScyllaDb");
278        Err(anyhow!("The input has not matched: {input}"))
279    }
280}
281
282impl StorageConfig {
283    /// Appends a shard-specific subdirectory to the storage path, if applicable.
284    #[allow(unused_variables)]
285    pub fn maybe_append_shard_path(&mut self, shard: usize) -> std::io::Result<()> {
286        match &mut self.inner_storage_config {
287            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
288            InnerStorageConfig::DualRocksDbScyllaDb {
289                path_with_guard,
290                spawn_mode: _,
291                uri: _,
292            } => {
293                let shard_str = format!("shard_{shard}");
294                path_with_guard.path_buf.push(shard_str);
295                std::fs::create_dir_all(&path_with_guard.path_buf)
296            }
297            _ => Ok(()),
298        }
299    }
300
301    /// The addition of the common config to get a full configuration
302    pub fn add_common_storage_options(
303        &self,
304        options: &CommonStorageOptions,
305    ) -> Result<StoreConfig, anyhow::Error> {
306        let namespace = self.namespace.clone();
307        match &self.inner_storage_config {
308            InnerStorageConfig::Memory { genesis_path } => {
309                let config = linera_views::memory::MemoryStoreConfig {
310                    max_stream_queries: options.storage_max_stream_queries,
311                    kill_on_drop: false,
312                };
313                let genesis_path = genesis_path.clone();
314                Ok(StoreConfig::Memory {
315                    config,
316                    namespace,
317                    genesis_path,
318                })
319            }
320            #[cfg(feature = "storage-service")]
321            InnerStorageConfig::Service { endpoint } => {
322                let inner_config =
323                    linera_storage_service::common::StorageServiceStoreInternalConfig {
324                        endpoint: endpoint.clone(),
325                        max_concurrent_queries: options.storage_max_concurrent_queries,
326                        max_stream_queries: options.storage_max_stream_queries,
327                    };
328                let config = linera_storage_service::common::StorageServiceStoreConfig {
329                    inner_config,
330                    storage_cache_config: options.views_storage_cache_config(),
331                };
332                Ok(StoreConfig::StorageService { config, namespace })
333            }
334            #[cfg(feature = "rocksdb")]
335            InnerStorageConfig::RocksDb { path, spawn_mode } => {
336                let path_with_guard = PathWithGuard::new(path.to_path_buf());
337                let inner_config = linera_views::rocks_db::RocksDbStoreInternalConfig {
338                    spawn_mode: *spawn_mode,
339                    path_with_guard,
340                    max_stream_queries: options.storage_max_stream_queries,
341                };
342                let config = linera_views::rocks_db::RocksDbStoreConfig {
343                    inner_config,
344                    storage_cache_config: options.views_storage_cache_config(),
345                };
346                Ok(StoreConfig::RocksDb { config, namespace })
347            }
348            #[cfg(feature = "scylladb")]
349            InnerStorageConfig::ScyllaDb { uri } => {
350                let inner_config = linera_views::scylla_db::ScyllaDbStoreInternalConfig {
351                    uri: uri.clone(),
352                    max_stream_queries: options.storage_max_stream_queries,
353                    max_concurrent_queries: options.storage_max_concurrent_queries,
354                    replication_factor: options.storage_replication_factor,
355                };
356                let config = linera_views::scylla_db::ScyllaDbStoreConfig {
357                    inner_config,
358                    storage_cache_config: options.views_storage_cache_config(),
359                };
360                Ok(StoreConfig::ScyllaDb { config, namespace })
361            }
362            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
363            InnerStorageConfig::DualRocksDbScyllaDb {
364                path_with_guard,
365                spawn_mode,
366                uri,
367            } => {
368                let inner_config = linera_views::rocks_db::RocksDbStoreInternalConfig {
369                    spawn_mode: *spawn_mode,
370                    path_with_guard: path_with_guard.clone(),
371                    max_stream_queries: options.storage_max_stream_queries,
372                };
373                let first_config = linera_views::rocks_db::RocksDbStoreConfig {
374                    inner_config,
375                    storage_cache_config: options.views_storage_cache_config(),
376                };
377
378                let inner_config = linera_views::scylla_db::ScyllaDbStoreInternalConfig {
379                    uri: uri.clone(),
380                    max_stream_queries: options.storage_max_stream_queries,
381                    max_concurrent_queries: options.storage_max_concurrent_queries,
382                    replication_factor: options.storage_replication_factor,
383                };
384                let second_config = linera_views::scylla_db::ScyllaDbStoreConfig {
385                    inner_config,
386                    storage_cache_config: options.views_storage_cache_config(),
387                };
388
389                let config = DualStoreConfig {
390                    first_config,
391                    second_config,
392                };
393                Ok(StoreConfig::DualRocksDbScyllaDb { config, namespace })
394            }
395        }
396    }
397}
398
399impl fmt::Display for StorageConfig {
400    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
401        let namespace = &self.namespace;
402        match &self.inner_storage_config {
403            #[cfg(feature = "storage-service")]
404            InnerStorageConfig::Service { endpoint } => {
405                write!(f, "service:tcp:{endpoint}:{namespace}")
406            }
407            InnerStorageConfig::Memory { genesis_path } => {
408                write!(f, "memory:{}:{}", genesis_path.display(), namespace)
409            }
410            #[cfg(feature = "rocksdb")]
411            InnerStorageConfig::RocksDb { path, spawn_mode } => {
412                let spawn_mode = spawn_mode.to_string();
413                write!(f, "rocksdb:{}:{}:{}", path.display(), spawn_mode, namespace)
414            }
415            #[cfg(feature = "scylladb")]
416            InnerStorageConfig::ScyllaDb { uri } => {
417                write!(f, "scylladb:tcp:{uri}:{namespace}")
418            }
419            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
420            InnerStorageConfig::DualRocksDbScyllaDb {
421                path_with_guard,
422                spawn_mode,
423                uri,
424            } => {
425                write!(
426                    f,
427                    "dualrocksdbscylladb:{}:{}:tcp:{}:{}",
428                    path_with_guard.path_buf.display(),
429                    spawn_mode,
430                    uri,
431                    namespace
432                )
433            }
434        }
435    }
436}
437
438#[test]
439fn test_memory_storage_config_from_str() {
440    assert_eq!(
441        StorageConfig::from_str("memory:path/to/genesis.json").unwrap(),
442        StorageConfig {
443            inner_storage_config: InnerStorageConfig::Memory {
444                genesis_path: PathBuf::from("path/to/genesis.json")
445            },
446            namespace: DEFAULT_NAMESPACE.into()
447        }
448    );
449    assert_eq!(
450        StorageConfig::from_str("memory:path/to/genesis.json:namespace").unwrap(),
451        StorageConfig {
452            inner_storage_config: InnerStorageConfig::Memory {
453                genesis_path: PathBuf::from("path/to/genesis.json")
454            },
455            namespace: "namespace".into()
456        }
457    );
458    assert!(StorageConfig::from_str("memory").is_err(),);
459}
460
461#[cfg(feature = "storage-service")]
462#[test]
463fn test_shared_store_config_from_str() {
464    assert_eq!(
465        StorageConfig::from_str("service:tcp:127.0.0.1:8942:linera").unwrap(),
466        StorageConfig {
467            inner_storage_config: InnerStorageConfig::Service {
468                endpoint: "127.0.0.1:8942".to_string()
469            },
470            namespace: "linera".into()
471        }
472    );
473    assert!(StorageConfig::from_str("service:tcp:127.0.0.1:8942").is_err());
474    assert!(StorageConfig::from_str("service:tcp:127.0.0.1:linera").is_err());
475}
476
477#[cfg(feature = "rocksdb")]
478#[test]
479fn test_rocks_db_storage_config_from_str() {
480    assert!(StorageConfig::from_str("rocksdb_foo.db").is_err());
481    assert_eq!(
482        StorageConfig::from_str("rocksdb:foo.db").unwrap(),
483        StorageConfig {
484            inner_storage_config: InnerStorageConfig::RocksDb {
485                path: "foo.db".into(),
486                spawn_mode: RocksDbSpawnMode::SpawnBlocking,
487            },
488            namespace: DEFAULT_NAMESPACE.to_string()
489        }
490    );
491    assert_eq!(
492        StorageConfig::from_str("rocksdb:foo.db:block_in_place").unwrap(),
493        StorageConfig {
494            inner_storage_config: InnerStorageConfig::RocksDb {
495                path: "foo.db".into(),
496                spawn_mode: RocksDbSpawnMode::BlockInPlace,
497            },
498            namespace: DEFAULT_NAMESPACE.to_string()
499        }
500    );
501    assert_eq!(
502        StorageConfig::from_str("rocksdb:foo.db:block_in_place:chosen_namespace").unwrap(),
503        StorageConfig {
504            inner_storage_config: InnerStorageConfig::RocksDb {
505                path: "foo.db".into(),
506                spawn_mode: RocksDbSpawnMode::BlockInPlace,
507            },
508            namespace: "chosen_namespace".into()
509        }
510    );
511}
512
513#[cfg(feature = "scylladb")]
514#[test]
515fn test_scylla_db_storage_config_from_str() {
516    assert_eq!(
517        StorageConfig::from_str("scylladb:").unwrap(),
518        StorageConfig {
519            inner_storage_config: InnerStorageConfig::ScyllaDb {
520                uri: "localhost:9042".to_string()
521            },
522            namespace: DEFAULT_NAMESPACE.to_string()
523        }
524    );
525    assert_eq!(
526        StorageConfig::from_str("scylladb:tcp:db_hostname:230:table_other_storage").unwrap(),
527        StorageConfig {
528            inner_storage_config: InnerStorageConfig::ScyllaDb {
529                uri: "db_hostname:230".to_string()
530            },
531            namespace: "table_other_storage".to_string()
532        }
533    );
534    assert_eq!(
535        StorageConfig::from_str("scylladb:tcp:db_hostname:230").unwrap(),
536        StorageConfig {
537            inner_storage_config: InnerStorageConfig::ScyllaDb {
538                uri: "db_hostname:230".to_string()
539            },
540            namespace: DEFAULT_NAMESPACE.to_string()
541        }
542    );
543    assert!(StorageConfig::from_str("scylladb:-10").is_err());
544    assert!(StorageConfig::from_str("scylladb:70000").is_err());
545    assert!(StorageConfig::from_str("scylladb:230:234").is_err());
546    assert!(StorageConfig::from_str("scylladb:tcp:address1").is_err());
547    assert!(StorageConfig::from_str("scylladb:tcp:address1:tcp:/address2").is_err());
548    assert!(StorageConfig::from_str("scylladb:wrong").is_err());
549}