1use std::{fmt, path::PathBuf, str::FromStr};
5
6use anyhow::{anyhow, bail};
7use linera_storage::DEFAULT_NAMESPACE;
8#[cfg(feature = "rocksdb")]
9use linera_views::rocks_db::{PathWithGuard, RocksDbSpawnMode};
10use tracing::error;
11#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
12use {linera_views::backends::dual::DualStoreConfig, std::path::Path};
13#[cfg(feature = "scylladb")]
14use {std::num::NonZeroU16, tracing::debug};
15
16use crate::{CommonStorageOptions, StoreConfig};
17
18#[derive(Clone, Debug)]
20#[cfg_attr(any(test), derive(Eq, PartialEq))]
21pub enum InnerStorageConfig {
22 Memory {
24 genesis_path: PathBuf,
27 },
28 #[cfg(feature = "storage-service")]
30 Service {
31 endpoint: String,
33 },
34 #[cfg(feature = "rocksdb")]
36 RocksDb {
37 path: PathBuf,
39 spawn_mode: RocksDbSpawnMode,
41 },
42 #[cfg(feature = "scylladb")]
44 ScyllaDb {
45 uri: String,
47 },
48 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
50 DualRocksDbScyllaDb {
51 path_with_guard: PathWithGuard,
53 spawn_mode: RocksDbSpawnMode,
55 uri: String,
57 },
58}
59
60#[derive(Clone, Debug)]
62#[cfg_attr(any(test), derive(Eq, PartialEq))]
63pub struct StorageConfig {
64 pub inner_storage_config: InnerStorageConfig,
66 pub namespace: String,
68}
69
70const MEMORY: &str = "memory:";
71#[cfg(feature = "storage-service")]
72const STORAGE_SERVICE: &str = "service:";
73#[cfg(feature = "rocksdb")]
74const ROCKS_DB: &str = "rocksdb:";
75#[cfg(feature = "scylladb")]
76const SCYLLA_DB: &str = "scylladb:";
77#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
78const DUAL_ROCKS_DB_SCYLLA_DB: &str = "dualrocksdbscylladb:";
79
80impl FromStr for StorageConfig {
81 type Err = anyhow::Error;
82
83 fn from_str(input: &str) -> Result<Self, Self::Err> {
84 if let Some(s) = input.strip_prefix(MEMORY) {
85 let parts = s.split(':').collect::<Vec<_>>();
86 if parts.len() == 1 {
87 let genesis_path = parts[0].to_string().into();
88 let namespace = DEFAULT_NAMESPACE.to_string();
89 let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
90 return Ok(StorageConfig {
91 inner_storage_config,
92 namespace,
93 });
94 }
95 if parts.len() != 2 {
96 bail!("We should have one genesis config path and one optional namespace");
97 }
98 let genesis_path = parts[0].to_string().into();
99 let namespace = parts[1].to_string();
100 let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
101 return Ok(StorageConfig {
102 inner_storage_config,
103 namespace,
104 });
105 }
106 #[cfg(feature = "storage-service")]
107 if let Some(s) = input.strip_prefix(STORAGE_SERVICE) {
108 if s.is_empty() {
109 bail!(
110 "For Storage service, the formatting has to be service:endpoint:namespace,\
111example service:tcp:127.0.0.1:7878:table_do_my_test"
112 );
113 }
114 let parts = s.split(':').collect::<Vec<_>>();
115 if parts.len() != 4 {
116 bail!("We should have one endpoint and one namespace");
117 }
118 let protocol = parts[0];
119 if protocol != "tcp" {
120 bail!("Only allowed protocol is tcp");
121 }
122 let endpoint = parts[1];
123 let port = parts[2];
124 let mut endpoint = endpoint.to_string();
125 endpoint.push(':');
126 endpoint.push_str(port);
127 let endpoint = endpoint.to_string();
128 let namespace = parts[3].to_string();
129 let inner_storage_config = InnerStorageConfig::Service { endpoint };
130 return Ok(StorageConfig {
131 inner_storage_config,
132 namespace,
133 });
134 }
135 #[cfg(feature = "rocksdb")]
136 if let Some(s) = input.strip_prefix(ROCKS_DB) {
137 if s.is_empty() {
138 bail!(
139 "For RocksDB, the formatting has to be rocksdb:directory or rocksdb:directory:spawn_mode:namespace");
140 }
141 let parts = s.split(':').collect::<Vec<_>>();
142 if parts.len() == 1 {
143 let path = parts[0].to_string().into();
144 let namespace = DEFAULT_NAMESPACE.to_string();
145 let spawn_mode = RocksDbSpawnMode::SpawnBlocking;
146 let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
147 return Ok(StorageConfig {
148 inner_storage_config,
149 namespace,
150 });
151 }
152 if parts.len() == 2 || parts.len() == 3 {
153 let path = parts[0].to_string().into();
154 let spawn_mode_name = parts
155 .get(1)
156 .copied()
157 .expect("validated by the parts length check above");
158 let spawn_mode = match spawn_mode_name {
159 "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
160 "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
161 "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
162 _ => Err(anyhow!("Failed to parse {spawn_mode_name} as a spawn_mode")),
163 }?;
164 let namespace = if parts.len() == 2 {
165 DEFAULT_NAMESPACE.to_string()
166 } else {
167 parts[2].to_string()
168 };
169 let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
170 return Ok(StorageConfig {
171 inner_storage_config,
172 namespace,
173 });
174 }
175 bail!("We should have one, two or three parts");
176 }
177 #[cfg(feature = "scylladb")]
178 if let Some(s) = input.strip_prefix(SCYLLA_DB) {
179 let mut uri: Option<String> = None;
180 let mut namespace: Option<String> = None;
181 let parse_error: &'static str = "Correct format is tcp:db_hostname:port.";
182 if !s.is_empty() {
183 let mut parts = s.split(':');
184 while let Some(part) = parts.next() {
185 match part {
186 "tcp" => {
187 let address = parts.next().ok_or_else(|| {
188 anyhow!("Failed to find address for {s}. {parse_error}")
189 })?;
190 let port_str = parts.next().ok_or_else(|| {
191 anyhow!("Failed to find port for {s}. {parse_error}")
192 })?;
193 let port = NonZeroU16::from_str(port_str).map_err(|_| {
194 anyhow!(
195 "Failed to find parse port {port_str} for {s}. {parse_error}",
196 )
197 })?;
198 if uri.is_some() {
199 bail!("The uri has already been assigned");
200 }
201 uri = Some(format!("{address}:{port}"));
202 }
203 _ if part.starts_with("table") => {
204 if namespace.is_some() {
205 bail!("The namespace has already been assigned");
206 }
207 namespace = Some(part.to_string());
208 }
209 _ => {
210 bail!("the entry \"{part}\" is not matching");
211 }
212 }
213 }
214 }
215 let uri = uri.unwrap_or_else(|| "localhost:9042".to_string());
216 let namespace = namespace.unwrap_or_else(|| DEFAULT_NAMESPACE.to_string());
217 let inner_storage_config = InnerStorageConfig::ScyllaDb { uri };
218 debug!("ScyllaDB connection info: {:?}", inner_storage_config);
219 return Ok(StorageConfig {
220 inner_storage_config,
221 namespace,
222 });
223 }
224 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
225 if let Some(s) = input.strip_prefix(DUAL_ROCKS_DB_SCYLLA_DB) {
226 let parts = s.split(':').collect::<Vec<_>>();
227 if parts.len() != 5 && parts.len() != 6 {
228 bail!(
229 "For DualRocksDbScyllaDb, the formatting has to be dualrocksdbscylladb:directory:mode:tcp:hostname:port:namespace"
230 );
231 }
232 let path = Path::new(parts[0]);
233 let path = path.to_path_buf();
234 let path_with_guard = PathWithGuard::new(path);
235 let spawn_mode_name = parts
236 .get(1)
237 .copied()
238 .expect("validated by the parts length check above");
239 let spawn_mode = match spawn_mode_name {
240 "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
241 "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
242 "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
243 _ => Err(anyhow!("Failed to parse {spawn_mode_name} as a spawn_mode",)),
244 }?;
245 let protocol = parts[2];
246 if protocol != "tcp" {
247 bail!("The only allowed protocol is tcp");
248 }
249 let address = parts[3];
250 let port_str = parts[4];
251 let port = NonZeroU16::from_str(port_str)
252 .map_err(|_| anyhow!("Failed to find parse port {port_str} for {s}"))?;
253 let uri = format!("{address}:{port}");
254 let inner_storage_config = InnerStorageConfig::DualRocksDbScyllaDb {
255 path_with_guard,
256 spawn_mode,
257 uri,
258 };
259 let namespace = if parts.len() == 5 {
260 DEFAULT_NAMESPACE.to_string()
261 } else {
262 parts[5].to_string()
263 };
264 return Ok(StorageConfig {
265 inner_storage_config,
266 namespace,
267 });
268 }
269 error!("available storage: memory");
270 #[cfg(feature = "storage-service")]
271 error!("Also available is linera-storage-service");
272 #[cfg(feature = "rocksdb")]
273 error!("Also available is RocksDB");
274 #[cfg(feature = "scylladb")]
275 error!("Also available is ScyllaDB");
276 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
277 error!("Also available is DualRocksDbScyllaDb");
278 Err(anyhow!("The input has not matched: {input}"))
279 }
280}
281
282impl StorageConfig {
283 #[allow(unused_variables)]
285 pub fn maybe_append_shard_path(&mut self, shard: usize) -> std::io::Result<()> {
286 match &mut self.inner_storage_config {
287 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
288 InnerStorageConfig::DualRocksDbScyllaDb {
289 path_with_guard,
290 spawn_mode: _,
291 uri: _,
292 } => {
293 let shard_str = format!("shard_{shard}");
294 path_with_guard.path_buf.push(shard_str);
295 std::fs::create_dir_all(&path_with_guard.path_buf)
296 }
297 _ => Ok(()),
298 }
299 }
300
301 pub fn add_common_storage_options(
303 &self,
304 options: &CommonStorageOptions,
305 ) -> Result<StoreConfig, anyhow::Error> {
306 let namespace = self.namespace.clone();
307 match &self.inner_storage_config {
308 InnerStorageConfig::Memory { genesis_path } => {
309 let config = linera_views::memory::MemoryStoreConfig {
310 max_stream_queries: options.storage_max_stream_queries,
311 kill_on_drop: false,
312 };
313 let genesis_path = genesis_path.clone();
314 Ok(StoreConfig::Memory {
315 config,
316 namespace,
317 genesis_path,
318 })
319 }
320 #[cfg(feature = "storage-service")]
321 InnerStorageConfig::Service { endpoint } => {
322 let inner_config =
323 linera_storage_service::common::StorageServiceStoreInternalConfig {
324 endpoint: endpoint.clone(),
325 max_concurrent_queries: options.storage_max_concurrent_queries,
326 max_stream_queries: options.storage_max_stream_queries,
327 };
328 let config = linera_storage_service::common::StorageServiceStoreConfig {
329 inner_config,
330 storage_cache_config: options.views_storage_cache_config(),
331 };
332 Ok(StoreConfig::StorageService { config, namespace })
333 }
334 #[cfg(feature = "rocksdb")]
335 InnerStorageConfig::RocksDb { path, spawn_mode } => {
336 let path_with_guard = PathWithGuard::new(path.to_path_buf());
337 let inner_config = linera_views::rocks_db::RocksDbStoreInternalConfig {
338 spawn_mode: *spawn_mode,
339 path_with_guard,
340 max_stream_queries: options.storage_max_stream_queries,
341 };
342 let config = linera_views::rocks_db::RocksDbStoreConfig {
343 inner_config,
344 storage_cache_config: options.views_storage_cache_config(),
345 };
346 Ok(StoreConfig::RocksDb { config, namespace })
347 }
348 #[cfg(feature = "scylladb")]
349 InnerStorageConfig::ScyllaDb { uri } => {
350 let inner_config = linera_views::scylla_db::ScyllaDbStoreInternalConfig {
351 uri: uri.clone(),
352 max_stream_queries: options.storage_max_stream_queries,
353 max_concurrent_queries: options.storage_max_concurrent_queries,
354 replication_factor: options.storage_replication_factor,
355 };
356 let config = linera_views::scylla_db::ScyllaDbStoreConfig {
357 inner_config,
358 storage_cache_config: options.views_storage_cache_config(),
359 };
360 Ok(StoreConfig::ScyllaDb { config, namespace })
361 }
362 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
363 InnerStorageConfig::DualRocksDbScyllaDb {
364 path_with_guard,
365 spawn_mode,
366 uri,
367 } => {
368 let inner_config = linera_views::rocks_db::RocksDbStoreInternalConfig {
369 spawn_mode: *spawn_mode,
370 path_with_guard: path_with_guard.clone(),
371 max_stream_queries: options.storage_max_stream_queries,
372 };
373 let first_config = linera_views::rocks_db::RocksDbStoreConfig {
374 inner_config,
375 storage_cache_config: options.views_storage_cache_config(),
376 };
377
378 let inner_config = linera_views::scylla_db::ScyllaDbStoreInternalConfig {
379 uri: uri.clone(),
380 max_stream_queries: options.storage_max_stream_queries,
381 max_concurrent_queries: options.storage_max_concurrent_queries,
382 replication_factor: options.storage_replication_factor,
383 };
384 let second_config = linera_views::scylla_db::ScyllaDbStoreConfig {
385 inner_config,
386 storage_cache_config: options.views_storage_cache_config(),
387 };
388
389 let config = DualStoreConfig {
390 first_config,
391 second_config,
392 };
393 Ok(StoreConfig::DualRocksDbScyllaDb { config, namespace })
394 }
395 }
396 }
397}
398
399impl fmt::Display for StorageConfig {
400 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
401 let namespace = &self.namespace;
402 match &self.inner_storage_config {
403 #[cfg(feature = "storage-service")]
404 InnerStorageConfig::Service { endpoint } => {
405 write!(f, "service:tcp:{endpoint}:{namespace}")
406 }
407 InnerStorageConfig::Memory { genesis_path } => {
408 write!(f, "memory:{}:{}", genesis_path.display(), namespace)
409 }
410 #[cfg(feature = "rocksdb")]
411 InnerStorageConfig::RocksDb { path, spawn_mode } => {
412 let spawn_mode = spawn_mode.to_string();
413 write!(f, "rocksdb:{}:{}:{}", path.display(), spawn_mode, namespace)
414 }
415 #[cfg(feature = "scylladb")]
416 InnerStorageConfig::ScyllaDb { uri } => {
417 write!(f, "scylladb:tcp:{uri}:{namespace}")
418 }
419 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
420 InnerStorageConfig::DualRocksDbScyllaDb {
421 path_with_guard,
422 spawn_mode,
423 uri,
424 } => {
425 write!(
426 f,
427 "dualrocksdbscylladb:{}:{}:tcp:{}:{}",
428 path_with_guard.path_buf.display(),
429 spawn_mode,
430 uri,
431 namespace
432 )
433 }
434 }
435 }
436}
437
438#[test]
439fn test_memory_storage_config_from_str() {
440 assert_eq!(
441 StorageConfig::from_str("memory:path/to/genesis.json").unwrap(),
442 StorageConfig {
443 inner_storage_config: InnerStorageConfig::Memory {
444 genesis_path: PathBuf::from("path/to/genesis.json")
445 },
446 namespace: DEFAULT_NAMESPACE.into()
447 }
448 );
449 assert_eq!(
450 StorageConfig::from_str("memory:path/to/genesis.json:namespace").unwrap(),
451 StorageConfig {
452 inner_storage_config: InnerStorageConfig::Memory {
453 genesis_path: PathBuf::from("path/to/genesis.json")
454 },
455 namespace: "namespace".into()
456 }
457 );
458 assert!(StorageConfig::from_str("memory").is_err(),);
459}
460
461#[cfg(feature = "storage-service")]
462#[test]
463fn test_shared_store_config_from_str() {
464 assert_eq!(
465 StorageConfig::from_str("service:tcp:127.0.0.1:8942:linera").unwrap(),
466 StorageConfig {
467 inner_storage_config: InnerStorageConfig::Service {
468 endpoint: "127.0.0.1:8942".to_string()
469 },
470 namespace: "linera".into()
471 }
472 );
473 assert!(StorageConfig::from_str("service:tcp:127.0.0.1:8942").is_err());
474 assert!(StorageConfig::from_str("service:tcp:127.0.0.1:linera").is_err());
475}
476
477#[cfg(feature = "rocksdb")]
478#[test]
479fn test_rocks_db_storage_config_from_str() {
480 assert!(StorageConfig::from_str("rocksdb_foo.db").is_err());
481 assert_eq!(
482 StorageConfig::from_str("rocksdb:foo.db").unwrap(),
483 StorageConfig {
484 inner_storage_config: InnerStorageConfig::RocksDb {
485 path: "foo.db".into(),
486 spawn_mode: RocksDbSpawnMode::SpawnBlocking,
487 },
488 namespace: DEFAULT_NAMESPACE.to_string()
489 }
490 );
491 assert_eq!(
492 StorageConfig::from_str("rocksdb:foo.db:block_in_place").unwrap(),
493 StorageConfig {
494 inner_storage_config: InnerStorageConfig::RocksDb {
495 path: "foo.db".into(),
496 spawn_mode: RocksDbSpawnMode::BlockInPlace,
497 },
498 namespace: DEFAULT_NAMESPACE.to_string()
499 }
500 );
501 assert_eq!(
502 StorageConfig::from_str("rocksdb:foo.db:block_in_place:chosen_namespace").unwrap(),
503 StorageConfig {
504 inner_storage_config: InnerStorageConfig::RocksDb {
505 path: "foo.db".into(),
506 spawn_mode: RocksDbSpawnMode::BlockInPlace,
507 },
508 namespace: "chosen_namespace".into()
509 }
510 );
511}
512
513#[cfg(feature = "scylladb")]
514#[test]
515fn test_scylla_db_storage_config_from_str() {
516 assert_eq!(
517 StorageConfig::from_str("scylladb:").unwrap(),
518 StorageConfig {
519 inner_storage_config: InnerStorageConfig::ScyllaDb {
520 uri: "localhost:9042".to_string()
521 },
522 namespace: DEFAULT_NAMESPACE.to_string()
523 }
524 );
525 assert_eq!(
526 StorageConfig::from_str("scylladb:tcp:db_hostname:230:table_other_storage").unwrap(),
527 StorageConfig {
528 inner_storage_config: InnerStorageConfig::ScyllaDb {
529 uri: "db_hostname:230".to_string()
530 },
531 namespace: "table_other_storage".to_string()
532 }
533 );
534 assert_eq!(
535 StorageConfig::from_str("scylladb:tcp:db_hostname:230").unwrap(),
536 StorageConfig {
537 inner_storage_config: InnerStorageConfig::ScyllaDb {
538 uri: "db_hostname:230".to_string()
539 },
540 namespace: DEFAULT_NAMESPACE.to_string()
541 }
542 );
543 assert!(StorageConfig::from_str("scylladb:-10").is_err());
544 assert!(StorageConfig::from_str("scylladb:70000").is_err());
545 assert!(StorageConfig::from_str("scylladb:230:234").is_err());
546 assert!(StorageConfig::from_str("scylladb:tcp:address1").is_err());
547 assert!(StorageConfig::from_str("scylladb:tcp:address1:tcp:/address2").is_err());
548 assert!(StorageConfig::from_str("scylladb:wrong").is_err());
549}