oneshot/
lib.rs

1//! Oneshot spsc (single producer, single consumer) channel. Meaning each channel instance
2//! can only transport a single message. This has a few nice outcomes. One thing is that
3//! the implementation can be very efficient, utilizing the knowledge that there will
4//! only be one message. But more importantly, it allows the API to be expressed in such
5//! a way that certain edge cases that you don't want to care about when only sending a
6//! single message on a channel does not exist. For example: The sender can't be copied
7//! or cloned, and the send method takes ownership and consumes the sender.
8//! So you are guaranteed, at the type level, that there can only be one message sent.
9//!
10//! The sender's send method is non-blocking, and potentially lock- and wait-free.
11//! See documentation on [Sender::send] for situations where it might not be fully wait-free.
12//! The receiver supports both lock- and wait-free `try_recv` as well as indefinite and time
13//! limited thread blocking receive operations. The receiver also implements `Future` and
14//! supports asynchronously awaiting the message.
15//!
16//!
17//! # Examples
18//!
19//! This example sets up a background worker that processes requests coming in on a standard
20//! mpsc channel and replies on a oneshot channel provided with each request. The worker can
21//! be interacted with both from sync and async contexts since the oneshot receiver
22//! can receive both blocking and async.
23//!
24//! ```rust
25//! # #[cfg(not(feature = "loom"))] {
26//! use std::sync::mpsc;
27//! use std::thread;
28//! use std::time::Duration;
29//!
30//! type Request = String;
31//!
32//! // Starts a background thread performing some computation on requests sent to it.
33//! // Delivers the response back over a oneshot channel.
34//! fn spawn_processing_thread() -> mpsc::Sender<(Request, oneshot::Sender<usize>)> {
35//!     let (request_sender, request_receiver) = mpsc::channel::<(Request, oneshot::Sender<usize>)>();
36//!     thread::spawn(move || {
37//!         for (request_data, response_sender) in request_receiver.iter() {
38//!             let compute_operation = || request_data.len();
39//!             let _ = response_sender.send(compute_operation()); // <- Send on the oneshot channel
40//!         }
41//!     });
42//!     request_sender
43//! }
44//!
45//! let processor = spawn_processing_thread();
46//!
47//! // If compiled with `std` the library can receive messages with timeout on regular threads
48//! #[cfg(feature = "std")] {
49//!     let (response_sender, response_receiver) = oneshot::channel();
50//!     let request = Request::from("data from sync thread");
51//!
52//!     processor.send((request, response_sender)).expect("Processor down");
53//!     match response_receiver.recv_timeout(Duration::from_secs(1)) { // <- Receive on the oneshot channel
54//!         Ok(result) => println!("Processor returned {}", result),
55//!         Err(oneshot::RecvTimeoutError::Timeout) => eprintln!("Processor was too slow"),
56//!         Err(oneshot::RecvTimeoutError::Disconnected) => panic!("Processor exited"),
57//!     }
58//! }
59//!
60//! // If compiled with the `async` feature, the `Receiver` can be awaited in an async context
61//! #[cfg(feature = "async")] {
62//!     tokio::runtime::Runtime::new()
63//!         .unwrap()
64//!         .block_on(async move {
65//!             let (response_sender, response_receiver) = oneshot::channel();
66//!             let request = Request::from("data from sync thread");
67//!
68//!             processor.send((request, response_sender)).expect("Processor down");
69//!             match response_receiver.await { // <- Receive on the oneshot channel asynchronously
70//!                 Ok(result) => println!("Processor returned {}", result),
71//!                 Err(_e) => panic!("Processor exited"),
72//!             }
73//!         });
74//! }
75//! # }
76//! ```
77//!
78//! # Sync vs async
79//!
80//! The main motivation for writing this library was that there were no (known to me) channel
81//! implementations allowing you to seamlessly send messages between a normal thread and an async
82//! task, or the other way around. If message passing is the way you are communicating, of course
83//! that should work smoothly between the sync and async parts of the program!
84//!
85//! This library achieves that by having a fast and cheap send operation that can
86//! be used in both sync threads and async tasks. The receiver has both thread blocking
87//! receive methods for synchronous usage, and implements `Future` for asynchronous usage.
88//!
89//! The receiving endpoint of this channel implements Rust's `Future` trait and can be waited on
90//! in an asynchronous task. This implementation is completely executor/runtime agnostic. It should
91//! be possible to use this library with any executor, or even pass messages between tasks running
92//! in different executors.
93//!
94
95// # Implementation description
96//
97// When a channel is created via the `channel` function, it creates a single heap allocation
98// containing:
99// * A one byte atomic integer that represents the current channel state,
100// * Uninitialized memory to fit the message,
101// * Uninitialized memory to fit the waker that can wake the receiving task or thread up.
102//
103// The size of the waker depends on which features are activated, it ranges from 0 to 24 bytes[1].
104// So with all features enabled each channel allocates 25 bytes plus the size of the
105// message, plus any padding needed to get correct memory alignment.
106//
107// The Sender and Receiver only holds a raw pointer to the heap channel object. The last endpoint
108// to be consumed or dropped is responsible for freeing the heap memory. The first endpoint to
109// be consumed or dropped signal via the state that it is gone. And the second one see this and
110// frees the memory.
111//
112// ## Footnotes
113//
114// [1]: Mind that the waker only takes zero bytes when all features are disabled, making it
115//      impossible to *wait* for the message. `try_recv` is the only available method in this scenario.
116
117#![deny(rust_2018_idioms)]
118#![cfg_attr(not(feature = "std"), no_std)]
119// Enables this nightly only feature for the documentation build on docs.rs.
120// To test this locally, build the docs with:
121// `RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --all-features`
122#![cfg_attr(docsrs, feature(doc_cfg))]
123#![cfg_attr(docsrs, feature(doc_auto_cfg))]
124
125#[cfg(not(oneshot_loom))]
126extern crate alloc;
127
128use core::{
129    marker::PhantomData,
130    mem::{self, MaybeUninit},
131    ptr::{self, NonNull},
132};
133
134#[cfg(not(oneshot_loom))]
135use core::{
136    cell::UnsafeCell,
137    sync::atomic::{fence, AtomicU8, Ordering::*},
138};
139#[cfg(oneshot_loom)]
140use loom::{
141    cell::UnsafeCell,
142    sync::atomic::{fence, AtomicU8, Ordering::*},
143};
144
145#[cfg(all(any(feature = "std", feature = "async"), not(oneshot_loom)))]
146use core::hint;
147#[cfg(all(any(feature = "std", feature = "async"), oneshot_loom))]
148use loom::hint;
149
150#[cfg(feature = "async")]
151use core::{
152    pin::Pin,
153    task::{self, Poll},
154};
155#[cfg(feature = "std")]
156use std::time::{Duration, Instant};
157
158#[cfg(feature = "std")]
159mod thread {
160    #[cfg(not(oneshot_loom))]
161    pub use std::thread::{current, park, park_timeout, Thread};
162
163    #[cfg(oneshot_loom)]
164    pub use loom::thread::{current, park, Thread};
165
166    // loom does not support parking with a timeout. So we just
167    // yield. This means that the "park" will "spuriously" wake up
168    // way too early. But the code should properly handle this.
169    // One thing to note is that very short timeouts are needed
170    // when using loom, since otherwise the looping will cause
171    // an overflow in loom.
172    #[cfg(oneshot_loom)]
173    pub fn park_timeout(_timeout: std::time::Duration) {
174        loom::thread::yield_now()
175    }
176}
177
178#[cfg(oneshot_loom)]
179mod loombox;
180#[cfg(not(oneshot_loom))]
181use alloc::boxed::Box;
182#[cfg(oneshot_loom)]
183use loombox::Box;
184
185mod errors;
186// Wildcard imports are not nice. But since multiple errors have various conditional compilation,
187// this is easier than doing three different imports.
188pub use errors::*;
189
190/// Creates a new oneshot channel and returns the two endpoints, [`Sender`] and [`Receiver`].
191pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
192    // Allocate the channel on the heap and get the pointer.
193    // The last endpoint of the channel to be alive is responsible for freeing the channel
194    // and dropping any object that might have been written to it.
195    let channel_ptr = NonNull::from(Box::leak(Box::new(Channel::new())));
196
197    (
198        Sender {
199            channel_ptr,
200            _invariant: PhantomData,
201        },
202        Receiver { channel_ptr },
203    )
204}
205
206/// Sending end of a oneshot channel.
207///
208/// Created and returned from the [`channel`] function.
209///
210/// Can be used to send a message to the corresponding [`Receiver`].
211#[derive(Debug)]
212pub struct Sender<T> {
213    channel_ptr: NonNull<Channel<T>>,
214    // In reality we want contravariance, however we can't obtain that.
215    //
216    // Consider the following scenario:
217    // ```
218    // let (mut tx, rx) = channel::<&'short u8>();
219    // let (tx2, rx2) = channel::<&'long u8>();
220    //
221    // tx = tx2;
222    //
223    // // Pretend short_ref is some &'short u8
224    // tx.send(short_ref).unwrap();
225    // let long_ref = rx2.recv().unwrap();
226    // ```
227    //
228    // If this type were covariant then we could safely extend lifetimes, which is not okay.
229    // Hence, we enforce invariance.
230    _invariant: PhantomData<fn(T) -> T>,
231}
232
233/// Receiving end of a oneshot channel.
234///
235/// Created and returned from the [`channel`] function.
236///
237/// Can be used to receive a message from the corresponding [`Sender`]. How the message
238/// can be received depends on what features are enabled.
239///
240/// This type implement [`IntoFuture`](core::future::IntoFuture) when the `async` feature is enabled.
241/// This allows awaiting it directly in an async context.
242#[derive(Debug)]
243pub struct Receiver<T> {
244    // Covariance is the right choice here. Consider the example presented in Sender, and you'll
245    // see that if we replaced `rx` instead then we would get the expected behavior
246    channel_ptr: NonNull<Channel<T>>,
247}
248
249unsafe impl<T: Send> Send for Sender<T> {}
250
251// SAFETY: The only methods that assumes there is only a single reference to the sender
252// takes `self` by value, guaranteeing that there is only one reference to the sender at
253// the time it is called.
254unsafe impl<T: Sync> Sync for Sender<T> {}
255
256unsafe impl<T: Send> Send for Receiver<T> {}
257impl<T> Unpin for Receiver<T> {}
258
259impl<T> Sender<T> {
260    /// Sends `message` over the channel to the corresponding [`Receiver`].
261    ///
262    /// Returns an error if the receiver has already been dropped. The message can
263    /// be extracted from the error.
264    ///
265    /// This method is lock-free and wait-free when sending on a channel that the
266    /// receiver is currently not receiving on. If the receiver is receiving during the send
267    /// operation this method includes waking up the thread/task. Unparking a thread involves
268    /// a mutex in Rust's standard library at the time of writing this.
269    /// How lock-free waking up an async task is
270    /// depends on your executor. If this method returns a `SendError`, please mind that dropping
271    /// the error involves running any drop implementation on the message type, and freeing the
272    /// channel's heap allocation, which might or might not be lock-free.
273    pub fn send(self, message: T) -> Result<(), SendError<T>> {
274        let channel_ptr = self.channel_ptr;
275
276        // Don't run our Drop implementation if send was called, any cleanup now happens here
277        mem::forget(self);
278
279        // SAFETY: The channel exists on the heap for the entire duration of this method and we
280        // only ever acquire shared references to it. Note that if the receiver disconnects it
281        // does not free the channel.
282        let channel = unsafe { channel_ptr.as_ref() };
283
284        // Write the message into the channel on the heap.
285        // SAFETY: The receiver only ever accesses this memory location if we are in the MESSAGE
286        // state, and since we're responsible for setting that state, we can guarantee that we have
287        // exclusive access to this memory location to perform this write.
288        unsafe { channel.write_message(message) };
289
290        // Set the state to signal there is a message on the channel.
291        // ORDERING: we use release ordering to ensure the write of the message is visible to the
292        // receiving thread. The EMPTY and DISCONNECTED branches do not observe any shared state,
293        // and thus we do not need acquire ordering. The RECEIVING branch manages synchronization
294        // independent of this operation.
295        //
296        // EMPTY + 1 = MESSAGE
297        // RECEIVING + 1 = UNPARKING
298        // DISCONNECTED + 1 = invalid, however this state is never observed
299        match channel.state.fetch_add(1, Release) {
300            // The receiver is alive and has not started waiting. Send done.
301            EMPTY => Ok(()),
302            // The receiver is waiting. Wake it up so it can return the message.
303            RECEIVING => {
304                // ORDERING: Synchronizes with the write of the waker to memory, and prevents the
305                // taking of the waker from being ordered before this operation.
306                fence(Acquire);
307
308                // Take the waker, but critically do not unpark it. If we unparked now, then the
309                // receiving thread could still observe the UNPARKING state and re-park, meaning
310                // that after we change to the MESSAGE state, it would remain parked indefinitely
311                // or until a spurious wakeup.
312                // SAFETY: at this point we are in the UNPARKING state, and the receiving thread
313                // does not access the waker while in this state, nor does it free the channel
314                // allocation in this state.
315                let waker = unsafe { channel.take_waker() };
316
317                // ORDERING: this ordering serves two-fold: it synchronizes with the acquire load
318                // in the receiving thread, ensuring that both our read of the waker and write of
319                // the message happen-before the taking of the message and freeing of the channel.
320                // Furthermore, we need acquire ordering to ensure the unparking of the receiver
321                // happens after the channel state is updated.
322                channel.state.swap(MESSAGE, AcqRel);
323
324                // Note: it is possible that between the store above and this statement that
325                // the receiving thread is spuriously unparked, takes the message, and frees
326                // the channel allocation. However, we took ownership of the channel out of
327                // that allocation, and freeing the channel does not drop the waker since the
328                // waker is wrapped in MaybeUninit. Therefore this data is valid regardless of
329                // whether or not the receive has completed by this point.
330                waker.unpark();
331
332                Ok(())
333            }
334            // The receiver was already dropped. The error is responsible for freeing the channel.
335            // SAFETY: since the receiver disconnected it will no longer access `channel_ptr`, so
336            // we can transfer exclusive ownership of the channel's resources to the error.
337            // Moreover, since we just placed the message in the channel, the channel contains a
338            // valid message.
339            DISCONNECTED => Err(unsafe { SendError::new(channel_ptr) }),
340            _ => unreachable!(),
341        }
342    }
343
344    /// Returns true if the associated [`Receiver`] has been dropped.
345    ///
346    /// If true is returned, a future call to send is guaranteed to return an error.
347    pub fn is_closed(&self) -> bool {
348        // SAFETY: The channel exists on the heap for the entire duration of this method and we
349        // only ever acquire shared references to it. Note that if the receiver disconnects it
350        // does not free the channel.
351        let channel = unsafe { self.channel_ptr.as_ref() };
352
353        // ORDERING: We *chose* a Relaxed ordering here as it sufficient to
354        // enforce the method's contract: "if true is returned, a future
355        // call to send is guaranteed to return an error."
356        channel.state.load(Relaxed) == DISCONNECTED
357    }
358
359    /// Consumes the Sender, returning a raw pointer to the channel on the heap.
360    ///
361    /// This is intended to simplify using oneshot channels with some FFI code. The only safe thing
362    /// to do with the returned pointer is to later reconstruct the Sender with [Sender::from_raw].
363    /// Memory will leak if the Sender is never reconstructed.
364    pub fn into_raw(self) -> *mut () {
365        let raw = self.channel_ptr.as_ptr() as *mut ();
366        mem::forget(self);
367        raw
368    }
369
370    /// Consumes a raw pointer from [Sender::into_raw], recreating the Sender.
371    ///
372    /// # Safety
373    ///
374    /// This pointer must have come from [`Sender<T>::into_raw`] with the same message type, `T`.
375    /// At most one Sender must exist for a channel at any point in time.
376    /// Constructing multiple Senders from the same raw pointer leads to undefined behavior.
377    pub unsafe fn from_raw(raw: *mut ()) -> Self {
378        Self {
379            channel_ptr: NonNull::new_unchecked(raw as *mut Channel<T>),
380            _invariant: PhantomData,
381        }
382    }
383}
384
385impl<T> Drop for Sender<T> {
386    fn drop(&mut self) {
387        // SAFETY: The receiver only ever frees the channel if we are in the MESSAGE or
388        // DISCONNECTED states. If we are in the MESSAGE state, then we called
389        // mem::forget(self), so we should not be in this function call. If we are in the
390        // DISCONNECTED state, then the receiver either received a MESSAGE so this statement is
391        // unreachable, or was dropped and observed that our side was still alive, and thus didn't
392        // free the channel.
393        let channel = unsafe { self.channel_ptr.as_ref() };
394
395        // Set the channel state to disconnected and read what state the receiver was in
396        // ORDERING: we don't need release ordering here since there are no modifications we
397        // need to make visible to other thread, and the Err(RECEIVING) branch handles
398        // synchronization independent of this cmpxchg
399        //
400        // EMPTY ^ 001 = DISCONNECTED
401        // RECEIVING ^ 001 = UNPARKING
402        // DISCONNECTED ^ 001 = EMPTY (invalid), but this state is never observed
403        match channel.state.fetch_xor(0b001, Relaxed) {
404            // The receiver has not started waiting, nor is it dropped.
405            EMPTY => (),
406            // The receiver is waiting. Wake it up so it can detect that the channel disconnected.
407            RECEIVING => {
408                // See comments in Sender::send
409
410                fence(Acquire);
411
412                let waker = unsafe { channel.take_waker() };
413
414                // We still need release ordering here to make sure our read of the waker happens
415                // before this, and acquire ordering to ensure the unparking of the receiver
416                // happens after this.
417                channel.state.swap(DISCONNECTED, AcqRel);
418
419                // The Acquire ordering above ensures that the write of the DISCONNECTED state
420                // happens-before unparking the receiver.
421                waker.unpark();
422            }
423            // The receiver was already dropped. We are responsible for freeing the channel.
424            DISCONNECTED => {
425                // SAFETY: when the receiver switches the state to DISCONNECTED they have received
426                // the message or will no longer be trying to receive the message, and have
427                // observed that the sender is still alive, meaning that we're responsible for
428                // freeing the channel allocation.
429                unsafe { dealloc(self.channel_ptr) };
430            }
431            _ => unreachable!(),
432        }
433    }
434}
435
436impl<T> Receiver<T> {
437    /// Checks if there is a message in the channel without blocking. Returns:
438    ///  * `Ok(message)` if there was a message in the channel.
439    ///  * `Err(Empty)` if the [`Sender`] is alive, but has not yet sent a message.
440    ///  * `Err(Disconnected)` if the [`Sender`] was dropped before sending anything or if the
441    ///    message has already been extracted by a previous receive call.
442    ///
443    /// If a message is returned, the channel is disconnected and any subsequent receive operation
444    /// using this receiver will return an error.
445    ///
446    /// This method is completely lock-free and wait-free. The only thing it does is an atomic
447    /// integer load of the channel state. And if there is a message in the channel it additionally
448    /// performs one atomic integer store and copies the message from the heap to the stack for
449    /// returning it.
450    pub fn try_recv(&self) -> Result<T, TryRecvError> {
451        // SAFETY: The channel will not be freed while this method is still running.
452        let channel = unsafe { self.channel_ptr.as_ref() };
453
454        // ORDERING: we use acquire ordering to synchronize with the store of the message.
455        match channel.state.load(Acquire) {
456            MESSAGE => {
457                // It's okay to break up the load and store since once we're in the message state
458                // the sender no longer modifies the state
459                // ORDERING: at this point the sender has done its job and is no longer active, so
460                // we don't need to make any side effects visible to it
461                channel.state.store(DISCONNECTED, Relaxed);
462
463                // SAFETY: we are in the MESSAGE state so the message is present
464                Ok(unsafe { channel.take_message() })
465            }
466            EMPTY => Err(TryRecvError::Empty),
467            DISCONNECTED => Err(TryRecvError::Disconnected),
468            #[cfg(feature = "async")]
469            RECEIVING | UNPARKING => Err(TryRecvError::Empty),
470            _ => unreachable!(),
471        }
472    }
473
474    /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is
475    /// disconnected.
476    ///
477    /// This method will always block the current thread if there is no data available and it is
478    /// still possible for the message to be sent. Once the message is sent to the corresponding
479    /// [`Sender`], then this receiver will wake up and return that message.
480    ///
481    /// If the corresponding [`Sender`] has disconnected (been dropped), or it disconnects while
482    /// this call is blocking, this call will wake up and return `Err` to indicate that the message
483    /// can never be received on this channel.
484    ///
485    /// If a sent message has already been extracted from this channel this method will return an
486    /// error.
487    ///
488    /// # Panics
489    ///
490    /// Panics if called after this receiver has been polled asynchronously.
491    #[cfg(feature = "std")]
492    pub fn recv(self) -> Result<T, RecvError> {
493        // Note that we don't need to worry about changing the state to disconnected or setting the
494        // state to an invalid value at any point in this function because we take ownership of
495        // self, and this function does not exit until the message has been received or both side
496        // of the channel are inactive and cleaned up.
497
498        let channel_ptr = self.channel_ptr;
499
500        // Don't run our Drop implementation. This consuming recv method is responsible for freeing.
501        mem::forget(self);
502
503        // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
504        // is still alive, meaning that even if the sender was dropped then it would have observed
505        // the fact that we're still alive and left the responsibility of deallocating the
506        // channel to us, so channel_ptr is valid
507        let channel = unsafe { channel_ptr.as_ref() };
508
509        // ORDERING: we use acquire ordering to synchronize with the write of the message in the
510        // case that it's available
511        match channel.state.load(Acquire) {
512            // The sender is alive but has not sent anything yet. We prepare to park.
513            EMPTY => {
514                // Conditionally add a delay here to help the tests trigger the edge cases where
515                // the sender manages to be dropped or send something before we are able to store
516                // our waker object in the channel.
517                #[cfg(all(oneshot_test_delay, not(oneshot_loom)))]
518                std::thread::sleep(std::time::Duration::from_millis(10));
519
520                // Write our waker instance to the channel.
521                // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
522                // try to access the waker until it sees the state set to RECEIVING below
523                unsafe { channel.write_waker(ReceiverWaker::current_thread()) };
524
525                // Switch the state to RECEIVING. We need to do this in one atomic step in case the
526                // sender disconnected or sent the message while we wrote the waker to memory. We
527                // don't need to do a compare exchange here however because if the original state
528                // was not EMPTY, then the sender has either finished sending the message or is
529                // being dropped, so the RECEIVING state will never be observed after we return.
530                // ORDERING: we use release ordering so the sender can synchronize with our writing
531                // of the waker to memory. The individual branches handle any additional
532                // synchronizaton
533                match channel.state.swap(RECEIVING, Release) {
534                    // We stored our waker, now we park until the sender has changed the state
535                    EMPTY => loop {
536                        thread::park();
537
538                        // ORDERING: synchronize with the write of the message
539                        match channel.state.load(Acquire) {
540                            // The sender sent the message while we were parked.
541                            MESSAGE => {
542                                // SAFETY: we are in the message state so the message is valid
543                                let message = unsafe { channel.take_message() };
544
545                                // SAFETY: the Sender delegates the responsibility of deallocating
546                                // the channel to us upon sending the message
547                                unsafe { dealloc(channel_ptr) };
548
549                                break Ok(message);
550                            }
551                            // The sender was dropped while we were parked.
552                            DISCONNECTED => {
553                                // SAFETY: the Sender doesn't deallocate the channel allocation in
554                                // its drop implementation if we're receiving
555                                unsafe { dealloc(channel_ptr) };
556
557                                break Err(RecvError);
558                            }
559                            // State did not change, spurious wakeup, park again.
560                            RECEIVING | UNPARKING => (),
561                            _ => unreachable!(),
562                        }
563                    },
564                    // The sender sent the message while we prepared to park.
565                    MESSAGE => {
566                        // ORDERING: Synchronize with the write of the message. This branch is
567                        // unlikely to be taken, so it's likely more efficient to use a fence here
568                        // instead of AcqRel ordering on the RMW operation
569                        fence(Acquire);
570
571                        // SAFETY: we started in the empty state and the sender switched us to the
572                        // message state. This means that it did not take the waker, so we're
573                        // responsible for dropping it.
574                        unsafe { channel.drop_waker() };
575
576                        // SAFETY: we are in the message state so the message is valid
577                        let message = unsafe { channel.take_message() };
578
579                        // SAFETY: the Sender delegates the responsibility of deallocating the
580                        // channel to us upon sending the message
581                        unsafe { dealloc(channel_ptr) };
582
583                        Ok(message)
584                    }
585                    // The sender was dropped before sending anything while we prepared to park.
586                    DISCONNECTED => {
587                        // SAFETY: we started in the empty state and the sender switched us to the
588                        // disconnected state. It does not take the waker when it does this so we
589                        // need to drop it.
590                        unsafe { channel.drop_waker() };
591
592                        // SAFETY: the sender does not deallocate the channel if it switches from
593                        // empty to disconnected so we need to free the allocation
594                        unsafe { dealloc(channel_ptr) };
595
596                        Err(RecvError)
597                    }
598                    _ => unreachable!(),
599                }
600            }
601            // The sender already sent the message.
602            MESSAGE => {
603                // SAFETY: we are in the message state so the message is valid
604                let message = unsafe { channel.take_message() };
605
606                // SAFETY: we are already in the message state so the sender has been forgotten
607                // and it's our job to clean up resources
608                unsafe { dealloc(channel_ptr) };
609
610                Ok(message)
611            }
612            // The sender was dropped before sending anything, or we already received the message.
613            DISCONNECTED => {
614                // SAFETY: the sender does not deallocate the channel if it switches from empty to
615                // disconnected so we need to free the allocation
616                unsafe { dealloc(channel_ptr) };
617
618                Err(RecvError)
619            }
620            // The receiver must have been `Future::poll`ed prior to this call.
621            #[cfg(feature = "async")]
622            RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR),
623            _ => unreachable!(),
624        }
625    }
626
627    /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is
628    /// disconnected. This is a non consuming version of [`Receiver::recv`], but with a bit
629    /// worse performance. Prefer `[`Receiver::recv`]` if your code allows consuming the receiver.
630    ///
631    /// If a message is returned, the channel is disconnected and any subsequent receive operation
632    /// using this receiver will return an error.
633    ///
634    /// # Panics
635    ///
636    /// Panics if called after this receiver has been polled asynchronously.
637    #[cfg(feature = "std")]
638    pub fn recv_ref(&self) -> Result<T, RecvError> {
639        self.start_recv_ref(RecvError, |channel| {
640            loop {
641                thread::park();
642
643                // ORDERING: we use acquire ordering to synchronize with the write of the message
644                match channel.state.load(Acquire) {
645                    // The sender sent the message while we were parked.
646                    // We take the message and mark the channel disconnected.
647                    MESSAGE => {
648                        // ORDERING: the sender is inactive at this point so we don't need to make
649                        // any reads or writes visible to the sending thread
650                        channel.state.store(DISCONNECTED, Relaxed);
651
652                        // SAFETY: we were just in the message state so the message is valid
653                        break Ok(unsafe { channel.take_message() });
654                    }
655                    // The sender was dropped while we were parked.
656                    DISCONNECTED => break Err(RecvError),
657                    // State did not change, spurious wakeup, park again.
658                    RECEIVING | UNPARKING => (),
659                    _ => unreachable!(),
660                }
661            }
662        })
663    }
664
665    /// Like [`Receiver::recv`], but will not block longer than `timeout`. Returns:
666    ///  * `Ok(message)` if there was a message in the channel before the timeout was reached.
667    ///  * `Err(Timeout)` if no message arrived on the channel before the timeout was reached.
668    ///  * `Err(Disconnected)` if the sender was dropped before sending anything or if the message
669    ///    has already been extracted by a previous receive call.
670    ///
671    /// If a message is returned, the channel is disconnected and any subsequent receive operation
672    /// using this receiver will return an error.
673    ///
674    /// If the supplied `timeout` is so large that Rust's `Instant` type can't represent this point
675    /// in the future this falls back to an indefinitely blocking receive operation.
676    ///
677    /// # Panics
678    ///
679    /// Panics if called after this receiver has been polled asynchronously.
680    #[cfg(feature = "std")]
681    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
682        match Instant::now().checked_add(timeout) {
683            Some(deadline) => self.recv_deadline(deadline),
684            None => self.recv_ref().map_err(|_| RecvTimeoutError::Disconnected),
685        }
686    }
687
688    /// Like [`Receiver::recv`], but will not block longer than until `deadline`. Returns:
689    ///  * `Ok(message)` if there was a message in the channel before the deadline was reached.
690    ///  * `Err(Timeout)` if no message arrived on the channel before the deadline was reached.
691    ///  * `Err(Disconnected)` if the sender was dropped before sending anything or if the message
692    ///    has already been extracted by a previous receive call.
693    ///
694    /// If a message is returned, the channel is disconnected and any subsequent receive operation
695    /// using this receiver will return an error.
696    ///
697    /// # Panics
698    ///
699    /// Panics if called after this receiver has been polled asynchronously.
700    #[cfg(feature = "std")]
701    pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
702        /// # Safety
703        ///
704        /// If the sender is unparking us after a message send, the message must already have been
705        /// written to the channel and an acquire memory barrier issued before calling this function
706        #[cold]
707        unsafe fn wait_for_unpark<T>(channel: &Channel<T>) -> Result<T, RecvTimeoutError> {
708            loop {
709                thread::park();
710
711                // ORDERING: The callee has already synchronized with any message write
712                match channel.state.load(Relaxed) {
713                    MESSAGE => {
714                        // ORDERING: the sender has been dropped, so this update only
715                        // needs to be visible to us
716                        channel.state.store(DISCONNECTED, Relaxed);
717                        break Ok(channel.take_message());
718                    }
719                    DISCONNECTED => break Err(RecvTimeoutError::Disconnected),
720                    // The sender is still unparking us. We continue on the empty state here since
721                    // the current implementation eagerly sets the state to EMPTY upon timeout.
722                    EMPTY => (),
723                    _ => unreachable!(),
724                }
725            }
726        }
727
728        self.start_recv_ref(RecvTimeoutError::Disconnected, |channel| {
729            loop {
730                match deadline.checked_duration_since(Instant::now()) {
731                    Some(timeout) => {
732                        thread::park_timeout(timeout);
733
734                        // ORDERING: synchronize with the write of the message
735                        match channel.state.load(Acquire) {
736                            // The sender sent the message while we were parked.
737                            MESSAGE => {
738                                // ORDERING: the sender has been `mem::forget`-ed so this update
739                                // only needs to be visible to us.
740                                channel.state.store(DISCONNECTED, Relaxed);
741
742                                // SAFETY: we either are in the message state or were just in the
743                                // message state
744                                break Ok(unsafe { channel.take_message() });
745                            }
746                            // The sender was dropped while we were parked.
747                            DISCONNECTED => break Err(RecvTimeoutError::Disconnected),
748                            // State did not change, spurious wakeup, park again.
749                            RECEIVING | UNPARKING => (),
750                            _ => unreachable!(),
751                        }
752                    }
753                    None => {
754                        // ORDERING: synchronize with the write of the message
755                        match channel.state.swap(EMPTY, Acquire) {
756                            // We reached the end of the timeout without receiving a message
757                            RECEIVING => {
758                                // SAFETY: we were in the receiving state and are now in the empty
759                                // state, so the sender has not and will not try to read the waker,
760                                // so we have exclusive access to drop it.
761                                unsafe { channel.drop_waker() };
762
763                                break Err(RecvTimeoutError::Timeout);
764                            }
765                            // The sender sent the message while we were parked.
766                            MESSAGE => {
767                                // Same safety and ordering as the Some branch
768
769                                channel.state.store(DISCONNECTED, Relaxed);
770                                break Ok(unsafe { channel.take_message() });
771                            }
772                            // The sender was dropped while we were parked.
773                            DISCONNECTED => {
774                                // ORDERING: we were originally in the disconnected state meaning
775                                // that the sender is inactive and no longer observing the state,
776                                // so we only need to change it back to DISCONNECTED for if the
777                                // receiver is dropped or a recv* method is called again
778                                channel.state.store(DISCONNECTED, Relaxed);
779
780                                break Err(RecvTimeoutError::Disconnected);
781                            }
782                            // The sender sent the message and started unparking us
783                            UNPARKING => {
784                                // We were in the UNPARKING state and are now in the EMPTY state.
785                                // We wait to be properly unparked and to observe if the sender
786                                // sets MESSAGE or DISCONNECTED state.
787                                // SAFETY: The load above has synchronized with any message write.
788                                break unsafe { wait_for_unpark(channel) };
789                            }
790                            _ => unreachable!(),
791                        }
792                    }
793                }
794            }
795        })
796    }
797
798    /// Returns true if the associated [`Sender`] was dropped before sending a message. Or if
799    /// the message has already been received.
800    ///
801    /// If `true` is returned, all future calls to receive methods are guaranteed to return
802    /// a disconnected error. And future calls to this method is guaranteed to also return `true`.
803    pub fn is_closed(&self) -> bool {
804        // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
805        // is still alive, meaning that even if the sender was dropped then it would have observed
806        // the fact that we're still alive and left the responsibility of deallocating the
807        // channel to us, so `self.channel` is valid
808        let channel = unsafe { self.channel_ptr.as_ref() };
809
810        // ORDERING: We *chose* a Relaxed ordering here as it is sufficient to
811        // enforce the method's contract. Once true has been observed, it will remain true.
812        // However, if false is observed, the sender might have just disconnected but this thread
813        // has not observed it yet.
814        channel.state.load(Relaxed) == DISCONNECTED
815    }
816
817    /// Returns true if there is a message in the channel, ready to be received.
818    ///
819    /// If `true` is returned, the next call to a receive method is guaranteed to return
820    /// a message.
821    pub fn has_message(&self) -> bool {
822        // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
823        // is still alive, meaning that even if the sender was dropped then it would have observed
824        // the fact that we're still alive and left the responsibility of deallocating the
825        // channel to us, so `self.channel` is valid
826        let channel = unsafe { self.channel_ptr.as_ref() };
827
828        // ORDERING: An acquire ordering is used to guarantee no subsequent loads is reordered
829        // before this one. This upholds the contract that if true is returned, the next call to
830        // a receive method is guaranteed to also abserve the `MESSAGE` state and return a message.
831        channel.state.load(Acquire) == MESSAGE
832    }
833
834    /// Begins the process of receiving on the channel by reference. If the message is already
835    /// ready, or the sender has disconnected, then this function will return the appropriate
836    /// Result immediately. Otherwise, it will write the waker to memory, check to see if the
837    /// sender has finished or disconnected again, and then will call `finish`. `finish` is
838    /// thus responsible for cleaning up the channel's resources appropriately before it returns,
839    /// such as destroying the waker, for instance.
840    #[cfg(feature = "std")]
841    #[inline]
842    fn start_recv_ref<E>(
843        &self,
844        disconnected_error: E,
845        finish: impl FnOnce(&Channel<T>) -> Result<T, E>,
846    ) -> Result<T, E> {
847        // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
848        // is still alive, meaning that even if the sender was dropped then it would have observed
849        // the fact that we're still alive and left the responsibility of deallocating the
850        // channel to us, so `self.channel` is valid
851        let channel = unsafe { self.channel_ptr.as_ref() };
852
853        // ORDERING: synchronize with the write of the message
854        match channel.state.load(Acquire) {
855            // The sender is alive but has not sent anything yet. We prepare to park.
856            EMPTY => {
857                // Conditionally add a delay here to help the tests trigger the edge cases where
858                // the sender manages to be dropped or send something before we are able to store
859                // our waker object in the channel.
860                #[cfg(all(oneshot_test_delay, not(oneshot_loom)))]
861                std::thread::sleep(std::time::Duration::from_millis(10));
862
863                // Write our waker instance to the channel.
864                // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
865                // try to access the waker until it sees the state set to RECEIVING below
866                unsafe { channel.write_waker(ReceiverWaker::current_thread()) };
867
868                // ORDERING: we use release ordering on success so the sender can synchronize with
869                // our write of the waker. We use relaxed ordering on failure since the sender does
870                // not need to synchronize with our write and the individual match arms handle any
871                // additional synchronization
872                match channel
873                    .state
874                    .compare_exchange(EMPTY, RECEIVING, Release, Relaxed)
875                {
876                    // We stored our waker, now we delegate to the callback to finish the receive
877                    // operation
878                    Ok(_) => finish(channel),
879                    // The sender sent the message while we prepared to finish
880                    Err(MESSAGE) => {
881                        // See comments in `recv` for ordering and safety
882
883                        fence(Acquire);
884
885                        unsafe { channel.drop_waker() };
886
887                        // ORDERING: the sender has been `mem::forget`-ed so this update only
888                        // needs to be visible to us
889                        channel.state.store(DISCONNECTED, Relaxed);
890
891                        // SAFETY: The MESSAGE state tells us there is a correctly initialized
892                        // message
893                        Ok(unsafe { channel.take_message() })
894                    }
895                    // The sender was dropped before sending anything while we prepared to park.
896                    Err(DISCONNECTED) => {
897                        // See comments in `recv` for safety
898                        unsafe { channel.drop_waker() };
899                        Err(disconnected_error)
900                    }
901                    _ => unreachable!(),
902                }
903            }
904            // The sender sent the message. We take the message and mark the channel disconnected.
905            MESSAGE => {
906                // ORDERING: the sender has been `mem::forget`-ed so this update only needs to be
907                // visible to us
908                channel.state.store(DISCONNECTED, Relaxed);
909
910                // SAFETY: we are in the message state so the message is valid
911                Ok(unsafe { channel.take_message() })
912            }
913            // The sender was dropped before sending anything, or we already received the message.
914            DISCONNECTED => Err(disconnected_error),
915            // The receiver must have been `Future::poll`ed prior to this call.
916            #[cfg(feature = "async")]
917            RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR),
918            _ => unreachable!(),
919        }
920    }
921
922    /// Consumes the Receiver, returning a raw pointer to the channel on the heap.
923    ///
924    /// This is intended to simplify using oneshot channels with some FFI code. The only safe thing
925    /// to do with the returned pointer is to later reconstruct the Receiver with
926    /// [Receiver::from_raw]. Memory will leak if the Receiver is never reconstructed.
927    pub fn into_raw(self) -> *mut () {
928        let raw = self.channel_ptr.as_ptr() as *mut ();
929        mem::forget(self);
930        raw
931    }
932
933    /// Consumes a raw pointer from [Receiver::into_raw], recreating the Receiver.
934    ///
935    /// # Safety
936    ///
937    /// This pointer must have come from [`Receiver<T>::into_raw`] with the same message type, `T`.
938    /// At most one Receiver must exist for a channel at any point in time.
939    /// Constructing multiple Receivers from the same raw pointer leads to undefined behavior.
940    pub unsafe fn from_raw(raw: *mut ()) -> Self {
941        Self {
942            channel_ptr: NonNull::new_unchecked(raw as *mut Channel<T>),
943        }
944    }
945}
946
947#[cfg(feature = "async")]
948impl<T> core::future::Future for Receiver<T> {
949    type Output = Result<T, RecvError>;
950
951    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
952        // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
953        // is still alive, meaning that even if the sender was dropped then it would have observed
954        // the fact that we're still alive and left the responsibility of deallocating the
955        // channel to us, so `self.channel` is valid
956        let channel = unsafe { self.channel_ptr.as_ref() };
957
958        // ORDERING: we use acquire ordering to synchronize with the store of the message.
959        match channel.state.load(Acquire) {
960            // The sender is alive but has not sent anything yet.
961            EMPTY => {
962                // SAFETY: We can't be in the forbidden states, and no waker in the channel.
963                unsafe { channel.write_async_waker(cx) }
964            }
965            // We were polled again while waiting for the sender. Replace the waker with the new one.
966            RECEIVING => {
967                // ORDERING: We use relaxed ordering on both success and failure since we have not
968                // written anything above that must be released, and the individual match arms
969                // handle any additional synchronization.
970                match channel
971                    .state
972                    .compare_exchange(RECEIVING, EMPTY, Relaxed, Relaxed)
973                {
974                    // We successfully changed the state back to EMPTY. Replace the waker.
975                    // This is the most likely branch to be taken, which is why we don't use any
976                    // memory barriers in the compare_exchange above.
977                    Ok(_) => {
978                        // SAFETY: We wrote the waker in a previous call to poll. We do not need
979                        // a memory barrier since the previous write here was by ourselves.
980                        unsafe { channel.drop_waker() };
981                        // SAFETY: We can't be in the forbidden states, and no waker in the channel.
982                        unsafe { channel.write_async_waker(cx) }
983                    }
984                    // The sender sent the message while we prepared to replace the waker.
985                    // We take the message and mark the channel disconnected.
986                    // The sender has already taken the waker.
987                    Err(MESSAGE) => {
988                        // ORDERING: Synchronize with the write of the message. This branch is
989                        // unlikely to be taken.
990                        channel.state.swap(DISCONNECTED, Acquire);
991                        // SAFETY: The state tells us the sender has initialized the message.
992                        Poll::Ready(Ok(unsafe { channel.take_message() }))
993                    }
994                    // The sender was dropped before sending anything while we prepared to park.
995                    // The sender has taken the waker already.
996                    Err(DISCONNECTED) => Poll::Ready(Err(RecvError)),
997                    // The sender is currently waking us up.
998                    Err(UNPARKING) => {
999                        // We can't trust that the old waker that the sender has access to
1000                        // is honored by the async runtime at this point. So we wake ourselves
1001                        // up to get polled instantly again.
1002                        cx.waker().wake_by_ref();
1003                        Poll::Pending
1004                    }
1005                    _ => unreachable!(),
1006                }
1007            }
1008            // The sender sent the message.
1009            MESSAGE => {
1010                // ORDERING: the sender has been dropped so this update only needs to be
1011                // visible to us
1012                channel.state.store(DISCONNECTED, Relaxed);
1013                Poll::Ready(Ok(unsafe { channel.take_message() }))
1014            }
1015            // The sender was dropped before sending anything, or we already received the message.
1016            DISCONNECTED => Poll::Ready(Err(RecvError)),
1017            // The sender has observed the RECEIVING state and is currently reading the waker from
1018            // a previous poll. We need to loop here until we observe the MESSAGE or DISCONNECTED
1019            // state. We busy loop here since we know the sender is done very soon.
1020            UNPARKING => loop {
1021                hint::spin_loop();
1022                // ORDERING: The load above has already synchronized with the write of the message.
1023                match channel.state.load(Relaxed) {
1024                    MESSAGE => {
1025                        // ORDERING: the sender has been dropped, so this update only
1026                        // needs to be visible to us
1027                        channel.state.store(DISCONNECTED, Relaxed);
1028                        // SAFETY: We observed the MESSAGE state
1029                        break Poll::Ready(Ok(unsafe { channel.take_message() }));
1030                    }
1031                    DISCONNECTED => break Poll::Ready(Err(RecvError)),
1032                    UNPARKING => (),
1033                    _ => unreachable!(),
1034                }
1035            },
1036            _ => unreachable!(),
1037        }
1038    }
1039}
1040
1041impl<T> Drop for Receiver<T> {
1042    fn drop(&mut self) {
1043        // SAFETY: since the receiving side is still alive the sender would have observed that and
1044        // left deallocating the channel allocation to us.
1045        let channel = unsafe { self.channel_ptr.as_ref() };
1046
1047        // Set the channel state to disconnected and read what state the receiver was in
1048        match channel.state.swap(DISCONNECTED, Acquire) {
1049            // The sender has not sent anything, nor is it dropped.
1050            EMPTY => (),
1051            // The sender already sent something. We must drop it, and free the channel.
1052            MESSAGE => {
1053                // SAFETY: we are in the message state so the message is initialized
1054                unsafe { channel.drop_message() };
1055
1056                // SAFETY: see safety comment at top of function
1057                unsafe { dealloc(self.channel_ptr) };
1058            }
1059            // The receiver has been polled.
1060            #[cfg(feature = "async")]
1061            RECEIVING => {
1062                // TODO: figure this out when async is fixed
1063                unsafe { channel.drop_waker() };
1064            }
1065            // The sender was already dropped. We are responsible for freeing the channel.
1066            DISCONNECTED => {
1067                // SAFETY: see safety comment at top of function
1068                unsafe { dealloc(self.channel_ptr) };
1069            }
1070            // This receiver was previously polled, so the channel was in the RECEIVING state.
1071            // But the sender has observed the RECEIVING state and is currently reading the waker
1072            // to wake us up. We need to loop here until we observe the MESSAGE or DISCONNECTED state.
1073            // We busy loop here since we know the sender is done very soon.
1074            #[cfg(any(feature = "std", feature = "async"))]
1075            UNPARKING => {
1076                loop {
1077                    hint::spin_loop();
1078                    // ORDERING: The swap above has already synchronized with the write of the message.
1079                    match channel.state.load(Relaxed) {
1080                        MESSAGE => {
1081                            // SAFETY: we are in the message state so the message is initialized
1082                            unsafe { channel.drop_message() };
1083                            break;
1084                        }
1085                        DISCONNECTED => break,
1086                        UNPARKING => (),
1087                        _ => unreachable!(),
1088                    }
1089                }
1090                // SAFETY: see safety comment at top of function
1091                unsafe { dealloc(self.channel_ptr) };
1092            }
1093            _ => unreachable!(),
1094        }
1095    }
1096}
1097
1098/// All the values that the `Channel::state` field can have during the lifetime of a channel.
1099mod states {
1100    // These values are very explicitly chosen so that we can replace some cmpxchg calls with
1101    // fetch_* calls.
1102
1103    /// The initial channel state. Active while both endpoints are still alive, no message has been
1104    /// sent, and the receiver is not receiving.
1105    pub const EMPTY: u8 = 0b011;
1106    /// A message has been sent to the channel, but the receiver has not yet read it.
1107    pub const MESSAGE: u8 = 0b100;
1108    /// No message has yet been sent on the channel, but the receiver is currently receiving.
1109    pub const RECEIVING: u8 = 0b000;
1110    #[cfg(any(feature = "std", feature = "async"))]
1111    pub const UNPARKING: u8 = 0b001;
1112    /// The channel has been closed. This means that either the sender or receiver has been dropped,
1113    /// or the message sent to the channel has already been received. Since this is a oneshot
1114    /// channel, it is disconnected after the one message it is supposed to hold has been
1115    /// transmitted.
1116    pub const DISCONNECTED: u8 = 0b010;
1117}
1118use states::*;
1119
1120/// Internal channel data structure structure. the `channel` method allocates and puts one instance
1121/// of this struct on the heap for each oneshot channel instance. The struct holds:
1122/// * The current state of the channel.
1123/// * The message in the channel. This memory is uninitialized until the message is sent.
1124/// * The waker instance for the thread or task that is currently receiving on this channel.
1125///   This memory is uninitialized until the receiver starts receiving.
1126struct Channel<T> {
1127    state: AtomicU8,
1128    message: UnsafeCell<MaybeUninit<T>>,
1129    waker: UnsafeCell<MaybeUninit<ReceiverWaker>>,
1130}
1131
1132impl<T> Channel<T> {
1133    pub fn new() -> Self {
1134        Self {
1135            state: AtomicU8::new(EMPTY),
1136            message: UnsafeCell::new(MaybeUninit::uninit()),
1137            waker: UnsafeCell::new(MaybeUninit::uninit()),
1138        }
1139    }
1140
1141    #[inline(always)]
1142    unsafe fn message(&self) -> &MaybeUninit<T> {
1143        #[cfg(oneshot_loom)]
1144        {
1145            self.message.with(|ptr| &*ptr)
1146        }
1147
1148        #[cfg(not(oneshot_loom))]
1149        {
1150            &*self.message.get()
1151        }
1152    }
1153
1154    #[inline(always)]
1155    unsafe fn with_message_mut<F>(&self, op: F)
1156    where
1157        F: FnOnce(&mut MaybeUninit<T>),
1158    {
1159        #[cfg(oneshot_loom)]
1160        {
1161            self.message.with_mut(|ptr| op(&mut *ptr))
1162        }
1163
1164        #[cfg(not(oneshot_loom))]
1165        {
1166            op(&mut *self.message.get())
1167        }
1168    }
1169
1170    #[inline(always)]
1171    #[cfg(any(feature = "std", feature = "async"))]
1172    unsafe fn with_waker_mut<F>(&self, op: F)
1173    where
1174        F: FnOnce(&mut MaybeUninit<ReceiverWaker>),
1175    {
1176        #[cfg(oneshot_loom)]
1177        {
1178            self.waker.with_mut(|ptr| op(&mut *ptr))
1179        }
1180
1181        #[cfg(not(oneshot_loom))]
1182        {
1183            op(&mut *self.waker.get())
1184        }
1185    }
1186
1187    #[inline(always)]
1188    unsafe fn write_message(&self, message: T) {
1189        self.with_message_mut(|slot| slot.as_mut_ptr().write(message));
1190    }
1191
1192    #[inline(always)]
1193    unsafe fn take_message(&self) -> T {
1194        #[cfg(oneshot_loom)]
1195        {
1196            self.message.with(|ptr| ptr::read(ptr)).assume_init()
1197        }
1198
1199        #[cfg(not(oneshot_loom))]
1200        {
1201            ptr::read(self.message.get()).assume_init()
1202        }
1203    }
1204
1205    #[inline(always)]
1206    unsafe fn drop_message(&self) {
1207        self.with_message_mut(|slot| slot.assume_init_drop());
1208    }
1209
1210    #[cfg(any(feature = "std", feature = "async"))]
1211    #[inline(always)]
1212    unsafe fn write_waker(&self, waker: ReceiverWaker) {
1213        self.with_waker_mut(|slot| slot.as_mut_ptr().write(waker));
1214    }
1215
1216    #[inline(always)]
1217    unsafe fn take_waker(&self) -> ReceiverWaker {
1218        #[cfg(oneshot_loom)]
1219        {
1220            self.waker.with(|ptr| ptr::read(ptr)).assume_init()
1221        }
1222
1223        #[cfg(not(oneshot_loom))]
1224        {
1225            ptr::read(self.waker.get()).assume_init()
1226        }
1227    }
1228
1229    #[cfg(any(feature = "std", feature = "async"))]
1230    #[inline(always)]
1231    unsafe fn drop_waker(&self) {
1232        self.with_waker_mut(|slot| slot.assume_init_drop());
1233    }
1234
1235    /// # Safety
1236    ///
1237    /// * `Channel::waker` must not have a waker stored in it when calling this method.
1238    /// * Channel state must not be RECEIVING or UNPARKING when calling this method.
1239    #[cfg(feature = "async")]
1240    unsafe fn write_async_waker(&self, cx: &mut task::Context<'_>) -> Poll<Result<T, RecvError>> {
1241        // Write our thread instance to the channel.
1242        // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
1243        // try to access the waker until it sees the state set to RECEIVING below
1244        self.write_waker(ReceiverWaker::task_waker(cx));
1245
1246        // ORDERING: we use release ordering on success so the sender can synchronize with
1247        // our write of the waker. We use relaxed ordering on failure since the sender does
1248        // not need to synchronize with our write and the individual match arms handle any
1249        // additional synchronization
1250        match self
1251            .state
1252            .compare_exchange(EMPTY, RECEIVING, Release, Relaxed)
1253        {
1254            // We stored our waker, now we return and let the sender wake us up
1255            Ok(_) => Poll::Pending,
1256            // The sender sent the message while we prepared to park.
1257            // We take the message and mark the channel disconnected.
1258            Err(MESSAGE) => {
1259                // ORDERING: Synchronize with the write of the message. This branch is
1260                // unlikely to be taken, so it's likely more efficient to use a fence here
1261                // instead of AcqRel ordering on the compare_exchange operation
1262                fence(Acquire);
1263
1264                // SAFETY: we started in the EMPTY state and the sender switched us to the
1265                // MESSAGE state. This means that it did not take the waker, so we're
1266                // responsible for dropping it.
1267                self.drop_waker();
1268
1269                // ORDERING: sender does not exist, so this update only needs to be visible to us
1270                self.state.store(DISCONNECTED, Relaxed);
1271
1272                // SAFETY: The MESSAGE state tells us there is a correctly initialized message
1273                Poll::Ready(Ok(self.take_message()))
1274            }
1275            // The sender was dropped before sending anything while we prepared to park.
1276            Err(DISCONNECTED) => {
1277                // SAFETY: we started in the EMPTY state and the sender switched us to the
1278                // DISCONNECTED state. This means that it did not take the waker, so we're
1279                // responsible for dropping it.
1280                self.drop_waker();
1281                Poll::Ready(Err(RecvError))
1282            }
1283            _ => unreachable!(),
1284        }
1285    }
1286}
1287
1288enum ReceiverWaker {
1289    /// The receiver is waiting synchronously. Its thread is parked.
1290    #[cfg(feature = "std")]
1291    Thread(thread::Thread),
1292    /// The receiver is waiting asynchronously. Its task can be woken up with this `Waker`.
1293    #[cfg(feature = "async")]
1294    Task(task::Waker),
1295    /// A little hack to not make this enum an uninhibitable type when no features are enabled.
1296    #[cfg(not(any(feature = "async", feature = "std")))]
1297    _Uninhabited,
1298}
1299
1300impl ReceiverWaker {
1301    #[cfg(feature = "std")]
1302    pub fn current_thread() -> Self {
1303        Self::Thread(thread::current())
1304    }
1305
1306    #[cfg(feature = "async")]
1307    pub fn task_waker(cx: &task::Context<'_>) -> Self {
1308        Self::Task(cx.waker().clone())
1309    }
1310
1311    pub fn unpark(self) {
1312        match self {
1313            #[cfg(feature = "std")]
1314            ReceiverWaker::Thread(thread) => thread.unpark(),
1315            #[cfg(feature = "async")]
1316            ReceiverWaker::Task(waker) => waker.wake(),
1317            #[cfg(not(any(feature = "async", feature = "std")))]
1318            ReceiverWaker::_Uninhabited => unreachable!(),
1319        }
1320    }
1321}
1322
1323#[cfg(not(oneshot_loom))]
1324#[test]
1325#[ignore = "Unstable test. Different Rust versions have different sizes for Thread"]
1326fn receiver_waker_size() {
1327    let expected: usize = match (cfg!(feature = "std"), cfg!(feature = "async")) {
1328        (false, false) => 0,
1329        (false, true) => 16,
1330        (true, false) => 16,
1331        (true, true) => 24,
1332    };
1333    assert_eq!(mem::size_of::<ReceiverWaker>(), expected);
1334}
1335
1336#[cfg(all(feature = "std", feature = "async"))]
1337const RECEIVER_USED_SYNC_AND_ASYNC_ERROR: &str =
1338    "Invalid to call a blocking receive method on oneshot::Receiver after it has been polled";
1339
1340#[inline]
1341pub(crate) unsafe fn dealloc<T>(channel: NonNull<Channel<T>>) {
1342    drop(Box::from_raw(channel.as_ptr()))
1343}