web_thread_shim/
lib.rs

1#![allow(clippy::missing_panics_doc)]
2
3/*!
4# `web-thread-shim`
5
6This crate mimics the public API of `web-thread`, but using native
7futures and channels, to be substituted in when conditionally
8compiling cross-platform software.
9
10If you aren't using `web-thread`, you probably don't want this crate!
11Just use `std::thread`.
12 */
13
14use std::{
15    pin::Pin,
16    task::{Context, Poll},
17};
18
19use futures::{
20    channel::{mpsc, oneshot},
21    future::FutureExt as _,
22    task::LocalFutureObj,
23};
24
25/// The type of errors that may arise from operations in this crate.
26#[derive(Debug, thiserror::Error)]
27#[non_exhaustive]
28pub enum Error {
29    #[error("thread killed before task completed")]
30    Killed(#[from] oneshot::Canceled),
31}
32
33/// Convenience alias for `Result<T, Error>`.
34pub type Result<T, E = Error> = std::result::Result<T, E>;
35
36/// A thread running a local future executor ([`futures::executor::LocalPool`]).
37pub struct Thread {
38    sender: mpsc::UnboundedSender<Request>,
39}
40
41type Request = Box<dyn FnOnce() -> LocalFutureObj<'static, ()> + Send>;
42
43/// A task that's been spawned on a [`Thread`] that should eventually
44/// compute a `T`.
45pub struct Task<T> {
46    receiver: oneshot::Receiver<T>,
47}
48
49/// A [`Task`] with a `Send` output.
50/// See [`Task::run_send`] for usage.
51pub struct SendTask<T>(Task<T>);
52
53impl<T: Send> Future for SendTask<T> {
54    type Output = Result<T>;
55
56    fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
57        self.0.poll_unpin(context)
58    }
59}
60
61impl<T> Future for Task<T> {
62    type Output = Result<T>;
63
64    fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
65        self.receiver
66            .poll_unpin(context)
67            .map(|ready| ready.map_err(Into::into))
68    }
69}
70
71impl Thread {
72    /// Create a new background thread to run tasks.
73    #[must_use]
74    pub fn new() -> Self {
75        let (sender, mut receiver) = mpsc::unbounded::<Request>();
76        std::thread::spawn(|| {
77            use futures::{StreamExt as _, executor::LocalPool, task::LocalSpawn as _};
78            let mut executor = LocalPool::new();
79            let spawner = executor.spawner();
80            executor.run_until(async move {
81                while let Some(task) = receiver.next().await {
82                    spawner
83                        .spawn_local_obj(task())
84                        .expect("executor should exist until destroyed");
85                }
86            });
87        });
88        Self { sender }
89    }
90
91    /// Execute a function on a thread.
92    ///
93    /// The function will begin executing immediately.  The resulting
94    /// [`Task`] can be awaited to retrieve the result.
95    pub fn run<Context: Post, F: Future<Output: Post> + 'static>(
96        &self,
97        context: Context,
98        code: impl FnOnce(Context) -> F + Send + 'static,
99    ) -> Task<F::Output> {
100        let (sender, receiver) = oneshot::channel::<F::Output>();
101        self.sender
102            .unbounded_send(Box::new(move || {
103                Box::new(async move {
104                    let _ = sender.send(code(context).await);
105                })
106                .into()
107            }))
108            .unwrap_or_else(|_| panic!("worker shouldn't die unless dropped"));
109        Task { receiver }
110    }
111
112    /// Like [`Thread::run`], but the output can be sent through Rust
113    /// memory without `Post`ing.
114    ///
115    /// In this shim, this is equivalent to [`Thread::run`].
116    pub fn run_send<Context: Post, F: Future<Output: Send> + 'static>(
117        &self,
118        context: Context,
119        code: impl FnOnce(Context) -> F + Send + 'static,
120    ) -> SendTask<F::Output> {
121        SendTask(self.run(context, code))
122    }
123}
124
125impl Default for Thread {
126    fn default() -> Self {
127        Self::new()
128    }
129}
130
131/// Types that can be sent to another thread.  In this shim, this
132/// trait is just an alias for `Send + 'static`, but in `web-thread`
133/// some types can be sent only by performing an explicit transfer
134/// operation.
135pub trait Post: Send + 'static {}
136impl<T: Send + 'static> Post for T {}
137
138#[test]
139fn basic_functionality() {
140    assert_eq!(
141        8u8,
142        futures::executor::LocalPool::new()
143            .run_until(
144                Thread::new()
145                    .unwrap()
146                    .run(3u8, |three| async move { three + 5 })
147            )
148            .unwrap(),
149    );
150}