linera_base/task.rs
1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4/*!
5Abstractions over tasks that can be used natively or on the Web.
6 */
7
8use futures::{future, Future, FutureExt as _};
9
10/// `Send` on native targets; no bound on web (where there's only one thread).
11///
12/// Use this in generic bounds that need `Send` on native but should compile on
13/// web without the bound. Combined with [`run_detached`], this lets a single
14/// function body support both targets.
15#[cfg(not(web))]
16pub trait MaybeSend: Send {}
17#[cfg(not(web))]
18impl<T: Send> MaybeSend for T {}
19
20/// `Sync` on native targets; no bound on web (where there's only one thread).
21///
22/// Use this in generic bounds that need `Sync` on native but should compile on
23/// web without the bound.
24#[cfg(not(web))]
25pub trait MaybeSync: Sync {}
26#[cfg(not(web))]
27impl<T: Sync> MaybeSync for T {}
28
29/// `Send` on native targets; no bound on web (where there's only one thread).
30#[cfg(web)]
31pub trait MaybeSend {}
32#[cfg(web)]
33impl<T> MaybeSend for T {}
34
35/// `Sync` on native targets; no bound on web (where there's only one thread).
36#[cfg(web)]
37pub trait MaybeSync {}
38#[cfg(web)]
39impl<T> MaybeSync for T {}
40
41/// Spawns `future` on the runtime and awaits its completion.
42///
43/// Dropping the returned future does *not* cancel the spawned task — it runs
44/// to completion in the background. Use this when the spawned work (e.g. a
45/// storage write paired with its in-memory finalization) must not be torn
46/// apart mid-flight by caller cancellation.
47pub async fn run_detached<F, R>(future: F) -> R
48where
49 F: Future<Output = R> + MaybeSend + 'static,
50 R: MaybeSend + 'static,
51{
52 // On native, `tokio::task::spawn` returns a `JoinHandle` that already
53 // detaches on drop. On web, `wasm_bindgen_futures::spawn_local` is
54 // fire-and-forget, so we deliver the output through a oneshot channel.
55 #[cfg(not(web))]
56 {
57 tokio::task::spawn(future)
58 .await
59 .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()))
60 }
61 #[cfg(web)]
62 {
63 let (tx, rx) = futures::channel::oneshot::channel();
64 wasm_bindgen_futures::spawn_local(async move {
65 if tx.send(future.await).is_err() {
66 tracing::debug!("run_detached: receiver dropped before result was delivered");
67 }
68 });
69 rx.await
70 .expect("spawned task dropped without sending its result")
71 }
72}
73
74/// The type of a future awaiting another task.
75///
76/// On drop, the remote task will be asynchronously cancelled, but will remain
77/// alive until it reaches a yield point.
78///
79/// To wait for the task to be fully cancelled, use [`Task::cancel`].
80pub struct Task<R> {
81 abort_handle: future::AbortHandle,
82 output: future::RemoteHandle<Result<R, future::Aborted>>,
83}
84
85impl<R: 'static> Task<R> {
86 fn spawn_<F: Future<Output = R>, T>(
87 future: F,
88 spawn: impl FnOnce(future::Remote<future::Abortable<F>>) -> T,
89 ) -> Self {
90 let (abortable_future, abort_handle) = future::abortable(future);
91 let (task, output) = abortable_future.remote_handle();
92 spawn(task);
93 Self {
94 abort_handle,
95 output,
96 }
97 }
98
99 /// Spawns a new task, potentially on the current thread.
100 #[cfg(not(web))]
101 pub fn spawn<F: Future<Output = R> + Send + 'static>(future: F) -> Self
102 where
103 R: Send,
104 {
105 Self::spawn_(future, tokio::task::spawn)
106 }
107
108 /// Spawns a new task on the current thread.
109 #[cfg(web)]
110 pub fn spawn<F: Future<Output = R> + 'static>(future: F) -> Self {
111 Self::spawn_(future, wasm_bindgen_futures::spawn_local)
112 }
113
114 /// Creates a [`Task`] that is immediately ready.
115 pub fn ready(value: R) -> Self {
116 Self::spawn_(async { value }, |fut| {
117 fut.now_or_never().expect("the future is ready")
118 })
119 }
120
121 /// Cancels the task, resolving only when the wrapped future is completely dropped.
122 pub async fn cancel(self) {
123 self.abort_handle.abort();
124 // We just want to wait for the task to finish unwinding; an `Aborted` error is the expected outcome.
125 self.output.await.ok();
126 }
127
128 /// Forgets the task. The task will continue to run to completion in the
129 /// background, but will no longer be joinable or cancelable.
130 pub fn forget(self) {
131 self.output.forget();
132 }
133}
134
135impl<R: 'static> std::future::IntoFuture for Task<R> {
136 type Output = R;
137 type IntoFuture = future::Map<
138 future::RemoteHandle<Result<R, future::Aborted>>,
139 fn(Result<R, future::Aborted>) -> R,
140 >;
141
142 fn into_future(self) -> Self::IntoFuture {
143 self.output
144 .map(|result| result.expect("we have the only AbortHandle"))
145 }
146}