1use alloy_json_rpc::{
2 transform_response, try_deserialize_ok, Request, RequestPacket, ResponsePacket, RpcRecv,
3 RpcResult, RpcSend,
4};
5use alloy_transport::{BoxTransport, IntoBoxTransport, RpcFut, TransportError, TransportResult};
6use core::panic;
7use futures::FutureExt;
8use serde_json::value::RawValue;
9use std::{
10 fmt,
11 future::Future,
12 marker::PhantomData,
13 pin::Pin,
14 task::{self, ready, Poll::Ready},
15};
16use tower::Service;
17
18#[must_use = "futures do nothing unless you `.await` or poll them"]
20#[pin_project::pin_project(project = CallStateProj)]
21enum CallState<Params>
22where
23 Params: RpcSend,
24{
25 Prepared {
26 request: Option<Request<Params>>,
27 connection: BoxTransport,
28 },
29 AwaitingResponse {
30 #[pin]
31 fut: <BoxTransport as Service<RequestPacket>>::Future,
32 },
33 Complete,
34}
35
36impl<Params> Clone for CallState<Params>
37where
38 Params: RpcSend,
39{
40 fn clone(&self) -> Self {
41 match self {
42 Self::Prepared { request, connection } => {
43 Self::Prepared { request: request.clone(), connection: connection.clone() }
44 }
45 _ => panic!("cloned after dispatch"),
46 }
47 }
48}
49
50impl<Params> fmt::Debug for CallState<Params>
51where
52 Params: RpcSend,
53{
54 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55 f.write_str(match self {
56 Self::Prepared { .. } => "Prepared",
57 Self::AwaitingResponse { .. } => "AwaitingResponse",
58 Self::Complete => "Complete",
59 })
60 }
61}
62
63impl<Params> Future for CallState<Params>
64where
65 Params: RpcSend,
66{
67 type Output = TransportResult<Box<RawValue>>;
68
69 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
70 loop {
71 match self.as_mut().project() {
72 CallStateProj::Prepared { connection, request } => {
73 if let Err(e) =
74 task::ready!(Service::<RequestPacket>::poll_ready(connection, cx))
75 {
76 self.set(Self::Complete);
77 return Ready(RpcResult::Err(e));
78 }
79
80 let request = request.take().expect("no request");
81 debug!(method=%request.meta.method, id=%request.meta.id, "sending request");
82 trace!(params_ty=%std::any::type_name::<Params>(), ?request, "full request");
83 let request = request.serialize();
84 let fut = match request {
85 Ok(request) => {
86 trace!(request=%request.serialized(), "serialized request");
87 connection.call(request.into())
88 }
89 Err(err) => {
90 trace!(?err, "failed to serialize request");
91 self.set(Self::Complete);
92 return Ready(RpcResult::Err(TransportError::ser_err(err)));
93 }
94 };
95 self.set(Self::AwaitingResponse { fut });
96 }
97 CallStateProj::AwaitingResponse { fut } => {
98 let res = match task::ready!(fut.poll(cx)) {
99 Ok(ResponsePacket::Single(res)) => Ready(transform_response(res)),
100 Err(e) => Ready(RpcResult::Err(e)),
101 _ => panic!("received batch response from single request"),
102 };
103 self.set(Self::Complete);
104 return res;
105 }
106 CallStateProj::Complete => {
107 panic!("Polled after completion");
108 }
109 }
110 }
111 }
112}
113
114#[must_use = "futures do nothing unless you `.await` or poll them"]
133#[pin_project::pin_project]
134#[derive(Clone)]
135pub struct RpcCall<Params, Resp, Output = Resp, Map = fn(Resp) -> Output>
136where
137 Params: RpcSend,
138 Map: FnOnce(Resp) -> Output,
139{
140 #[pin]
141 state: CallState<Params>,
142 map: Option<Map>,
143 _pd: core::marker::PhantomData<fn() -> (Resp, Output)>,
144}
145
146impl<Params, Resp, Output, Map> core::fmt::Debug for RpcCall<Params, Resp, Output, Map>
147where
148 Params: RpcSend,
149 Map: FnOnce(Resp) -> Output,
150{
151 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
152 f.debug_struct("RpcCall").field("state", &self.state).finish()
153 }
154}
155
156impl<Params, Resp> RpcCall<Params, Resp>
157where
158 Params: RpcSend,
159{
160 #[doc(hidden)]
161 pub fn new(req: Request<Params>, connection: impl IntoBoxTransport) -> Self {
162 Self {
163 state: CallState::Prepared {
164 request: Some(req),
165 connection: connection.into_box_transport(),
166 },
167 map: Some(std::convert::identity),
168 _pd: PhantomData,
169 }
170 }
171}
172
173impl<Params, Resp, Output, Map> RpcCall<Params, Resp, Output, Map>
174where
175 Params: RpcSend,
176 Map: FnOnce(Resp) -> Output,
177{
178 pub fn map_resp<NewOutput, NewMap>(
190 self,
191 map: NewMap,
192 ) -> RpcCall<Params, Resp, NewOutput, NewMap>
193 where
194 NewMap: FnOnce(Resp) -> NewOutput,
195 {
196 RpcCall { state: self.state, map: Some(map), _pd: PhantomData }
197 }
198
199 pub fn is_subscription(&self) -> bool {
205 self.request().meta.is_subscription()
206 }
207
208 pub fn set_is_subscription(&mut self) {
215 self.request_mut().meta.set_is_subscription();
216 }
217
218 pub fn set_subscription_status(&mut self, status: bool) {
220 self.request_mut().meta.set_subscription_status(status);
221 }
222
223 pub fn params(&mut self) -> &mut Params {
232 &mut self.request_mut().params
233 }
234
235 pub fn request(&self) -> &Request<Params> {
241 let CallState::Prepared { request, .. } = &self.state else {
242 panic!("Cannot get request after request has been sent");
243 };
244 request.as_ref().expect("no request in prepared")
245 }
246
247 pub fn method(&self) -> &str {
249 &self.request().meta.method
250 }
251
252 pub fn request_mut(&mut self) -> &mut Request<Params> {
258 let CallState::Prepared { request, .. } = &mut self.state else {
259 panic!("Cannot get request after request has been sent");
260 };
261 request.as_mut().expect("no request in prepared")
262 }
263
264 pub fn map_params<NewParams: RpcSend>(
266 self,
267 map: impl Fn(Params) -> NewParams,
268 ) -> RpcCall<NewParams, Resp, Output, Map> {
269 let CallState::Prepared { request, connection } = self.state else {
270 panic!("Cannot get request after request has been sent");
271 };
272 let request = request.expect("no request in prepared").map_params(map);
273 RpcCall {
274 state: CallState::Prepared { request: Some(request), connection },
275 map: self.map,
276 _pd: PhantomData,
277 }
278 }
279}
280
281impl<Params, Resp, Output, Map> RpcCall<&Params, Resp, Output, Map>
282where
283 Params: RpcSend + ToOwned,
284 Params::Owned: RpcSend,
285 Map: FnOnce(Resp) -> Output,
286{
287 pub fn into_owned_params(self) -> RpcCall<Params::Owned, Resp, Output, Map> {
293 let CallState::Prepared { request, connection } = self.state else {
294 panic!("Cannot get params after request has been sent");
295 };
296 let request = request.expect("no request in prepared").into_owned_params();
297
298 RpcCall {
299 state: CallState::Prepared { request: Some(request), connection },
300 map: self.map,
301 _pd: PhantomData,
302 }
303 }
304}
305
306impl<'a, Params, Resp, Output, Map> RpcCall<Params, Resp, Output, Map>
307where
308 Params: RpcSend + 'a,
309 Resp: RpcRecv,
310 Output: 'static,
311 Map: FnOnce(Resp) -> Output + Send + 'a,
312{
313 pub fn boxed(self) -> RpcFut<'a, Output> {
315 Box::pin(self)
316 }
317}
318
319impl<Params, Resp, Output, Map> Future for RpcCall<Params, Resp, Output, Map>
320where
321 Params: RpcSend,
322 Resp: RpcRecv,
323 Output: 'static,
324 Map: FnOnce(Resp) -> Output,
325{
326 type Output = TransportResult<Output>;
327
328 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
329 trace!(?self.state, "polling RpcCall");
330
331 let this = self.get_mut();
332 let resp = try_deserialize_ok(ready!(this.state.poll_unpin(cx)));
333
334 Ready(resp.map(this.map.take().expect("polled after completion")))
335 }
336}