Skip to main content

linera_storage_runtime/
store_config.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::path::PathBuf;
5
6use anyhow::anyhow;
7use async_trait::async_trait;
8use linera_client::config::GenesisConfig;
9use linera_execution::WasmRuntime;
10use linera_storage::{DbStorage, Storage, StorageCacheConfig};
11#[cfg(feature = "storage-service")]
12use linera_storage_service::client::StorageServiceDatabase;
13#[cfg(feature = "rocksdb")]
14use linera_views::rocks_db::RocksDbDatabase;
15#[cfg(feature = "scylladb")]
16use linera_views::scylla_db::ScyllaDbDatabase;
17use linera_views::{
18    memory::MemoryDatabase,
19    store::{KeyValueDatabase, KeyValueStore},
20};
21use serde::{Deserialize, Serialize};
22#[cfg(all(feature = "rocksdb", feature = "scylladb"))]
23use {linera_storage::ChainStatesFirstAssignment, linera_views::backends::dual::DualDatabase};
24
25/// The configuration of the key value store in use.
26#[derive(Clone, Debug, Deserialize, Serialize)]
27pub enum StoreConfig {
28    /// The memory key value store
29    Memory {
30        /// The store configuration.
31        config: linera_views::memory::MemoryStoreConfig,
32        /// The namespace used.
33        namespace: String,
34        /// The path to the genesis configuration used to initialize the store.
35        genesis_path: PathBuf,
36    },
37    /// The storage service key-value store
38    #[cfg(feature = "storage-service")]
39    StorageService {
40        /// The store configuration.
41        config: linera_storage_service::common::StorageServiceStoreConfig,
42        /// The namespace used.
43        namespace: String,
44    },
45    /// The RocksDB key value store
46    #[cfg(feature = "rocksdb")]
47    RocksDb {
48        /// The store configuration.
49        config: linera_views::rocks_db::RocksDbStoreConfig,
50        /// The namespace used.
51        namespace: String,
52    },
53    /// The ScyllaDB key value store
54    #[cfg(feature = "scylladb")]
55    ScyllaDb {
56        /// The store configuration.
57        config: linera_views::scylla_db::ScyllaDbStoreConfig,
58        /// The namespace used.
59        namespace: String,
60    },
61    /// The dual RocksDB / ScyllaDB key value store
62    #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
63    DualRocksDbScyllaDb {
64        /// The store configuration.
65        config: linera_views::backends::dual::DualStoreConfig<
66            linera_views::rocks_db::RocksDbStoreConfig,
67            linera_views::scylla_db::ScyllaDbStoreConfig,
68        >,
69        /// The namespace used.
70        namespace: String,
71    },
72}
73
74/// A job that can be run against a high-level [`Storage`].
75#[async_trait]
76pub trait Runnable {
77    /// The type produced by running the job.
78    type Output;
79
80    /// Runs the job against the given `storage`.
81    async fn run<S>(self, storage: S) -> Self::Output
82    where
83        S: Storage + Clone + Send + Sync + 'static;
84}
85
86/// A job that can be run against a low-level key-value store.
87#[async_trait]
88pub trait RunnableWithStore {
89    /// The type produced by running the job.
90    type Output;
91
92    /// Runs the job against a store built from the given configuration.
93    async fn run<D>(
94        self,
95        config: D::Config,
96        namespace: String,
97        cache_sizes: StorageCacheConfig,
98    ) -> Result<Self::Output, anyhow::Error>
99    where
100        D: KeyValueDatabase + Clone + Send + Sync + 'static,
101        D::Store: KeyValueStore + Clone + Send + Sync + 'static,
102        D::Error: Send + Sync;
103}
104
105/// Reads a JSON value from a file at the given path.
106fn read_json<T: serde::de::DeserializeOwned>(path: impl Into<PathBuf>) -> anyhow::Result<T> {
107    Ok(serde_json::from_reader(fs_err::File::open(path.into())?)?)
108}
109
110impl StoreConfig {
111    /// Connects to the configured storage and runs the given [`Runnable`] job against it.
112    pub async fn run_with_storage<Job>(
113        self,
114        wasm_runtime: Option<WasmRuntime>,
115        allow_application_logs: bool,
116        cache_sizes: StorageCacheConfig,
117        job: Job,
118    ) -> Result<Job::Output, anyhow::Error>
119    where
120        Job: Runnable,
121    {
122        match self {
123            StoreConfig::Memory {
124                config,
125                namespace,
126                genesis_path,
127            } => {
128                let mut storage = DbStorage::<MemoryDatabase, _>::maybe_create_and_connect(
129                    &config,
130                    &namespace,
131                    wasm_runtime,
132                    cache_sizes,
133                )
134                .await?
135                .with_allow_application_logs(allow_application_logs);
136                let genesis_config = read_json::<GenesisConfig>(genesis_path)?;
137                // Memory storage must be initialized every time.
138                genesis_config.initialize_storage(&mut storage).await?;
139                Ok(job.run(storage).await)
140            }
141            #[cfg(feature = "storage-service")]
142            StoreConfig::StorageService { config, namespace } => {
143                let storage = DbStorage::<StorageServiceDatabase, _>::connect(
144                    &config,
145                    &namespace,
146                    wasm_runtime,
147                    cache_sizes,
148                )
149                .await?
150                .with_allow_application_logs(allow_application_logs);
151                Ok(job.run(storage).await)
152            }
153            #[cfg(feature = "rocksdb")]
154            StoreConfig::RocksDb { config, namespace } => {
155                let storage = DbStorage::<RocksDbDatabase, _>::connect(
156                    &config,
157                    &namespace,
158                    wasm_runtime,
159                    cache_sizes,
160                )
161                .await?
162                .with_allow_application_logs(allow_application_logs);
163                Ok(job.run(storage).await)
164            }
165            #[cfg(feature = "scylladb")]
166            StoreConfig::ScyllaDb { config, namespace } => {
167                let storage = DbStorage::<ScyllaDbDatabase, _>::connect(
168                    &config,
169                    &namespace,
170                    wasm_runtime,
171                    cache_sizes,
172                )
173                .await?
174                .with_allow_application_logs(allow_application_logs);
175                Ok(job.run(storage).await)
176            }
177            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
178            StoreConfig::DualRocksDbScyllaDb { config, namespace } => {
179                let storage =
180                    DbStorage::<
181                        DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>,
182                        _,
183                    >::connect(&config, &namespace, wasm_runtime, cache_sizes)
184                    .await?
185                    .with_allow_application_logs(allow_application_logs);
186                Ok(job.run(storage).await)
187            }
188        }
189    }
190
191    /// Connects to the configured key-value store and runs the given
192    /// [`RunnableWithStore`] job against it.
193    #[allow(unused_variables)]
194    pub async fn run_with_store<Job>(
195        self,
196        cache_sizes: StorageCacheConfig,
197        job: Job,
198    ) -> Result<Job::Output, anyhow::Error>
199    where
200        Job: RunnableWithStore,
201    {
202        match self {
203            StoreConfig::Memory { .. } => {
204                Err(anyhow!("Cannot run admin operations on the memory store"))
205            }
206            #[cfg(feature = "storage-service")]
207            StoreConfig::StorageService { config, namespace } => Ok(job
208                .run::<StorageServiceDatabase>(config, namespace, cache_sizes)
209                .await?),
210            #[cfg(feature = "rocksdb")]
211            StoreConfig::RocksDb { config, namespace } => Ok(job
212                .run::<RocksDbDatabase>(config, namespace, cache_sizes)
213                .await?),
214            #[cfg(feature = "scylladb")]
215            StoreConfig::ScyllaDb { config, namespace } => Ok(job
216                .run::<ScyllaDbDatabase>(config, namespace, cache_sizes)
217                .await?),
218            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
219            StoreConfig::DualRocksDbScyllaDb { config, namespace } => Ok(job
220                .run::<DualDatabase<RocksDbDatabase, ScyllaDbDatabase, ChainStatesFirstAssignment>>(
221                    config,
222                    namespace,
223                    cache_sizes,
224                )
225                .await?),
226        }
227    }
228}
229
230/// A [`RunnableWithStore`] job that migrates the storage schema.
231pub struct StorageMigration;
232
233#[async_trait]
234impl RunnableWithStore for StorageMigration {
235    type Output = ();
236
237    async fn run<D>(
238        self,
239        _config: D::Config,
240        _namespace: String,
241        _cache_sizes: StorageCacheConfig,
242    ) -> Result<Self::Output, anyhow::Error>
243    where
244        D: KeyValueDatabase + Clone + Send + Sync + 'static,
245        D::Store: KeyValueStore + Clone + Send + Sync + 'static,
246        D::Error: Send + Sync,
247    {
248        // Storage migration is not yet available on main.
249        Ok(())
250    }
251}
252
253/// A [`RunnableWithStore`] job that asserts the storage is at schema version 1.
254pub struct AssertStorageV1;
255
256#[async_trait]
257impl RunnableWithStore for AssertStorageV1 {
258    type Output = ();
259
260    async fn run<D>(
261        self,
262        _config: D::Config,
263        _namespace: String,
264        _cache_sizes: StorageCacheConfig,
265    ) -> Result<Self::Output, anyhow::Error>
266    where
267        D: KeyValueDatabase + Clone + Send + Sync + 'static,
268        D::Store: KeyValueStore + Clone + Send + Sync + 'static,
269        D::Error: Send + Sync,
270    {
271        // Storage migration assertion is not yet available on main.
272        Ok(())
273    }
274}