1#![allow(clippy::missing_panics_doc)]
2
3use std::{
15 pin::Pin,
16 task::{Context, Poll},
17};
18
19use futures::{
20 channel::{mpsc, oneshot},
21 future::FutureExt as _,
22 task::LocalFutureObj,
23};
24
25#[derive(Debug, thiserror::Error)]
27#[non_exhaustive]
28pub enum Error {
29 #[error("thread killed before task completed")]
30 Killed(#[from] oneshot::Canceled),
31}
32
33pub type Result<T, E = Error> = std::result::Result<T, E>;
35
36pub struct Thread {
38 sender: mpsc::UnboundedSender<Request>,
39}
40
41type Request = Box<dyn FnOnce() -> LocalFutureObj<'static, ()> + Send>;
42
43pub struct Task<T> {
46 receiver: oneshot::Receiver<T>,
47}
48
49pub struct SendTask<T>(Task<T>);
52
53impl<T: Send> Future for SendTask<T> {
54 type Output = Result<T>;
55
56 fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
57 self.0.poll_unpin(context)
58 }
59}
60
61impl<T> Future for Task<T> {
62 type Output = Result<T>;
63
64 fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
65 self.receiver
66 .poll_unpin(context)
67 .map(|ready| ready.map_err(Into::into))
68 }
69}
70
71impl Thread {
72 #[must_use]
74 pub fn new() -> Self {
75 let (sender, mut receiver) = mpsc::unbounded::<Request>();
76 std::thread::spawn(|| {
77 use futures::{StreamExt as _, executor::LocalPool, task::LocalSpawn as _};
78 let mut executor = LocalPool::new();
79 let spawner = executor.spawner();
80 executor.run_until(async move {
81 while let Some(task) = receiver.next().await {
82 spawner
83 .spawn_local_obj(task())
84 .expect("executor should exist until destroyed");
85 }
86 });
87 });
88 Self { sender }
89 }
90
91 pub fn run<Context: Post, F: Future<Output: Post> + 'static>(
96 &self,
97 context: Context,
98 code: impl FnOnce(Context) -> F + Send + 'static,
99 ) -> Task<F::Output> {
100 let (sender, receiver) = oneshot::channel::<F::Output>();
101 self.sender
102 .unbounded_send(Box::new(move || {
103 Box::new(async move {
104 let _ = sender.send(code(context).await);
105 })
106 .into()
107 }))
108 .unwrap_or_else(|_| panic!("worker shouldn't die unless dropped"));
109 Task { receiver }
110 }
111
112 pub fn run_send<Context: Post, F: Future<Output: Send> + 'static>(
117 &self,
118 context: Context,
119 code: impl FnOnce(Context) -> F + Send + 'static,
120 ) -> SendTask<F::Output> {
121 SendTask(self.run(context, code))
122 }
123}
124
125impl Default for Thread {
126 fn default() -> Self {
127 Self::new()
128 }
129}
130
131pub trait Post: Send + 'static {}
136impl<T: Send + 'static> Post for T {}
137
138#[test]
139fn basic_functionality() {
140 assert_eq!(
141 8u8,
142 futures::executor::LocalPool::new()
143 .run_until(
144 Thread::new()
145 .unwrap()
146 .run(3u8, |three| async move { three + 5 })
147 )
148 .unwrap(),
149 );
150}