alloy_provider/provider/
get_block.rs

1use std::{fmt::Debug, marker::PhantomData};
2
3use crate::{utils, ProviderCall};
4use alloy_eips::{BlockId, BlockNumberOrTag};
5use alloy_json_rpc::RpcRecv;
6use alloy_network::BlockResponse;
7use alloy_network_primitives::BlockTransactionsKind;
8use alloy_primitives::{Address, BlockHash, B256, B64};
9use alloy_rpc_client::{ClientRef, RpcCall};
10#[cfg(feature = "pubsub")]
11use alloy_rpc_types_eth::pubsub::SubscriptionKind;
12use alloy_transport::{TransportError, TransportResult};
13use either::Either;
14use futures::{Stream, StreamExt};
15use serde_json::Value;
16use std::time::Duration;
17
18use super::FilterPollerBuilder;
19
20/// The parameters for an `eth_getBlockBy{Hash, Number}` RPC request.
21///
22/// Default is "latest" block with transaction hashes.
23#[derive(Clone, Debug, Default)]
24pub struct EthGetBlockParams {
25    block: BlockId,
26    kind: BlockTransactionsKind,
27}
28
29impl serde::Serialize for EthGetBlockParams {
30    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
31    where
32        S: serde::Serializer,
33    {
34        use serde::ser::SerializeTuple;
35
36        let mut tuple = serializer.serialize_tuple(2)?;
37        match self.block {
38            BlockId::Hash(hash) => tuple.serialize_element(&hash.block_hash)?,
39            BlockId::Number(number) => tuple.serialize_element(&number)?,
40        }
41        if self.kind.is_hashes() {
42            tuple.serialize_element(&false)?;
43        } else {
44            tuple.serialize_element(&true)?
45        };
46        tuple.end()
47    }
48}
49
50impl EthGetBlockParams {
51    /// Instantiate [`EthGetBlockParams`] with the given block and kind.
52    pub const fn new(block: BlockId, kind: BlockTransactionsKind) -> Self {
53        Self { block, kind }
54    }
55}
56
57/// A builder for an `"eth_getBlockByHash"` request. This type is returned by the
58/// [`Provider::call`] method.
59///
60/// [`Provider::call`]: crate::Provider::call
61#[must_use = "EthGetBlockBy must be awaited to execute the request"]
62//#[derive(Clone, Debug)]
63pub struct EthGetBlock<BlockResp>
64where
65    BlockResp: alloy_network::BlockResponse + RpcRecv,
66{
67    inner: GetBlockInner<BlockResp>,
68    block: BlockId,
69    kind: BlockTransactionsKind,
70    _pd: std::marker::PhantomData<BlockResp>,
71}
72
73impl<BlockResp> EthGetBlock<BlockResp>
74where
75    BlockResp: alloy_network::BlockResponse + RpcRecv,
76{
77    /// Create a new [`EthGetBlock`] request to get the block by hash i.e call
78    /// `"eth_getBlockByHash"`.
79    pub fn by_hash(hash: BlockHash, client: ClientRef<'_>) -> Self {
80        let params = EthGetBlockParams::default();
81        let call = client.request("eth_getBlockByHash", params);
82        Self::new_rpc(hash.into(), call)
83    }
84
85    /// Create a new [`EthGetBlock`] request to get the block by number i.e call
86    /// `"eth_getBlockByNumber"`.
87    pub fn by_number(number: BlockNumberOrTag, client: ClientRef<'_>) -> Self {
88        let params = EthGetBlockParams::default();
89
90        if number.is_pending() {
91            return Self::new_pending_rpc(client.request("eth_getBlockByNumber", params));
92        }
93
94        Self::new_rpc(number.into(), client.request("eth_getBlockByNumber", params))
95    }
96}
97
98impl<BlockResp> EthGetBlock<BlockResp>
99where
100    BlockResp: alloy_network::BlockResponse + RpcRecv,
101{
102    /// Create a new [`EthGetBlock`] request with the given [`RpcCall`].
103    pub fn new_rpc(block: BlockId, inner: RpcCall<EthGetBlockParams, Option<BlockResp>>) -> Self {
104        Self {
105            block,
106            inner: GetBlockInner::RpcCall(inner),
107            kind: BlockTransactionsKind::Hashes,
108            _pd: PhantomData,
109        }
110    }
111
112    /// Create a new [`EthGetBlock`] request with the given [`RpcCall`] for pending block.
113    pub fn new_pending_rpc(inner: RpcCall<EthGetBlockParams, Value>) -> Self {
114        Self {
115            block: BlockId::pending(),
116            inner: GetBlockInner::PendingBlock(inner),
117            kind: BlockTransactionsKind::Hashes,
118            _pd: PhantomData,
119        }
120    }
121
122    /// Create a new [`EthGetBlock`] request with a closure that returns a [`ProviderCall`].
123    pub fn new_provider(block: BlockId, producer: ProviderCallProducer<BlockResp>) -> Self {
124        Self {
125            block,
126            inner: GetBlockInner::ProviderCall(producer),
127            kind: BlockTransactionsKind::Hashes,
128            _pd: PhantomData,
129        }
130    }
131
132    /// Set the [`BlockTransactionsKind`] for the request.
133    pub const fn kind(mut self, kind: BlockTransactionsKind) -> Self {
134        self.kind = kind;
135        self
136    }
137
138    /// Set the [`BlockTransactionsKind`] to [`BlockTransactionsKind::Full`].
139    pub const fn full(mut self) -> Self {
140        self.kind = BlockTransactionsKind::Full;
141        self
142    }
143
144    /// Set the [`BlockTransactionsKind`] to [`BlockTransactionsKind::Hashes`].
145    pub const fn hashes(mut self) -> Self {
146        self.kind = BlockTransactionsKind::Hashes;
147        self
148    }
149}
150
151impl<BlockResp> std::future::IntoFuture for EthGetBlock<BlockResp>
152where
153    BlockResp: alloy_network::BlockResponse + RpcRecv,
154{
155    type Output = TransportResult<Option<BlockResp>>;
156
157    type IntoFuture = ProviderCall<EthGetBlockParams, Option<BlockResp>>;
158
159    fn into_future(self) -> Self::IntoFuture {
160        match self.inner {
161            GetBlockInner::RpcCall(call) => {
162                let rpc_call =
163                    call.map_params(|_params| EthGetBlockParams::new(self.block, self.kind));
164
165                let fut = async move {
166                    let resp = rpc_call.await?;
167                    let result =
168                        if self.kind.is_hashes() { utils::convert_to_hashes(resp) } else { resp };
169                    Ok(result)
170                };
171
172                ProviderCall::BoxedFuture(Box::pin(fut))
173            }
174            GetBlockInner::PendingBlock(call) => {
175                let rpc_call =
176                    call.map_params(|_params| EthGetBlockParams::new(self.block, self.kind));
177
178                let map_fut = async move {
179                    let mut block = rpc_call.await?;
180
181                    if block.is_null() {
182                        return Ok(None);
183                    }
184
185                    // Ref: <https://github.com/alloy-rs/alloy/issues/2117>
186                    // Geth ref: <https://github.com/ethereum/go-ethereum/blob/ebff2f42c0fbb4ebee43b0e73e39b658305a8a9b/internal/ethapi/api.go#L470-L471>
187                    tracing::trace!(pending_block = ?block.to_string());
188                    if block.get("hash").is_none_or(|v| v.is_null()) {
189                        block["hash"] = Value::String(format!("{}", B256::ZERO));
190                    }
191
192                    if block.get("nonce").is_none_or(|v| v.is_null()) {
193                        block["nonce"] = Value::String(format!("{}", B64::ZERO));
194                    }
195
196                    if block.get("miner").is_none_or(|v| v.is_null())
197                        || block.get("beneficiary").is_none_or(|v| v.is_null())
198                    {
199                        block["miner"] = Value::String(format!("{}", Address::ZERO));
200                    }
201
202                    let block = serde_json::from_value(block.clone())
203                        .map_err(|e| TransportError::deser_err(e, block.to_string()))?;
204
205                    let block = if self.kind.is_hashes() {
206                        utils::convert_to_hashes(Some(block))
207                    } else {
208                        Some(block)
209                    };
210
211                    Ok(block)
212                };
213
214                ProviderCall::BoxedFuture(Box::pin(map_fut))
215            }
216            GetBlockInner::ProviderCall(producer) => producer(self.kind),
217        }
218    }
219}
220
221impl<BlockResp> core::fmt::Debug for EthGetBlock<BlockResp>
222where
223    BlockResp: BlockResponse + RpcRecv,
224{
225    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
226        f.debug_struct("EthGetBlock").field("block", &self.block).field("kind", &self.kind).finish()
227    }
228}
229
230type ProviderCallProducer<BlockResp> =
231    Box<dyn Fn(BlockTransactionsKind) -> ProviderCall<EthGetBlockParams, Option<BlockResp>> + Send>;
232
233enum GetBlockInner<BlockResp>
234where
235    BlockResp: BlockResponse + RpcRecv,
236{
237    /// [`RpcCall`] with params that get wrapped into [`EthGetBlockParams`] in the future.
238    RpcCall(RpcCall<EthGetBlockParams, Option<BlockResp>>),
239    /// Pending Block Call
240    ///
241    /// This has been made explicit to handle cases where fields such as `hash`, `nonce`, `miner`
242    /// are either missing or set to null causing deserilization issues. See: <https://github.com/alloy-rs/alloy/issues/2117>
243    ///
244    /// This is specifically true in case of the response is returned from a geth node. See: <https://github.com/ethereum/go-ethereum/blob/ebff2f42c0fbb4ebee43b0e73e39b658305a8a9b/internal/ethapi/api.go#L470-L471>
245    ///
246    /// In such case, we first deserialize to [`Value`] and then check if the fields are missing or
247    /// set to null. If so, we set them to default values.  
248    PendingBlock(RpcCall<EthGetBlockParams, Value>),
249    /// Closure that produces a [`ProviderCall`] given [`BlockTransactionsKind`].
250    ProviderCall(ProviderCallProducer<BlockResp>),
251}
252
253impl<BlockResp> core::fmt::Debug for GetBlockInner<BlockResp>
254where
255    BlockResp: BlockResponse + RpcRecv,
256{
257    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258        match self {
259            Self::RpcCall(call) => f.debug_tuple("RpcCall").field(call).finish(),
260            Self::PendingBlock(call) => f.debug_tuple("PendingBlockCall").field(call).finish(),
261            Self::ProviderCall(_) => f.debug_struct("ProviderCall").finish(),
262        }
263    }
264}
265
266/// A builder type for polling new blocks using the [`FilterPollerBuilder`].
267///
268/// By default, this polls for blocks with [`BlockTransactionsKind::Hashes`].
269///
270/// [`WatchBlocks::full`] should be used to poll for blocks with
271/// [`BlockTransactionsKind::Full`].
272///
273/// The polling stream must be consumed by calling [`WatchBlocks::into_stream`].
274#[derive(Debug)]
275#[must_use = "this builder does nothing unless you call `.into_stream`"]
276pub struct WatchBlocks<BlockResp> {
277    /// [`PollerBuilder`] for polling new block hashes.
278    ///
279    /// On every poll it returns an array of block hashes [`Vec<B256>`] as `eth_getFilterChanges`
280    /// returns an array of logs. See <https://docs.alchemy.com/reference/eth-getfilterchanges-1>.
281    ///
282    /// [`PollerBuilder`]: alloy_rpc_client::PollerBuilder
283    poller: FilterPollerBuilder<B256>,
284    /// The [`BlockTransactionsKind`] to retrieve on each poll.
285    ///
286    /// Default is [`BlockTransactionsKind::Hashes`].
287    ///
288    /// [`WatchBlocks::full`] should be used to poll for blocks with
289    /// [`BlockTransactionsKind::Full`].
290    kind: BlockTransactionsKind,
291    _pd: std::marker::PhantomData<BlockResp>,
292}
293
294impl<BlockResp> WatchBlocks<BlockResp>
295where
296    BlockResp: BlockResponse + RpcRecv,
297{
298    /// Create a new [`WatchBlocks`] instance.
299    pub(crate) const fn new(poller: FilterPollerBuilder<B256>) -> Self {
300        Self { poller, kind: BlockTransactionsKind::Hashes, _pd: PhantomData }
301    }
302
303    /// Poll for blocks with full transactions i.e [`BlockTransactionsKind::Full`].
304    pub const fn full(mut self) -> Self {
305        self.kind = BlockTransactionsKind::Full;
306        self
307    }
308
309    /// Poll for blocks with just transactions hashes i.e [`BlockTransactionsKind::Hashes`].
310    pub const fn hashes(mut self) -> Self {
311        self.kind = BlockTransactionsKind::Hashes;
312        self
313    }
314
315    /// Sets the channel size for the poller task.
316    pub fn set_channel_size(&mut self, channel_size: usize) {
317        self.poller.set_channel_size(channel_size);
318    }
319
320    /// Sets a limit on the number of successful polls.
321    pub fn set_limit(&mut self, limit: Option<usize>) {
322        self.poller.set_limit(limit);
323    }
324
325    /// Sets the duration between polls.
326    pub fn set_poll_interval(&mut self, poll_interval: Duration) {
327        self.poller.set_poll_interval(poll_interval);
328    }
329
330    /// Consumes the stream of block hashes from the inner [`FilterPollerBuilder`] and maps it to a
331    /// stream of [`BlockResponse`].
332    pub fn into_stream(self) -> impl Stream<Item = TransportResult<BlockResp>> + Unpin {
333        let client = self.poller.client();
334        let kind = self.kind;
335        let stream = self
336            .poller
337            .into_stream()
338            .then(move |hashes| utils::hashes_to_blocks(hashes, client.clone(), kind.into()))
339            .flat_map(|res| {
340                futures::stream::iter(match res {
341                    Ok(blocks) => {
342                        // Ignore `None` responses.
343                        Either::Left(blocks.into_iter().filter_map(|block| block.map(Ok)))
344                    }
345                    Err(err) => Either::Right(std::iter::once(Err(err))),
346                })
347            });
348        Box::pin(stream)
349    }
350}
351
352/// A builder type for subscribing to full blocks i.e [`alloy_network_primitives::BlockResponse`],
353/// and not just [`alloy_network_primitives::HeaderResponse`].
354///
355/// By default this subscribes to block with tx hashes only. Use [`SubFullBlocks::full`] to
356/// subscribe to blocks with full transactions.
357#[derive(Debug)]
358#[must_use = "this does nothing unless you call `.into_stream`"]
359#[cfg(feature = "pubsub")]
360pub struct SubFullBlocks<N: alloy_network::Network> {
361    sub: super::GetSubscription<(SubscriptionKind,), N::HeaderResponse>,
362    client: alloy_rpc_client::WeakClient,
363    kind: BlockTransactionsKind,
364}
365
366#[cfg(feature = "pubsub")]
367impl<N: alloy_network::Network> SubFullBlocks<N> {
368    /// Create a new [`SubFullBlocks`] subscription with the given [`super::GetSubscription`].
369    ///
370    /// By default, this subscribes to block with tx hashes only. Use [`SubFullBlocks::full`] to
371    /// subscribe to blocks with full transactions.
372    pub const fn new(
373        sub: super::GetSubscription<(SubscriptionKind,), N::HeaderResponse>,
374        client: alloy_rpc_client::WeakClient,
375    ) -> Self {
376        Self { sub, client, kind: BlockTransactionsKind::Hashes }
377    }
378
379    /// Subscribe to blocks with full transactions.
380    pub const fn full(mut self) -> Self {
381        self.kind = BlockTransactionsKind::Full;
382        self
383    }
384
385    /// Subscribe to blocks with transaction hashes only.
386    pub const fn hashes(mut self) -> Self {
387        self.kind = BlockTransactionsKind::Hashes;
388        self
389    }
390
391    /// Set the channel size
392    pub fn channel_size(mut self, size: usize) -> Self {
393        self.sub = self.sub.channel_size(size);
394        self
395    }
396
397    /// Subscribe to the inner stream of headers and map them to block responses.
398    pub async fn into_stream(
399        self,
400    ) -> TransportResult<impl Stream<Item = TransportResult<N::BlockResponse>> + Unpin> {
401        use alloy_network_primitives::HeaderResponse;
402        use futures::StreamExt;
403
404        let sub = self.sub.await?;
405
406        let stream = sub
407            .into_stream()
408            .then(move |resp| {
409                let hash = resp.hash();
410                let kind = self.kind;
411                let client_weak = self.client.clone();
412
413                async move {
414                    let client = client_weak
415                        .upgrade()
416                        .ok_or(TransportError::local_usage_str("Client dropped"))?;
417
418                    let call = client.request("eth_getBlockByHash", (hash, kind.is_full()));
419                    let resp = call.await?;
420
421                    if kind.is_hashes() {
422                        Ok(utils::convert_to_hashes(resp))
423                    } else {
424                        Ok(resp)
425                    }
426                }
427            })
428            .filter_map(|result| futures::future::ready(result.transpose()));
429
430        #[cfg(not(target_family = "wasm"))]
431        {
432            Ok(stream.boxed())
433        }
434
435        #[cfg(target_family = "wasm")]
436        {
437            Ok(stream.boxed_local())
438        }
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445    use crate::{Provider, ProviderBuilder};
446
447    // <https://github.com/alloy-rs/alloy/issues/2117>
448    #[tokio::test]
449    async fn test_pending_block_deser() {
450        let provider =
451            ProviderBuilder::new().connect_http("https://binance.llamarpc.com".parse().unwrap());
452
453        let res = provider.get_block_by_number(BlockNumberOrTag::Pending).full().await;
454        if let Err(err) = &res {
455            if err.to_string().contains("no response") {
456                // response can be flaky
457                eprintln!("skipping flaky response: {err:?}");
458                return;
459            }
460        }
461        let _block = res.unwrap();
462    }
463}