1use std::path::PathBuf;
5
6use anyhow::anyhow;
7use async_trait::async_trait;
8use linera_client::config::GenesisConfig;
9use linera_execution::WasmRuntime;
10use linera_storage::{DbStorage, Storage, StorageCacheConfig};
11#[cfg(feature = "storage-service")]
12use linera_storage_service::client::StorageServiceDatabase;
13#[cfg(feature = "rocksdb")]
14use linera_views::rocks_db::RocksDbDatabase;
15#[cfg(feature = "scylladb")]
16use linera_views::scylla_db::ScyllaDbDatabase;
17use linera_views::{
18 memory::MemoryDatabase,
19 store::{KeyValueDatabase, KeyValueStore},
20};
21use serde::{Deserialize, Serialize};
22#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
23use {linera_storage::ChainStatesFirstAssignment, linera_views::backends::dual::DualDatabase};
24
25#[derive(Clone, Debug, Deserialize, Serialize)]
27pub enum StoreConfig {
28 Memory {
30 config: linera_views::memory::MemoryStoreConfig,
32 namespace: String,
34 genesis_path: PathBuf,
36 },
37 #[cfg(feature = "storage-service")]
39 StorageService {
40 config: linera_storage_service::common::StorageServiceStoreConfig,
42 namespace: String,
44 },
45 #[cfg(feature = "rocksdb")]
47 RocksDb {
48 config: linera_views::rocks_db::RocksDbStoreConfig,
50 namespace: String,
52 },
53 #[cfg(feature = "scylladb")]
55 ScyllaDb {
56 config: linera_views::scylla_db::ScyllaDbStoreConfig,
58 namespace: String,
60 },
61 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
63 DualRocksDbScyllaDb {
64 config: linera_views::backends::dual::DualStoreConfig<
66 linera_views::rocks_db::RocksDbStoreConfig,
67 linera_views::scylla_db::ScyllaDbStoreConfig,
68 >,
69 namespace: String,
71 },
72}
73
74#[async_trait]
76pub trait Runnable {
77 type Output;
79
80 async fn run<S>(self, storage: S) -> Self::Output
82 where
83 S: Storage + Clone + Send + Sync + 'static;
84}
85
86#[async_trait]
88pub trait RunnableWithStore {
89 type Output;
91
92 async fn run<D>(
94 self,
95 config: D::Config,
96 namespace: String,
97 cache_sizes: StorageCacheConfig,
98 ) -> Result<Self::Output, anyhow::Error>
99 where
100 D: KeyValueDatabase + Clone + Send + Sync + 'static,
101 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
102 D::Error: Send + Sync;
103}
104
105fn read_json<T: serde::de::DeserializeOwned>(path: impl Into<PathBuf>) -> anyhow::Result<T> {
107 Ok(serde_json::from_reader(fs_err::File::open(path.into())?)?)
108}
109
110impl StoreConfig {
111 pub async fn run_with_storage<Job>(
113 self,
114 wasm_runtime: Option<WasmRuntime>,
115 allow_application_logs: bool,
116 cache_sizes: StorageCacheConfig,
117 job: Job,
118 ) -> Result<Job::Output, anyhow::Error>
119 where
120 Job: Runnable,
121 {
122 match self {
123 StoreConfig::Memory {
124 config,
125 namespace,
126 genesis_path,
127 } => {
128 let mut storage = DbStorage::<MemoryDatabase, _>::maybe_create_and_connect(
129 &config,
130 &namespace,
131 wasm_runtime,
132 cache_sizes,
133 )
134 .await?
135 .with_allow_application_logs(allow_application_logs);
136 let genesis_config = read_json::<GenesisConfig>(genesis_path)?;
137 genesis_config.initialize_storage(&mut storage).await?;
139 Ok(job.run(storage).await)
140 }
141 #[cfg(feature = "storage-service")]
142 StoreConfig::StorageService { config, namespace } => {
143 let storage = DbStorage::<StorageServiceDatabase, _>::connect(
144 &config,
145 &namespace,
146 wasm_runtime,
147 cache_sizes,
148 )
149 .await?
150 .with_allow_application_logs(allow_application_logs);
151 Ok(job.run(storage).await)
152 }
153 #[cfg(feature = "rocksdb")]
154 StoreConfig::RocksDb { config, namespace } => {
155 let storage = DbStorage::<RocksDbDatabase, _>::connect(
156 &config,
157 &namespace,
158 wasm_runtime,
159 cache_sizes,
160 )
161 .await?
162 .with_allow_application_logs(allow_application_logs);
163 Ok(job.run(storage).await)
164 }
165 #[cfg(feature = "scylladb")]
166 StoreConfig::ScyllaDb { config, namespace } => {
167 let storage = DbStorage::<ScyllaDbDatabase, _>::connect(
168 &config,
169 &namespace,
170 wasm_runtime,
171 cache_sizes,
172 )
173 .await?
174 .with_allow_application_logs(allow_application_logs);
175 Ok(job.run(storage).await)
176 }
177 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
178 StoreConfig::DualRocksDbScyllaDb { config, namespace } => {
179 let storage =
180 DbStorage::<
181 DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>,
182 _,
183 >::connect(&config, &namespace, wasm_runtime, cache_sizes)
184 .await?
185 .with_allow_application_logs(allow_application_logs);
186 Ok(job.run(storage).await)
187 }
188 }
189 }
190
191 #[allow(unused_variables)]
194 pub async fn run_with_store<Job>(
195 self,
196 cache_sizes: StorageCacheConfig,
197 job: Job,
198 ) -> Result<Job::Output, anyhow::Error>
199 where
200 Job: RunnableWithStore,
201 {
202 match self {
203 StoreConfig::Memory { .. } => {
204 Err(anyhow!("Cannot run admin operations on the memory store"))
205 }
206 #[cfg(feature = "storage-service")]
207 StoreConfig::StorageService { config, namespace } => Ok(job
208 .run::<StorageServiceDatabase>(config, namespace, cache_sizes)
209 .await?),
210 #[cfg(feature = "rocksdb")]
211 StoreConfig::RocksDb { config, namespace } => Ok(job
212 .run::<RocksDbDatabase>(config, namespace, cache_sizes)
213 .await?),
214 #[cfg(feature = "scylladb")]
215 StoreConfig::ScyllaDb { config, namespace } => Ok(job
216 .run::<ScyllaDbDatabase>(config, namespace, cache_sizes)
217 .await?),
218 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
219 StoreConfig::DualRocksDbScyllaDb { config, namespace } => Ok(job
220 .run::<DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>>(
221 config,
222 namespace,
223 cache_sizes,
224 )
225 .await?),
226 }
227 }
228}
229
230pub struct StorageMigration;
232
233#[async_trait]
234impl RunnableWithStore for StorageMigration {
235 type Output = ();
236
237 async fn run<D>(
238 self,
239 _config: D::Config,
240 _namespace: String,
241 _cache_sizes: StorageCacheConfig,
242 ) -> Result<Self::Output, anyhow::Error>
243 where
244 D: KeyValueDatabase + Clone + Send + Sync + 'static,
245 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
246 D::Error: Send + Sync,
247 {
248 Ok(())
250 }
251}
252
253pub struct AssertStorageV1;
255
256#[async_trait]
257impl RunnableWithStore for AssertStorageV1 {
258 type Output = ();
259
260 async fn run<D>(
261 self,
262 _config: D::Config,
263 _namespace: String,
264 _cache_sizes: StorageCacheConfig,
265 ) -> Result<Self::Output, anyhow::Error>
266 where
267 D: KeyValueDatabase + Clone + Send + Sync + 'static,
268 D::Store: KeyValueStore + Clone + Send + Sync + 'static,
269 D::Error: Send + Sync,
270 {
271 Ok(())
273 }
274}