alloy_rpc_client/
call.rs

1use alloy_json_rpc::{
2    transform_response, try_deserialize_ok, Request, RequestPacket, ResponsePacket, RpcRecv,
3    RpcResult, RpcSend,
4};
5use alloy_transport::{BoxTransport, IntoBoxTransport, RpcFut, TransportError, TransportResult};
6use core::panic;
7use futures::FutureExt;
8use serde_json::value::RawValue;
9use std::{
10    fmt,
11    future::Future,
12    marker::PhantomData,
13    pin::Pin,
14    task::{self, ready, Poll::Ready},
15};
16use tower::Service;
17
18/// The states of the [`RpcCall`] future.
19#[must_use = "futures do nothing unless you `.await` or poll them"]
20#[pin_project::pin_project(project = CallStateProj)]
21enum CallState<Params>
22where
23    Params: RpcSend,
24{
25    Prepared {
26        request: Option<Request<Params>>,
27        connection: BoxTransport,
28    },
29    AwaitingResponse {
30        #[pin]
31        fut: <BoxTransport as Service<RequestPacket>>::Future,
32    },
33    Complete,
34}
35
36impl<Params> Clone for CallState<Params>
37where
38    Params: RpcSend,
39{
40    fn clone(&self) -> Self {
41        match self {
42            Self::Prepared { request, connection } => {
43                Self::Prepared { request: request.clone(), connection: connection.clone() }
44            }
45            _ => panic!("cloned after dispatch"),
46        }
47    }
48}
49
50impl<Params> fmt::Debug for CallState<Params>
51where
52    Params: RpcSend,
53{
54    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55        f.write_str(match self {
56            Self::Prepared { .. } => "Prepared",
57            Self::AwaitingResponse { .. } => "AwaitingResponse",
58            Self::Complete => "Complete",
59        })
60    }
61}
62
63impl<Params> Future for CallState<Params>
64where
65    Params: RpcSend,
66{
67    type Output = TransportResult<Box<RawValue>>;
68
69    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
70        loop {
71            match self.as_mut().project() {
72                CallStateProj::Prepared { connection, request } => {
73                    if let Err(e) =
74                        task::ready!(Service::<RequestPacket>::poll_ready(connection, cx))
75                    {
76                        self.set(Self::Complete);
77                        return Ready(RpcResult::Err(e));
78                    }
79
80                    let request = request.take().expect("no request");
81                    debug!(method=%request.meta.method, id=%request.meta.id, "sending request");
82                    trace!(params_ty=%std::any::type_name::<Params>(), ?request, "full request");
83                    let request = request.serialize();
84                    let fut = match request {
85                        Ok(request) => {
86                            trace!(request=%request.serialized(), "serialized request");
87                            connection.call(request.into())
88                        }
89                        Err(err) => {
90                            trace!(?err, "failed to serialize request");
91                            self.set(Self::Complete);
92                            return Ready(RpcResult::Err(TransportError::ser_err(err)));
93                        }
94                    };
95                    self.set(Self::AwaitingResponse { fut });
96                }
97                CallStateProj::AwaitingResponse { fut } => {
98                    let res = match task::ready!(fut.poll(cx)) {
99                        Ok(ResponsePacket::Single(res)) => Ready(transform_response(res)),
100                        Err(e) => Ready(RpcResult::Err(e)),
101                        _ => panic!("received batch response from single request"),
102                    };
103                    self.set(Self::Complete);
104                    return res;
105                }
106                CallStateProj::Complete => {
107                    panic!("Polled after completion");
108                }
109            }
110        }
111    }
112}
113
114/// A prepared, but unsent, RPC call.
115///
116/// This is a future that will send the request when polled. It contains a
117/// [`Request`], a [`BoxTransport`], and knowledge of its expected response
118/// type. Upon awaiting, it will send the request and wait for the response. It
119/// will then deserialize the response into the expected type.
120///
121/// Errors are captured in the [`RpcResult`] type. Rpc Calls will result in
122/// either a successful response of the `Resp` type, an error response, or a
123/// transport error.
124///
125/// ### Note
126///
127/// Serializing the request is done lazily. The request is not serialized until
128/// the future is polled. This differs from the behavior of
129/// [`crate::BatchRequest`], which serializes greedily. This is because the
130/// batch request must immediately erase the `Param` type to allow batching of
131/// requests with different `Param` types, while the `RpcCall` may do so lazily.
132#[must_use = "futures do nothing unless you `.await` or poll them"]
133#[pin_project::pin_project]
134#[derive(Clone)]
135pub struct RpcCall<Params, Resp, Output = Resp, Map = fn(Resp) -> Output>
136where
137    Params: RpcSend,
138    Map: FnOnce(Resp) -> Output,
139{
140    #[pin]
141    state: CallState<Params>,
142    map: Option<Map>,
143    _pd: core::marker::PhantomData<fn() -> (Resp, Output)>,
144}
145
146impl<Params, Resp, Output, Map> core::fmt::Debug for RpcCall<Params, Resp, Output, Map>
147where
148    Params: RpcSend,
149    Map: FnOnce(Resp) -> Output,
150{
151    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
152        f.debug_struct("RpcCall").field("state", &self.state).finish()
153    }
154}
155
156impl<Params, Resp> RpcCall<Params, Resp>
157where
158    Params: RpcSend,
159{
160    #[doc(hidden)]
161    pub fn new(req: Request<Params>, connection: impl IntoBoxTransport) -> Self {
162        Self {
163            state: CallState::Prepared {
164                request: Some(req),
165                connection: connection.into_box_transport(),
166            },
167            map: Some(std::convert::identity),
168            _pd: PhantomData,
169        }
170    }
171}
172
173impl<Params, Resp, Output, Map> RpcCall<Params, Resp, Output, Map>
174where
175    Params: RpcSend,
176    Map: FnOnce(Resp) -> Output,
177{
178    /// Map the response to a different type. This is usable for converting
179    /// the response to a more usable type, e.g. changing `U64` to `u64`.
180    ///
181    /// ## Note
182    ///
183    /// Carefully review the rust documentation on [fn pointers] before passing
184    /// them to this function. Unless the pointer is specifically coerced to a
185    /// `fn(_) -> _`, the `NewMap` will be inferred as that function's unique
186    /// type. This can lead to confusing error messages.
187    ///
188    /// [fn pointers]: https://doc.rust-lang.org/std/primitive.fn.html#creating-function-pointers
189    pub fn map_resp<NewOutput, NewMap>(
190        self,
191        map: NewMap,
192    ) -> RpcCall<Params, Resp, NewOutput, NewMap>
193    where
194        NewMap: FnOnce(Resp) -> NewOutput,
195    {
196        RpcCall { state: self.state, map: Some(map), _pd: PhantomData }
197    }
198
199    /// Returns `true` if the request is a subscription.
200    ///
201    /// # Panics
202    ///
203    /// Panics if called after the request has been sent.
204    pub fn is_subscription(&self) -> bool {
205        self.request().meta.is_subscription()
206    }
207
208    /// Set the request to be a non-standard subscription (i.e. not
209    /// "eth_subscribe").
210    ///
211    /// # Panics
212    ///
213    /// Panics if called after the request has been sent.
214    pub fn set_is_subscription(&mut self) {
215        self.request_mut().meta.set_is_subscription();
216    }
217
218    /// Set the subscription status of the request.
219    pub fn set_subscription_status(&mut self, status: bool) {
220        self.request_mut().meta.set_subscription_status(status);
221    }
222
223    /// Get a mutable reference to the params of the request.
224    ///
225    /// This is useful for modifying the params after the request has been
226    /// prepared.
227    ///
228    /// # Panics
229    ///
230    /// Panics if called after the request has been sent.
231    pub fn params(&mut self) -> &mut Params {
232        &mut self.request_mut().params
233    }
234
235    /// Returns a reference to the request.
236    ///
237    /// # Panics
238    ///
239    /// Panics if called after the request has been sent.
240    pub fn request(&self) -> &Request<Params> {
241        let CallState::Prepared { request, .. } = &self.state else {
242            panic!("Cannot get request after request has been sent");
243        };
244        request.as_ref().expect("no request in prepared")
245    }
246
247    /// Returns the RPC method
248    pub fn method(&self) -> &str {
249        &self.request().meta.method
250    }
251
252    /// Returns a mutable reference to the request.
253    ///
254    /// # Panics
255    ///
256    /// Panics if called after the request has been sent.
257    pub fn request_mut(&mut self) -> &mut Request<Params> {
258        let CallState::Prepared { request, .. } = &mut self.state else {
259            panic!("Cannot get request after request has been sent");
260        };
261        request.as_mut().expect("no request in prepared")
262    }
263
264    /// Map the params of the request into a new type.
265    pub fn map_params<NewParams: RpcSend>(
266        self,
267        map: impl Fn(Params) -> NewParams,
268    ) -> RpcCall<NewParams, Resp, Output, Map> {
269        let CallState::Prepared { request, connection } = self.state else {
270            panic!("Cannot get request after request has been sent");
271        };
272        let request = request.expect("no request in prepared").map_params(map);
273        RpcCall {
274            state: CallState::Prepared { request: Some(request), connection },
275            map: self.map,
276            _pd: PhantomData,
277        }
278    }
279}
280
281impl<Params, Resp, Output, Map> RpcCall<&Params, Resp, Output, Map>
282where
283    Params: RpcSend + ToOwned,
284    Params::Owned: RpcSend,
285    Map: FnOnce(Resp) -> Output,
286{
287    /// Convert this call into one with owned params, by cloning the params.
288    ///
289    /// # Panics
290    ///
291    /// Panics if called after the request has been polled.
292    pub fn into_owned_params(self) -> RpcCall<Params::Owned, Resp, Output, Map> {
293        let CallState::Prepared { request, connection } = self.state else {
294            panic!("Cannot get params after request has been sent");
295        };
296        let request = request.expect("no request in prepared").into_owned_params();
297
298        RpcCall {
299            state: CallState::Prepared { request: Some(request), connection },
300            map: self.map,
301            _pd: PhantomData,
302        }
303    }
304}
305
306impl<'a, Params, Resp, Output, Map> RpcCall<Params, Resp, Output, Map>
307where
308    Params: RpcSend + 'a,
309    Resp: RpcRecv,
310    Output: 'static,
311    Map: FnOnce(Resp) -> Output + Send + 'a,
312{
313    /// Convert this future into a boxed, pinned future, erasing its type.
314    pub fn boxed(self) -> RpcFut<'a, Output> {
315        Box::pin(self)
316    }
317}
318
319impl<Params, Resp, Output, Map> Future for RpcCall<Params, Resp, Output, Map>
320where
321    Params: RpcSend,
322    Resp: RpcRecv,
323    Output: 'static,
324    Map: FnOnce(Resp) -> Output,
325{
326    type Output = TransportResult<Output>;
327
328    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
329        trace!(?self.state, "polling RpcCall");
330
331        let this = self.get_mut();
332        let resp = try_deserialize_ok(ready!(this.state.poll_unpin(cx)));
333
334        Ready(resp.map(this.map.take().expect("polled after completion")))
335    }
336}