alloy_json_rpc/
notification.rs1use crate::{Response, ResponsePayload};
2use alloy_primitives::U256;
3use serde::{
4 de::{MapAccess, Visitor},
5 Deserialize, Serialize,
6};
7
8#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
10#[serde(untagged)]
11pub enum SubId {
12 Number(U256),
14 String(String),
16}
17
18impl From<U256> for SubId {
19 fn from(value: U256) -> Self {
20 Self::Number(value)
21 }
22}
23
24impl From<String> for SubId {
25 fn from(value: String) -> Self {
26 Self::String(value)
27 }
28}
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct EthNotification<T = Box<serde_json::value::RawValue>> {
34 pub subscription: SubId,
36 pub result: T,
38}
39
40#[derive(Clone, Debug)]
45pub enum PubSubItem {
46 Response(Response),
48 Notification(EthNotification),
50}
51
52impl From<Response> for PubSubItem {
53 fn from(response: Response) -> Self {
54 Self::Response(response)
55 }
56}
57
58impl From<EthNotification> for PubSubItem {
59 fn from(notification: EthNotification) -> Self {
60 Self::Notification(notification)
61 }
62}
63
64impl<'de> Deserialize<'de> for PubSubItem {
65 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
66 where
67 D: serde::Deserializer<'de>,
68 {
69 struct PubSubItemVisitor;
70
71 impl<'de> Visitor<'de> for PubSubItemVisitor {
72 type Value = PubSubItem;
73
74 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 formatter.write_str("a JSON-RPC response or an Ethereum-style notification")
76 }
77
78 fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
79 where
80 A: MapAccess<'de>,
81 {
82 let mut id = None;
83 let mut result = None;
84 let mut params = None;
85 let mut error = None;
86
87 while let Ok(Some(key)) = map.next_key() {
89 match key {
90 "id" => {
91 if id.is_some() {
92 return Err(serde::de::Error::duplicate_field("id"));
93 }
94 id = Some(map.next_value()?);
95 }
96 "result" => {
97 if result.is_some() {
98 return Err(serde::de::Error::duplicate_field("result"));
99 }
100 result = Some(map.next_value()?);
101 }
102 "params" => {
103 if params.is_some() {
104 return Err(serde::de::Error::duplicate_field("params"));
105 }
106 params = Some(map.next_value()?);
107 }
108 "error" => {
109 if error.is_some() {
110 return Err(serde::de::Error::duplicate_field("error"));
111 }
112 error = Some(map.next_value()?);
113 }
114 _ => {
116 let _ = map.next_value::<serde_json::Value>()?;
117 }
118 }
119 }
120
121 if let Some(id) = id {
123 let payload = error
124 .map(ResponsePayload::Failure)
125 .or_else(|| result.map(ResponsePayload::Success))
126 .ok_or_else(|| {
127 serde::de::Error::custom(
128 "missing `result` or `error` field in response",
129 )
130 })?;
131
132 Ok(Response { id, payload }.into())
133 } else {
134 if error.is_some() {
136 return Err(serde::de::Error::custom(
137 "unexpected `error` field in subscription notification",
138 ));
139 }
140 params
141 .map(PubSubItem::Notification)
142 .ok_or_else(|| serde::de::Error::missing_field("params"))
143 }
144 }
145 }
146
147 deserializer.deserialize_any(PubSubItemVisitor)
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use crate::{EthNotification, PubSubItem, SubId};
155 use serde_json::json;
156
157 #[test]
158 fn deserializer_test() {
159 let notification = r#"{ "jsonrpc": "2.0", "method": "eth_subscription", "params": {"subscription": "0xcd0c3e8af590364c09d0fa6a1210faf5", "result": {"difficulty": "0xd9263f42a87", "uncles": []}} }
161 "#;
162
163 let deser = serde_json::from_str::<PubSubItem>(notification).unwrap();
164
165 match deser {
166 PubSubItem::Notification(EthNotification { subscription, result }) => {
167 assert_eq!(
168 subscription,
169 SubId::Number("0xcd0c3e8af590364c09d0fa6a1210faf5".parse().unwrap())
170 );
171 assert_eq!(result.get(), r#"{"difficulty": "0xd9263f42a87", "uncles": []}"#);
172 }
173 _ => panic!("unexpected deserialization result"),
174 }
175 }
176
177 #[test]
178 fn subid_number() {
179 let number = U256::from(123456u64);
180 let subid: SubId = number.into();
181 assert_eq!(subid, SubId::Number(number));
182 }
183
184 #[test]
185 fn subid_string() {
186 let string = "subscription_id".to_string();
187 let subid: SubId = string.clone().into();
188 assert_eq!(subid, SubId::String(string));
189 }
190
191 #[test]
192 fn eth_notification_header() {
193 let header = json!({
194 "subscription": "0x123",
195 "result": {
196 "difficulty": "0xabc",
197 "uncles": []
198 }
199 });
200
201 let notification: EthNotification = serde_json::from_value(header).unwrap();
202 assert_eq!(notification.subscription, SubId::Number(U256::from(0x123)));
203 assert_eq!(notification.result.get(), r#"{"difficulty":"0xabc","uncles":[]}"#);
204 }
205
206 #[test]
207 fn deserializer_test_valid_response() {
208 let response = r#"
210 {
211 "jsonrpc": "2.0",
212 "id": 1,
213 "result": "0x123456"
214 }"#;
215
216 let deser = serde_json::from_str::<PubSubItem>(response).unwrap();
217
218 match deser {
219 PubSubItem::Response(Response { id, payload }) => {
220 assert_eq!(id, 1.into());
221 match payload {
222 ResponsePayload::Success(result) => assert_eq!(result.get(), r#""0x123456""#),
223 _ => panic!("unexpected payload"),
224 }
225 }
226 _ => panic!("unexpected deserialization result"),
227 }
228 }
229
230 #[test]
231 fn deserializer_test_error_response() {
232 let response = r#"
234 {
235 "jsonrpc": "2.0",
236 "id": 2,
237 "error": {
238 "code": -32601,
239 "message": "Method not found"
240 }
241 }"#;
242
243 let deser = serde_json::from_str::<PubSubItem>(response).unwrap();
244
245 match deser {
246 PubSubItem::Response(Response { id, payload }) => {
247 assert_eq!(id, 2.into());
248 match payload {
249 ResponsePayload::Failure(error) => {
250 assert_eq!(error.code, -32601);
251 assert_eq!(error.message, "Method not found");
252 }
253 _ => panic!("unexpected payload"),
254 }
255 }
256 _ => panic!("unexpected deserialization result"),
257 }
258 }
259
260 #[test]
261 fn deserializer_test_empty_notification() {
262 let notification = r#"
264 {
265 "jsonrpc": "2.0",
266 "method": "eth_subscription",
267 "params": {
268 "subscription": "0x0",
269 "result": {}
270 }
271 }"#;
272
273 let deser = serde_json::from_str::<PubSubItem>(notification).unwrap();
274
275 match deser {
276 PubSubItem::Notification(EthNotification { subscription, result }) => {
277 assert_eq!(subscription, SubId::Number(U256::from(0u64)));
278 assert_eq!(result.get(), r#"{}"#);
279 }
280 _ => panic!("unexpected deserialization result"),
281 }
282 }
283
284 #[test]
285 fn deserializer_test_invalid_structure() {
286 let invalid_notification = r#"
288 {
289 "jsonrpc": "2.0",
290 "method": "eth_subscription"
291 }"#;
292
293 let deser = serde_json::from_str::<PubSubItem>(invalid_notification);
294 assert!(deser.is_err());
295 }
296
297 #[test]
298 fn deserializer_test_missing_fields() {
299 let missing_fields = r#"
301 {
302 "jsonrpc": "2.0",
303 "method": "eth_subscription",
304 "params": {}
305 }"#;
306
307 let deser = serde_json::from_str::<PubSubItem>(missing_fields);
308 assert!(deser.is_err());
309 }
310}