1use crate::Error;
2use alloy_network::Ethereum;
3use alloy_primitives::{Address, LogData, B256};
4use alloy_provider::{FilterPollerBuilder, Network, Provider};
5use alloy_rpc_types_eth::{BlockNumberOrTag, Filter, FilterBlockOption, Log, Topic, ValueOrArray};
6use alloy_sol_types::SolEvent;
7use alloy_transport::TransportResult;
8use futures::Stream;
9use futures_util::StreamExt;
10use std::{fmt, marker::PhantomData};
11
12#[must_use = "event filters do nothing unless you `query`, `watch`, or `stream` them"]
14pub struct Event<P, E, N = Ethereum> {
15 pub provider: P,
17 pub filter: Filter,
19 _phantom: PhantomData<(E, N)>,
20}
21
22impl<P: fmt::Debug, E, N> fmt::Debug for Event<P, E, N> {
23 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24 f.debug_struct("Event")
25 .field("provider", &self.provider)
26 .field("filter", &self.filter)
27 .field("event_type", &format_args!("{}", std::any::type_name::<E>()))
28 .finish()
29 }
30}
31
32#[doc(hidden)]
33impl<'a, P: Provider<N>, E: SolEvent, N: Network> Event<&'a P, E, N> {
34 pub fn new_sol(provider: &'a P, address: &Address) -> Self {
37 if E::ANONYMOUS {
40 Self::new(provider, Filter::new().address(*address))
41 } else {
42 Self::new(provider, Filter::new().address(*address).event_signature(E::SIGNATURE_HASH))
43 }
44 }
45}
46
47impl<P: Provider<N>, E: SolEvent, N: Network> Event<P, E, N> {
48 pub const fn new(provider: P, filter: Filter) -> Self {
50 Self { provider, filter, _phantom: PhantomData }
51 }
52
53 pub async fn query(&self) -> Result<Vec<(E, Log)>, Error> {
55 let logs = self.query_raw().await?;
56 logs.into_iter().map(|log| Ok((decode_log(&log)?, log))).collect()
57 }
58
59 pub async fn query_raw(&self) -> TransportResult<Vec<Log>> {
62 self.provider.get_logs(&self.filter).await
63 }
64
65 #[doc(alias = "stream")]
69 #[doc(alias = "stream_with_meta")]
70 pub async fn watch(&self) -> TransportResult<EventPoller<E>> {
71 let poller = self.provider.watch_logs(&self.filter).await?;
72 Ok(poller.into())
73 }
74
75 #[cfg(feature = "pubsub")]
79 pub async fn subscribe(&self) -> TransportResult<subscription::EventSubscription<E>> {
80 let sub = self.provider.subscribe_logs(&self.filter).await?;
81 Ok(sub.into())
82 }
83
84 pub fn select(mut self, filter: impl Into<FilterBlockOption>) -> Self {
88 self.filter.block_option = filter.into();
89 self
90 }
91
92 pub fn from_block<B: Into<BlockNumberOrTag>>(mut self, block: B) -> Self {
94 self.filter.block_option = self.filter.block_option.with_from_block(block.into());
95 self
96 }
97
98 pub fn to_block<B: Into<BlockNumberOrTag>>(mut self, block: B) -> Self {
100 self.filter.block_option = self.filter.block_option.with_to_block(block.into());
101 self
102 }
103
104 pub fn is_pending_block_filter(&self) -> bool {
109 self.filter.block_option.get_from_block().is_some_and(BlockNumberOrTag::is_pending)
110 && self.filter.block_option.get_to_block().is_some_and(BlockNumberOrTag::is_pending)
111 }
112
113 pub fn at_block_hash<A: Into<B256>>(mut self, hash: A) -> Self {
115 self.filter.block_option = self.filter.block_option.with_block_hash(hash.into());
116 self
117 }
118
119 pub fn address<A: Into<ValueOrArray<Address>>>(mut self, address: A) -> Self {
123 self.filter.address = address.into().into();
124 self
125 }
126
127 pub fn event(mut self, event_name: &str) -> Self {
129 self.filter = self.filter.event(event_name);
130 self
131 }
132
133 pub fn events(mut self, events: impl IntoIterator<Item = impl AsRef<[u8]>>) -> Self {
135 self.filter = self.filter.events(events);
136 self
137 }
138
139 pub fn event_signature<TO: Into<Topic>>(mut self, topic: TO) -> Self {
141 self.filter.topics[0] = topic.into();
142 self
143 }
144
145 pub fn topic1<TO: Into<Topic>>(mut self, topic: TO) -> Self {
147 self.filter.topics[1] = topic.into();
148 self
149 }
150
151 pub fn topic2<TO: Into<Topic>>(mut self, topic: TO) -> Self {
153 self.filter.topics[2] = topic.into();
154 self
155 }
156
157 pub fn topic3<TO: Into<Topic>>(mut self, topic: TO) -> Self {
159 self.filter.topics[3] = topic.into();
160 self
161 }
162}
163
164impl<P: Clone, E, N> Event<&P, E, N> {
165 pub fn with_cloned_provider(self) -> Event<P, E, N> {
167 Event { provider: self.provider.clone(), filter: self.filter, _phantom: PhantomData }
168 }
169}
170
171pub struct EventPoller<E> {
175 pub poller: FilterPollerBuilder<Log>,
177 _phantom: PhantomData<E>,
178}
179
180impl<E> AsRef<FilterPollerBuilder<Log>> for EventPoller<E> {
181 #[inline]
182 fn as_ref(&self) -> &FilterPollerBuilder<Log> {
183 &self.poller
184 }
185}
186
187impl<E> AsMut<FilterPollerBuilder<Log>> for EventPoller<E> {
188 #[inline]
189 fn as_mut(&mut self) -> &mut FilterPollerBuilder<Log> {
190 &mut self.poller
191 }
192}
193
194impl<E> fmt::Debug for EventPoller<E> {
195 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196 f.debug_struct("EventPoller")
197 .field("poller", &self.poller)
198 .field("event_type", &format_args!("{}", std::any::type_name::<E>()))
199 .finish()
200 }
201}
202
203impl<E> From<FilterPollerBuilder<Log>> for EventPoller<E> {
204 fn from(poller: FilterPollerBuilder<Log>) -> Self {
205 Self { poller, _phantom: PhantomData }
206 }
207}
208
209impl<E: SolEvent> EventPoller<E> {
210 pub fn into_stream(self) -> impl Stream<Item = alloy_sol_types::Result<(E, Log)>> + Unpin {
214 self.poller
215 .into_stream()
216 .flat_map(futures_util::stream::iter)
217 .map(|log| decode_log(&log).map(|e| (e, log)))
218 }
219}
220
221fn decode_log<E: SolEvent>(log: &Log) -> alloy_sol_types::Result<E> {
222 let log_data: &LogData = log.as_ref();
223
224 E::decode_raw_log(log_data.topics().iter().copied(), &log_data.data)
225}
226
227#[cfg(feature = "pubsub")]
228pub(crate) mod subscription {
229 use super::*;
230 use alloy_pubsub::Subscription;
231
232 pub struct EventSubscription<E> {
236 pub sub: Subscription<Log>,
238 _phantom: PhantomData<E>,
239 }
240
241 impl<E> AsRef<Subscription<Log>> for EventSubscription<E> {
242 #[inline]
243 fn as_ref(&self) -> &Subscription<Log> {
244 &self.sub
245 }
246 }
247
248 impl<E> AsMut<Subscription<Log>> for EventSubscription<E> {
249 #[inline]
250 fn as_mut(&mut self) -> &mut Subscription<Log> {
251 &mut self.sub
252 }
253 }
254
255 impl<E> fmt::Debug for EventSubscription<E> {
256 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
257 f.debug_struct("EventSubscription")
258 .field("sub", &self.sub)
259 .field("event_type", &format_args!("{}", std::any::type_name::<E>()))
260 .finish()
261 }
262 }
263
264 impl<E> From<Subscription<Log>> for EventSubscription<E> {
265 fn from(sub: Subscription<Log>) -> Self {
266 Self { sub, _phantom: PhantomData }
267 }
268 }
269
270 impl<E: SolEvent> EventSubscription<E> {
271 pub fn into_stream(self) -> impl Stream<Item = alloy_sol_types::Result<(E, Log)>> + Unpin {
273 self.sub.into_stream().map(|log| decode_log(&log).map(|e| (e, log)))
274 }
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281 use alloy_network::EthereumWallet;
282 use alloy_primitives::U256;
283 use alloy_signer_local::PrivateKeySigner;
284 use alloy_sol_types::sol;
285
286 sol! {
287 #[sol(rpc, bytecode = "60808060405234601557610147908161001a8239f35b5f80fdfe6080806040526004361015610012575f80fd5b5f3560e01c908163299d8665146100a7575063ffdf4f1b14610032575f80fd5b346100a3575f3660031901126100a357602a7f6d10b8446ff0ac11bb95d154e7b10a73042fb9fc3bca0c92de5397b2fe78496c6040518061009e819060608252600560608301526468656c6c6f60d81b608083015263deadbeef604060a0840193600160208201520152565b0390a2005b5f80fd5b346100a3575f3660031901126100a3577f4e4cd44610926680098f1b54e2bdd1fb952659144c471173bbb9cf966af3a988818061009e602a949060608252600560608301526468656c6c6f60d81b608083015263deadbeef604060a084019360016020820152015256fea26469706673582212202e640cd14a7310d4165f902d2721ef5b4640a08f5ae38e9ae5c315a9f9f4435864736f6c63430008190033")]
289 #[allow(dead_code)]
290 contract MyContract {
291 #[derive(Debug, PartialEq, Eq)]
292 event MyEvent(uint64 indexed, string, bool, bytes32);
293
294 #[derive(Debug, PartialEq, Eq)]
295 event WrongEvent(uint64 indexed, string, bool, bytes32);
296
297 function doEmit() external {
298 emit MyEvent(42, "hello", true, bytes32(uint256(0xdeadbeef)));
299 }
300
301 function doEmitWrongEvent() external {
302 emit WrongEvent(42, "hello", true, bytes32(uint256(0xdeadbeef)));
303 }
304 }
305 }
306
307 #[tokio::test]
308 async fn event_filters() {
309 let _ = tracing_subscriber::fmt::try_init();
310
311 let anvil = alloy_node_bindings::Anvil::new().spawn();
312
313 let pk: PrivateKeySigner =
314 "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80".parse().unwrap();
315 let wallet = EthereumWallet::from(pk);
316 let provider = alloy_provider::ProviderBuilder::new()
317 .wallet(wallet.clone())
318 .connect_http(anvil.endpoint_url());
319
320 let contract = MyContract::deploy(&provider).await.unwrap();
322
323 let event: Event<_, MyContract::MyEvent, _> = Event::new(&provider, Filter::new());
324 let all = event.query().await.unwrap();
325 assert_eq!(all.len(), 0);
326
327 let event = contract.MyEvent_filter();
329
330 let poller = event.watch().await.unwrap();
331
332 let _receipt =
333 contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
334
335 let expected_event = MyContract::MyEvent {
336 _0: 42,
337 _1: "hello".to_string(),
338 _2: true,
339 _3: U256::from(0xdeadbeefu64).into(),
340 };
341
342 let mut stream = poller.into_stream();
343 let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
344 assert_eq!(MyContract::MyEvent::SIGNATURE_HASH.0, stream_log.topics().first().unwrap().0); assert_eq!(stream_event, expected_event);
346 assert_eq!(stream_log.inner.address, *contract.address());
347 assert_eq!(stream_log.block_number, Some(2));
348
349 let all = event.query().await.unwrap();
353 assert_eq!(all.len(), 1);
354 assert_eq!(all[0].0, expected_event);
355 assert_eq!(all[0].1, stream_log);
356
357 let _wrong_receipt = contract
359 .doEmitWrongEvent()
360 .send()
361 .await
362 .unwrap()
363 .get_receipt()
364 .await
365 .expect("no receipt");
366
367 let all = event.query().await.unwrap();
370 assert_eq!(all.len(), 0);
371
372 #[cfg(feature = "pubsub")]
373 {
374 let provider = alloy_provider::ProviderBuilder::new()
375 .wallet(wallet)
376 .connect(&anvil.ws_endpoint())
377 .await
378 .unwrap();
379
380 let contract = MyContract::new(*contract.address(), provider);
381 let event = contract.MyEvent_filter();
382
383 let sub = event.subscribe().await.unwrap();
384
385 contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
386
387 let mut stream = sub.into_stream();
388
389 let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
390 assert_eq!(
391 MyContract::MyEvent::SIGNATURE_HASH.0,
392 stream_log.topics().first().unwrap().0
393 );
394 assert_eq!(stream_event, expected_event);
395 assert_eq!(stream_log.address(), *contract.address());
396 assert_eq!(stream_log.block_number, Some(4));
397
398 contract
400 .doEmitWrongEvent()
401 .send()
402 .await
403 .unwrap()
404 .get_receipt()
405 .await
406 .expect("no receipt");
407
408 let all = event.query().await.unwrap();
411 assert_eq!(all.len(), 0);
412 }
413 }
414
415 #[tokio::test]
417 async fn event_builder_filters() {
418 let _ = tracing_subscriber::fmt::try_init();
419
420 let anvil = alloy_node_bindings::Anvil::new().spawn();
421 let pk: PrivateKeySigner =
422 "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80".parse().unwrap();
423 let wallet = EthereumWallet::from(pk);
424 let provider = alloy_provider::ProviderBuilder::new()
425 .wallet(wallet.clone())
426 .connect_http(anvil.endpoint_url());
427
428 let contract = MyContract::deploy(&provider).await.unwrap();
429
430 let event: Event<_, MyContract::MyEvent, _> = Event::new(&provider, Filter::new())
431 .address(*contract.address())
432 .event_signature(MyContract::MyEvent::SIGNATURE_HASH);
433 let all = event.query().await.unwrap();
434 assert_eq!(all.len(), 0);
435
436 let poller = event.watch().await.unwrap();
437
438 let _receipt =
439 contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
440
441 let expected_event = MyContract::MyEvent {
442 _0: 42,
443 _1: "hello".to_string(),
444 _2: true,
445 _3: U256::from(0xdeadbeefu64).into(),
446 };
447
448 let mut stream = poller.into_stream();
449 let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
450 assert_eq!(MyContract::MyEvent::SIGNATURE_HASH.0, stream_log.topics().first().unwrap().0); assert_eq!(stream_event, expected_event);
452 assert_eq!(stream_log.inner.address, *contract.address());
453 assert_eq!(stream_log.block_number, Some(2));
454
455 let all = event.query().await.unwrap();
459 assert_eq!(all.len(), 1);
460 assert_eq!(all[0].0, expected_event);
461 assert_eq!(all[0].1, stream_log);
462
463 let _wrong_receipt = contract
465 .doEmitWrongEvent()
466 .send()
467 .await
468 .unwrap()
469 .get_receipt()
470 .await
471 .expect("no receipt");
472
473 let all = event.query().await.unwrap();
476 assert_eq!(all.len(), 0);
477
478 #[cfg(feature = "pubsub")]
479 {
480 let provider = alloy_provider::ProviderBuilder::new()
481 .wallet(wallet)
482 .connect(&anvil.ws_endpoint())
483 .await
484 .unwrap();
485
486 let contract = MyContract::new(*contract.address(), &provider);
487 let event: Event<_, MyContract::MyEvent, _> = Event::new(&provider, Filter::new())
488 .address(*contract.address())
489 .event_signature(MyContract::MyEvent::SIGNATURE_HASH);
490
491 let sub = event.subscribe().await.unwrap();
492
493 contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
494
495 let mut stream = sub.into_stream();
496
497 let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
498 assert_eq!(
499 MyContract::MyEvent::SIGNATURE_HASH.0,
500 stream_log.topics().first().unwrap().0
501 );
502 assert_eq!(stream_event, expected_event);
503 assert_eq!(stream_log.address(), *contract.address());
504 assert_eq!(stream_log.block_number, Some(4));
505
506 contract
508 .doEmitWrongEvent()
509 .send()
510 .await
511 .unwrap()
512 .get_receipt()
513 .await
514 .expect("no receipt");
515
516 let all = event.query().await.unwrap();
519 assert_eq!(all.len(), 0);
520 }
521 }
522}