oneshot/lib.rs
1//! Oneshot spsc (single producer, single consumer) channel. Meaning each channel instance
2//! can only transport a single message. This has a few nice outcomes. One thing is that
3//! the implementation can be very efficient, utilizing the knowledge that there will
4//! only be one message. But more importantly, it allows the API to be expressed in such
5//! a way that certain edge cases that you don't want to care about when only sending a
6//! single message on a channel does not exist. For example: The sender can't be copied
7//! or cloned, and the send method takes ownership and consumes the sender.
8//! So you are guaranteed, at the type level, that there can only be one message sent.
9//!
10//! The sender's send method is non-blocking, and potentially lock- and wait-free.
11//! See documentation on [Sender::send] for situations where it might not be fully wait-free.
12//! The receiver supports both lock- and wait-free `try_recv` as well as indefinite and time
13//! limited thread blocking receive operations. The receiver also implements `Future` and
14//! supports asynchronously awaiting the message.
15//!
16//!
17//! # Examples
18//!
19//! This example sets up a background worker that processes requests coming in on a standard
20//! mpsc channel and replies on a oneshot channel provided with each request. The worker can
21//! be interacted with both from sync and async contexts since the oneshot receiver
22//! can receive both blocking and async.
23//!
24//! ```rust
25//! # #[cfg(not(feature = "loom"))] {
26//! use std::sync::mpsc;
27//! use std::thread;
28//! use std::time::Duration;
29//!
30//! type Request = String;
31//!
32//! // Starts a background thread performing some computation on requests sent to it.
33//! // Delivers the response back over a oneshot channel.
34//! fn spawn_processing_thread() -> mpsc::Sender<(Request, oneshot::Sender<usize>)> {
35//! let (request_sender, request_receiver) = mpsc::channel::<(Request, oneshot::Sender<usize>)>();
36//! thread::spawn(move || {
37//! for (request_data, response_sender) in request_receiver.iter() {
38//! let compute_operation = || request_data.len();
39//! let _ = response_sender.send(compute_operation()); // <- Send on the oneshot channel
40//! }
41//! });
42//! request_sender
43//! }
44//!
45//! let processor = spawn_processing_thread();
46//!
47//! // If compiled with `std` the library can receive messages with timeout on regular threads
48//! #[cfg(feature = "std")] {
49//! let (response_sender, response_receiver) = oneshot::channel();
50//! let request = Request::from("data from sync thread");
51//!
52//! processor.send((request, response_sender)).expect("Processor down");
53//! match response_receiver.recv_timeout(Duration::from_secs(1)) { // <- Receive on the oneshot channel
54//! Ok(result) => println!("Processor returned {}", result),
55//! Err(oneshot::RecvTimeoutError::Timeout) => eprintln!("Processor was too slow"),
56//! Err(oneshot::RecvTimeoutError::Disconnected) => panic!("Processor exited"),
57//! }
58//! }
59//!
60//! // If compiled with the `async` feature, the `Receiver` can be awaited in an async context
61//! #[cfg(feature = "async")] {
62//! tokio::runtime::Runtime::new()
63//! .unwrap()
64//! .block_on(async move {
65//! let (response_sender, response_receiver) = oneshot::channel();
66//! let request = Request::from("data from sync thread");
67//!
68//! processor.send((request, response_sender)).expect("Processor down");
69//! match response_receiver.await { // <- Receive on the oneshot channel asynchronously
70//! Ok(result) => println!("Processor returned {}", result),
71//! Err(_e) => panic!("Processor exited"),
72//! }
73//! });
74//! }
75//! # }
76//! ```
77//!
78//! # Sync vs async
79//!
80//! The main motivation for writing this library was that there were no (known to me) channel
81//! implementations allowing you to seamlessly send messages between a normal thread and an async
82//! task, or the other way around. If message passing is the way you are communicating, of course
83//! that should work smoothly between the sync and async parts of the program!
84//!
85//! This library achieves that by having a fast and cheap send operation that can
86//! be used in both sync threads and async tasks. The receiver has both thread blocking
87//! receive methods for synchronous usage, and implements `Future` for asynchronous usage.
88//!
89//! The receiving endpoint of this channel implements Rust's `Future` trait and can be waited on
90//! in an asynchronous task. This implementation is completely executor/runtime agnostic. It should
91//! be possible to use this library with any executor, or even pass messages between tasks running
92//! in different executors.
93//!
94
95// # Implementation description
96//
97// When a channel is created via the `channel` function, it creates a single heap allocation
98// containing:
99// * A one byte atomic integer that represents the current channel state,
100// * Uninitialized memory to fit the message,
101// * Uninitialized memory to fit the waker that can wake the receiving task or thread up.
102//
103// The size of the waker depends on which features are activated, it ranges from 0 to 24 bytes[1].
104// So with all features enabled each channel allocates 25 bytes plus the size of the
105// message, plus any padding needed to get correct memory alignment.
106//
107// The Sender and Receiver only holds a raw pointer to the heap channel object. The last endpoint
108// to be consumed or dropped is responsible for freeing the heap memory. The first endpoint to
109// be consumed or dropped signal via the state that it is gone. And the second one see this and
110// frees the memory.
111//
112// ## Footnotes
113//
114// [1]: Mind that the waker only takes zero bytes when all features are disabled, making it
115// impossible to *wait* for the message. `try_recv` is the only available method in this scenario.
116
117#![deny(rust_2018_idioms)]
118#![cfg_attr(not(feature = "std"), no_std)]
119// Enables this nightly only feature for the documentation build on docs.rs.
120// To test this locally, build the docs with:
121// `RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --all-features`
122#![cfg_attr(docsrs, feature(doc_cfg))]
123#![cfg_attr(docsrs, feature(doc_auto_cfg))]
124
125#[cfg(not(oneshot_loom))]
126extern crate alloc;
127
128use core::{
129 marker::PhantomData,
130 mem::{self, MaybeUninit},
131 ptr::{self, NonNull},
132};
133
134#[cfg(not(oneshot_loom))]
135use core::{
136 cell::UnsafeCell,
137 sync::atomic::{fence, AtomicU8, Ordering::*},
138};
139#[cfg(oneshot_loom)]
140use loom::{
141 cell::UnsafeCell,
142 sync::atomic::{fence, AtomicU8, Ordering::*},
143};
144
145#[cfg(all(any(feature = "std", feature = "async"), not(oneshot_loom)))]
146use core::hint;
147#[cfg(all(any(feature = "std", feature = "async"), oneshot_loom))]
148use loom::hint;
149
150#[cfg(feature = "async")]
151use core::{
152 pin::Pin,
153 task::{self, Poll},
154};
155#[cfg(feature = "std")]
156use std::time::{Duration, Instant};
157
158#[cfg(feature = "std")]
159mod thread {
160 #[cfg(not(oneshot_loom))]
161 pub use std::thread::{current, park, park_timeout, Thread};
162
163 #[cfg(oneshot_loom)]
164 pub use loom::thread::{current, park, Thread};
165
166 // loom does not support parking with a timeout. So we just
167 // yield. This means that the "park" will "spuriously" wake up
168 // way too early. But the code should properly handle this.
169 // One thing to note is that very short timeouts are needed
170 // when using loom, since otherwise the looping will cause
171 // an overflow in loom.
172 #[cfg(oneshot_loom)]
173 pub fn park_timeout(_timeout: std::time::Duration) {
174 loom::thread::yield_now()
175 }
176}
177
178#[cfg(oneshot_loom)]
179mod loombox;
180#[cfg(not(oneshot_loom))]
181use alloc::boxed::Box;
182#[cfg(oneshot_loom)]
183use loombox::Box;
184
185mod errors;
186// Wildcard imports are not nice. But since multiple errors have various conditional compilation,
187// this is easier than doing three different imports.
188pub use errors::*;
189
190/// Creates a new oneshot channel and returns the two endpoints, [`Sender`] and [`Receiver`].
191pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
192 // Allocate the channel on the heap and get the pointer.
193 // The last endpoint of the channel to be alive is responsible for freeing the channel
194 // and dropping any object that might have been written to it.
195 let channel_ptr = NonNull::from(Box::leak(Box::new(Channel::new())));
196
197 (
198 Sender {
199 channel_ptr,
200 _invariant: PhantomData,
201 },
202 Receiver { channel_ptr },
203 )
204}
205
206/// Sending end of a oneshot channel.
207///
208/// Created and returned from the [`channel`] function.
209///
210/// Can be used to send a message to the corresponding [`Receiver`].
211#[derive(Debug)]
212pub struct Sender<T> {
213 channel_ptr: NonNull<Channel<T>>,
214 // In reality we want contravariance, however we can't obtain that.
215 //
216 // Consider the following scenario:
217 // ```
218 // let (mut tx, rx) = channel::<&'short u8>();
219 // let (tx2, rx2) = channel::<&'long u8>();
220 //
221 // tx = tx2;
222 //
223 // // Pretend short_ref is some &'short u8
224 // tx.send(short_ref).unwrap();
225 // let long_ref = rx2.recv().unwrap();
226 // ```
227 //
228 // If this type were covariant then we could safely extend lifetimes, which is not okay.
229 // Hence, we enforce invariance.
230 _invariant: PhantomData<fn(T) -> T>,
231}
232
233/// Receiving end of a oneshot channel.
234///
235/// Created and returned from the [`channel`] function.
236///
237/// Can be used to receive a message from the corresponding [`Sender`]. How the message
238/// can be received depends on what features are enabled.
239///
240/// This type implement [`IntoFuture`](core::future::IntoFuture) when the `async` feature is enabled.
241/// This allows awaiting it directly in an async context.
242#[derive(Debug)]
243pub struct Receiver<T> {
244 // Covariance is the right choice here. Consider the example presented in Sender, and you'll
245 // see that if we replaced `rx` instead then we would get the expected behavior
246 channel_ptr: NonNull<Channel<T>>,
247}
248
249unsafe impl<T: Send> Send for Sender<T> {}
250
251// SAFETY: The only methods that assumes there is only a single reference to the sender
252// takes `self` by value, guaranteeing that there is only one reference to the sender at
253// the time it is called.
254unsafe impl<T: Sync> Sync for Sender<T> {}
255
256unsafe impl<T: Send> Send for Receiver<T> {}
257impl<T> Unpin for Receiver<T> {}
258
259impl<T> Sender<T> {
260 /// Sends `message` over the channel to the corresponding [`Receiver`].
261 ///
262 /// Returns an error if the receiver has already been dropped. The message can
263 /// be extracted from the error.
264 ///
265 /// This method is lock-free and wait-free when sending on a channel that the
266 /// receiver is currently not receiving on. If the receiver is receiving during the send
267 /// operation this method includes waking up the thread/task. Unparking a thread involves
268 /// a mutex in Rust's standard library at the time of writing this.
269 /// How lock-free waking up an async task is
270 /// depends on your executor. If this method returns a `SendError`, please mind that dropping
271 /// the error involves running any drop implementation on the message type, and freeing the
272 /// channel's heap allocation, which might or might not be lock-free.
273 pub fn send(self, message: T) -> Result<(), SendError<T>> {
274 let channel_ptr = self.channel_ptr;
275
276 // Don't run our Drop implementation if send was called, any cleanup now happens here
277 mem::forget(self);
278
279 // SAFETY: The channel exists on the heap for the entire duration of this method and we
280 // only ever acquire shared references to it. Note that if the receiver disconnects it
281 // does not free the channel.
282 let channel = unsafe { channel_ptr.as_ref() };
283
284 // Write the message into the channel on the heap.
285 // SAFETY: The receiver only ever accesses this memory location if we are in the MESSAGE
286 // state, and since we're responsible for setting that state, we can guarantee that we have
287 // exclusive access to this memory location to perform this write.
288 unsafe { channel.write_message(message) };
289
290 // Set the state to signal there is a message on the channel.
291 // ORDERING: we use release ordering to ensure the write of the message is visible to the
292 // receiving thread. The EMPTY and DISCONNECTED branches do not observe any shared state,
293 // and thus we do not need acquire ordering. The RECEIVING branch manages synchronization
294 // independent of this operation.
295 //
296 // EMPTY + 1 = MESSAGE
297 // RECEIVING + 1 = UNPARKING
298 // DISCONNECTED + 1 = invalid, however this state is never observed
299 match channel.state.fetch_add(1, Release) {
300 // The receiver is alive and has not started waiting. Send done.
301 EMPTY => Ok(()),
302 // The receiver is waiting. Wake it up so it can return the message.
303 RECEIVING => {
304 // ORDERING: Synchronizes with the write of the waker to memory, and prevents the
305 // taking of the waker from being ordered before this operation.
306 fence(Acquire);
307
308 // Take the waker, but critically do not unpark it. If we unparked now, then the
309 // receiving thread could still observe the UNPARKING state and re-park, meaning
310 // that after we change to the MESSAGE state, it would remain parked indefinitely
311 // or until a spurious wakeup.
312 // SAFETY: at this point we are in the UNPARKING state, and the receiving thread
313 // does not access the waker while in this state, nor does it free the channel
314 // allocation in this state.
315 let waker = unsafe { channel.take_waker() };
316
317 // ORDERING: this ordering serves two-fold: it synchronizes with the acquire load
318 // in the receiving thread, ensuring that both our read of the waker and write of
319 // the message happen-before the taking of the message and freeing of the channel.
320 // Furthermore, we need acquire ordering to ensure the unparking of the receiver
321 // happens after the channel state is updated.
322 channel.state.swap(MESSAGE, AcqRel);
323
324 // Note: it is possible that between the store above and this statement that
325 // the receiving thread is spuriously unparked, takes the message, and frees
326 // the channel allocation. However, we took ownership of the channel out of
327 // that allocation, and freeing the channel does not drop the waker since the
328 // waker is wrapped in MaybeUninit. Therefore this data is valid regardless of
329 // whether or not the receive has completed by this point.
330 waker.unpark();
331
332 Ok(())
333 }
334 // The receiver was already dropped. The error is responsible for freeing the channel.
335 // SAFETY: since the receiver disconnected it will no longer access `channel_ptr`, so
336 // we can transfer exclusive ownership of the channel's resources to the error.
337 // Moreover, since we just placed the message in the channel, the channel contains a
338 // valid message.
339 DISCONNECTED => Err(unsafe { SendError::new(channel_ptr) }),
340 _ => unreachable!(),
341 }
342 }
343
344 /// Returns true if the associated [`Receiver`] has been dropped.
345 ///
346 /// If true is returned, a future call to send is guaranteed to return an error.
347 pub fn is_closed(&self) -> bool {
348 // SAFETY: The channel exists on the heap for the entire duration of this method and we
349 // only ever acquire shared references to it. Note that if the receiver disconnects it
350 // does not free the channel.
351 let channel = unsafe { self.channel_ptr.as_ref() };
352
353 // ORDERING: We *chose* a Relaxed ordering here as it sufficient to
354 // enforce the method's contract: "if true is returned, a future
355 // call to send is guaranteed to return an error."
356 channel.state.load(Relaxed) == DISCONNECTED
357 }
358
359 /// Consumes the Sender, returning a raw pointer to the channel on the heap.
360 ///
361 /// This is intended to simplify using oneshot channels with some FFI code. The only safe thing
362 /// to do with the returned pointer is to later reconstruct the Sender with [Sender::from_raw].
363 /// Memory will leak if the Sender is never reconstructed.
364 pub fn into_raw(self) -> *mut () {
365 let raw = self.channel_ptr.as_ptr() as *mut ();
366 mem::forget(self);
367 raw
368 }
369
370 /// Consumes a raw pointer from [Sender::into_raw], recreating the Sender.
371 ///
372 /// # Safety
373 ///
374 /// This pointer must have come from [`Sender<T>::into_raw`] with the same message type, `T`.
375 /// At most one Sender must exist for a channel at any point in time.
376 /// Constructing multiple Senders from the same raw pointer leads to undefined behavior.
377 pub unsafe fn from_raw(raw: *mut ()) -> Self {
378 Self {
379 channel_ptr: NonNull::new_unchecked(raw as *mut Channel<T>),
380 _invariant: PhantomData,
381 }
382 }
383}
384
385impl<T> Drop for Sender<T> {
386 fn drop(&mut self) {
387 // SAFETY: The receiver only ever frees the channel if we are in the MESSAGE or
388 // DISCONNECTED states. If we are in the MESSAGE state, then we called
389 // mem::forget(self), so we should not be in this function call. If we are in the
390 // DISCONNECTED state, then the receiver either received a MESSAGE so this statement is
391 // unreachable, or was dropped and observed that our side was still alive, and thus didn't
392 // free the channel.
393 let channel = unsafe { self.channel_ptr.as_ref() };
394
395 // Set the channel state to disconnected and read what state the receiver was in
396 // ORDERING: we don't need release ordering here since there are no modifications we
397 // need to make visible to other thread, and the Err(RECEIVING) branch handles
398 // synchronization independent of this cmpxchg
399 //
400 // EMPTY ^ 001 = DISCONNECTED
401 // RECEIVING ^ 001 = UNPARKING
402 // DISCONNECTED ^ 001 = EMPTY (invalid), but this state is never observed
403 match channel.state.fetch_xor(0b001, Relaxed) {
404 // The receiver has not started waiting, nor is it dropped.
405 EMPTY => (),
406 // The receiver is waiting. Wake it up so it can detect that the channel disconnected.
407 RECEIVING => {
408 // See comments in Sender::send
409
410 fence(Acquire);
411
412 let waker = unsafe { channel.take_waker() };
413
414 // We still need release ordering here to make sure our read of the waker happens
415 // before this, and acquire ordering to ensure the unparking of the receiver
416 // happens after this.
417 channel.state.swap(DISCONNECTED, AcqRel);
418
419 // The Acquire ordering above ensures that the write of the DISCONNECTED state
420 // happens-before unparking the receiver.
421 waker.unpark();
422 }
423 // The receiver was already dropped. We are responsible for freeing the channel.
424 DISCONNECTED => {
425 // SAFETY: when the receiver switches the state to DISCONNECTED they have received
426 // the message or will no longer be trying to receive the message, and have
427 // observed that the sender is still alive, meaning that we're responsible for
428 // freeing the channel allocation.
429 unsafe { dealloc(self.channel_ptr) };
430 }
431 _ => unreachable!(),
432 }
433 }
434}
435
436impl<T> Receiver<T> {
437 /// Checks if there is a message in the channel without blocking. Returns:
438 /// * `Ok(message)` if there was a message in the channel.
439 /// * `Err(Empty)` if the [`Sender`] is alive, but has not yet sent a message.
440 /// * `Err(Disconnected)` if the [`Sender`] was dropped before sending anything or if the
441 /// message has already been extracted by a previous receive call.
442 ///
443 /// If a message is returned, the channel is disconnected and any subsequent receive operation
444 /// using this receiver will return an error.
445 ///
446 /// This method is completely lock-free and wait-free. The only thing it does is an atomic
447 /// integer load of the channel state. And if there is a message in the channel it additionally
448 /// performs one atomic integer store and copies the message from the heap to the stack for
449 /// returning it.
450 pub fn try_recv(&self) -> Result<T, TryRecvError> {
451 // SAFETY: The channel will not be freed while this method is still running.
452 let channel = unsafe { self.channel_ptr.as_ref() };
453
454 // ORDERING: we use acquire ordering to synchronize with the store of the message.
455 match channel.state.load(Acquire) {
456 MESSAGE => {
457 // It's okay to break up the load and store since once we're in the message state
458 // the sender no longer modifies the state
459 // ORDERING: at this point the sender has done its job and is no longer active, so
460 // we don't need to make any side effects visible to it
461 channel.state.store(DISCONNECTED, Relaxed);
462
463 // SAFETY: we are in the MESSAGE state so the message is present
464 Ok(unsafe { channel.take_message() })
465 }
466 EMPTY => Err(TryRecvError::Empty),
467 DISCONNECTED => Err(TryRecvError::Disconnected),
468 #[cfg(feature = "async")]
469 RECEIVING | UNPARKING => Err(TryRecvError::Empty),
470 _ => unreachable!(),
471 }
472 }
473
474 /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is
475 /// disconnected.
476 ///
477 /// This method will always block the current thread if there is no data available and it is
478 /// still possible for the message to be sent. Once the message is sent to the corresponding
479 /// [`Sender`], then this receiver will wake up and return that message.
480 ///
481 /// If the corresponding [`Sender`] has disconnected (been dropped), or it disconnects while
482 /// this call is blocking, this call will wake up and return `Err` to indicate that the message
483 /// can never be received on this channel.
484 ///
485 /// If a sent message has already been extracted from this channel this method will return an
486 /// error.
487 ///
488 /// # Panics
489 ///
490 /// Panics if called after this receiver has been polled asynchronously.
491 #[cfg(feature = "std")]
492 pub fn recv(self) -> Result<T, RecvError> {
493 // Note that we don't need to worry about changing the state to disconnected or setting the
494 // state to an invalid value at any point in this function because we take ownership of
495 // self, and this function does not exit until the message has been received or both side
496 // of the channel are inactive and cleaned up.
497
498 let channel_ptr = self.channel_ptr;
499
500 // Don't run our Drop implementation. This consuming recv method is responsible for freeing.
501 mem::forget(self);
502
503 // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
504 // is still alive, meaning that even if the sender was dropped then it would have observed
505 // the fact that we're still alive and left the responsibility of deallocating the
506 // channel to us, so channel_ptr is valid
507 let channel = unsafe { channel_ptr.as_ref() };
508
509 // ORDERING: we use acquire ordering to synchronize with the write of the message in the
510 // case that it's available
511 match channel.state.load(Acquire) {
512 // The sender is alive but has not sent anything yet. We prepare to park.
513 EMPTY => {
514 // Conditionally add a delay here to help the tests trigger the edge cases where
515 // the sender manages to be dropped or send something before we are able to store
516 // our waker object in the channel.
517 #[cfg(all(oneshot_test_delay, not(oneshot_loom)))]
518 std::thread::sleep(std::time::Duration::from_millis(10));
519
520 // Write our waker instance to the channel.
521 // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
522 // try to access the waker until it sees the state set to RECEIVING below
523 unsafe { channel.write_waker(ReceiverWaker::current_thread()) };
524
525 // Switch the state to RECEIVING. We need to do this in one atomic step in case the
526 // sender disconnected or sent the message while we wrote the waker to memory. We
527 // don't need to do a compare exchange here however because if the original state
528 // was not EMPTY, then the sender has either finished sending the message or is
529 // being dropped, so the RECEIVING state will never be observed after we return.
530 // ORDERING: we use release ordering so the sender can synchronize with our writing
531 // of the waker to memory. The individual branches handle any additional
532 // synchronizaton
533 match channel.state.swap(RECEIVING, Release) {
534 // We stored our waker, now we park until the sender has changed the state
535 EMPTY => loop {
536 thread::park();
537
538 // ORDERING: synchronize with the write of the message
539 match channel.state.load(Acquire) {
540 // The sender sent the message while we were parked.
541 MESSAGE => {
542 // SAFETY: we are in the message state so the message is valid
543 let message = unsafe { channel.take_message() };
544
545 // SAFETY: the Sender delegates the responsibility of deallocating
546 // the channel to us upon sending the message
547 unsafe { dealloc(channel_ptr) };
548
549 break Ok(message);
550 }
551 // The sender was dropped while we were parked.
552 DISCONNECTED => {
553 // SAFETY: the Sender doesn't deallocate the channel allocation in
554 // its drop implementation if we're receiving
555 unsafe { dealloc(channel_ptr) };
556
557 break Err(RecvError);
558 }
559 // State did not change, spurious wakeup, park again.
560 RECEIVING | UNPARKING => (),
561 _ => unreachable!(),
562 }
563 },
564 // The sender sent the message while we prepared to park.
565 MESSAGE => {
566 // ORDERING: Synchronize with the write of the message. This branch is
567 // unlikely to be taken, so it's likely more efficient to use a fence here
568 // instead of AcqRel ordering on the RMW operation
569 fence(Acquire);
570
571 // SAFETY: we started in the empty state and the sender switched us to the
572 // message state. This means that it did not take the waker, so we're
573 // responsible for dropping it.
574 unsafe { channel.drop_waker() };
575
576 // SAFETY: we are in the message state so the message is valid
577 let message = unsafe { channel.take_message() };
578
579 // SAFETY: the Sender delegates the responsibility of deallocating the
580 // channel to us upon sending the message
581 unsafe { dealloc(channel_ptr) };
582
583 Ok(message)
584 }
585 // The sender was dropped before sending anything while we prepared to park.
586 DISCONNECTED => {
587 // SAFETY: we started in the empty state and the sender switched us to the
588 // disconnected state. It does not take the waker when it does this so we
589 // need to drop it.
590 unsafe { channel.drop_waker() };
591
592 // SAFETY: the sender does not deallocate the channel if it switches from
593 // empty to disconnected so we need to free the allocation
594 unsafe { dealloc(channel_ptr) };
595
596 Err(RecvError)
597 }
598 _ => unreachable!(),
599 }
600 }
601 // The sender already sent the message.
602 MESSAGE => {
603 // SAFETY: we are in the message state so the message is valid
604 let message = unsafe { channel.take_message() };
605
606 // SAFETY: we are already in the message state so the sender has been forgotten
607 // and it's our job to clean up resources
608 unsafe { dealloc(channel_ptr) };
609
610 Ok(message)
611 }
612 // The sender was dropped before sending anything, or we already received the message.
613 DISCONNECTED => {
614 // SAFETY: the sender does not deallocate the channel if it switches from empty to
615 // disconnected so we need to free the allocation
616 unsafe { dealloc(channel_ptr) };
617
618 Err(RecvError)
619 }
620 // The receiver must have been `Future::poll`ed prior to this call.
621 #[cfg(feature = "async")]
622 RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR),
623 _ => unreachable!(),
624 }
625 }
626
627 /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is
628 /// disconnected. This is a non consuming version of [`Receiver::recv`], but with a bit
629 /// worse performance. Prefer `[`Receiver::recv`]` if your code allows consuming the receiver.
630 ///
631 /// If a message is returned, the channel is disconnected and any subsequent receive operation
632 /// using this receiver will return an error.
633 ///
634 /// # Panics
635 ///
636 /// Panics if called after this receiver has been polled asynchronously.
637 #[cfg(feature = "std")]
638 pub fn recv_ref(&self) -> Result<T, RecvError> {
639 self.start_recv_ref(RecvError, |channel| {
640 loop {
641 thread::park();
642
643 // ORDERING: we use acquire ordering to synchronize with the write of the message
644 match channel.state.load(Acquire) {
645 // The sender sent the message while we were parked.
646 // We take the message and mark the channel disconnected.
647 MESSAGE => {
648 // ORDERING: the sender is inactive at this point so we don't need to make
649 // any reads or writes visible to the sending thread
650 channel.state.store(DISCONNECTED, Relaxed);
651
652 // SAFETY: we were just in the message state so the message is valid
653 break Ok(unsafe { channel.take_message() });
654 }
655 // The sender was dropped while we were parked.
656 DISCONNECTED => break Err(RecvError),
657 // State did not change, spurious wakeup, park again.
658 RECEIVING | UNPARKING => (),
659 _ => unreachable!(),
660 }
661 }
662 })
663 }
664
665 /// Like [`Receiver::recv`], but will not block longer than `timeout`. Returns:
666 /// * `Ok(message)` if there was a message in the channel before the timeout was reached.
667 /// * `Err(Timeout)` if no message arrived on the channel before the timeout was reached.
668 /// * `Err(Disconnected)` if the sender was dropped before sending anything or if the message
669 /// has already been extracted by a previous receive call.
670 ///
671 /// If a message is returned, the channel is disconnected and any subsequent receive operation
672 /// using this receiver will return an error.
673 ///
674 /// If the supplied `timeout` is so large that Rust's `Instant` type can't represent this point
675 /// in the future this falls back to an indefinitely blocking receive operation.
676 ///
677 /// # Panics
678 ///
679 /// Panics if called after this receiver has been polled asynchronously.
680 #[cfg(feature = "std")]
681 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
682 match Instant::now().checked_add(timeout) {
683 Some(deadline) => self.recv_deadline(deadline),
684 None => self.recv_ref().map_err(|_| RecvTimeoutError::Disconnected),
685 }
686 }
687
688 /// Like [`Receiver::recv`], but will not block longer than until `deadline`. Returns:
689 /// * `Ok(message)` if there was a message in the channel before the deadline was reached.
690 /// * `Err(Timeout)` if no message arrived on the channel before the deadline was reached.
691 /// * `Err(Disconnected)` if the sender was dropped before sending anything or if the message
692 /// has already been extracted by a previous receive call.
693 ///
694 /// If a message is returned, the channel is disconnected and any subsequent receive operation
695 /// using this receiver will return an error.
696 ///
697 /// # Panics
698 ///
699 /// Panics if called after this receiver has been polled asynchronously.
700 #[cfg(feature = "std")]
701 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
702 /// # Safety
703 ///
704 /// If the sender is unparking us after a message send, the message must already have been
705 /// written to the channel and an acquire memory barrier issued before calling this function
706 #[cold]
707 unsafe fn wait_for_unpark<T>(channel: &Channel<T>) -> Result<T, RecvTimeoutError> {
708 loop {
709 thread::park();
710
711 // ORDERING: The callee has already synchronized with any message write
712 match channel.state.load(Relaxed) {
713 MESSAGE => {
714 // ORDERING: the sender has been dropped, so this update only
715 // needs to be visible to us
716 channel.state.store(DISCONNECTED, Relaxed);
717 break Ok(channel.take_message());
718 }
719 DISCONNECTED => break Err(RecvTimeoutError::Disconnected),
720 // The sender is still unparking us. We continue on the empty state here since
721 // the current implementation eagerly sets the state to EMPTY upon timeout.
722 EMPTY => (),
723 _ => unreachable!(),
724 }
725 }
726 }
727
728 self.start_recv_ref(RecvTimeoutError::Disconnected, |channel| {
729 loop {
730 match deadline.checked_duration_since(Instant::now()) {
731 Some(timeout) => {
732 thread::park_timeout(timeout);
733
734 // ORDERING: synchronize with the write of the message
735 match channel.state.load(Acquire) {
736 // The sender sent the message while we were parked.
737 MESSAGE => {
738 // ORDERING: the sender has been `mem::forget`-ed so this update
739 // only needs to be visible to us.
740 channel.state.store(DISCONNECTED, Relaxed);
741
742 // SAFETY: we either are in the message state or were just in the
743 // message state
744 break Ok(unsafe { channel.take_message() });
745 }
746 // The sender was dropped while we were parked.
747 DISCONNECTED => break Err(RecvTimeoutError::Disconnected),
748 // State did not change, spurious wakeup, park again.
749 RECEIVING | UNPARKING => (),
750 _ => unreachable!(),
751 }
752 }
753 None => {
754 // ORDERING: synchronize with the write of the message
755 match channel.state.swap(EMPTY, Acquire) {
756 // We reached the end of the timeout without receiving a message
757 RECEIVING => {
758 // SAFETY: we were in the receiving state and are now in the empty
759 // state, so the sender has not and will not try to read the waker,
760 // so we have exclusive access to drop it.
761 unsafe { channel.drop_waker() };
762
763 break Err(RecvTimeoutError::Timeout);
764 }
765 // The sender sent the message while we were parked.
766 MESSAGE => {
767 // Same safety and ordering as the Some branch
768
769 channel.state.store(DISCONNECTED, Relaxed);
770 break Ok(unsafe { channel.take_message() });
771 }
772 // The sender was dropped while we were parked.
773 DISCONNECTED => {
774 // ORDERING: we were originally in the disconnected state meaning
775 // that the sender is inactive and no longer observing the state,
776 // so we only need to change it back to DISCONNECTED for if the
777 // receiver is dropped or a recv* method is called again
778 channel.state.store(DISCONNECTED, Relaxed);
779
780 break Err(RecvTimeoutError::Disconnected);
781 }
782 // The sender sent the message and started unparking us
783 UNPARKING => {
784 // We were in the UNPARKING state and are now in the EMPTY state.
785 // We wait to be properly unparked and to observe if the sender
786 // sets MESSAGE or DISCONNECTED state.
787 // SAFETY: The load above has synchronized with any message write.
788 break unsafe { wait_for_unpark(channel) };
789 }
790 _ => unreachable!(),
791 }
792 }
793 }
794 }
795 })
796 }
797
798 /// Returns true if the associated [`Sender`] was dropped before sending a message. Or if
799 /// the message has already been received.
800 ///
801 /// If `true` is returned, all future calls to receive methods are guaranteed to return
802 /// a disconnected error. And future calls to this method is guaranteed to also return `true`.
803 pub fn is_closed(&self) -> bool {
804 // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
805 // is still alive, meaning that even if the sender was dropped then it would have observed
806 // the fact that we're still alive and left the responsibility of deallocating the
807 // channel to us, so `self.channel` is valid
808 let channel = unsafe { self.channel_ptr.as_ref() };
809
810 // ORDERING: We *chose* a Relaxed ordering here as it is sufficient to
811 // enforce the method's contract. Once true has been observed, it will remain true.
812 // However, if false is observed, the sender might have just disconnected but this thread
813 // has not observed it yet.
814 channel.state.load(Relaxed) == DISCONNECTED
815 }
816
817 /// Returns true if there is a message in the channel, ready to be received.
818 ///
819 /// If `true` is returned, the next call to a receive method is guaranteed to return
820 /// a message.
821 pub fn has_message(&self) -> bool {
822 // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
823 // is still alive, meaning that even if the sender was dropped then it would have observed
824 // the fact that we're still alive and left the responsibility of deallocating the
825 // channel to us, so `self.channel` is valid
826 let channel = unsafe { self.channel_ptr.as_ref() };
827
828 // ORDERING: An acquire ordering is used to guarantee no subsequent loads is reordered
829 // before this one. This upholds the contract that if true is returned, the next call to
830 // a receive method is guaranteed to also abserve the `MESSAGE` state and return a message.
831 channel.state.load(Acquire) == MESSAGE
832 }
833
834 /// Begins the process of receiving on the channel by reference. If the message is already
835 /// ready, or the sender has disconnected, then this function will return the appropriate
836 /// Result immediately. Otherwise, it will write the waker to memory, check to see if the
837 /// sender has finished or disconnected again, and then will call `finish`. `finish` is
838 /// thus responsible for cleaning up the channel's resources appropriately before it returns,
839 /// such as destroying the waker, for instance.
840 #[cfg(feature = "std")]
841 #[inline]
842 fn start_recv_ref<E>(
843 &self,
844 disconnected_error: E,
845 finish: impl FnOnce(&Channel<T>) -> Result<T, E>,
846 ) -> Result<T, E> {
847 // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
848 // is still alive, meaning that even if the sender was dropped then it would have observed
849 // the fact that we're still alive and left the responsibility of deallocating the
850 // channel to us, so `self.channel` is valid
851 let channel = unsafe { self.channel_ptr.as_ref() };
852
853 // ORDERING: synchronize with the write of the message
854 match channel.state.load(Acquire) {
855 // The sender is alive but has not sent anything yet. We prepare to park.
856 EMPTY => {
857 // Conditionally add a delay here to help the tests trigger the edge cases where
858 // the sender manages to be dropped or send something before we are able to store
859 // our waker object in the channel.
860 #[cfg(all(oneshot_test_delay, not(oneshot_loom)))]
861 std::thread::sleep(std::time::Duration::from_millis(10));
862
863 // Write our waker instance to the channel.
864 // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
865 // try to access the waker until it sees the state set to RECEIVING below
866 unsafe { channel.write_waker(ReceiverWaker::current_thread()) };
867
868 // ORDERING: we use release ordering on success so the sender can synchronize with
869 // our write of the waker. We use relaxed ordering on failure since the sender does
870 // not need to synchronize with our write and the individual match arms handle any
871 // additional synchronization
872 match channel
873 .state
874 .compare_exchange(EMPTY, RECEIVING, Release, Relaxed)
875 {
876 // We stored our waker, now we delegate to the callback to finish the receive
877 // operation
878 Ok(_) => finish(channel),
879 // The sender sent the message while we prepared to finish
880 Err(MESSAGE) => {
881 // See comments in `recv` for ordering and safety
882
883 fence(Acquire);
884
885 unsafe { channel.drop_waker() };
886
887 // ORDERING: the sender has been `mem::forget`-ed so this update only
888 // needs to be visible to us
889 channel.state.store(DISCONNECTED, Relaxed);
890
891 // SAFETY: The MESSAGE state tells us there is a correctly initialized
892 // message
893 Ok(unsafe { channel.take_message() })
894 }
895 // The sender was dropped before sending anything while we prepared to park.
896 Err(DISCONNECTED) => {
897 // See comments in `recv` for safety
898 unsafe { channel.drop_waker() };
899 Err(disconnected_error)
900 }
901 _ => unreachable!(),
902 }
903 }
904 // The sender sent the message. We take the message and mark the channel disconnected.
905 MESSAGE => {
906 // ORDERING: the sender has been `mem::forget`-ed so this update only needs to be
907 // visible to us
908 channel.state.store(DISCONNECTED, Relaxed);
909
910 // SAFETY: we are in the message state so the message is valid
911 Ok(unsafe { channel.take_message() })
912 }
913 // The sender was dropped before sending anything, or we already received the message.
914 DISCONNECTED => Err(disconnected_error),
915 // The receiver must have been `Future::poll`ed prior to this call.
916 #[cfg(feature = "async")]
917 RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR),
918 _ => unreachable!(),
919 }
920 }
921
922 /// Consumes the Receiver, returning a raw pointer to the channel on the heap.
923 ///
924 /// This is intended to simplify using oneshot channels with some FFI code. The only safe thing
925 /// to do with the returned pointer is to later reconstruct the Receiver with
926 /// [Receiver::from_raw]. Memory will leak if the Receiver is never reconstructed.
927 pub fn into_raw(self) -> *mut () {
928 let raw = self.channel_ptr.as_ptr() as *mut ();
929 mem::forget(self);
930 raw
931 }
932
933 /// Consumes a raw pointer from [Receiver::into_raw], recreating the Receiver.
934 ///
935 /// # Safety
936 ///
937 /// This pointer must have come from [`Receiver<T>::into_raw`] with the same message type, `T`.
938 /// At most one Receiver must exist for a channel at any point in time.
939 /// Constructing multiple Receivers from the same raw pointer leads to undefined behavior.
940 pub unsafe fn from_raw(raw: *mut ()) -> Self {
941 Self {
942 channel_ptr: NonNull::new_unchecked(raw as *mut Channel<T>),
943 }
944 }
945}
946
947#[cfg(feature = "async")]
948impl<T> core::future::Future for Receiver<T> {
949 type Output = Result<T, RecvError>;
950
951 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
952 // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
953 // is still alive, meaning that even if the sender was dropped then it would have observed
954 // the fact that we're still alive and left the responsibility of deallocating the
955 // channel to us, so `self.channel` is valid
956 let channel = unsafe { self.channel_ptr.as_ref() };
957
958 // ORDERING: we use acquire ordering to synchronize with the store of the message.
959 match channel.state.load(Acquire) {
960 // The sender is alive but has not sent anything yet.
961 EMPTY => {
962 // SAFETY: We can't be in the forbidden states, and no waker in the channel.
963 unsafe { channel.write_async_waker(cx) }
964 }
965 // We were polled again while waiting for the sender. Replace the waker with the new one.
966 RECEIVING => {
967 // ORDERING: We use relaxed ordering on both success and failure since we have not
968 // written anything above that must be released, and the individual match arms
969 // handle any additional synchronization.
970 match channel
971 .state
972 .compare_exchange(RECEIVING, EMPTY, Relaxed, Relaxed)
973 {
974 // We successfully changed the state back to EMPTY. Replace the waker.
975 // This is the most likely branch to be taken, which is why we don't use any
976 // memory barriers in the compare_exchange above.
977 Ok(_) => {
978 // SAFETY: We wrote the waker in a previous call to poll. We do not need
979 // a memory barrier since the previous write here was by ourselves.
980 unsafe { channel.drop_waker() };
981 // SAFETY: We can't be in the forbidden states, and no waker in the channel.
982 unsafe { channel.write_async_waker(cx) }
983 }
984 // The sender sent the message while we prepared to replace the waker.
985 // We take the message and mark the channel disconnected.
986 // The sender has already taken the waker.
987 Err(MESSAGE) => {
988 // ORDERING: Synchronize with the write of the message. This branch is
989 // unlikely to be taken.
990 channel.state.swap(DISCONNECTED, Acquire);
991 // SAFETY: The state tells us the sender has initialized the message.
992 Poll::Ready(Ok(unsafe { channel.take_message() }))
993 }
994 // The sender was dropped before sending anything while we prepared to park.
995 // The sender has taken the waker already.
996 Err(DISCONNECTED) => Poll::Ready(Err(RecvError)),
997 // The sender is currently waking us up.
998 Err(UNPARKING) => {
999 // We can't trust that the old waker that the sender has access to
1000 // is honored by the async runtime at this point. So we wake ourselves
1001 // up to get polled instantly again.
1002 cx.waker().wake_by_ref();
1003 Poll::Pending
1004 }
1005 _ => unreachable!(),
1006 }
1007 }
1008 // The sender sent the message.
1009 MESSAGE => {
1010 // ORDERING: the sender has been dropped so this update only needs to be
1011 // visible to us
1012 channel.state.store(DISCONNECTED, Relaxed);
1013 Poll::Ready(Ok(unsafe { channel.take_message() }))
1014 }
1015 // The sender was dropped before sending anything, or we already received the message.
1016 DISCONNECTED => Poll::Ready(Err(RecvError)),
1017 // The sender has observed the RECEIVING state and is currently reading the waker from
1018 // a previous poll. We need to loop here until we observe the MESSAGE or DISCONNECTED
1019 // state. We busy loop here since we know the sender is done very soon.
1020 UNPARKING => loop {
1021 hint::spin_loop();
1022 // ORDERING: The load above has already synchronized with the write of the message.
1023 match channel.state.load(Relaxed) {
1024 MESSAGE => {
1025 // ORDERING: the sender has been dropped, so this update only
1026 // needs to be visible to us
1027 channel.state.store(DISCONNECTED, Relaxed);
1028 // SAFETY: We observed the MESSAGE state
1029 break Poll::Ready(Ok(unsafe { channel.take_message() }));
1030 }
1031 DISCONNECTED => break Poll::Ready(Err(RecvError)),
1032 UNPARKING => (),
1033 _ => unreachable!(),
1034 }
1035 },
1036 _ => unreachable!(),
1037 }
1038 }
1039}
1040
1041impl<T> Drop for Receiver<T> {
1042 fn drop(&mut self) {
1043 // SAFETY: since the receiving side is still alive the sender would have observed that and
1044 // left deallocating the channel allocation to us.
1045 let channel = unsafe { self.channel_ptr.as_ref() };
1046
1047 // Set the channel state to disconnected and read what state the receiver was in
1048 match channel.state.swap(DISCONNECTED, Acquire) {
1049 // The sender has not sent anything, nor is it dropped.
1050 EMPTY => (),
1051 // The sender already sent something. We must drop it, and free the channel.
1052 MESSAGE => {
1053 // SAFETY: we are in the message state so the message is initialized
1054 unsafe { channel.drop_message() };
1055
1056 // SAFETY: see safety comment at top of function
1057 unsafe { dealloc(self.channel_ptr) };
1058 }
1059 // The receiver has been polled.
1060 #[cfg(feature = "async")]
1061 RECEIVING => {
1062 // TODO: figure this out when async is fixed
1063 unsafe { channel.drop_waker() };
1064 }
1065 // The sender was already dropped. We are responsible for freeing the channel.
1066 DISCONNECTED => {
1067 // SAFETY: see safety comment at top of function
1068 unsafe { dealloc(self.channel_ptr) };
1069 }
1070 // This receiver was previously polled, so the channel was in the RECEIVING state.
1071 // But the sender has observed the RECEIVING state and is currently reading the waker
1072 // to wake us up. We need to loop here until we observe the MESSAGE or DISCONNECTED state.
1073 // We busy loop here since we know the sender is done very soon.
1074 #[cfg(any(feature = "std", feature = "async"))]
1075 UNPARKING => {
1076 loop {
1077 hint::spin_loop();
1078 // ORDERING: The swap above has already synchronized with the write of the message.
1079 match channel.state.load(Relaxed) {
1080 MESSAGE => {
1081 // SAFETY: we are in the message state so the message is initialized
1082 unsafe { channel.drop_message() };
1083 break;
1084 }
1085 DISCONNECTED => break,
1086 UNPARKING => (),
1087 _ => unreachable!(),
1088 }
1089 }
1090 // SAFETY: see safety comment at top of function
1091 unsafe { dealloc(self.channel_ptr) };
1092 }
1093 _ => unreachable!(),
1094 }
1095 }
1096}
1097
1098/// All the values that the `Channel::state` field can have during the lifetime of a channel.
1099mod states {
1100 // These values are very explicitly chosen so that we can replace some cmpxchg calls with
1101 // fetch_* calls.
1102
1103 /// The initial channel state. Active while both endpoints are still alive, no message has been
1104 /// sent, and the receiver is not receiving.
1105 pub const EMPTY: u8 = 0b011;
1106 /// A message has been sent to the channel, but the receiver has not yet read it.
1107 pub const MESSAGE: u8 = 0b100;
1108 /// No message has yet been sent on the channel, but the receiver is currently receiving.
1109 pub const RECEIVING: u8 = 0b000;
1110 #[cfg(any(feature = "std", feature = "async"))]
1111 pub const UNPARKING: u8 = 0b001;
1112 /// The channel has been closed. This means that either the sender or receiver has been dropped,
1113 /// or the message sent to the channel has already been received. Since this is a oneshot
1114 /// channel, it is disconnected after the one message it is supposed to hold has been
1115 /// transmitted.
1116 pub const DISCONNECTED: u8 = 0b010;
1117}
1118use states::*;
1119
1120/// Internal channel data structure structure. the `channel` method allocates and puts one instance
1121/// of this struct on the heap for each oneshot channel instance. The struct holds:
1122/// * The current state of the channel.
1123/// * The message in the channel. This memory is uninitialized until the message is sent.
1124/// * The waker instance for the thread or task that is currently receiving on this channel.
1125/// This memory is uninitialized until the receiver starts receiving.
1126struct Channel<T> {
1127 state: AtomicU8,
1128 message: UnsafeCell<MaybeUninit<T>>,
1129 waker: UnsafeCell<MaybeUninit<ReceiverWaker>>,
1130}
1131
1132impl<T> Channel<T> {
1133 pub fn new() -> Self {
1134 Self {
1135 state: AtomicU8::new(EMPTY),
1136 message: UnsafeCell::new(MaybeUninit::uninit()),
1137 waker: UnsafeCell::new(MaybeUninit::uninit()),
1138 }
1139 }
1140
1141 #[inline(always)]
1142 unsafe fn message(&self) -> &MaybeUninit<T> {
1143 #[cfg(oneshot_loom)]
1144 {
1145 self.message.with(|ptr| &*ptr)
1146 }
1147
1148 #[cfg(not(oneshot_loom))]
1149 {
1150 &*self.message.get()
1151 }
1152 }
1153
1154 #[inline(always)]
1155 unsafe fn with_message_mut<F>(&self, op: F)
1156 where
1157 F: FnOnce(&mut MaybeUninit<T>),
1158 {
1159 #[cfg(oneshot_loom)]
1160 {
1161 self.message.with_mut(|ptr| op(&mut *ptr))
1162 }
1163
1164 #[cfg(not(oneshot_loom))]
1165 {
1166 op(&mut *self.message.get())
1167 }
1168 }
1169
1170 #[inline(always)]
1171 #[cfg(any(feature = "std", feature = "async"))]
1172 unsafe fn with_waker_mut<F>(&self, op: F)
1173 where
1174 F: FnOnce(&mut MaybeUninit<ReceiverWaker>),
1175 {
1176 #[cfg(oneshot_loom)]
1177 {
1178 self.waker.with_mut(|ptr| op(&mut *ptr))
1179 }
1180
1181 #[cfg(not(oneshot_loom))]
1182 {
1183 op(&mut *self.waker.get())
1184 }
1185 }
1186
1187 #[inline(always)]
1188 unsafe fn write_message(&self, message: T) {
1189 self.with_message_mut(|slot| slot.as_mut_ptr().write(message));
1190 }
1191
1192 #[inline(always)]
1193 unsafe fn take_message(&self) -> T {
1194 #[cfg(oneshot_loom)]
1195 {
1196 self.message.with(|ptr| ptr::read(ptr)).assume_init()
1197 }
1198
1199 #[cfg(not(oneshot_loom))]
1200 {
1201 ptr::read(self.message.get()).assume_init()
1202 }
1203 }
1204
1205 #[inline(always)]
1206 unsafe fn drop_message(&self) {
1207 self.with_message_mut(|slot| slot.assume_init_drop());
1208 }
1209
1210 #[cfg(any(feature = "std", feature = "async"))]
1211 #[inline(always)]
1212 unsafe fn write_waker(&self, waker: ReceiverWaker) {
1213 self.with_waker_mut(|slot| slot.as_mut_ptr().write(waker));
1214 }
1215
1216 #[inline(always)]
1217 unsafe fn take_waker(&self) -> ReceiverWaker {
1218 #[cfg(oneshot_loom)]
1219 {
1220 self.waker.with(|ptr| ptr::read(ptr)).assume_init()
1221 }
1222
1223 #[cfg(not(oneshot_loom))]
1224 {
1225 ptr::read(self.waker.get()).assume_init()
1226 }
1227 }
1228
1229 #[cfg(any(feature = "std", feature = "async"))]
1230 #[inline(always)]
1231 unsafe fn drop_waker(&self) {
1232 self.with_waker_mut(|slot| slot.assume_init_drop());
1233 }
1234
1235 /// # Safety
1236 ///
1237 /// * `Channel::waker` must not have a waker stored in it when calling this method.
1238 /// * Channel state must not be RECEIVING or UNPARKING when calling this method.
1239 #[cfg(feature = "async")]
1240 unsafe fn write_async_waker(&self, cx: &mut task::Context<'_>) -> Poll<Result<T, RecvError>> {
1241 // Write our thread instance to the channel.
1242 // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
1243 // try to access the waker until it sees the state set to RECEIVING below
1244 self.write_waker(ReceiverWaker::task_waker(cx));
1245
1246 // ORDERING: we use release ordering on success so the sender can synchronize with
1247 // our write of the waker. We use relaxed ordering on failure since the sender does
1248 // not need to synchronize with our write and the individual match arms handle any
1249 // additional synchronization
1250 match self
1251 .state
1252 .compare_exchange(EMPTY, RECEIVING, Release, Relaxed)
1253 {
1254 // We stored our waker, now we return and let the sender wake us up
1255 Ok(_) => Poll::Pending,
1256 // The sender sent the message while we prepared to park.
1257 // We take the message and mark the channel disconnected.
1258 Err(MESSAGE) => {
1259 // ORDERING: Synchronize with the write of the message. This branch is
1260 // unlikely to be taken, so it's likely more efficient to use a fence here
1261 // instead of AcqRel ordering on the compare_exchange operation
1262 fence(Acquire);
1263
1264 // SAFETY: we started in the EMPTY state and the sender switched us to the
1265 // MESSAGE state. This means that it did not take the waker, so we're
1266 // responsible for dropping it.
1267 self.drop_waker();
1268
1269 // ORDERING: sender does not exist, so this update only needs to be visible to us
1270 self.state.store(DISCONNECTED, Relaxed);
1271
1272 // SAFETY: The MESSAGE state tells us there is a correctly initialized message
1273 Poll::Ready(Ok(self.take_message()))
1274 }
1275 // The sender was dropped before sending anything while we prepared to park.
1276 Err(DISCONNECTED) => {
1277 // SAFETY: we started in the EMPTY state and the sender switched us to the
1278 // DISCONNECTED state. This means that it did not take the waker, so we're
1279 // responsible for dropping it.
1280 self.drop_waker();
1281 Poll::Ready(Err(RecvError))
1282 }
1283 _ => unreachable!(),
1284 }
1285 }
1286}
1287
1288enum ReceiverWaker {
1289 /// The receiver is waiting synchronously. Its thread is parked.
1290 #[cfg(feature = "std")]
1291 Thread(thread::Thread),
1292 /// The receiver is waiting asynchronously. Its task can be woken up with this `Waker`.
1293 #[cfg(feature = "async")]
1294 Task(task::Waker),
1295 /// A little hack to not make this enum an uninhibitable type when no features are enabled.
1296 #[cfg(not(any(feature = "async", feature = "std")))]
1297 _Uninhabited,
1298}
1299
1300impl ReceiverWaker {
1301 #[cfg(feature = "std")]
1302 pub fn current_thread() -> Self {
1303 Self::Thread(thread::current())
1304 }
1305
1306 #[cfg(feature = "async")]
1307 pub fn task_waker(cx: &task::Context<'_>) -> Self {
1308 Self::Task(cx.waker().clone())
1309 }
1310
1311 pub fn unpark(self) {
1312 match self {
1313 #[cfg(feature = "std")]
1314 ReceiverWaker::Thread(thread) => thread.unpark(),
1315 #[cfg(feature = "async")]
1316 ReceiverWaker::Task(waker) => waker.wake(),
1317 #[cfg(not(any(feature = "async", feature = "std")))]
1318 ReceiverWaker::_Uninhabited => unreachable!(),
1319 }
1320 }
1321}
1322
1323#[cfg(not(oneshot_loom))]
1324#[test]
1325#[ignore = "Unstable test. Different Rust versions have different sizes for Thread"]
1326fn receiver_waker_size() {
1327 let expected: usize = match (cfg!(feature = "std"), cfg!(feature = "async")) {
1328 (false, false) => 0,
1329 (false, true) => 16,
1330 (true, false) => 16,
1331 (true, true) => 24,
1332 };
1333 assert_eq!(mem::size_of::<ReceiverWaker>(), expected);
1334}
1335
1336#[cfg(all(feature = "std", feature = "async"))]
1337const RECEIVER_USED_SYNC_AND_ASYNC_ERROR: &str =
1338 "Invalid to call a blocking receive method on oneshot::Receiver after it has been polled";
1339
1340#[inline]
1341pub(crate) unsafe fn dealloc<T>(channel: NonNull<Channel<T>>) {
1342 drop(Box::from_raw(channel.as_ptr()))
1343}