1use std::{fmt::Debug, marker::PhantomData};
2
3use crate::{utils, ProviderCall};
4use alloy_eips::{BlockId, BlockNumberOrTag};
5use alloy_json_rpc::RpcRecv;
6use alloy_network::BlockResponse;
7use alloy_network_primitives::BlockTransactionsKind;
8use alloy_primitives::{Address, BlockHash, B256, B64};
9use alloy_rpc_client::{ClientRef, RpcCall};
10#[cfg(feature = "pubsub")]
11use alloy_rpc_types_eth::pubsub::SubscriptionKind;
12use alloy_transport::{TransportError, TransportResult};
13use either::Either;
14use futures::{Stream, StreamExt};
15use serde_json::Value;
16use std::time::Duration;
17
18use super::FilterPollerBuilder;
19
20#[derive(Clone, Debug, Default)]
24pub struct EthGetBlockParams {
25 block: BlockId,
26 kind: BlockTransactionsKind,
27}
28
29impl serde::Serialize for EthGetBlockParams {
30 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
31 where
32 S: serde::Serializer,
33 {
34 use serde::ser::SerializeTuple;
35
36 let mut tuple = serializer.serialize_tuple(2)?;
37 match self.block {
38 BlockId::Hash(hash) => tuple.serialize_element(&hash.block_hash)?,
39 BlockId::Number(number) => tuple.serialize_element(&number)?,
40 }
41 if self.kind.is_hashes() {
42 tuple.serialize_element(&false)?;
43 } else {
44 tuple.serialize_element(&true)?
45 };
46 tuple.end()
47 }
48}
49
50impl EthGetBlockParams {
51 pub const fn new(block: BlockId, kind: BlockTransactionsKind) -> Self {
53 Self { block, kind }
54 }
55}
56
57#[must_use = "EthGetBlockBy must be awaited to execute the request"]
62pub struct EthGetBlock<BlockResp>
64where
65 BlockResp: alloy_network::BlockResponse + RpcRecv,
66{
67 inner: GetBlockInner<BlockResp>,
68 block: BlockId,
69 kind: BlockTransactionsKind,
70 _pd: std::marker::PhantomData<BlockResp>,
71}
72
73impl<BlockResp> EthGetBlock<BlockResp>
74where
75 BlockResp: alloy_network::BlockResponse + RpcRecv,
76{
77 pub fn by_hash(hash: BlockHash, client: ClientRef<'_>) -> Self {
80 let params = EthGetBlockParams::default();
81 let call = client.request("eth_getBlockByHash", params);
82 Self::new_rpc(hash.into(), call)
83 }
84
85 pub fn by_number(number: BlockNumberOrTag, client: ClientRef<'_>) -> Self {
88 let params = EthGetBlockParams::default();
89
90 if number.is_pending() {
91 return Self::new_pending_rpc(client.request("eth_getBlockByNumber", params));
92 }
93
94 Self::new_rpc(number.into(), client.request("eth_getBlockByNumber", params))
95 }
96}
97
98impl<BlockResp> EthGetBlock<BlockResp>
99where
100 BlockResp: alloy_network::BlockResponse + RpcRecv,
101{
102 pub fn new_rpc(block: BlockId, inner: RpcCall<EthGetBlockParams, Option<BlockResp>>) -> Self {
104 Self {
105 block,
106 inner: GetBlockInner::RpcCall(inner),
107 kind: BlockTransactionsKind::Hashes,
108 _pd: PhantomData,
109 }
110 }
111
112 pub fn new_pending_rpc(inner: RpcCall<EthGetBlockParams, Value>) -> Self {
114 Self {
115 block: BlockId::pending(),
116 inner: GetBlockInner::PendingBlock(inner),
117 kind: BlockTransactionsKind::Hashes,
118 _pd: PhantomData,
119 }
120 }
121
122 pub fn new_provider(block: BlockId, producer: ProviderCallProducer<BlockResp>) -> Self {
124 Self {
125 block,
126 inner: GetBlockInner::ProviderCall(producer),
127 kind: BlockTransactionsKind::Hashes,
128 _pd: PhantomData,
129 }
130 }
131
132 pub const fn kind(mut self, kind: BlockTransactionsKind) -> Self {
134 self.kind = kind;
135 self
136 }
137
138 pub const fn full(mut self) -> Self {
140 self.kind = BlockTransactionsKind::Full;
141 self
142 }
143
144 pub const fn hashes(mut self) -> Self {
146 self.kind = BlockTransactionsKind::Hashes;
147 self
148 }
149}
150
151impl<BlockResp> std::future::IntoFuture for EthGetBlock<BlockResp>
152where
153 BlockResp: alloy_network::BlockResponse + RpcRecv,
154{
155 type Output = TransportResult<Option<BlockResp>>;
156
157 type IntoFuture = ProviderCall<EthGetBlockParams, Option<BlockResp>>;
158
159 fn into_future(self) -> Self::IntoFuture {
160 match self.inner {
161 GetBlockInner::RpcCall(call) => {
162 let rpc_call =
163 call.map_params(|_params| EthGetBlockParams::new(self.block, self.kind));
164
165 let fut = async move {
166 let resp = rpc_call.await?;
167 let result =
168 if self.kind.is_hashes() { utils::convert_to_hashes(resp) } else { resp };
169 Ok(result)
170 };
171
172 ProviderCall::BoxedFuture(Box::pin(fut))
173 }
174 GetBlockInner::PendingBlock(call) => {
175 let rpc_call =
176 call.map_params(|_params| EthGetBlockParams::new(self.block, self.kind));
177
178 let map_fut = async move {
179 let mut block = rpc_call.await?;
180
181 if block.is_null() {
182 return Ok(None);
183 }
184
185 tracing::trace!(pending_block = ?block.to_string());
188 if block.get("hash").is_none_or(|v| v.is_null()) {
189 block["hash"] = Value::String(format!("{}", B256::ZERO));
190 }
191
192 if block.get("nonce").is_none_or(|v| v.is_null()) {
193 block["nonce"] = Value::String(format!("{}", B64::ZERO));
194 }
195
196 if block.get("miner").is_none_or(|v| v.is_null())
197 || block.get("beneficiary").is_none_or(|v| v.is_null())
198 {
199 block["miner"] = Value::String(format!("{}", Address::ZERO));
200 }
201
202 let block = serde_json::from_value(block.clone())
203 .map_err(|e| TransportError::deser_err(e, block.to_string()))?;
204
205 let block = if self.kind.is_hashes() {
206 utils::convert_to_hashes(Some(block))
207 } else {
208 Some(block)
209 };
210
211 Ok(block)
212 };
213
214 ProviderCall::BoxedFuture(Box::pin(map_fut))
215 }
216 GetBlockInner::ProviderCall(producer) => producer(self.kind),
217 }
218 }
219}
220
221impl<BlockResp> core::fmt::Debug for EthGetBlock<BlockResp>
222where
223 BlockResp: BlockResponse + RpcRecv,
224{
225 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
226 f.debug_struct("EthGetBlock").field("block", &self.block).field("kind", &self.kind).finish()
227 }
228}
229
230type ProviderCallProducer<BlockResp> =
231 Box<dyn Fn(BlockTransactionsKind) -> ProviderCall<EthGetBlockParams, Option<BlockResp>> + Send>;
232
233enum GetBlockInner<BlockResp>
234where
235 BlockResp: BlockResponse + RpcRecv,
236{
237 RpcCall(RpcCall<EthGetBlockParams, Option<BlockResp>>),
239 PendingBlock(RpcCall<EthGetBlockParams, Value>),
249 ProviderCall(ProviderCallProducer<BlockResp>),
251}
252
253impl<BlockResp> core::fmt::Debug for GetBlockInner<BlockResp>
254where
255 BlockResp: BlockResponse + RpcRecv,
256{
257 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258 match self {
259 Self::RpcCall(call) => f.debug_tuple("RpcCall").field(call).finish(),
260 Self::PendingBlock(call) => f.debug_tuple("PendingBlockCall").field(call).finish(),
261 Self::ProviderCall(_) => f.debug_struct("ProviderCall").finish(),
262 }
263 }
264}
265
266#[derive(Debug)]
275#[must_use = "this builder does nothing unless you call `.into_stream`"]
276pub struct WatchBlocks<BlockResp> {
277 poller: FilterPollerBuilder<B256>,
284 kind: BlockTransactionsKind,
291 _pd: std::marker::PhantomData<BlockResp>,
292}
293
294impl<BlockResp> WatchBlocks<BlockResp>
295where
296 BlockResp: BlockResponse + RpcRecv,
297{
298 pub(crate) const fn new(poller: FilterPollerBuilder<B256>) -> Self {
300 Self { poller, kind: BlockTransactionsKind::Hashes, _pd: PhantomData }
301 }
302
303 pub const fn full(mut self) -> Self {
305 self.kind = BlockTransactionsKind::Full;
306 self
307 }
308
309 pub const fn hashes(mut self) -> Self {
311 self.kind = BlockTransactionsKind::Hashes;
312 self
313 }
314
315 pub fn set_channel_size(&mut self, channel_size: usize) {
317 self.poller.set_channel_size(channel_size);
318 }
319
320 pub fn set_limit(&mut self, limit: Option<usize>) {
322 self.poller.set_limit(limit);
323 }
324
325 pub fn set_poll_interval(&mut self, poll_interval: Duration) {
327 self.poller.set_poll_interval(poll_interval);
328 }
329
330 pub fn into_stream(self) -> impl Stream<Item = TransportResult<BlockResp>> + Unpin {
333 let client = self.poller.client();
334 let kind = self.kind;
335 let stream = self
336 .poller
337 .into_stream()
338 .then(move |hashes| utils::hashes_to_blocks(hashes, client.clone(), kind.into()))
339 .flat_map(|res| {
340 futures::stream::iter(match res {
341 Ok(blocks) => {
342 Either::Left(blocks.into_iter().filter_map(|block| block.map(Ok)))
344 }
345 Err(err) => Either::Right(std::iter::once(Err(err))),
346 })
347 });
348 Box::pin(stream)
349 }
350}
351
352#[derive(Debug)]
358#[must_use = "this does nothing unless you call `.into_stream`"]
359#[cfg(feature = "pubsub")]
360pub struct SubFullBlocks<N: alloy_network::Network> {
361 sub: super::GetSubscription<(SubscriptionKind,), N::HeaderResponse>,
362 client: alloy_rpc_client::WeakClient,
363 kind: BlockTransactionsKind,
364}
365
366#[cfg(feature = "pubsub")]
367impl<N: alloy_network::Network> SubFullBlocks<N> {
368 pub const fn new(
373 sub: super::GetSubscription<(SubscriptionKind,), N::HeaderResponse>,
374 client: alloy_rpc_client::WeakClient,
375 ) -> Self {
376 Self { sub, client, kind: BlockTransactionsKind::Hashes }
377 }
378
379 pub const fn full(mut self) -> Self {
381 self.kind = BlockTransactionsKind::Full;
382 self
383 }
384
385 pub const fn hashes(mut self) -> Self {
387 self.kind = BlockTransactionsKind::Hashes;
388 self
389 }
390
391 pub fn channel_size(mut self, size: usize) -> Self {
393 self.sub = self.sub.channel_size(size);
394 self
395 }
396
397 pub async fn into_stream(
399 self,
400 ) -> TransportResult<impl Stream<Item = TransportResult<N::BlockResponse>> + Unpin> {
401 use alloy_network_primitives::HeaderResponse;
402 use futures::StreamExt;
403
404 let sub = self.sub.await?;
405
406 let stream = sub
407 .into_stream()
408 .then(move |resp| {
409 let hash = resp.hash();
410 let kind = self.kind;
411 let client_weak = self.client.clone();
412
413 async move {
414 let client = client_weak
415 .upgrade()
416 .ok_or(TransportError::local_usage_str("Client dropped"))?;
417
418 let call = client.request("eth_getBlockByHash", (hash, kind.is_full()));
419 let resp = call.await?;
420
421 if kind.is_hashes() {
422 Ok(utils::convert_to_hashes(resp))
423 } else {
424 Ok(resp)
425 }
426 }
427 })
428 .filter_map(|result| futures::future::ready(result.transpose()));
429
430 #[cfg(not(target_family = "wasm"))]
431 {
432 Ok(stream.boxed())
433 }
434
435 #[cfg(target_family = "wasm")]
436 {
437 Ok(stream.boxed_local())
438 }
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445 use crate::{Provider, ProviderBuilder};
446
447 #[tokio::test]
449 async fn test_pending_block_deser() {
450 let provider =
451 ProviderBuilder::new().connect_http("https://binance.llamarpc.com".parse().unwrap());
452
453 let res = provider.get_block_by_number(BlockNumberOrTag::Pending).full().await;
454 if let Err(err) = &res {
455 if err.to_string().contains("no response") {
456 eprintln!("skipping flaky response: {err:?}");
458 return;
459 }
460 }
461 let _block = res.unwrap();
462 }
463}