alloy_json_rpc/
notification.rs

1use crate::{Response, ResponsePayload};
2use alloy_primitives::U256;
3use serde::{
4    de::{MapAccess, Visitor},
5    Deserialize, Serialize,
6};
7
8/// A subscription ID.
9#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
10#[serde(untagged)]
11pub enum SubId {
12    /// A number.
13    Number(U256),
14    /// A string.
15    String(String),
16}
17
18impl From<U256> for SubId {
19    fn from(value: U256) -> Self {
20        Self::Number(value)
21    }
22}
23
24impl From<String> for SubId {
25    fn from(value: String) -> Self {
26        Self::String(value)
27    }
28}
29
30/// An ethereum-style notification, not to be confused with a JSON-RPC
31/// notification.
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct EthNotification<T = Box<serde_json::value::RawValue>> {
34    /// The subscription ID.
35    pub subscription: SubId,
36    /// The notification payload.
37    pub result: T,
38}
39
40/// An item received over an Ethereum pubsub transport.
41///
42/// Ethereum pubsub uses a non-standard JSON-RPC notification format. An item received over a pubsub
43/// transport may be a JSON-RPC response or an Ethereum-style notification.
44#[derive(Clone, Debug)]
45pub enum PubSubItem {
46    /// A [`Response`] to a JSON-RPC request.
47    Response(Response),
48    /// An Ethereum-style notification.
49    Notification(EthNotification),
50}
51
52impl From<Response> for PubSubItem {
53    fn from(response: Response) -> Self {
54        Self::Response(response)
55    }
56}
57
58impl From<EthNotification> for PubSubItem {
59    fn from(notification: EthNotification) -> Self {
60        Self::Notification(notification)
61    }
62}
63
64impl<'de> Deserialize<'de> for PubSubItem {
65    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
66    where
67        D: serde::Deserializer<'de>,
68    {
69        struct PubSubItemVisitor;
70
71        impl<'de> Visitor<'de> for PubSubItemVisitor {
72            type Value = PubSubItem;
73
74            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75                formatter.write_str("a JSON-RPC response or an Ethereum-style notification")
76            }
77
78            fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
79            where
80                A: MapAccess<'de>,
81            {
82                let mut id = None;
83                let mut result = None;
84                let mut params = None;
85                let mut error = None;
86
87                // Drain the map into the appropriate fields.
88                while let Ok(Some(key)) = map.next_key() {
89                    match key {
90                        "id" => {
91                            if id.is_some() {
92                                return Err(serde::de::Error::duplicate_field("id"));
93                            }
94                            id = Some(map.next_value()?);
95                        }
96                        "result" => {
97                            if result.is_some() {
98                                return Err(serde::de::Error::duplicate_field("result"));
99                            }
100                            result = Some(map.next_value()?);
101                        }
102                        "params" => {
103                            if params.is_some() {
104                                return Err(serde::de::Error::duplicate_field("params"));
105                            }
106                            params = Some(map.next_value()?);
107                        }
108                        "error" => {
109                            if error.is_some() {
110                                return Err(serde::de::Error::duplicate_field("error"));
111                            }
112                            error = Some(map.next_value()?);
113                        }
114                        // Discard unknown fields.
115                        _ => {
116                            let _ = map.next_value::<serde_json::Value>()?;
117                        }
118                    }
119                }
120
121                // If it has an ID, it is a response.
122                if let Some(id) = id {
123                    let payload = error
124                        .map(ResponsePayload::Failure)
125                        .or_else(|| result.map(ResponsePayload::Success))
126                        .ok_or_else(|| {
127                            serde::de::Error::custom(
128                                "missing `result` or `error` field in response",
129                            )
130                        })?;
131
132                    Ok(Response { id, payload }.into())
133                } else {
134                    // Notifications cannot have an error.
135                    if error.is_some() {
136                        return Err(serde::de::Error::custom(
137                            "unexpected `error` field in subscription notification",
138                        ));
139                    }
140                    params
141                        .map(PubSubItem::Notification)
142                        .ok_or_else(|| serde::de::Error::missing_field("params"))
143                }
144            }
145        }
146
147        deserializer.deserialize_any(PubSubItemVisitor)
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use crate::{EthNotification, PubSubItem, SubId};
155    use serde_json::json;
156
157    #[test]
158    fn deserializer_test() {
159        // https://geth.ethereum.org/docs/interacting-with-geth/rpc/pubsub
160        let notification = r#"{ "jsonrpc": "2.0", "method": "eth_subscription", "params": {"subscription": "0xcd0c3e8af590364c09d0fa6a1210faf5", "result": {"difficulty": "0xd9263f42a87", "uncles": []}} }
161        "#;
162
163        let deser = serde_json::from_str::<PubSubItem>(notification).unwrap();
164
165        match deser {
166            PubSubItem::Notification(EthNotification { subscription, result }) => {
167                assert_eq!(
168                    subscription,
169                    SubId::Number("0xcd0c3e8af590364c09d0fa6a1210faf5".parse().unwrap())
170                );
171                assert_eq!(result.get(), r#"{"difficulty": "0xd9263f42a87", "uncles": []}"#);
172            }
173            _ => panic!("unexpected deserialization result"),
174        }
175    }
176
177    #[test]
178    fn subid_number() {
179        let number = U256::from(123456u64);
180        let subid: SubId = number.into();
181        assert_eq!(subid, SubId::Number(number));
182    }
183
184    #[test]
185    fn subid_string() {
186        let string = "subscription_id".to_string();
187        let subid: SubId = string.clone().into();
188        assert_eq!(subid, SubId::String(string));
189    }
190
191    #[test]
192    fn eth_notification_header() {
193        let header = json!({
194            "subscription": "0x123",
195            "result": {
196                "difficulty": "0xabc",
197                "uncles": []
198            }
199        });
200
201        let notification: EthNotification = serde_json::from_value(header).unwrap();
202        assert_eq!(notification.subscription, SubId::Number(U256::from(0x123)));
203        assert_eq!(notification.result.get(), r#"{"difficulty":"0xabc","uncles":[]}"#);
204    }
205
206    #[test]
207    fn deserializer_test_valid_response() {
208        // A valid JSON-RPC response with a result
209        let response = r#"
210            {
211                "jsonrpc": "2.0",
212                "id": 1,
213                "result": "0x123456"
214            }"#;
215
216        let deser = serde_json::from_str::<PubSubItem>(response).unwrap();
217
218        match deser {
219            PubSubItem::Response(Response { id, payload }) => {
220                assert_eq!(id, 1.into());
221                match payload {
222                    ResponsePayload::Success(result) => assert_eq!(result.get(), r#""0x123456""#),
223                    _ => panic!("unexpected payload"),
224                }
225            }
226            _ => panic!("unexpected deserialization result"),
227        }
228    }
229
230    #[test]
231    fn deserializer_test_error_response() {
232        // A JSON-RPC response with an error
233        let response = r#"
234            {
235                "jsonrpc": "2.0",
236                "id": 2,
237                "error": {
238                    "code": -32601,
239                    "message": "Method not found"
240                }
241            }"#;
242
243        let deser = serde_json::from_str::<PubSubItem>(response).unwrap();
244
245        match deser {
246            PubSubItem::Response(Response { id, payload }) => {
247                assert_eq!(id, 2.into());
248                match payload {
249                    ResponsePayload::Failure(error) => {
250                        assert_eq!(error.code, -32601);
251                        assert_eq!(error.message, "Method not found");
252                    }
253                    _ => panic!("unexpected payload"),
254                }
255            }
256            _ => panic!("unexpected deserialization result"),
257        }
258    }
259
260    #[test]
261    fn deserializer_test_empty_notification() {
262        // An empty notification to test deserialization handling
263        let notification = r#"
264            {
265                "jsonrpc": "2.0",
266                "method": "eth_subscription",
267                "params": {
268                    "subscription": "0x0",
269                    "result": {}
270                }
271            }"#;
272
273        let deser = serde_json::from_str::<PubSubItem>(notification).unwrap();
274
275        match deser {
276            PubSubItem::Notification(EthNotification { subscription, result }) => {
277                assert_eq!(subscription, SubId::Number(U256::from(0u64)));
278                assert_eq!(result.get(), r#"{}"#);
279            }
280            _ => panic!("unexpected deserialization result"),
281        }
282    }
283
284    #[test]
285    fn deserializer_test_invalid_structure() {
286        // An invalid structure should fail deserialization
287        let invalid_notification = r#"
288           {
289               "jsonrpc": "2.0",
290               "method": "eth_subscription"
291           }"#;
292
293        let deser = serde_json::from_str::<PubSubItem>(invalid_notification);
294        assert!(deser.is_err());
295    }
296
297    #[test]
298    fn deserializer_test_missing_fields() {
299        // A notification missing essential fields should fail
300        let missing_fields = r#"
301           {
302               "jsonrpc": "2.0",
303               "method": "eth_subscription",
304               "params": {}
305           }"#;
306
307        let deser = serde_json::from_str::<PubSubItem>(missing_fields);
308        assert!(deser.is_err());
309    }
310}