alloy_contract/
event.rs

1use crate::Error;
2use alloy_network::Ethereum;
3use alloy_primitives::{Address, LogData, B256};
4use alloy_provider::{FilterPollerBuilder, Network, Provider};
5use alloy_rpc_types_eth::{BlockNumberOrTag, Filter, FilterBlockOption, Log, Topic, ValueOrArray};
6use alloy_sol_types::SolEvent;
7use alloy_transport::TransportResult;
8use futures::Stream;
9use futures_util::StreamExt;
10use std::{fmt, marker::PhantomData};
11
12/// Helper for managing the event filter before querying or streaming its logs
13#[must_use = "event filters do nothing unless you `query`, `watch`, or `stream` them"]
14pub struct Event<P, E, N = Ethereum> {
15    /// The provider to use for querying or streaming logs.
16    pub provider: P,
17    /// The filter to use for querying or streaming logs.
18    pub filter: Filter,
19    _phantom: PhantomData<(E, N)>,
20}
21
22impl<P: fmt::Debug, E, N> fmt::Debug for Event<P, E, N> {
23    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24        f.debug_struct("Event")
25            .field("provider", &self.provider)
26            .field("filter", &self.filter)
27            .field("event_type", &format_args!("{}", std::any::type_name::<E>()))
28            .finish()
29    }
30}
31
32#[doc(hidden)]
33impl<'a, P: Provider<N>, E: SolEvent, N: Network> Event<&'a P, E, N> {
34    // `sol!` macro constructor, see `#[sol(rpc)]`. Not public API.
35    // NOTE: please avoid changing this function due to its use in the `sol!` macro.
36    pub fn new_sol(provider: &'a P, address: &Address) -> Self {
37        // keccak256 hash of the event signature needed for the filter to actually filter by event
38        // check that the event is not anonymous to include the event signature in the filter
39        if E::ANONYMOUS {
40            Self::new(provider, Filter::new().address(*address))
41        } else {
42            Self::new(provider, Filter::new().address(*address).event_signature(E::SIGNATURE_HASH))
43        }
44    }
45}
46
47impl<P: Provider<N>, E: SolEvent, N: Network> Event<P, E, N> {
48    /// Creates a new event with the provided provider and filter.
49    pub const fn new(provider: P, filter: Filter) -> Self {
50        Self { provider, filter, _phantom: PhantomData }
51    }
52
53    /// Queries the blockchain for the selected filter and returns a vector of matching event logs.
54    pub async fn query(&self) -> Result<Vec<(E, Log)>, Error> {
55        let logs = self.query_raw().await?;
56        logs.into_iter().map(|log| Ok((decode_log(&log)?, log))).collect()
57    }
58
59    /// Queries the blockchain for the selected filter and returns a vector of matching event logs,
60    /// without decoding them.
61    pub async fn query_raw(&self) -> TransportResult<Vec<Log>> {
62        self.provider.get_logs(&self.filter).await
63    }
64
65    /// Watches for events that match the filter.
66    ///
67    /// Returns a stream of decoded events and raw logs.
68    #[doc(alias = "stream")]
69    #[doc(alias = "stream_with_meta")]
70    pub async fn watch(&self) -> TransportResult<EventPoller<E>> {
71        let poller = self.provider.watch_logs(&self.filter).await?;
72        Ok(poller.into())
73    }
74
75    /// Subscribes to the stream of events that match the filter.
76    ///
77    /// Returns a stream of decoded events and raw logs.
78    #[cfg(feature = "pubsub")]
79    pub async fn subscribe(&self) -> TransportResult<subscription::EventSubscription<E>> {
80        let sub = self.provider.subscribe_logs(&self.filter).await?;
81        Ok(sub.into())
82    }
83
84    /// Sets the inner filter object
85    ///
86    /// See [`Filter::select`].
87    pub fn select(mut self, filter: impl Into<FilterBlockOption>) -> Self {
88        self.filter.block_option = filter.into();
89        self
90    }
91
92    /// Sets the from block number
93    pub fn from_block<B: Into<BlockNumberOrTag>>(mut self, block: B) -> Self {
94        self.filter.block_option = self.filter.block_option.with_from_block(block.into());
95        self
96    }
97
98    /// Sets the to block number
99    pub fn to_block<B: Into<BlockNumberOrTag>>(mut self, block: B) -> Self {
100        self.filter.block_option = self.filter.block_option.with_to_block(block.into());
101        self
102    }
103
104    /// Return `true` if filter configured to match pending block.
105    ///
106    /// This means that both `from_block` and `to_block` are set to the pending
107    /// tag.
108    pub fn is_pending_block_filter(&self) -> bool {
109        self.filter.block_option.get_from_block().is_some_and(BlockNumberOrTag::is_pending)
110            && self.filter.block_option.get_to_block().is_some_and(BlockNumberOrTag::is_pending)
111    }
112
113    /// Pins the block hash for the filter
114    pub fn at_block_hash<A: Into<B256>>(mut self, hash: A) -> Self {
115        self.filter.block_option = self.filter.block_option.with_block_hash(hash.into());
116        self
117    }
118
119    /// Sets the address to query with this filter.
120    ///
121    /// See [`Filter::address`].
122    pub fn address<A: Into<ValueOrArray<Address>>>(mut self, address: A) -> Self {
123        self.filter.address = address.into().into();
124        self
125    }
126
127    /// Given the event signature in string form, it hashes it and adds it to the topics to monitor
128    pub fn event(mut self, event_name: &str) -> Self {
129        self.filter = self.filter.event(event_name);
130        self
131    }
132
133    /// Hashes all event signatures and sets them as array to event_signature(topic0)
134    pub fn events(mut self, events: impl IntoIterator<Item = impl AsRef<[u8]>>) -> Self {
135        self.filter = self.filter.events(events);
136        self
137    }
138
139    /// Sets event_signature(topic0) (the event name for non-anonymous events)
140    pub fn event_signature<TO: Into<Topic>>(mut self, topic: TO) -> Self {
141        self.filter.topics[0] = topic.into();
142        self
143    }
144
145    /// Sets the 1st indexed topic
146    pub fn topic1<TO: Into<Topic>>(mut self, topic: TO) -> Self {
147        self.filter.topics[1] = topic.into();
148        self
149    }
150
151    /// Sets the 2nd indexed topic
152    pub fn topic2<TO: Into<Topic>>(mut self, topic: TO) -> Self {
153        self.filter.topics[2] = topic.into();
154        self
155    }
156
157    /// Sets the 3rd indexed topic
158    pub fn topic3<TO: Into<Topic>>(mut self, topic: TO) -> Self {
159        self.filter.topics[3] = topic.into();
160        self
161    }
162}
163
164impl<P: Clone, E, N> Event<&P, E, N> {
165    /// Clones the provider and returns a new event with the cloned provider.
166    pub fn with_cloned_provider(self) -> Event<P, E, N> {
167        Event { provider: self.provider.clone(), filter: self.filter, _phantom: PhantomData }
168    }
169}
170
171/// An event poller.
172///
173/// Polling configuration is available through the [`poller`](Self::poller) field.
174pub struct EventPoller<E> {
175    /// The inner poller.
176    pub poller: FilterPollerBuilder<Log>,
177    _phantom: PhantomData<E>,
178}
179
180impl<E> AsRef<FilterPollerBuilder<Log>> for EventPoller<E> {
181    #[inline]
182    fn as_ref(&self) -> &FilterPollerBuilder<Log> {
183        &self.poller
184    }
185}
186
187impl<E> AsMut<FilterPollerBuilder<Log>> for EventPoller<E> {
188    #[inline]
189    fn as_mut(&mut self) -> &mut FilterPollerBuilder<Log> {
190        &mut self.poller
191    }
192}
193
194impl<E> fmt::Debug for EventPoller<E> {
195    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196        f.debug_struct("EventPoller")
197            .field("poller", &self.poller)
198            .field("event_type", &format_args!("{}", std::any::type_name::<E>()))
199            .finish()
200    }
201}
202
203impl<E> From<FilterPollerBuilder<Log>> for EventPoller<E> {
204    fn from(poller: FilterPollerBuilder<Log>) -> Self {
205        Self { poller, _phantom: PhantomData }
206    }
207}
208
209impl<E: SolEvent> EventPoller<E> {
210    /// Starts the poller and returns a stream that yields the decoded event and the raw log.
211    ///
212    /// Note that this stream will not return `None` until the provider is dropped.
213    pub fn into_stream(self) -> impl Stream<Item = alloy_sol_types::Result<(E, Log)>> + Unpin {
214        self.poller
215            .into_stream()
216            .flat_map(futures_util::stream::iter)
217            .map(|log| decode_log(&log).map(|e| (e, log)))
218    }
219}
220
221fn decode_log<E: SolEvent>(log: &Log) -> alloy_sol_types::Result<E> {
222    let log_data: &LogData = log.as_ref();
223
224    E::decode_raw_log(log_data.topics().iter().copied(), &log_data.data)
225}
226
227#[cfg(feature = "pubsub")]
228pub(crate) mod subscription {
229    use super::*;
230    use alloy_pubsub::Subscription;
231
232    /// An event subscription.
233    ///
234    /// Underlying subscription is available through the [`sub`](Self::sub) field.
235    pub struct EventSubscription<E> {
236        /// The inner poller.
237        pub sub: Subscription<Log>,
238        _phantom: PhantomData<E>,
239    }
240
241    impl<E> AsRef<Subscription<Log>> for EventSubscription<E> {
242        #[inline]
243        fn as_ref(&self) -> &Subscription<Log> {
244            &self.sub
245        }
246    }
247
248    impl<E> AsMut<Subscription<Log>> for EventSubscription<E> {
249        #[inline]
250        fn as_mut(&mut self) -> &mut Subscription<Log> {
251            &mut self.sub
252        }
253    }
254
255    impl<E> fmt::Debug for EventSubscription<E> {
256        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
257            f.debug_struct("EventSubscription")
258                .field("sub", &self.sub)
259                .field("event_type", &format_args!("{}", std::any::type_name::<E>()))
260                .finish()
261        }
262    }
263
264    impl<E> From<Subscription<Log>> for EventSubscription<E> {
265        fn from(sub: Subscription<Log>) -> Self {
266            Self { sub, _phantom: PhantomData }
267        }
268    }
269
270    impl<E: SolEvent> EventSubscription<E> {
271        /// Converts the subscription into a stream.
272        pub fn into_stream(self) -> impl Stream<Item = alloy_sol_types::Result<(E, Log)>> + Unpin {
273            self.sub.into_stream().map(|log| decode_log(&log).map(|e| (e, log)))
274        }
275    }
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281    use alloy_network::EthereumWallet;
282    use alloy_primitives::U256;
283    use alloy_signer_local::PrivateKeySigner;
284    use alloy_sol_types::sol;
285
286    sol! {
287        // solc v0.8.24; solc a.sol --via-ir --optimize --bin
288        #[sol(rpc, bytecode = "60808060405234601557610147908161001a8239f35b5f80fdfe6080806040526004361015610012575f80fd5b5f3560e01c908163299d8665146100a7575063ffdf4f1b14610032575f80fd5b346100a3575f3660031901126100a357602a7f6d10b8446ff0ac11bb95d154e7b10a73042fb9fc3bca0c92de5397b2fe78496c6040518061009e819060608252600560608301526468656c6c6f60d81b608083015263deadbeef604060a0840193600160208201520152565b0390a2005b5f80fd5b346100a3575f3660031901126100a3577f4e4cd44610926680098f1b54e2bdd1fb952659144c471173bbb9cf966af3a988818061009e602a949060608252600560608301526468656c6c6f60d81b608083015263deadbeef604060a084019360016020820152015256fea26469706673582212202e640cd14a7310d4165f902d2721ef5b4640a08f5ae38e9ae5c315a9f9f4435864736f6c63430008190033")]
289        #[allow(dead_code)]
290        contract MyContract {
291            #[derive(Debug, PartialEq, Eq)]
292            event MyEvent(uint64 indexed, string, bool, bytes32);
293
294            #[derive(Debug, PartialEq, Eq)]
295            event WrongEvent(uint64 indexed, string, bool, bytes32);
296
297            function doEmit() external {
298                emit MyEvent(42, "hello", true, bytes32(uint256(0xdeadbeef)));
299            }
300
301            function doEmitWrongEvent() external {
302                emit WrongEvent(42, "hello", true, bytes32(uint256(0xdeadbeef)));
303            }
304        }
305    }
306
307    #[tokio::test]
308    async fn event_filters() {
309        let _ = tracing_subscriber::fmt::try_init();
310
311        let anvil = alloy_node_bindings::Anvil::new().spawn();
312
313        let pk: PrivateKeySigner =
314            "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80".parse().unwrap();
315        let wallet = EthereumWallet::from(pk);
316        let provider = alloy_provider::ProviderBuilder::new()
317            .wallet(wallet.clone())
318            .connect_http(anvil.endpoint_url());
319
320        // let from = address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266");
321        let contract = MyContract::deploy(&provider).await.unwrap();
322
323        let event: Event<_, MyContract::MyEvent, _> = Event::new(&provider, Filter::new());
324        let all = event.query().await.unwrap();
325        assert_eq!(all.len(), 0);
326
327        // Same as above, but generated by `sol!`.
328        let event = contract.MyEvent_filter();
329
330        let poller = event.watch().await.unwrap();
331
332        let _receipt =
333            contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
334
335        let expected_event = MyContract::MyEvent {
336            _0: 42,
337            _1: "hello".to_string(),
338            _2: true,
339            _3: U256::from(0xdeadbeefu64).into(),
340        };
341
342        let mut stream = poller.into_stream();
343        let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
344        assert_eq!(MyContract::MyEvent::SIGNATURE_HASH.0, stream_log.topics().first().unwrap().0); // add check that the received event signature is the same as the one we expect
345        assert_eq!(stream_event, expected_event);
346        assert_eq!(stream_log.inner.address, *contract.address());
347        assert_eq!(stream_log.block_number, Some(2));
348
349        // This is not going to return `None`
350        // assert!(stream.next().await.is_none());
351
352        let all = event.query().await.unwrap();
353        assert_eq!(all.len(), 1);
354        assert_eq!(all[0].0, expected_event);
355        assert_eq!(all[0].1, stream_log);
356
357        // send the wrong event and make sure it is NOT picked up by the event filter
358        let _wrong_receipt = contract
359            .doEmitWrongEvent()
360            .send()
361            .await
362            .unwrap()
363            .get_receipt()
364            .await
365            .expect("no receipt");
366
367        // we sent the wrong event
368        // so no events should be returned when querying event.query() (MyEvent)
369        let all = event.query().await.unwrap();
370        assert_eq!(all.len(), 0);
371
372        #[cfg(feature = "pubsub")]
373        {
374            let provider = alloy_provider::ProviderBuilder::new()
375                .wallet(wallet)
376                .connect(&anvil.ws_endpoint())
377                .await
378                .unwrap();
379
380            let contract = MyContract::new(*contract.address(), provider);
381            let event = contract.MyEvent_filter();
382
383            let sub = event.subscribe().await.unwrap();
384
385            contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
386
387            let mut stream = sub.into_stream();
388
389            let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
390            assert_eq!(
391                MyContract::MyEvent::SIGNATURE_HASH.0,
392                stream_log.topics().first().unwrap().0
393            );
394            assert_eq!(stream_event, expected_event);
395            assert_eq!(stream_log.address(), *contract.address());
396            assert_eq!(stream_log.block_number, Some(4));
397
398            // send the request to emit the wrong event
399            contract
400                .doEmitWrongEvent()
401                .send()
402                .await
403                .unwrap()
404                .get_receipt()
405                .await
406                .expect("no receipt");
407
408            // we sent the wrong event
409            // so no events should be returned when querying event.query() (MyEvent)
410            let all = event.query().await.unwrap();
411            assert_eq!(all.len(), 0);
412        }
413    }
414
415    /// Same test as above, but using builder methods.
416    #[tokio::test]
417    async fn event_builder_filters() {
418        let _ = tracing_subscriber::fmt::try_init();
419
420        let anvil = alloy_node_bindings::Anvil::new().spawn();
421        let pk: PrivateKeySigner =
422            "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80".parse().unwrap();
423        let wallet = EthereumWallet::from(pk);
424        let provider = alloy_provider::ProviderBuilder::new()
425            .wallet(wallet.clone())
426            .connect_http(anvil.endpoint_url());
427
428        let contract = MyContract::deploy(&provider).await.unwrap();
429
430        let event: Event<_, MyContract::MyEvent, _> = Event::new(&provider, Filter::new())
431            .address(*contract.address())
432            .event_signature(MyContract::MyEvent::SIGNATURE_HASH);
433        let all = event.query().await.unwrap();
434        assert_eq!(all.len(), 0);
435
436        let poller = event.watch().await.unwrap();
437
438        let _receipt =
439            contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
440
441        let expected_event = MyContract::MyEvent {
442            _0: 42,
443            _1: "hello".to_string(),
444            _2: true,
445            _3: U256::from(0xdeadbeefu64).into(),
446        };
447
448        let mut stream = poller.into_stream();
449        let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
450        assert_eq!(MyContract::MyEvent::SIGNATURE_HASH.0, stream_log.topics().first().unwrap().0); // add check that the received event signature is the same as the one we expect
451        assert_eq!(stream_event, expected_event);
452        assert_eq!(stream_log.inner.address, *contract.address());
453        assert_eq!(stream_log.block_number, Some(2));
454
455        // This is not going to return `None`
456        // assert!(stream.next().await.is_none());
457
458        let all = event.query().await.unwrap();
459        assert_eq!(all.len(), 1);
460        assert_eq!(all[0].0, expected_event);
461        assert_eq!(all[0].1, stream_log);
462
463        // send the wrong event and make sure it is NOT picked up by the event filter
464        let _wrong_receipt = contract
465            .doEmitWrongEvent()
466            .send()
467            .await
468            .unwrap()
469            .get_receipt()
470            .await
471            .expect("no receipt");
472
473        // we sent the wrong event
474        // so no events should be returned when querying event.query() (MyEvent)
475        let all = event.query().await.unwrap();
476        assert_eq!(all.len(), 0);
477
478        #[cfg(feature = "pubsub")]
479        {
480            let provider = alloy_provider::ProviderBuilder::new()
481                .wallet(wallet)
482                .connect(&anvil.ws_endpoint())
483                .await
484                .unwrap();
485
486            let contract = MyContract::new(*contract.address(), &provider);
487            let event: Event<_, MyContract::MyEvent, _> = Event::new(&provider, Filter::new())
488                .address(*contract.address())
489                .event_signature(MyContract::MyEvent::SIGNATURE_HASH);
490
491            let sub = event.subscribe().await.unwrap();
492
493            contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
494
495            let mut stream = sub.into_stream();
496
497            let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
498            assert_eq!(
499                MyContract::MyEvent::SIGNATURE_HASH.0,
500                stream_log.topics().first().unwrap().0
501            );
502            assert_eq!(stream_event, expected_event);
503            assert_eq!(stream_log.address(), *contract.address());
504            assert_eq!(stream_log.block_number, Some(4));
505
506            // send the request to emit the wrong event
507            contract
508                .doEmitWrongEvent()
509                .send()
510                .await
511                .unwrap()
512                .get_receipt()
513                .await
514                .expect("no receipt");
515
516            // we sent the wrong event
517            // so no events should be returned when querying event.query() (MyEvent)
518            let all = event.query().await.unwrap();
519            assert_eq!(all.len(), 0);
520        }
521    }
522}