alloy_rpc_types_eth/
pubsub.rs

1//! Ethereum types for pub-sub
2
3use crate::{Filter, Header, Log, Transaction};
4use alloc::{boxed::Box, format};
5use alloy_primitives::B256;
6use alloy_serde::WithOtherFields;
7
8/// Subscription result.
9#[derive(Clone, Debug, PartialEq, Eq)]
10#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
11#[cfg_attr(feature = "serde", serde(untagged))]
12pub enum SubscriptionResult<T = Transaction> {
13    /// New block header.
14    Header(Box<WithOtherFields<Header>>),
15    /// Log
16    Log(Box<Log>),
17    /// Transaction hash
18    TransactionHash(B256),
19    /// Full Transaction
20    FullTransaction(Box<T>),
21    /// SyncStatus
22    SyncState(PubSubSyncStatus),
23}
24
25/// Response type for a SyncStatus subscription.
26#[derive(Clone, Copy, Debug, PartialEq, Eq)]
27#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
28#[cfg_attr(feature = "serde", serde(untagged))]
29pub enum PubSubSyncStatus {
30    /// If not currently syncing, this should always be `false`.
31    Simple(bool),
32    /// Syncing metadata.
33    Detailed(SyncStatusMetadata),
34}
35
36/// Sync status metadata.
37#[derive(Clone, Copy, Debug, PartialEq, Eq)]
38#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
39#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
40pub struct SyncStatusMetadata {
41    /// Whether the node is currently syncing.
42    pub syncing: bool,
43    /// The starting block.
44    pub starting_block: u64,
45    /// The current block.
46    pub current_block: u64,
47    /// The highest block.
48    #[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "Option::is_none"))]
49    pub highest_block: Option<u64>,
50}
51
52#[cfg(feature = "serde")]
53impl<T> serde::Serialize for SubscriptionResult<T>
54where
55    T: serde::Serialize,
56{
57    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
58    where
59        S: serde::Serializer,
60    {
61        match *self {
62            Self::Header(ref header) => header.serialize(serializer),
63            Self::Log(ref log) => log.serialize(serializer),
64            Self::TransactionHash(ref hash) => hash.serialize(serializer),
65            Self::FullTransaction(ref tx) => tx.serialize(serializer),
66            Self::SyncState(ref sync) => sync.serialize(serializer),
67        }
68    }
69}
70
71/// Subscription kind.
72#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
73#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
74#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
75#[cfg_attr(feature = "serde", serde(deny_unknown_fields))]
76pub enum SubscriptionKind {
77    /// New block headers subscription.
78    ///
79    /// Fires a notification each time a new header is appended to the chain, including chain
80    /// reorganizations. In case of a chain reorganization the subscription will emit all new
81    /// headers for the new chain. Therefore the subscription can emit multiple headers on the same
82    /// height.
83    NewHeads,
84    /// Logs subscription.
85    ///
86    /// Returns logs that are included in new imported blocks and match the given filter criteria.
87    /// In case of a chain reorganization previous sent logs that are on the old chain will be
88    /// resent with the removed property set to true. Logs from transactions that ended up in the
89    /// new chain are emitted. Therefore, a subscription can emit logs for the same transaction
90    /// multiple times.
91    Logs,
92    /// New Pending Transactions subscription.
93    ///
94    /// Returns the hash or full tx for all transactions that are added to the pending state and
95    /// are signed with a key that is available in the node. When a transaction that was
96    /// previously part of the canonical chain isn't part of the new canonical chain after a
97    /// reorganization its again emitted.
98    NewPendingTransactions,
99    /// Node syncing status subscription.
100    ///
101    /// Indicates when the node starts or stops synchronizing. The result can either be a boolean
102    /// indicating that the synchronization has started (true), finished (false) or an object with
103    /// various progress indicators.
104    Syncing,
105}
106
107/// Any additional parameters for a subscription.
108#[derive(Clone, Debug, Default, PartialEq, Eq)]
109pub enum Params {
110    /// No parameters passed.
111    #[default]
112    None,
113    /// Log parameters.
114    Logs(Box<Filter>),
115    /// Boolean parameter for new pending transactions.
116    Bool(bool),
117}
118
119impl Params {
120    /// Returns true if it's a bool parameter.
121    #[inline]
122    pub const fn is_bool(&self) -> bool {
123        matches!(self, Self::Bool(_))
124    }
125
126    /// Returns true if it's a log parameter.
127    #[inline]
128    pub const fn is_logs(&self) -> bool {
129        matches!(self, Self::Logs(_))
130    }
131}
132
133impl From<Filter> for Params {
134    fn from(filter: Filter) -> Self {
135        Self::Logs(Box::new(filter))
136    }
137}
138
139impl From<bool> for Params {
140    fn from(value: bool) -> Self {
141        Self::Bool(value)
142    }
143}
144
145#[cfg(feature = "serde")]
146impl serde::Serialize for Params {
147    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
148    where
149        S: serde::Serializer,
150    {
151        match self {
152            Self::None => (&[] as &[serde_json::Value]).serialize(serializer),
153            Self::Logs(logs) => logs.serialize(serializer),
154            Self::Bool(full) => full.serialize(serializer),
155        }
156    }
157}
158
159#[cfg(feature = "serde")]
160impl<'a> serde::Deserialize<'a> for Params {
161    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
162    where
163        D: serde::Deserializer<'a>,
164    {
165        use serde::de::Error;
166
167        let v = serde_json::Value::deserialize(deserializer)?;
168
169        if v.is_null() {
170            return Ok(Self::None);
171        }
172
173        if let Some(val) = v.as_bool() {
174            return Ok(val.into());
175        }
176
177        serde_json::from_value::<Filter>(v)
178            .map(Into::into)
179            .map_err(|e| D::Error::custom(format!("Invalid Pub-Sub parameters: {e}")))
180    }
181}
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186    use similar_asserts::assert_eq;
187
188    #[test]
189    #[cfg(feature = "serde")]
190    fn params_serde() {
191        // Test deserialization of boolean parameter
192        let s: Params = serde_json::from_str("true").unwrap();
193        assert_eq!(s, Params::Bool(true));
194
195        // Test deserialization of null (None) parameter
196        let s: Params = serde_json::from_str("null").unwrap();
197        assert_eq!(s, Params::None);
198
199        // Test deserialization of log parameters
200        let filter = Filter::default();
201        let s: Params = serde_json::from_str(&serde_json::to_string(&filter).unwrap()).unwrap();
202        assert_eq!(s, Params::Logs(Box::new(filter)));
203    }
204
205    #[test]
206    fn params_is_bool() {
207        // Check if the `is_bool` method correctly identifies boolean parameters
208        let param = Params::Bool(true);
209        assert!(param.is_bool());
210
211        let param = Params::None;
212        assert!(!param.is_bool());
213
214        let param = Params::Logs(Box::default());
215        assert!(!param.is_bool());
216    }
217
218    #[test]
219    fn params_is_logs() {
220        // Check if the `is_logs` method correctly identifies log parameters
221        let param = Params::Logs(Box::default());
222        assert!(param.is_logs());
223
224        let param = Params::None;
225        assert!(!param.is_logs());
226
227        let param = Params::Bool(true);
228        assert!(!param.is_logs());
229    }
230
231    #[test]
232    fn params_from_filter() {
233        let filter = Filter::default();
234        let param: Params = filter.clone().into();
235        assert_eq!(param, Params::Logs(Box::new(filter)));
236    }
237
238    #[test]
239    fn params_from_bool() {
240        let param: Params = true.into();
241        assert_eq!(param, Params::Bool(true));
242
243        let param: Params = false.into();
244        assert_eq!(param, Params::Bool(false));
245    }
246
247    #[test]
248    #[cfg(feature = "serde")]
249    fn params_serialize_none() {
250        let param = Params::None;
251        let serialized = serde_json::to_string(&param).unwrap();
252        assert_eq!(serialized, "[]");
253    }
254
255    #[test]
256    #[cfg(feature = "serde")]
257    fn params_serialize_bool() {
258        let param = Params::Bool(true);
259        let serialized = serde_json::to_string(&param).unwrap();
260        assert_eq!(serialized, "true");
261
262        let param = Params::Bool(false);
263        let serialized = serde_json::to_string(&param).unwrap();
264        assert_eq!(serialized, "false");
265    }
266
267    #[test]
268    #[cfg(feature = "serde")]
269    fn params_serialize_logs() {
270        let filter = Filter::default();
271        let param = Params::Logs(Box::new(filter.clone()));
272        let serialized = serde_json::to_string(&param).unwrap();
273        let expected = serde_json::to_string(&filter).unwrap();
274        assert_eq!(serialized, expected);
275    }
276}