linera_base/
task.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4/*!
5Abstractions over tasks that can be used natively or on the Web.
6 */
7
8use futures::{future, Future, FutureExt as _};
9
10/// `Send` on native targets; no bound on web (where there's only one thread).
11///
12/// Use this in generic bounds that need `Send` on native but should compile on
13/// web without the bound. Combined with [`run_detached`], this lets a single
14/// function body support both targets.
15#[cfg(not(web))]
16pub trait MaybeSend: Send {}
17#[cfg(not(web))]
18impl<T: Send> MaybeSend for T {}
19
20/// `Sync` on native targets; no bound on web (where there's only one thread).
21///
22/// Use this in generic bounds that need `Sync` on native but should compile on
23/// web without the bound.
24#[cfg(not(web))]
25pub trait MaybeSync: Sync {}
26#[cfg(not(web))]
27impl<T: Sync> MaybeSync for T {}
28
29/// `Send` on native targets; no bound on web (where there's only one thread).
30#[cfg(web)]
31pub trait MaybeSend {}
32#[cfg(web)]
33impl<T> MaybeSend for T {}
34
35/// `Sync` on native targets; no bound on web (where there's only one thread).
36#[cfg(web)]
37pub trait MaybeSync {}
38#[cfg(web)]
39impl<T> MaybeSync for T {}
40
41/// Spawns `future` on the runtime and awaits its completion.
42///
43/// Dropping the returned future does *not* cancel the spawned task — it runs
44/// to completion in the background. Use this when the spawned work (e.g. a
45/// storage write paired with its in-memory finalization) must not be torn
46/// apart mid-flight by caller cancellation.
47pub async fn run_detached<F, R>(future: F) -> R
48where
49    F: Future<Output = R> + MaybeSend + 'static,
50    R: MaybeSend + 'static,
51{
52    // On native, `tokio::task::spawn` returns a `JoinHandle` that already
53    // detaches on drop. On web, `wasm_bindgen_futures::spawn_local` is
54    // fire-and-forget, so we deliver the output through a oneshot channel.
55    #[cfg(not(web))]
56    {
57        tokio::task::spawn(future)
58            .await
59            .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()))
60    }
61    #[cfg(web)]
62    {
63        let (tx, rx) = futures::channel::oneshot::channel();
64        wasm_bindgen_futures::spawn_local(async move {
65            if tx.send(future.await).is_err() {
66                tracing::debug!("run_detached: receiver dropped before result was delivered");
67            }
68        });
69        rx.await
70            .expect("spawned task dropped without sending its result")
71    }
72}
73
74/// The type of a future awaiting another task.
75///
76/// On drop, the remote task will be asynchronously cancelled, but will remain
77/// alive until it reaches a yield point.
78///
79/// To wait for the task to be fully cancelled, use [`Task::cancel`].
80pub struct Task<R> {
81    abort_handle: future::AbortHandle,
82    output: future::RemoteHandle<Result<R, future::Aborted>>,
83}
84
85impl<R: 'static> Task<R> {
86    fn spawn_<F: Future<Output = R>, T>(
87        future: F,
88        spawn: impl FnOnce(future::Remote<future::Abortable<F>>) -> T,
89    ) -> Self {
90        let (abortable_future, abort_handle) = future::abortable(future);
91        let (task, output) = abortable_future.remote_handle();
92        spawn(task);
93        Self {
94            abort_handle,
95            output,
96        }
97    }
98
99    /// Spawns a new task, potentially on the current thread.
100    #[cfg(not(web))]
101    pub fn spawn<F: Future<Output = R> + Send + 'static>(future: F) -> Self
102    where
103        R: Send,
104    {
105        Self::spawn_(future, tokio::task::spawn)
106    }
107
108    /// Spawns a new task on the current thread.
109    #[cfg(web)]
110    pub fn spawn<F: Future<Output = R> + 'static>(future: F) -> Self {
111        Self::spawn_(future, wasm_bindgen_futures::spawn_local)
112    }
113
114    /// Creates a [`Task`] that is immediately ready.
115    pub fn ready(value: R) -> Self {
116        Self::spawn_(async { value }, |fut| {
117            fut.now_or_never().expect("the future is ready")
118        })
119    }
120
121    /// Cancels the task, resolving only when the wrapped future is completely dropped.
122    pub async fn cancel(self) {
123        self.abort_handle.abort();
124        // We just want to wait for the task to finish unwinding; an `Aborted` error is the expected outcome.
125        self.output.await.ok();
126    }
127
128    /// Forgets the task. The task will continue to run to completion in the
129    /// background, but will no longer be joinable or cancelable.
130    pub fn forget(self) {
131        self.output.forget();
132    }
133}
134
135impl<R: 'static> std::future::IntoFuture for Task<R> {
136    type Output = R;
137    type IntoFuture = future::Map<
138        future::RemoteHandle<Result<R, future::Aborted>>,
139        fn(Result<R, future::Aborted>) -> R,
140    >;
141
142    fn into_future(self) -> Self::IntoFuture {
143        self.output
144            .map(|result| result.expect("we have the only AbortHandle"))
145    }
146}