1use crate::{ErrorPayload, Id, Response, ResponsePayload, SerializedRequest};
2use alloy_primitives::map::HashSet;
3use http::HeaderMap;
4use serde::{
5 de::{self, Deserializer, MapAccess, SeqAccess, Visitor},
6 Deserialize, Serialize,
7};
8use serde_json::value::RawValue;
9use std::{fmt, marker::PhantomData};
10
11#[derive(Clone, Debug)]
14pub enum RequestPacket {
15 Single(SerializedRequest),
17 Batch(Vec<SerializedRequest>),
19}
20
21impl FromIterator<SerializedRequest> for RequestPacket {
22 fn from_iter<T: IntoIterator<Item = SerializedRequest>>(iter: T) -> Self {
23 Self::Batch(iter.into_iter().collect())
24 }
25}
26
27impl From<SerializedRequest> for RequestPacket {
28 fn from(req: SerializedRequest) -> Self {
29 Self::Single(req)
30 }
31}
32
33impl Serialize for RequestPacket {
34 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
35 where
36 S: serde::Serializer,
37 {
38 match self {
39 Self::Single(single) => single.serialize(serializer),
40 Self::Batch(batch) => batch.serialize(serializer),
41 }
42 }
43}
44
45impl RequestPacket {
46 pub fn with_capacity(capacity: usize) -> Self {
48 Self::Batch(Vec::with_capacity(capacity))
49 }
50
51 pub const fn as_single(&self) -> Option<&SerializedRequest> {
53 match self {
54 Self::Single(req) => Some(req),
55 Self::Batch(_) => None,
56 }
57 }
58
59 pub fn as_batch(&self) -> Option<&[SerializedRequest]> {
61 match self {
62 Self::Batch(req) => Some(req.as_slice()),
63 Self::Single(_) => None,
64 }
65 }
66
67 pub fn serialize(self) -> serde_json::Result<Box<RawValue>> {
69 match self {
70 Self::Single(single) => Ok(single.take_request()),
71 Self::Batch(batch) => serde_json::value::to_raw_value(&batch),
72 }
73 }
74
75 pub fn subscription_request_ids(&self) -> HashSet<&Id> {
77 match self {
78 Self::Single(single) => {
79 let id = (single.method() == "eth_subscribe").then(|| single.id());
80 HashSet::from_iter(id)
81 }
82 Self::Batch(batch) => batch
83 .iter()
84 .filter(|req| req.method() == "eth_subscribe")
85 .map(|req| req.id())
86 .collect(),
87 }
88 }
89
90 pub fn len(&self) -> usize {
92 match self {
93 Self::Single(_) => 1,
94 Self::Batch(batch) => batch.len(),
95 }
96 }
97
98 pub fn is_empty(&self) -> bool {
100 self.len() == 0
101 }
102
103 pub fn push(&mut self, req: SerializedRequest) {
105 match self {
106 Self::Batch(batch) => batch.push(req),
107 Self::Single(_) => {
108 let old = std::mem::replace(self, Self::Batch(Vec::with_capacity(10)));
109 if let Self::Single(single) = old {
110 self.push(single);
111 }
112 self.push(req);
113 }
114 }
115 }
116
117 pub fn requests(&self) -> &[SerializedRequest] {
119 match self {
120 Self::Single(req) => std::slice::from_ref(req),
121 Self::Batch(req) => req.as_slice(),
122 }
123 }
124
125 pub fn requests_mut(&mut self) -> &mut [SerializedRequest] {
127 match self {
128 Self::Single(req) => std::slice::from_mut(req),
129 Self::Batch(req) => req.as_mut_slice(),
130 }
131 }
132
133 pub fn method_names(&self) -> impl Iterator<Item = &str> + '_ {
135 self.requests().iter().map(|req| req.method())
136 }
137
138 pub fn headers(&self) -> HeaderMap {
141 let Some(single_req) = self.as_single() else {
143 return HeaderMap::new();
144 };
145 if let Some(http_header_extension) = single_req.meta().extensions().get::<HeaderMap>() {
147 return http_header_extension.clone();
148 };
149
150 HeaderMap::new()
151 }
152}
153
154#[derive(Clone, Debug)]
156pub enum ResponsePacket<Payload = Box<RawValue>, ErrData = Box<RawValue>> {
157 Single(Response<Payload, ErrData>),
159 Batch(Vec<Response<Payload, ErrData>>),
161}
162
163impl<Payload, ErrData> FromIterator<Response<Payload, ErrData>>
164 for ResponsePacket<Payload, ErrData>
165{
166 fn from_iter<T: IntoIterator<Item = Response<Payload, ErrData>>>(iter: T) -> Self {
167 let mut iter = iter.into_iter().peekable();
168 if let Some(first) = iter.next() {
170 return if iter.peek().is_none() {
171 Self::Single(first)
172 } else {
173 let mut batch = Vec::new();
174 batch.push(first);
175 batch.extend(iter);
176 Self::Batch(batch)
177 };
178 }
179 Self::Batch(vec![])
180 }
181}
182
183impl<Payload, ErrData> From<Vec<Response<Payload, ErrData>>> for ResponsePacket<Payload, ErrData> {
184 fn from(value: Vec<Response<Payload, ErrData>>) -> Self {
185 if value.len() == 1 {
186 Self::Single(value.into_iter().next().unwrap())
187 } else {
188 Self::Batch(value)
189 }
190 }
191}
192
193impl<'de, Payload, ErrData> Deserialize<'de> for ResponsePacket<Payload, ErrData>
194where
195 Payload: Deserialize<'de>,
196 ErrData: Deserialize<'de>,
197{
198 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
199 where
200 D: Deserializer<'de>,
201 {
202 struct ResponsePacketVisitor<Payload, ErrData> {
203 marker: PhantomData<fn() -> ResponsePacket<Payload, ErrData>>,
204 }
205
206 impl<'de, Payload, ErrData> Visitor<'de> for ResponsePacketVisitor<Payload, ErrData>
207 where
208 Payload: Deserialize<'de>,
209 ErrData: Deserialize<'de>,
210 {
211 type Value = ResponsePacket<Payload, ErrData>;
212
213 fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
214 formatter.write_str("a single response or a batch of responses")
215 }
216
217 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
218 where
219 A: SeqAccess<'de>,
220 {
221 let mut responses = Vec::new();
222
223 while let Some(response) = seq.next_element()? {
224 responses.push(response);
225 }
226
227 Ok(ResponsePacket::Batch(responses))
228 }
229
230 fn visit_map<M>(self, map: M) -> Result<Self::Value, M::Error>
231 where
232 M: MapAccess<'de>,
233 {
234 let response =
235 Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))?;
236 Ok(ResponsePacket::Single(response))
237 }
238 }
239
240 deserializer.deserialize_any(ResponsePacketVisitor { marker: PhantomData })
241 }
242}
243
244pub type BorrowedResponsePacket<'a> = ResponsePacket<&'a RawValue, &'a RawValue>;
253
254impl BorrowedResponsePacket<'_> {
255 pub fn into_owned(self) -> ResponsePacket {
258 match self {
259 Self::Single(single) => ResponsePacket::Single(single.into_owned()),
260 Self::Batch(batch) => {
261 ResponsePacket::Batch(batch.into_iter().map(Response::into_owned).collect())
262 }
263 }
264 }
265}
266
267impl<Payload, ErrData> ResponsePacket<Payload, ErrData> {
268 pub const fn as_single(&self) -> Option<&Response<Payload, ErrData>> {
270 match self {
271 Self::Single(resp) => Some(resp),
272 Self::Batch(_) => None,
273 }
274 }
275
276 pub fn as_batch(&self) -> Option<&[Response<Payload, ErrData>]> {
278 match self {
279 Self::Batch(resp) => Some(resp.as_slice()),
280 Self::Single(_) => None,
281 }
282 }
283
284 pub fn single_payload(&self) -> Option<&ResponsePayload<Payload, ErrData>> {
286 self.as_single().map(|resp| &resp.payload)
287 }
288
289 pub fn is_success(&self) -> bool {
293 match self {
294 Self::Single(single) => single.is_success(),
295 Self::Batch(batch) => batch.iter().all(|res| res.is_success()),
296 }
297 }
298
299 pub fn is_error(&self) -> bool {
303 match self {
304 Self::Single(single) => single.is_error(),
305 Self::Batch(batch) => batch.iter().any(|res| res.is_error()),
306 }
307 }
308
309 pub fn as_error(&self) -> Option<&ErrorPayload<ErrData>> {
313 self.iter_errors().next()
314 }
315
316 pub fn iter_errors(&self) -> impl Iterator<Item = &ErrorPayload<ErrData>> + '_ {
318 match self {
319 Self::Single(single) => ResponsePacketErrorsIter::Single(Some(single)),
320 Self::Batch(batch) => ResponsePacketErrorsIter::Batch(batch.iter()),
321 }
322 }
323
324 pub fn first_error_code(&self) -> Option<i64> {
326 self.as_error().map(|error| error.code)
327 }
328
329 pub fn first_error_message(&self) -> Option<&str> {
331 self.as_error().map(|error| error.message.as_ref())
332 }
333
334 pub fn first_error_data(&self) -> Option<&ErrData> {
336 self.as_error().and_then(|error| error.data.as_ref())
337 }
338
339 pub fn responses(&self) -> &[Response<Payload, ErrData>] {
341 match self {
342 Self::Single(req) => std::slice::from_ref(req),
343 Self::Batch(req) => req.as_slice(),
344 }
345 }
346
347 pub fn payloads(&self) -> impl Iterator<Item = &ResponsePayload<Payload, ErrData>> + '_ {
349 self.responses().iter().map(|resp| &resp.payload)
350 }
351
352 pub fn first_payload(&self) -> Option<&ResponsePayload<Payload, ErrData>> {
354 self.payloads().next()
355 }
356
357 pub fn response_ids(&self) -> impl Iterator<Item = &Id> + '_ {
359 self.responses().iter().map(|resp| &resp.id)
360 }
361
362 pub fn responses_by_ids(&self, ids: &HashSet<Id>) -> Vec<&Response<Payload, ErrData>> {
374 match self {
375 Self::Single(single) if ids.contains(&single.id) => vec![single],
376 Self::Batch(batch) => batch.iter().filter(|res| ids.contains(&res.id)).collect(),
377 _ => Vec::new(),
378 }
379 }
380}
381
382#[derive(Clone, Debug)]
384enum ResponsePacketErrorsIter<'a, Payload, ErrData> {
385 Single(Option<&'a Response<Payload, ErrData>>),
386 Batch(std::slice::Iter<'a, Response<Payload, ErrData>>),
387}
388
389impl<'a, Payload, ErrData> Iterator for ResponsePacketErrorsIter<'a, Payload, ErrData> {
390 type Item = &'a ErrorPayload<ErrData>;
391
392 fn next(&mut self) -> Option<Self::Item> {
393 match self {
394 ResponsePacketErrorsIter::Single(single) => single.take()?.payload.as_error(),
395 ResponsePacketErrorsIter::Batch(batch) => loop {
396 let res = batch.next()?;
397 if let Some(err) = res.payload.as_error() {
398 return Some(err);
399 }
400 },
401 }
402 }
403}