alloy_json_rpc/
packet.rs

1use crate::{ErrorPayload, Id, Response, ResponsePayload, SerializedRequest};
2use alloy_primitives::map::HashSet;
3use serde::{
4    de::{self, Deserializer, MapAccess, SeqAccess, Visitor},
5    Deserialize, Serialize,
6};
7use serde_json::value::RawValue;
8use std::{fmt, marker::PhantomData};
9
10/// A [`RequestPacket`] is a [`SerializedRequest`] or a batch of serialized
11/// request.
12#[derive(Clone, Debug)]
13pub enum RequestPacket {
14    /// A single request.
15    Single(SerializedRequest),
16    /// A batch of requests.
17    Batch(Vec<SerializedRequest>),
18}
19
20impl FromIterator<SerializedRequest> for RequestPacket {
21    fn from_iter<T: IntoIterator<Item = SerializedRequest>>(iter: T) -> Self {
22        Self::Batch(iter.into_iter().collect())
23    }
24}
25
26impl From<SerializedRequest> for RequestPacket {
27    fn from(req: SerializedRequest) -> Self {
28        Self::Single(req)
29    }
30}
31
32impl Serialize for RequestPacket {
33    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
34    where
35        S: serde::Serializer,
36    {
37        match self {
38            Self::Single(single) => single.serialize(serializer),
39            Self::Batch(batch) => batch.serialize(serializer),
40        }
41    }
42}
43
44impl RequestPacket {
45    /// Create a new empty packet with the given capacity.
46    pub fn with_capacity(capacity: usize) -> Self {
47        Self::Batch(Vec::with_capacity(capacity))
48    }
49
50    /// Returns the [`SerializedRequest`] if this packet is [`ResponsePacket::Single`]
51    pub const fn as_single(&self) -> Option<&SerializedRequest> {
52        match self {
53            Self::Single(req) => Some(req),
54            Self::Batch(_) => None,
55        }
56    }
57
58    /// Returns the batch of [`SerializedRequest`] if this packet is [`ResponsePacket::Batch`]
59    pub fn as_batch(&self) -> Option<&[SerializedRequest]> {
60        match self {
61            Self::Batch(req) => Some(req.as_slice()),
62            Self::Single(_) => None,
63        }
64    }
65
66    /// Serialize the packet as a boxed [`RawValue`].
67    pub fn serialize(self) -> serde_json::Result<Box<RawValue>> {
68        match self {
69            Self::Single(single) => Ok(single.take_request()),
70            Self::Batch(batch) => serde_json::value::to_raw_value(&batch),
71        }
72    }
73
74    /// Get the request IDs of all subscription requests in the packet.
75    pub fn subscription_request_ids(&self) -> HashSet<&Id> {
76        match self {
77            Self::Single(single) => {
78                let id = (single.method() == "eth_subscribe").then(|| single.id());
79                HashSet::from_iter(id)
80            }
81            Self::Batch(batch) => batch
82                .iter()
83                .filter(|req| req.method() == "eth_subscribe")
84                .map(|req| req.id())
85                .collect(),
86        }
87    }
88
89    /// Get the number of requests in the packet.
90    pub fn len(&self) -> usize {
91        match self {
92            Self::Single(_) => 1,
93            Self::Batch(batch) => batch.len(),
94        }
95    }
96
97    /// Check if the packet is empty.
98    pub fn is_empty(&self) -> bool {
99        self.len() == 0
100    }
101
102    /// Push a request into the packet.
103    pub fn push(&mut self, req: SerializedRequest) {
104        match self {
105            Self::Batch(batch) => batch.push(req),
106            Self::Single(_) => {
107                let old = std::mem::replace(self, Self::Batch(Vec::with_capacity(10)));
108                if let Self::Single(single) = old {
109                    self.push(single);
110                }
111                self.push(req);
112            }
113        }
114    }
115
116    /// Returns a all [`SerializedRequest`].
117    pub fn requests(&self) -> &[SerializedRequest] {
118        match self {
119            Self::Single(req) => std::slice::from_ref(req),
120            Self::Batch(req) => req.as_slice(),
121        }
122    }
123
124    /// Returns an iterator over the requests' method names
125    pub fn method_names(&self) -> impl Iterator<Item = &str> + '_ {
126        self.requests().iter().map(|req| req.method())
127    }
128}
129
130/// A [`ResponsePacket`] is a [`Response`] or a batch of responses.
131#[derive(Clone, Debug)]
132pub enum ResponsePacket<Payload = Box<RawValue>, ErrData = Box<RawValue>> {
133    /// A single response.
134    Single(Response<Payload, ErrData>),
135    /// A batch of responses.
136    Batch(Vec<Response<Payload, ErrData>>),
137}
138
139impl<Payload, ErrData> FromIterator<Response<Payload, ErrData>>
140    for ResponsePacket<Payload, ErrData>
141{
142    fn from_iter<T: IntoIterator<Item = Response<Payload, ErrData>>>(iter: T) -> Self {
143        let mut iter = iter.into_iter().peekable();
144        // return single if iter has exactly one element, else make a batch
145        if let Some(first) = iter.next() {
146            return if iter.peek().is_none() {
147                Self::Single(first)
148            } else {
149                let mut batch = Vec::new();
150                batch.push(first);
151                batch.extend(iter);
152                Self::Batch(batch)
153            };
154        }
155        Self::Batch(vec![])
156    }
157}
158
159impl<Payload, ErrData> From<Vec<Response<Payload, ErrData>>> for ResponsePacket<Payload, ErrData> {
160    fn from(value: Vec<Response<Payload, ErrData>>) -> Self {
161        if value.len() == 1 {
162            Self::Single(value.into_iter().next().unwrap())
163        } else {
164            Self::Batch(value)
165        }
166    }
167}
168
169impl<'de, Payload, ErrData> Deserialize<'de> for ResponsePacket<Payload, ErrData>
170where
171    Payload: Deserialize<'de>,
172    ErrData: Deserialize<'de>,
173{
174    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
175    where
176        D: Deserializer<'de>,
177    {
178        struct ResponsePacketVisitor<Payload, ErrData> {
179            marker: PhantomData<fn() -> ResponsePacket<Payload, ErrData>>,
180        }
181
182        impl<'de, Payload, ErrData> Visitor<'de> for ResponsePacketVisitor<Payload, ErrData>
183        where
184            Payload: Deserialize<'de>,
185            ErrData: Deserialize<'de>,
186        {
187            type Value = ResponsePacket<Payload, ErrData>;
188
189            fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
190                formatter.write_str("a single response or a batch of responses")
191            }
192
193            fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
194            where
195                A: SeqAccess<'de>,
196            {
197                let mut responses = Vec::new();
198
199                while let Some(response) = seq.next_element()? {
200                    responses.push(response);
201                }
202
203                Ok(ResponsePacket::Batch(responses))
204            }
205
206            fn visit_map<M>(self, map: M) -> Result<Self::Value, M::Error>
207            where
208                M: MapAccess<'de>,
209            {
210                let response =
211                    Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))?;
212                Ok(ResponsePacket::Single(response))
213            }
214        }
215
216        deserializer.deserialize_any(ResponsePacketVisitor { marker: PhantomData })
217    }
218}
219
220/// A [`BorrowedResponsePacket`] is a [`ResponsePacket`] that has been partially deserialized,
221/// borrowing its contents from the deserializer.
222///
223/// This is used primarily for intermediate deserialization. Most users will not require it.
224///
225/// See the [top-level docs] for more info.
226///
227/// [top-level docs]: crate
228pub type BorrowedResponsePacket<'a> = ResponsePacket<&'a RawValue, &'a RawValue>;
229
230impl BorrowedResponsePacket<'_> {
231    /// Convert this borrowed response packet into an owned packet by copying
232    /// the data from the deserializer (if necessary).
233    pub fn into_owned(self) -> ResponsePacket {
234        match self {
235            Self::Single(single) => ResponsePacket::Single(single.into_owned()),
236            Self::Batch(batch) => {
237                ResponsePacket::Batch(batch.into_iter().map(Response::into_owned).collect())
238            }
239        }
240    }
241}
242
243impl<Payload, ErrData> ResponsePacket<Payload, ErrData> {
244    /// Returns the [`Response`] if this packet is [`ResponsePacket::Single`].
245    pub const fn as_single(&self) -> Option<&Response<Payload, ErrData>> {
246        match self {
247            Self::Single(resp) => Some(resp),
248            Self::Batch(_) => None,
249        }
250    }
251
252    /// Returns the batch of [`Response`] if this packet is [`ResponsePacket::Batch`].
253    pub fn as_batch(&self) -> Option<&[Response<Payload, ErrData>]> {
254        match self {
255            Self::Batch(resp) => Some(resp.as_slice()),
256            Self::Single(_) => None,
257        }
258    }
259
260    /// Returns the [`ResponsePayload`] if this packet is [`ResponsePacket::Single`].
261    pub fn single_payload(&self) -> Option<&ResponsePayload<Payload, ErrData>> {
262        self.as_single().map(|resp| &resp.payload)
263    }
264
265    /// Returns `true` if the response payload is a success.
266    ///
267    /// For batch responses, this returns `true` if __all__ responses are successful.
268    pub fn is_success(&self) -> bool {
269        match self {
270            Self::Single(single) => single.is_success(),
271            Self::Batch(batch) => batch.iter().all(|res| res.is_success()),
272        }
273    }
274
275    /// Returns `true` if the response payload is an error.
276    ///
277    /// For batch responses, this returns `true` there's at least one error response.
278    pub fn is_error(&self) -> bool {
279        match self {
280            Self::Single(single) => single.is_error(),
281            Self::Batch(batch) => batch.iter().any(|res| res.is_error()),
282        }
283    }
284
285    /// Returns the [ErrorPayload] if the response is an error.
286    ///
287    /// For batch responses, this returns the first error response.
288    pub fn as_error(&self) -> Option<&ErrorPayload<ErrData>> {
289        self.iter_errors().next()
290    }
291
292    /// Returns an iterator over the [ErrorPayload]s in the response.
293    pub fn iter_errors(&self) -> impl Iterator<Item = &ErrorPayload<ErrData>> + '_ {
294        match self {
295            Self::Single(single) => ResponsePacketErrorsIter::Single(Some(single)),
296            Self::Batch(batch) => ResponsePacketErrorsIter::Batch(batch.iter()),
297        }
298    }
299
300    /// Returns the first error code in this packet if it contains any error responses.
301    pub fn first_error_code(&self) -> Option<i64> {
302        self.as_error().map(|error| error.code)
303    }
304
305    /// Returns the first error message in this packet if it contains any error responses.
306    pub fn first_error_message(&self) -> Option<&str> {
307        self.as_error().map(|error| error.message.as_ref())
308    }
309
310    /// Returns the first error data in this packet if it contains any error responses.
311    pub fn first_error_data(&self) -> Option<&ErrData> {
312        self.as_error().and_then(|error| error.data.as_ref())
313    }
314
315    /// Returns a all [`Response`].
316    pub fn responses(&self) -> &[Response<Payload, ErrData>] {
317        match self {
318            Self::Single(req) => std::slice::from_ref(req),
319            Self::Batch(req) => req.as_slice(),
320        }
321    }
322
323    /// Returns an iterator over the responses' payloads.
324    pub fn payloads(&self) -> impl Iterator<Item = &ResponsePayload<Payload, ErrData>> + '_ {
325        self.responses().iter().map(|resp| &resp.payload)
326    }
327
328    /// Returns the first [`ResponsePayload`] in this packet.
329    pub fn first_payload(&self) -> Option<&ResponsePayload<Payload, ErrData>> {
330        self.payloads().next()
331    }
332
333    /// Returns an iterator over the responses' identifiers.
334    pub fn response_ids(&self) -> impl Iterator<Item = &Id> + '_ {
335        self.responses().iter().map(|resp| &resp.id)
336    }
337
338    /// Find responses by a list of IDs.
339    ///
340    /// This is intended to be used in conjunction with
341    /// [`RequestPacket::subscription_request_ids`] to identify subscription
342    /// responses.
343    ///
344    /// # Note
345    ///
346    /// - Responses are not guaranteed to be in the same order.
347    /// - Responses are not guaranteed to be in the set.
348    /// - If the packet contains duplicate IDs, both will be found.
349    pub fn responses_by_ids(&self, ids: &HashSet<Id>) -> Vec<&Response<Payload, ErrData>> {
350        match self {
351            Self::Single(single) if ids.contains(&single.id) => vec![single],
352            Self::Batch(batch) => batch.iter().filter(|res| ids.contains(&res.id)).collect(),
353            _ => Vec::new(),
354        }
355    }
356}
357
358/// An Iterator over the [ErrorPayload]s in a [ResponsePacket].
359#[derive(Clone, Debug)]
360enum ResponsePacketErrorsIter<'a, Payload, ErrData> {
361    Single(Option<&'a Response<Payload, ErrData>>),
362    Batch(std::slice::Iter<'a, Response<Payload, ErrData>>),
363}
364
365impl<'a, Payload, ErrData> Iterator for ResponsePacketErrorsIter<'a, Payload, ErrData> {
366    type Item = &'a ErrorPayload<ErrData>;
367
368    fn next(&mut self) -> Option<Self::Item> {
369        match self {
370            ResponsePacketErrorsIter::Single(single) => single.take()?.payload.as_error(),
371            ResponsePacketErrorsIter::Batch(batch) => loop {
372                let res = batch.next()?;
373                if let Some(err) = res.payload.as_error() {
374                    return Some(err);
375                }
376            },
377        }
378    }
379}