1use std::{fmt, path::PathBuf, str::FromStr};
5
6use anyhow::{anyhow, bail};
7use linera_storage::DEFAULT_NAMESPACE;
8#[cfg(feature = "rocksdb")]
9use linera_views::rocks_db::{PathWithGuard, RocksDbSpawnMode};
10use tracing::error;
11#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
12use {linera_views::backends::dual::DualStoreConfig, std::path::Path};
13#[cfg(feature = "scylladb")]
14use {std::num::NonZeroU16, tracing::debug};
15
16use crate::{CommonStorageOptions, StoreConfig};
17
18#[derive(Clone, Debug)]
20#[cfg_attr(any(test), derive(Eq, PartialEq))]
21pub enum InnerStorageConfig {
22 Memory {
24 genesis_path: PathBuf,
27 },
28 #[cfg(feature = "storage-service")]
30 Service {
31 endpoint: String,
33 },
34 #[cfg(feature = "rocksdb")]
36 RocksDb {
37 path: PathBuf,
39 spawn_mode: RocksDbSpawnMode,
41 },
42 #[cfg(feature = "dynamodb")]
44 DynamoDb {
45 use_dynamodb_local: bool,
47 },
48 #[cfg(feature = "scylladb")]
50 ScyllaDb {
51 uri: String,
53 },
54 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
55 DualRocksDbScyllaDb {
56 path_with_guard: PathWithGuard,
58 spawn_mode: RocksDbSpawnMode,
60 uri: String,
62 },
63}
64
65#[derive(Clone, Debug)]
67#[cfg_attr(any(test), derive(Eq, PartialEq))]
68pub struct StorageConfig {
69 pub inner_storage_config: InnerStorageConfig,
71 pub namespace: String,
73}
74
75const MEMORY: &str = "memory:";
76#[cfg(feature = "storage-service")]
77const STORAGE_SERVICE: &str = "service:";
78#[cfg(feature = "rocksdb")]
79const ROCKS_DB: &str = "rocksdb:";
80#[cfg(feature = "dynamodb")]
81const DYNAMO_DB: &str = "dynamodb:";
82#[cfg(feature = "scylladb")]
83const SCYLLA_DB: &str = "scylladb:";
84#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
85const DUAL_ROCKS_DB_SCYLLA_DB: &str = "dualrocksdbscylladb:";
86
87impl FromStr for StorageConfig {
88 type Err = anyhow::Error;
89
90 fn from_str(input: &str) -> Result<Self, Self::Err> {
91 if let Some(s) = input.strip_prefix(MEMORY) {
92 let parts = s.split(':').collect::<Vec<_>>();
93 if parts.len() == 1 {
94 let genesis_path = parts[0].to_string().into();
95 let namespace = DEFAULT_NAMESPACE.to_string();
96 let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
97 return Ok(StorageConfig {
98 inner_storage_config,
99 namespace,
100 });
101 }
102 if parts.len() != 2 {
103 bail!("We should have one genesis config path and one optional namespace");
104 }
105 let genesis_path = parts[0].to_string().into();
106 let namespace = parts[1].to_string();
107 let inner_storage_config = InnerStorageConfig::Memory { genesis_path };
108 return Ok(StorageConfig {
109 inner_storage_config,
110 namespace,
111 });
112 }
113 #[cfg(feature = "storage-service")]
114 if let Some(s) = input.strip_prefix(STORAGE_SERVICE) {
115 if s.is_empty() {
116 bail!(
117 "For Storage service, the formatting has to be service:endpoint:namespace,\
118example service:tcp:127.0.0.1:7878:table_do_my_test"
119 );
120 }
121 let parts = s.split(':').collect::<Vec<_>>();
122 if parts.len() != 4 {
123 bail!("We should have one endpoint and one namespace");
124 }
125 let protocol = parts[0];
126 if protocol != "tcp" {
127 bail!("Only allowed protocol is tcp");
128 }
129 let endpoint = parts[1];
130 let port = parts[2];
131 let mut endpoint = endpoint.to_string();
132 endpoint.push(':');
133 endpoint.push_str(port);
134 let endpoint = endpoint.to_string();
135 let namespace = parts[3].to_string();
136 let inner_storage_config = InnerStorageConfig::Service { endpoint };
137 return Ok(StorageConfig {
138 inner_storage_config,
139 namespace,
140 });
141 }
142 #[cfg(feature = "rocksdb")]
143 if let Some(s) = input.strip_prefix(ROCKS_DB) {
144 if s.is_empty() {
145 bail!(
146 "For RocksDB, the formatting has to be rocksdb:directory or rocksdb:directory:spawn_mode:namespace");
147 }
148 let parts = s.split(':').collect::<Vec<_>>();
149 if parts.len() == 1 {
150 let path = parts[0].to_string().into();
151 let namespace = DEFAULT_NAMESPACE.to_string();
152 let spawn_mode = RocksDbSpawnMode::SpawnBlocking;
153 let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
154 return Ok(StorageConfig {
155 inner_storage_config,
156 namespace,
157 });
158 }
159 if parts.len() == 2 || parts.len() == 3 {
160 let path = parts[0].to_string().into();
161 let spawn_mode_name = parts
162 .get(1)
163 .copied()
164 .expect("validated by the parts length check above");
165 let spawn_mode = match spawn_mode_name {
166 "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
167 "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
168 "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
169 _ => Err(anyhow!(
170 "Failed to parse {} as a spawn_mode",
171 spawn_mode_name
172 )),
173 }?;
174 let namespace = if parts.len() == 2 {
175 DEFAULT_NAMESPACE.to_string()
176 } else {
177 parts[2].to_string()
178 };
179 let inner_storage_config = InnerStorageConfig::RocksDb { path, spawn_mode };
180 return Ok(StorageConfig {
181 inner_storage_config,
182 namespace,
183 });
184 }
185 bail!("We should have one, two or three parts");
186 }
187 #[cfg(feature = "dynamodb")]
188 if let Some(s) = input.strip_prefix(DYNAMO_DB) {
189 let mut parts = s.splitn(2, ':');
190 let namespace = parts
191 .next()
192 .ok_or_else(|| anyhow!("Missing DynamoDB table name, e.g. {DYNAMO_DB}TABLE"))?
193 .to_string();
194 let use_dynamodb_local = match parts.next() {
195 None | Some("env") => false,
196 Some("dynamodb_local") => true,
197 Some(unknown) => {
198 bail!(
199 "Invalid DynamoDB endpoint {unknown:?}. \
200 Expected {DYNAMO_DB}TABLE:[env|dynamodb_local]"
201 );
202 }
203 };
204 let inner_storage_config = InnerStorageConfig::DynamoDb { use_dynamodb_local };
205 return Ok(StorageConfig {
206 inner_storage_config,
207 namespace,
208 });
209 }
210 #[cfg(feature = "scylladb")]
211 if let Some(s) = input.strip_prefix(SCYLLA_DB) {
212 let mut uri: Option<String> = None;
213 let mut namespace: Option<String> = None;
214 let parse_error: &'static str = "Correct format is tcp:db_hostname:port.";
215 if !s.is_empty() {
216 let mut parts = s.split(':');
217 while let Some(part) = parts.next() {
218 match part {
219 "tcp" => {
220 let address = parts.next().ok_or_else(|| {
221 anyhow!("Failed to find address for {s}. {parse_error}")
222 })?;
223 let port_str = parts.next().ok_or_else(|| {
224 anyhow!("Failed to find port for {s}. {parse_error}")
225 })?;
226 let port = NonZeroU16::from_str(port_str).map_err(|_| {
227 anyhow!(
228 "Failed to find parse port {port_str} for {s}. {parse_error}",
229 )
230 })?;
231 if uri.is_some() {
232 bail!("The uri has already been assigned");
233 }
234 uri = Some(format!("{}:{}", &address, port));
235 }
236 _ if part.starts_with("table") => {
237 if namespace.is_some() {
238 bail!("The namespace has already been assigned");
239 }
240 namespace = Some(part.to_string());
241 }
242 _ => {
243 bail!("the entry \"{part}\" is not matching");
244 }
245 }
246 }
247 }
248 let uri = uri.unwrap_or_else(|| "localhost:9042".to_string());
249 let namespace = namespace.unwrap_or_else(|| DEFAULT_NAMESPACE.to_string());
250 let inner_storage_config = InnerStorageConfig::ScyllaDb { uri };
251 debug!("ScyllaDB connection info: {:?}", inner_storage_config);
252 return Ok(StorageConfig {
253 inner_storage_config,
254 namespace,
255 });
256 }
257 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
258 if let Some(s) = input.strip_prefix(DUAL_ROCKS_DB_SCYLLA_DB) {
259 let parts = s.split(':').collect::<Vec<_>>();
260 if parts.len() != 5 && parts.len() != 6 {
261 bail!(
262 "For DualRocksDbScyllaDb, the formatting has to be dualrocksdbscylladb:directory:mode:tcp:hostname:port:namespace"
263 );
264 }
265 let path = Path::new(parts[0]);
266 let path = path.to_path_buf();
267 let path_with_guard = PathWithGuard::new(path);
268 let spawn_mode_name = parts
269 .get(1)
270 .copied()
271 .expect("validated by the parts length check above");
272 let spawn_mode = match spawn_mode_name {
273 "spawn_blocking" => Ok(RocksDbSpawnMode::SpawnBlocking),
274 "block_in_place" => Ok(RocksDbSpawnMode::BlockInPlace),
275 "runtime" => Ok(RocksDbSpawnMode::get_spawn_mode_from_runtime()),
276 _ => Err(anyhow!(
277 "Failed to parse {} as a spawn_mode",
278 spawn_mode_name
279 )),
280 }?;
281 let protocol = parts[2];
282 if protocol != "tcp" {
283 bail!("The only allowed protocol is tcp");
284 }
285 let address = parts[3];
286 let port_str = parts[4];
287 let port = NonZeroU16::from_str(port_str)
288 .map_err(|_| anyhow!("Failed to find parse port {port_str} for {s}"))?;
289 let uri = format!("{}:{}", &address, port);
290 let inner_storage_config = InnerStorageConfig::DualRocksDbScyllaDb {
291 path_with_guard,
292 spawn_mode,
293 uri,
294 };
295 let namespace = if parts.len() == 5 {
296 DEFAULT_NAMESPACE.to_string()
297 } else {
298 parts[5].to_string()
299 };
300 return Ok(StorageConfig {
301 inner_storage_config,
302 namespace,
303 });
304 }
305 error!("available storage: memory");
306 #[cfg(feature = "storage-service")]
307 error!("Also available is linera-storage-service");
308 #[cfg(feature = "rocksdb")]
309 error!("Also available is RocksDB");
310 #[cfg(feature = "dynamodb")]
311 error!("Also available is DynamoDB");
312 #[cfg(feature = "scylladb")]
313 error!("Also available is ScyllaDB");
314 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
315 error!("Also available is DualRocksDbScyllaDb");
316 Err(anyhow!("The input has not matched: {input}"))
317 }
318}
319
320impl StorageConfig {
321 #[allow(unused_variables)]
322 pub fn maybe_append_shard_path(&mut self, shard: usize) -> std::io::Result<()> {
323 match &mut self.inner_storage_config {
324 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
325 InnerStorageConfig::DualRocksDbScyllaDb {
326 path_with_guard,
327 spawn_mode: _,
328 uri: _,
329 } => {
330 let shard_str = format!("shard_{shard}");
331 path_with_guard.path_buf.push(shard_str);
332 std::fs::create_dir_all(&path_with_guard.path_buf)
333 }
334 _ => Ok(()),
335 }
336 }
337
338 pub fn add_common_storage_options(
340 &self,
341 options: &CommonStorageOptions,
342 ) -> Result<StoreConfig, anyhow::Error> {
343 let namespace = self.namespace.clone();
344 match &self.inner_storage_config {
345 InnerStorageConfig::Memory { genesis_path } => {
346 let config = linera_views::memory::MemoryStoreConfig {
347 max_stream_queries: options.storage_max_stream_queries,
348 kill_on_drop: false,
349 };
350 let genesis_path = genesis_path.clone();
351 Ok(StoreConfig::Memory {
352 config,
353 namespace,
354 genesis_path,
355 })
356 }
357 #[cfg(feature = "storage-service")]
358 InnerStorageConfig::Service { endpoint } => {
359 let inner_config =
360 linera_storage_service::common::StorageServiceStoreInternalConfig {
361 endpoint: endpoint.clone(),
362 max_concurrent_queries: options.storage_max_concurrent_queries,
363 max_stream_queries: options.storage_max_stream_queries,
364 };
365 let config = linera_storage_service::common::StorageServiceStoreConfig {
366 inner_config,
367 storage_cache_config: options.views_storage_cache_config(),
368 };
369 Ok(StoreConfig::StorageService { config, namespace })
370 }
371 #[cfg(feature = "rocksdb")]
372 InnerStorageConfig::RocksDb { path, spawn_mode } => {
373 let path_with_guard = PathWithGuard::new(path.to_path_buf());
374 let inner_config = linera_views::rocks_db::RocksDbStoreInternalConfig {
375 spawn_mode: *spawn_mode,
376 path_with_guard,
377 max_stream_queries: options.storage_max_stream_queries,
378 };
379 let config = linera_views::rocks_db::RocksDbStoreConfig {
380 inner_config,
381 storage_cache_config: options.views_storage_cache_config(),
382 };
383 Ok(StoreConfig::RocksDb { config, namespace })
384 }
385 #[cfg(feature = "dynamodb")]
386 InnerStorageConfig::DynamoDb { use_dynamodb_local } => {
387 let inner_config = linera_views::dynamo_db::DynamoDbStoreInternalConfig {
388 use_dynamodb_local: *use_dynamodb_local,
389 max_concurrent_queries: options.storage_max_concurrent_queries,
390 max_stream_queries: options.storage_max_stream_queries,
391 };
392 let config = linera_views::dynamo_db::DynamoDbStoreConfig {
393 inner_config,
394 storage_cache_config: options.views_storage_cache_config(),
395 };
396 Ok(StoreConfig::DynamoDb { config, namespace })
397 }
398 #[cfg(feature = "scylladb")]
399 InnerStorageConfig::ScyllaDb { uri } => {
400 let inner_config = linera_views::scylla_db::ScyllaDbStoreInternalConfig {
401 uri: uri.clone(),
402 max_stream_queries: options.storage_max_stream_queries,
403 max_concurrent_queries: options.storage_max_concurrent_queries,
404 replication_factor: options.storage_replication_factor,
405 };
406 let config = linera_views::scylla_db::ScyllaDbStoreConfig {
407 inner_config,
408 storage_cache_config: options.views_storage_cache_config(),
409 };
410 Ok(StoreConfig::ScyllaDb { config, namespace })
411 }
412 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
413 InnerStorageConfig::DualRocksDbScyllaDb {
414 path_with_guard,
415 spawn_mode,
416 uri,
417 } => {
418 let inner_config = linera_views::rocks_db::RocksDbStoreInternalConfig {
419 spawn_mode: *spawn_mode,
420 path_with_guard: path_with_guard.clone(),
421 max_stream_queries: options.storage_max_stream_queries,
422 };
423 let first_config = linera_views::rocks_db::RocksDbStoreConfig {
424 inner_config,
425 storage_cache_config: options.views_storage_cache_config(),
426 };
427
428 let inner_config = linera_views::scylla_db::ScyllaDbStoreInternalConfig {
429 uri: uri.clone(),
430 max_stream_queries: options.storage_max_stream_queries,
431 max_concurrent_queries: options.storage_max_concurrent_queries,
432 replication_factor: options.storage_replication_factor,
433 };
434 let second_config = linera_views::scylla_db::ScyllaDbStoreConfig {
435 inner_config,
436 storage_cache_config: options.views_storage_cache_config(),
437 };
438
439 let config = DualStoreConfig {
440 first_config,
441 second_config,
442 };
443 Ok(StoreConfig::DualRocksDbScyllaDb { config, namespace })
444 }
445 }
446 }
447}
448
449impl fmt::Display for StorageConfig {
450 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
451 let namespace = &self.namespace;
452 match &self.inner_storage_config {
453 #[cfg(feature = "storage-service")]
454 InnerStorageConfig::Service { endpoint } => {
455 write!(f, "service:tcp:{endpoint}:{namespace}")
456 }
457 InnerStorageConfig::Memory { genesis_path } => {
458 write!(f, "memory:{}:{}", genesis_path.display(), namespace)
459 }
460 #[cfg(feature = "rocksdb")]
461 InnerStorageConfig::RocksDb { path, spawn_mode } => {
462 let spawn_mode = spawn_mode.to_string();
463 write!(f, "rocksdb:{}:{}:{}", path.display(), spawn_mode, namespace)
464 }
465 #[cfg(feature = "dynamodb")]
466 InnerStorageConfig::DynamoDb { use_dynamodb_local } => match use_dynamodb_local {
467 true => write!(f, "dynamodb:{namespace}:dynamodb_local"),
468 false => write!(f, "dynamodb:{namespace}:env"),
469 },
470 #[cfg(feature = "scylladb")]
471 InnerStorageConfig::ScyllaDb { uri } => {
472 write!(f, "scylladb:tcp:{uri}:{namespace}")
473 }
474 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
475 InnerStorageConfig::DualRocksDbScyllaDb {
476 path_with_guard,
477 spawn_mode,
478 uri,
479 } => {
480 write!(
481 f,
482 "dualrocksdbscylladb:{}:{}:tcp:{}:{}",
483 path_with_guard.path_buf.display(),
484 spawn_mode,
485 uri,
486 namespace
487 )
488 }
489 }
490 }
491}
492
493#[test]
494fn test_memory_storage_config_from_str() {
495 assert_eq!(
496 StorageConfig::from_str("memory:path/to/genesis.json").unwrap(),
497 StorageConfig {
498 inner_storage_config: InnerStorageConfig::Memory {
499 genesis_path: PathBuf::from("path/to/genesis.json")
500 },
501 namespace: DEFAULT_NAMESPACE.into()
502 }
503 );
504 assert_eq!(
505 StorageConfig::from_str("memory:path/to/genesis.json:namespace").unwrap(),
506 StorageConfig {
507 inner_storage_config: InnerStorageConfig::Memory {
508 genesis_path: PathBuf::from("path/to/genesis.json")
509 },
510 namespace: "namespace".into()
511 }
512 );
513 assert!(StorageConfig::from_str("memory").is_err(),);
514}
515
516#[cfg(feature = "storage-service")]
517#[test]
518fn test_shared_store_config_from_str() {
519 assert_eq!(
520 StorageConfig::from_str("service:tcp:127.0.0.1:8942:linera").unwrap(),
521 StorageConfig {
522 inner_storage_config: InnerStorageConfig::Service {
523 endpoint: "127.0.0.1:8942".to_string()
524 },
525 namespace: "linera".into()
526 }
527 );
528 assert!(StorageConfig::from_str("service:tcp:127.0.0.1:8942").is_err());
529 assert!(StorageConfig::from_str("service:tcp:127.0.0.1:linera").is_err());
530}
531
532#[cfg(feature = "rocksdb")]
533#[test]
534fn test_rocks_db_storage_config_from_str() {
535 assert!(StorageConfig::from_str("rocksdb_foo.db").is_err());
536 assert_eq!(
537 StorageConfig::from_str("rocksdb:foo.db").unwrap(),
538 StorageConfig {
539 inner_storage_config: InnerStorageConfig::RocksDb {
540 path: "foo.db".into(),
541 spawn_mode: RocksDbSpawnMode::SpawnBlocking,
542 },
543 namespace: DEFAULT_NAMESPACE.to_string()
544 }
545 );
546 assert_eq!(
547 StorageConfig::from_str("rocksdb:foo.db:block_in_place").unwrap(),
548 StorageConfig {
549 inner_storage_config: InnerStorageConfig::RocksDb {
550 path: "foo.db".into(),
551 spawn_mode: RocksDbSpawnMode::BlockInPlace,
552 },
553 namespace: DEFAULT_NAMESPACE.to_string()
554 }
555 );
556 assert_eq!(
557 StorageConfig::from_str("rocksdb:foo.db:block_in_place:chosen_namespace").unwrap(),
558 StorageConfig {
559 inner_storage_config: InnerStorageConfig::RocksDb {
560 path: "foo.db".into(),
561 spawn_mode: RocksDbSpawnMode::BlockInPlace,
562 },
563 namespace: "chosen_namespace".into()
564 }
565 );
566}
567
568#[cfg(feature = "dynamodb")]
569#[test]
570fn test_aws_storage_config_from_str() {
571 assert_eq!(
572 StorageConfig::from_str("dynamodb:table").unwrap(),
573 StorageConfig {
574 inner_storage_config: InnerStorageConfig::DynamoDb {
575 use_dynamodb_local: false
576 },
577 namespace: "table".to_string()
578 }
579 );
580 assert_eq!(
581 StorageConfig::from_str("dynamodb:table:env").unwrap(),
582 StorageConfig {
583 inner_storage_config: InnerStorageConfig::DynamoDb {
584 use_dynamodb_local: false
585 },
586 namespace: "table".to_string()
587 }
588 );
589 assert_eq!(
590 StorageConfig::from_str("dynamodb:table:dynamodb_local").unwrap(),
591 StorageConfig {
592 inner_storage_config: InnerStorageConfig::DynamoDb {
593 use_dynamodb_local: true
594 },
595 namespace: "table".to_string()
596 }
597 );
598 assert!(StorageConfig::from_str("dynamodb").is_err());
599 assert!(StorageConfig::from_str("dynamodb:").is_err());
600 assert!(StorageConfig::from_str("dynamodb:1").is_err());
601 assert!(StorageConfig::from_str("dynamodb:wrong:endpoint").is_err());
602}
603
604#[cfg(feature = "scylladb")]
605#[test]
606fn test_scylla_db_storage_config_from_str() {
607 assert_eq!(
608 StorageConfig::from_str("scylladb:").unwrap(),
609 StorageConfig {
610 inner_storage_config: InnerStorageConfig::ScyllaDb {
611 uri: "localhost:9042".to_string()
612 },
613 namespace: DEFAULT_NAMESPACE.to_string()
614 }
615 );
616 assert_eq!(
617 StorageConfig::from_str("scylladb:tcp:db_hostname:230:table_other_storage").unwrap(),
618 StorageConfig {
619 inner_storage_config: InnerStorageConfig::ScyllaDb {
620 uri: "db_hostname:230".to_string()
621 },
622 namespace: "table_other_storage".to_string()
623 }
624 );
625 assert_eq!(
626 StorageConfig::from_str("scylladb:tcp:db_hostname:230").unwrap(),
627 StorageConfig {
628 inner_storage_config: InnerStorageConfig::ScyllaDb {
629 uri: "db_hostname:230".to_string()
630 },
631 namespace: DEFAULT_NAMESPACE.to_string()
632 }
633 );
634 assert!(StorageConfig::from_str("scylladb:-10").is_err());
635 assert!(StorageConfig::from_str("scylladb:70000").is_err());
636 assert!(StorageConfig::from_str("scylladb:230:234").is_err());
637 assert!(StorageConfig::from_str("scylladb:tcp:address1").is_err());
638 assert!(StorageConfig::from_str("scylladb:tcp:address1:tcp:/address2").is_err());
639 assert!(StorageConfig::from_str("scylladb:wrong").is_err());
640}