1use std::path::PathBuf;
5
6use anyhow::anyhow;
7use async_trait::async_trait;
8use linera_client::config::GenesisConfig;
9use linera_execution::WasmRuntime;
10use linera_storage::{DbStorage, Storage, StorageCacheConfig};
11#[cfg(feature = "storage-service")]
12use linera_storage_service::client::StorageServiceDatabase;
13#[cfg(feature = "dynamodb")]
14use linera_views::dynamo_db::DynamoDbDatabase;
15#[cfg(feature = "rocksdb")]
16use linera_views::rocks_db::RocksDbDatabase;
17#[cfg(feature = "scylladb")]
18use linera_views::scylla_db::ScyllaDbDatabase;
19use linera_views::{
20 memory::MemoryDatabase,
21 store::{KeyValueDatabase, KeyValueStore},
22};
23use serde::{Deserialize, Serialize};
24#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
25use {linera_storage::ChainStatesFirstAssignment, linera_views::backends::dual::DualDatabase};
26
27#[derive(Clone, Debug, Deserialize, Serialize)]
29pub enum StoreConfig {
30 Memory {
32 config: linera_views::memory::MemoryStoreConfig,
33 namespace: String,
34 genesis_path: PathBuf,
35 },
36 #[cfg(feature = "storage-service")]
38 StorageService {
39 config: linera_storage_service::common::StorageServiceStoreConfig,
40 namespace: String,
41 },
42 #[cfg(feature = "rocksdb")]
44 RocksDb {
45 config: linera_views::rocks_db::RocksDbStoreConfig,
46 namespace: String,
47 },
48 #[cfg(feature = "dynamodb")]
50 DynamoDb {
51 config: linera_views::dynamo_db::DynamoDbStoreConfig,
52 namespace: String,
53 },
54 #[cfg(feature = "scylladb")]
56 ScyllaDb {
57 config: linera_views::scylla_db::ScyllaDbStoreConfig,
58 namespace: String,
59 },
60 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
61 DualRocksDbScyllaDb {
62 config: linera_views::backends::dual::DualStoreConfig<
63 linera_views::rocks_db::RocksDbStoreConfig,
64 linera_views::scylla_db::ScyllaDbStoreConfig,
65 >,
66 namespace: String,
67 },
68}
69
70#[async_trait]
71pub trait Runnable {
72 type Output;
73
74 async fn run<S>(self, storage: S) -> Self::Output
75 where
76 S: Storage + Clone + Send + Sync + 'static;
77}
78
79#[async_trait]
80pub trait RunnableWithStore {
81 type Output;
82
83 async fn run<D>(
84 self,
85 config: D::Config,
86 namespace: String,
87 cache_sizes: StorageCacheConfig,
88 ) -> Result<Self::Output, anyhow::Error>
89 where
90 D: KeyValueDatabase + Clone + Send + Sync + 'static,
91 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
92 D::Error: Send + Sync;
93}
94
95fn read_json<T: serde::de::DeserializeOwned>(path: impl Into<PathBuf>) -> anyhow::Result<T> {
97 Ok(serde_json::from_reader(fs_err::File::open(path.into())?)?)
98}
99
100impl StoreConfig {
101 pub async fn run_with_storage<Job>(
102 self,
103 wasm_runtime: Option<WasmRuntime>,
104 allow_application_logs: bool,
105 cache_sizes: StorageCacheConfig,
106 job: Job,
107 ) -> Result<Job::Output, anyhow::Error>
108 where
109 Job: Runnable,
110 {
111 match self {
112 StoreConfig::Memory {
113 config,
114 namespace,
115 genesis_path,
116 } => {
117 let mut storage = DbStorage::<MemoryDatabase, _>::maybe_create_and_connect(
118 &config,
119 &namespace,
120 wasm_runtime,
121 cache_sizes,
122 )
123 .await?
124 .with_allow_application_logs(allow_application_logs);
125 let genesis_config = read_json::<GenesisConfig>(genesis_path)?;
126 genesis_config.initialize_storage(&mut storage).await?;
128 Ok(job.run(storage).await)
129 }
130 #[cfg(feature = "storage-service")]
131 StoreConfig::StorageService { config, namespace } => {
132 let storage = DbStorage::<StorageServiceDatabase, _>::connect(
133 &config,
134 &namespace,
135 wasm_runtime,
136 cache_sizes,
137 )
138 .await?
139 .with_allow_application_logs(allow_application_logs);
140 Ok(job.run(storage).await)
141 }
142 #[cfg(feature = "rocksdb")]
143 StoreConfig::RocksDb { config, namespace } => {
144 let storage = DbStorage::<RocksDbDatabase, _>::connect(
145 &config,
146 &namespace,
147 wasm_runtime,
148 cache_sizes,
149 )
150 .await?
151 .with_allow_application_logs(allow_application_logs);
152 Ok(job.run(storage).await)
153 }
154 #[cfg(feature = "dynamodb")]
155 StoreConfig::DynamoDb { config, namespace } => {
156 let storage = DbStorage::<DynamoDbDatabase, _>::connect(
157 &config,
158 &namespace,
159 wasm_runtime,
160 cache_sizes,
161 )
162 .await?
163 .with_allow_application_logs(allow_application_logs);
164 Ok(job.run(storage).await)
165 }
166 #[cfg(feature = "scylladb")]
167 StoreConfig::ScyllaDb { config, namespace } => {
168 let storage = DbStorage::<ScyllaDbDatabase, _>::connect(
169 &config,
170 &namespace,
171 wasm_runtime,
172 cache_sizes,
173 )
174 .await?
175 .with_allow_application_logs(allow_application_logs);
176 Ok(job.run(storage).await)
177 }
178 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
179 StoreConfig::DualRocksDbScyllaDb { config, namespace } => {
180 let storage =
181 DbStorage::<
182 DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>,
183 _,
184 >::connect(&config, &namespace, wasm_runtime, cache_sizes)
185 .await?
186 .with_allow_application_logs(allow_application_logs);
187 Ok(job.run(storage).await)
188 }
189 }
190 }
191
192 #[allow(unused_variables)]
193 pub async fn run_with_store<Job>(
194 self,
195 cache_sizes: StorageCacheConfig,
196 job: Job,
197 ) -> Result<Job::Output, anyhow::Error>
198 where
199 Job: RunnableWithStore,
200 {
201 match self {
202 StoreConfig::Memory { .. } => {
203 Err(anyhow!("Cannot run admin operations on the memory store"))
204 }
205 #[cfg(feature = "storage-service")]
206 StoreConfig::StorageService { config, namespace } => Ok(job
207 .run::<StorageServiceDatabase>(config, namespace, cache_sizes)
208 .await?),
209 #[cfg(feature = "rocksdb")]
210 StoreConfig::RocksDb { config, namespace } => Ok(job
211 .run::<RocksDbDatabase>(config, namespace, cache_sizes)
212 .await?),
213 #[cfg(feature = "dynamodb")]
214 StoreConfig::DynamoDb { config, namespace } => Ok(job
215 .run::<DynamoDbDatabase>(config, namespace, cache_sizes)
216 .await?),
217 #[cfg(feature = "scylladb")]
218 StoreConfig::ScyllaDb { config, namespace } => Ok(job
219 .run::<ScyllaDbDatabase>(config, namespace, cache_sizes)
220 .await?),
221 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
222 StoreConfig::DualRocksDbScyllaDb { config, namespace } => Ok(job
223 .run::<DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>>(
224 config,
225 namespace,
226 cache_sizes,
227 )
228 .await?),
229 }
230 }
231}
232
233pub struct StorageMigration;
234
235#[async_trait]
236impl RunnableWithStore for StorageMigration {
237 type Output = ();
238
239 async fn run<D>(
240 self,
241 _config: D::Config,
242 _namespace: String,
243 _cache_sizes: StorageCacheConfig,
244 ) -> Result<Self::Output, anyhow::Error>
245 where
246 D: KeyValueDatabase + Clone + Send + Sync + 'static,
247 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
248 D::Error: Send + Sync,
249 {
250 Ok(())
252 }
253}
254
255pub struct AssertStorageV1;
256
257#[async_trait]
258impl RunnableWithStore for AssertStorageV1 {
259 type Output = ();
260
261 async fn run<D>(
262 self,
263 _config: D::Config,
264 _namespace: String,
265 _cache_sizes: StorageCacheConfig,
266 ) -> Result<Self::Output, anyhow::Error>
267 where
268 D: KeyValueDatabase + Clone + Send + Sync + 'static,
269 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
270 D::Error: Send + Sync,
271 {
272 Ok(())
274 }
275}