Skip to main content

linera_persistent/
file.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! A [`Persist`] backend that atomically saves the value to a locked file on disk.
5
6use std::{
7    io::{self, BufRead as _, Write as _},
8    path::Path,
9};
10
11use fs4::FileExt;
12use thiserror_context::Context;
13
14use super::Persist;
15
16/// A guard that keeps an exclusive lock on a file.
17struct Lock(fs_err::File);
18
19#[derive(Debug, thiserror::Error)]
20enum ErrorInner {
21    #[error("I/O error: {0}")]
22    IoError(#[from] std::io::Error),
23    #[error("JSON error: {0}")]
24    JsonError(#[from] serde_json::Error),
25}
26
27pub use error::Error;
28
29mod error {
30    // `impl_context!` generates a public `Error` newtype (with accessors) that cannot carry
31    // doc comments, so this wrapper module is exempted from the crate's `missing_docs` policy.
32    // `expect` (rather than `allow`) flags this if the macro ever stops generating such items.
33    #![expect(missing_docs)]
34
35    use thiserror_context::Context;
36
37    use super::ErrorInner;
38
39    thiserror_context::impl_context!(Error(ErrorInner));
40}
41
42/// Utility: run a fallible cleanup function if an operation failed, attaching the
43/// original operation as context to its error.
44trait CleanupExt {
45    type Ok;
46    type Error;
47
48    fn or_cleanup<E>(self, f: impl FnOnce() -> Result<(), E>) -> Result<Self::Ok, Self::Error>
49    where
50        E: Into<Self::Error>,
51        Result<(), E>: Context<Self::Error, Self::Ok, E>;
52}
53
54impl<T, W> CleanupExt for Result<T, W>
55where
56    W: std::fmt::Display + Send + Sync + 'static,
57{
58    type Ok = T;
59    type Error = W;
60
61    fn or_cleanup<E>(self, cleanup: impl FnOnce() -> Result<(), E>) -> Self
62    where
63        E: Into<W>,
64        Result<(), E>: Context<W, T, E>,
65    {
66        self.or_else(|error| {
67            if let Err(cleanup_error) = cleanup() {
68                Err(cleanup_error).context(error)
69            } else {
70                Err(error)
71            }
72        })
73    }
74}
75
76impl Lock {
77    /// Acquires an exclusive lock on a provided `file`, returning a [`Lock`] which will
78    /// release the lock when dropped.
79    pub fn new(file: fs_err::File) -> std::io::Result<Self> {
80        file.file().try_lock_exclusive()?;
81        Ok(Lock(file))
82    }
83}
84
85impl Drop for Lock {
86    fn drop(&mut self) {
87        if let Err(error) = FileExt::unlock(self.0.file()) {
88            tracing::warn!("Failed to unlock wallet file: {error}");
89        }
90    }
91}
92
93/// An implementation of [`Persist`] based on an atomically-updated file at a given path.
94/// An exclusive lock is taken using `flock(2)` to ensure that concurrent updates cannot
95/// happen, and writes are saved to a staging file before being moved over the old file,
96/// an operation that is atomic on all Unixes.
97pub struct File<T> {
98    _lock: Lock,
99    path: std::path::PathBuf,
100    value: T,
101}
102
103impl<T> std::ops::Deref for File<T> {
104    type Target = T;
105    fn deref(&self) -> &T {
106        &self.value
107    }
108}
109
110impl<T> std::ops::DerefMut for File<T> {
111    fn deref_mut(&mut self) -> &mut T {
112        &mut self.value
113    }
114}
115
116/// Returns options for opening and writing to the file, creating it if it doesn't
117/// exist. On Unix, this restricts read and write permissions to the current user.
118// TODO(#1924): Implement better key management.
119// BUG(#2053): Use a separate lock file per staging file.
120fn open_options() -> fs_err::OpenOptions {
121    let mut options = fs_err::OpenOptions::new();
122    #[cfg(target_family = "unix")]
123    fs_err::os::unix::fs::OpenOptionsExt::mode(&mut options, 0o600);
124    options.create(true).read(true).write(true);
125    options
126}
127
128impl<T: serde::Serialize + serde::de::DeserializeOwned> File<T> {
129    /// Creates a new persistent file at `path` containing `value`.
130    pub fn new(path: &Path, value: T) -> Result<Self, Error> {
131        let this = Self {
132            _lock: Lock::new(
133                fs_err::OpenOptions::new()
134                    .read(true)
135                    .write(true)
136                    .create(true)
137                    .open(path)?,
138            )
139            .with_context(|| format!("locking path {}", path.display()))?,
140            path: path.into(),
141            value,
142        };
143        this.save()?;
144        Ok(this)
145    }
146
147    /// Reads the value from a file at `path`, returning an error if it does not exist.
148    pub fn read(path: &Path) -> Result<Self, Error> {
149        Self::read_or_create(path, || {
150            Err(std::io::Error::new(
151                std::io::ErrorKind::NotFound,
152                format!("file is empty or does not exist: {}", path.display()),
153            )
154            .into())
155        })
156    }
157
158    /// Reads the value from a file at `path`, calling the `value` function to create it
159    /// if it does not exist. If it does exist, `value` will not be called.
160    pub fn read_or_create(
161        path: &Path,
162        value: impl FnOnce() -> Result<T, Error>,
163    ) -> Result<Self, Error> {
164        let lock = Lock::new(open_options().read(true).open(path)?)?;
165        let mut reader = io::BufReader::new(&lock.0);
166        let file_is_empty = reader.fill_buf()?.is_empty();
167
168        let me = Self {
169            value: if file_is_empty {
170                value()?
171            } else {
172                serde_json::from_reader(reader)?
173            },
174            path: path.into(),
175            _lock: lock,
176        };
177
178        me.save()?;
179
180        Ok(me)
181    }
182
183    /// Atomically writes the current value to the file, via a temporary staging file.
184    pub fn save(&self) -> Result<(), Error> {
185        let mut temp_file_path = self.path.clone();
186        temp_file_path.set_extension("json.new");
187        let temp_file = open_options().open(&temp_file_path)?;
188        let mut temp_file_writer = std::io::BufWriter::new(temp_file);
189
190        let remove_temp_file = || fs_err::remove_file(&temp_file_path);
191
192        serde_json::to_writer_pretty(&mut temp_file_writer, &self.value)
193            .map_err(Error::from)
194            .or_cleanup(remove_temp_file)?;
195        temp_file_writer
196            .flush()
197            .map_err(Error::from)
198            .or_cleanup(remove_temp_file)?;
199        drop(temp_file_writer);
200        fs_err::rename(&temp_file_path, &self.path)?;
201        Ok(())
202    }
203}
204
205impl<T: serde::Serialize + serde::de::DeserializeOwned + Send> Persist for File<T> {
206    type Error = Error;
207
208    fn as_mut(&mut self) -> &mut T {
209        &mut self.value
210    }
211
212    /// Writes the value to disk.
213    ///
214    /// The contents of the file need to be over-written completely, so
215    /// a temporary file is created as a backup in case a crash occurs while
216    /// writing to disk.
217    ///
218    /// The temporary file is then renamed to the original filename. If
219    /// serialization or writing to disk fails, the temporary file is
220    /// deleted.
221    fn persist(&mut self) -> impl std::future::Future<Output = Result<(), Error>> {
222        let result = self.save();
223        async { result }
224    }
225
226    /// Takes the value out, releasing the lock on the persistent file.
227    fn into_value(self) -> T {
228        self.value
229    }
230}