linera_storage_runtime/
store_config.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::path::PathBuf;
5
6use anyhow::anyhow;
7use async_trait::async_trait;
8use linera_client::config::GenesisConfig;
9use linera_execution::WasmRuntime;
10use linera_storage::{DbStorage, Storage, StorageCacheConfig};
11#[cfg(feature = "storage-service")]
12use linera_storage_service::client::StorageServiceDatabase;
13#[cfg(feature = "dynamodb")]
14use linera_views::dynamo_db::DynamoDbDatabase;
15#[cfg(feature = "rocksdb")]
16use linera_views::rocks_db::RocksDbDatabase;
17#[cfg(feature = "scylladb")]
18use linera_views::scylla_db::ScyllaDbDatabase;
19use linera_views::{
20    memory::MemoryDatabase,
21    store::{KeyValueDatabase, KeyValueStore},
22};
23use serde::{Deserialize, Serialize};
24#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
25use {linera_storage::ChainStatesFirstAssignment, linera_views::backends::dual::DualDatabase};
26
27/// The configuration of the key value store in use.
28#[derive(Clone, Debug, Deserialize, Serialize)]
29pub enum StoreConfig {
30    /// The memory key value store
31    Memory {
32        config: linera_views::memory::MemoryStoreConfig,
33        namespace: String,
34        genesis_path: PathBuf,
35    },
36    /// The storage service key-value store
37    #[cfg(feature = "storage-service")]
38    StorageService {
39        config: linera_storage_service::common::StorageServiceStoreConfig,
40        namespace: String,
41    },
42    /// The RocksDB key value store
43    #[cfg(feature = "rocksdb")]
44    RocksDb {
45        config: linera_views::rocks_db::RocksDbStoreConfig,
46        namespace: String,
47    },
48    /// The DynamoDB key value store
49    #[cfg(feature = "dynamodb")]
50    DynamoDb {
51        config: linera_views::dynamo_db::DynamoDbStoreConfig,
52        namespace: String,
53    },
54    /// The ScyllaDB key value store
55    #[cfg(feature = "scylladb")]
56    ScyllaDb {
57        config: linera_views::scylla_db::ScyllaDbStoreConfig,
58        namespace: String,
59    },
60    #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
61    DualRocksDbScyllaDb {
62        config: linera_views::backends::dual::DualStoreConfig<
63            linera_views::rocks_db::RocksDbStoreConfig,
64            linera_views::scylla_db::ScyllaDbStoreConfig,
65        >,
66        namespace: String,
67    },
68}
69
70#[async_trait]
71pub trait Runnable {
72    type Output;
73
74    async fn run<S>(self, storage: S) -> Self::Output
75    where
76        S: Storage + Clone + Send + Sync + 'static;
77}
78
79#[async_trait]
80pub trait RunnableWithStore {
81    type Output;
82
83    async fn run<D>(
84        self,
85        config: D::Config,
86        namespace: String,
87        cache_sizes: StorageCacheConfig,
88    ) -> Result<Self::Output, anyhow::Error>
89    where
90        D: KeyValueDatabase + Clone + Send + Sync + 'static,
91        D::Store: KeyValueStore + Clone + Send + Sync + 'static,
92        D::Error: Send + Sync;
93}
94
95/// Reads a JSON value from a file at the given path.
96fn read_json<T: serde::de::DeserializeOwned>(path: impl Into<PathBuf>) -> anyhow::Result<T> {
97    Ok(serde_json::from_reader(fs_err::File::open(path.into())?)?)
98}
99
100impl StoreConfig {
101    pub async fn run_with_storage<Job>(
102        self,
103        wasm_runtime: Option<WasmRuntime>,
104        allow_application_logs: bool,
105        cache_sizes: StorageCacheConfig,
106        job: Job,
107    ) -> Result<Job::Output, anyhow::Error>
108    where
109        Job: Runnable,
110    {
111        match self {
112            StoreConfig::Memory {
113                config,
114                namespace,
115                genesis_path,
116            } => {
117                let mut storage = DbStorage::<MemoryDatabase, _>::maybe_create_and_connect(
118                    &config,
119                    &namespace,
120                    wasm_runtime,
121                    cache_sizes,
122                )
123                .await?
124                .with_allow_application_logs(allow_application_logs);
125                let genesis_config = read_json::<GenesisConfig>(genesis_path)?;
126                // Memory storage must be initialized every time.
127                genesis_config.initialize_storage(&mut storage).await?;
128                Ok(job.run(storage).await)
129            }
130            #[cfg(feature = "storage-service")]
131            StoreConfig::StorageService { config, namespace } => {
132                let storage = DbStorage::<StorageServiceDatabase, _>::connect(
133                    &config,
134                    &namespace,
135                    wasm_runtime,
136                    cache_sizes,
137                )
138                .await?
139                .with_allow_application_logs(allow_application_logs);
140                Ok(job.run(storage).await)
141            }
142            #[cfg(feature = "rocksdb")]
143            StoreConfig::RocksDb { config, namespace } => {
144                let storage = DbStorage::<RocksDbDatabase, _>::connect(
145                    &config,
146                    &namespace,
147                    wasm_runtime,
148                    cache_sizes,
149                )
150                .await?
151                .with_allow_application_logs(allow_application_logs);
152                Ok(job.run(storage).await)
153            }
154            #[cfg(feature = "dynamodb")]
155            StoreConfig::DynamoDb { config, namespace } => {
156                let storage = DbStorage::<DynamoDbDatabase, _>::connect(
157                    &config,
158                    &namespace,
159                    wasm_runtime,
160                    cache_sizes,
161                )
162                .await?
163                .with_allow_application_logs(allow_application_logs);
164                Ok(job.run(storage).await)
165            }
166            #[cfg(feature = "scylladb")]
167            StoreConfig::ScyllaDb { config, namespace } => {
168                let storage = DbStorage::<ScyllaDbDatabase, _>::connect(
169                    &config,
170                    &namespace,
171                    wasm_runtime,
172                    cache_sizes,
173                )
174                .await?
175                .with_allow_application_logs(allow_application_logs);
176                Ok(job.run(storage).await)
177            }
178            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
179            StoreConfig::DualRocksDbScyllaDb { config, namespace } => {
180                let storage =
181                    DbStorage::<
182                        DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>,
183                        _,
184                    >::connect(&config, &namespace, wasm_runtime, cache_sizes)
185                    .await?
186                    .with_allow_application_logs(allow_application_logs);
187                Ok(job.run(storage).await)
188            }
189        }
190    }
191
192    #[allow(unused_variables)]
193    pub async fn run_with_store<Job>(
194        self,
195        cache_sizes: StorageCacheConfig,
196        job: Job,
197    ) -> Result<Job::Output, anyhow::Error>
198    where
199        Job: RunnableWithStore,
200    {
201        match self {
202            StoreConfig::Memory { .. } => {
203                Err(anyhow!("Cannot run admin operations on the memory store"))
204            }
205            #[cfg(feature = "storage-service")]
206            StoreConfig::StorageService { config, namespace } => Ok(job
207                .run::<StorageServiceDatabase>(config, namespace, cache_sizes)
208                .await?),
209            #[cfg(feature = "rocksdb")]
210            StoreConfig::RocksDb { config, namespace } => Ok(job
211                .run::<RocksDbDatabase>(config, namespace, cache_sizes)
212                .await?),
213            #[cfg(feature = "dynamodb")]
214            StoreConfig::DynamoDb { config, namespace } => Ok(job
215                .run::<DynamoDbDatabase>(config, namespace, cache_sizes)
216                .await?),
217            #[cfg(feature = "scylladb")]
218            StoreConfig::ScyllaDb { config, namespace } => Ok(job
219                .run::<ScyllaDbDatabase>(config, namespace, cache_sizes)
220                .await?),
221            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
222            StoreConfig::DualRocksDbScyllaDb { config, namespace } => Ok(job
223                .run::<DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>>(
224                    config,
225                    namespace,
226                    cache_sizes,
227                )
228                .await?),
229        }
230    }
231}
232
233pub struct StorageMigration;
234
235#[async_trait]
236impl RunnableWithStore for StorageMigration {
237    type Output = ();
238
239    async fn run<D>(
240        self,
241        _config: D::Config,
242        _namespace: String,
243        _cache_sizes: StorageCacheConfig,
244    ) -> Result<Self::Output, anyhow::Error>
245    where
246        D: KeyValueDatabase + Clone + Send + Sync + 'static,
247        D::Store: KeyValueStore + Clone + Send + Sync + 'static,
248        D::Error: Send + Sync,
249    {
250        // Storage migration is not yet available on main.
251        Ok(())
252    }
253}
254
255pub struct AssertStorageV1;
256
257#[async_trait]
258impl RunnableWithStore for AssertStorageV1 {
259    type Output = ();
260
261    async fn run<D>(
262        self,
263        _config: D::Config,
264        _namespace: String,
265        _cache_sizes: StorageCacheConfig,
266    ) -> Result<Self::Output, anyhow::Error>
267    where
268        D: KeyValueDatabase + Clone + Send + Sync + 'static,
269        D::Store: KeyValueStore + Clone + Send + Sync + 'static,
270        D::Error: Send + Sync,
271    {
272        // Storage migration assertion is not yet available on main.
273        Ok(())
274    }
275}