Skip to main content

linera_rpc/grpc/
client.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::BTreeSet,
6    fmt,
7    future::Future,
8    iter,
9    sync::{
10        atomic::{AtomicU32, Ordering},
11        Arc,
12    },
13};
14
15use futures::{future, stream, StreamExt};
16use linera_base::{
17    crypto::CryptoHash,
18    data_types::{BlobContent, BlockHeight, NetworkDescription},
19    ensure,
20    identifiers::{BlobId, ChainId, EventId},
21    time::{Duration, Instant},
22};
23use linera_chain::{
24    data_types::{self},
25    types::{
26        self, Certificate, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27        LiteCertificate, Timeout, ValidatedBlock,
28    },
29};
30#[cfg(with_metrics)]
31mod metrics {
32    use std::sync::LazyLock;
33
34    use linera_base::prometheus_util::register_int_counter_vec;
35    use prometheus::IntCounterVec;
36
37    pub static VALIDATOR_SUBSCRIPTION_ERRORS: LazyLock<IntCounterVec> = LazyLock::new(|| {
38        register_int_counter_vec(
39            "validator_subscription_errors",
40            "Number of notification subscription stream errors per validator",
41            &["address"],
42        )
43    });
44}
45
46use linera_core::{
47    data_types::{CertificatesByHeightRequest, ChainInfoResponse},
48    node::{BlobStream, CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
49    worker::Notification,
50};
51use linera_storage::Arc as CacheArc;
52use linera_version::VersionInfo;
53use tonic::{Code, IntoRequest, Request, Status};
54use tracing::{debug, instrument, trace, Level};
55
56use super::{
57    api::{self, validator_node_client::ValidatorNodeClient, SubscriptionRequest},
58    transport, GRPC_MAX_MESSAGE_SIZE,
59};
60#[cfg(feature = "opentelemetry")]
61use crate::propagation::{get_context_with_traffic_type, inject_context};
62use crate::{
63    grpc::api::RawCertificate, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
64    HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
65};
66
67#[derive(Clone)]
68pub struct GrpcClient {
69    address: String,
70    client: ValidatorNodeClient<transport::Channel>,
71    retry_delay: Duration,
72    max_retries: u32,
73    max_backoff: Duration,
74    /// Shared across all `GrpcClient` instances created by the same `GrpcNodeProvider`.
75    /// Tracks when each validator address last had a subscription failure, so that
76    /// other chains don't independently retry the same dead validator.
77    subscription_cooldowns: Arc<papaya::HashMap<String, Instant>>,
78}
79
80impl GrpcClient {
81    pub fn new(
82        address: String,
83        channel: transport::Channel,
84        retry_delay: Duration,
85        max_retries: u32,
86        max_backoff: Duration,
87        subscription_cooldowns: Arc<papaya::HashMap<String, Instant>>,
88    ) -> Self {
89        let client = ValidatorNodeClient::new(channel)
90            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
91            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
92        Self {
93            address,
94            client,
95            retry_delay,
96            max_retries,
97            max_backoff,
98            subscription_cooldowns,
99        }
100    }
101
102    pub fn address(&self) -> &str {
103        &self.address
104    }
105
106    /// Returns whether this gRPC status means the server stream should be reconnected to, or not.
107    /// Logs a warning on unexpected status codes.
108    fn is_retryable(status: &Status) -> bool {
109        match status.code() {
110            Code::DeadlineExceeded | Code::Aborted | Code::Unavailable | Code::Unknown => {
111                trace!("gRPC request interrupted: {status:?}; retrying");
112                true
113            }
114            Code::Ok | Code::Cancelled | Code::ResourceExhausted => {
115                trace!("Unexpected gRPC status: {status:?}; retrying");
116                true
117            }
118            Code::Internal if status.message().contains("h2 protocol error") => {
119                // HTTP/2 connection reset errors are transient network issues, not real
120                // internal errors. This happens when the server restarts and the
121                // connection is forcibly closed.
122                trace!("gRPC connection reset: {status:?}; retrying");
123                true
124            }
125            Code::Internal if status.message().contains("502 Bad Gateway") => {
126                // When a proxy/ingress returns HTTP 502 (e.g. during rolling restarts),
127                // tonic's frame decoder fails on the non-gRPC response body before the
128                // HTTP-to-gRPC status mapping can run, producing Code::Internal instead
129                // of Code::Unavailable. Per the gRPC spec, HTTP 502 maps to UNAVAILABLE
130                // which is retryable. This works around tonic#2365.
131                trace!("gRPC proxy error (502): {status:?}; retrying");
132                true
133            }
134            Code::NotFound => false, // This code is used if e.g. the validator is missing blobs.
135            Code::InvalidArgument
136            | Code::AlreadyExists
137            | Code::PermissionDenied
138            | Code::FailedPrecondition
139            | Code::OutOfRange
140            | Code::Unimplemented
141            | Code::Internal
142            | Code::DataLoss
143            | Code::Unauthenticated => {
144                trace!("Unexpected gRPC status: {status:?}");
145                false
146            }
147        }
148    }
149
150    async fn delegate<F, Fut, R, S>(
151        &self,
152        f: F,
153        request: impl TryInto<R> + fmt::Debug + Clone,
154        handler: &str,
155    ) -> Result<S, NodeError>
156    where
157        F: Fn(ValidatorNodeClient<transport::Channel>, Request<R>) -> Fut,
158        Fut: Future<Output = Result<tonic::Response<S>, Status>>,
159        R: IntoRequest<R> + Clone,
160    {
161        let mut retry_count = 0;
162        let request_inner = request.try_into().map_err(|_| NodeError::GrpcError {
163            error: "could not convert request to proto".to_string(),
164        })?;
165        loop {
166            #[allow(unused_mut)]
167            let mut request = Request::new(request_inner.clone());
168            // Inject OpenTelemetry context (trace context + baggage) into gRPC metadata.
169            // This uses get_context_with_traffic_type() to also check the LINERA_TRAFFIC_TYPE
170            // environment variable, allowing benchmark tools to mark their traffic as synthetic.
171            #[cfg(feature = "opentelemetry")]
172            inject_context(&get_context_with_traffic_type(), request.metadata_mut());
173            match f(self.client.clone(), request).await {
174                Err(s) if Self::is_retryable(&s) && retry_count < self.max_retries => {
175                    let delay = crate::jittered_backoff_delay(
176                        self.retry_delay,
177                        retry_count,
178                        self.max_backoff,
179                    );
180                    retry_count += 1;
181                    linera_base::time::timer::sleep(delay).await;
182                    continue;
183                }
184                Err(s) => {
185                    return Err(NodeError::GrpcError {
186                        error: format!("remote request [{handler}] failed with status: {s:?}"),
187                    });
188                }
189                Ok(result) => return Ok(result.into_inner()),
190            };
191        }
192    }
193
194    fn try_into_chain_info(
195        result: api::ChainInfoResult,
196    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
197        let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
198            error: "missing body from response".to_string(),
199        })?;
200        match inner {
201            api::chain_info_result::Inner::ChainInfoResponse(response) => {
202                Ok(response.try_into().map_err(|err| NodeError::GrpcError {
203                    error: format!("failed to unmarshal response: {err}"),
204                })?)
205            }
206            api::chain_info_result::Inner::Error(error) => Err(bincode::deserialize(&error)
207                .map_err(|err| NodeError::GrpcError {
208                    error: format!("failed to unmarshal error message: {err}"),
209                })?),
210        }
211    }
212}
213
214impl TryFrom<api::PendingBlobResult> for BlobContent {
215    type Error = NodeError;
216
217    fn try_from(result: api::PendingBlobResult) -> Result<Self, Self::Error> {
218        let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
219            error: "missing body from response".to_string(),
220        })?;
221        match inner {
222            api::pending_blob_result::Inner::Blob(blob) => {
223                Ok(blob.try_into().map_err(|err| NodeError::GrpcError {
224                    error: format!("failed to unmarshal response: {err}"),
225                })?)
226            }
227            api::pending_blob_result::Inner::Error(error) => Err(bincode::deserialize(&error)
228                .map_err(|err| NodeError::GrpcError {
229                    error: format!("failed to unmarshal error message: {err}"),
230                })?),
231        }
232    }
233}
234
235macro_rules! client_delegate {
236    ($self:ident, $handler:ident, $req:ident) => {{
237        debug!(
238            handler = stringify!($handler),
239            request = ?$req,
240            "sending gRPC request"
241        );
242        $self
243            .delegate(
244                |mut client, req| async move { client.$handler(req).await },
245                $req,
246                stringify!($handler),
247            )
248            .await
249    }};
250}
251
252impl ValidatorNode for GrpcClient {
253    type NotificationStream = NotificationStream;
254
255    fn address(&self) -> String {
256        self.address.clone()
257    }
258
259    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
260    async fn handle_block_proposal(
261        &self,
262        proposal: data_types::BlockProposal,
263    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
264        GrpcClient::try_into_chain_info(client_delegate!(self, handle_block_proposal, proposal)?)
265    }
266
267    #[instrument(target = "grpc_client", skip_all, fields(address = self.address))]
268    async fn handle_lite_certificate(
269        &self,
270        certificate: types::LiteCertificate<'_>,
271        delivery: CrossChainMessageDelivery,
272    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
273        let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
274        let request = HandleLiteCertRequest {
275            certificate,
276            wait_for_outgoing_messages,
277        };
278        GrpcClient::try_into_chain_info(client_delegate!(self, handle_lite_certificate, request)?)
279    }
280
281    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
282    async fn handle_confirmed_certificate(
283        &self,
284        certificate: CacheArc<GenericCertificate<ConfirmedBlock>>,
285        delivery: CrossChainMessageDelivery,
286    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
287        let wait_for_outgoing_messages: bool = delivery.wait_for_outgoing_messages();
288        let request = HandleConfirmedCertificateRequest {
289            certificate: CacheArc::unwrap_or_clone(certificate),
290            wait_for_outgoing_messages,
291        };
292        GrpcClient::try_into_chain_info(client_delegate!(
293            self,
294            handle_confirmed_certificate,
295            request
296        )?)
297    }
298
299    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
300    async fn handle_validated_certificate(
301        &self,
302        certificate: GenericCertificate<ValidatedBlock>,
303    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
304        let request = HandleValidatedCertificateRequest { certificate };
305        GrpcClient::try_into_chain_info(client_delegate!(
306            self,
307            handle_validated_certificate,
308            request
309        )?)
310    }
311
312    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
313    async fn handle_timeout_certificate(
314        &self,
315        certificate: GenericCertificate<Timeout>,
316    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
317        let request = HandleTimeoutCertificateRequest { certificate };
318        GrpcClient::try_into_chain_info(client_delegate!(
319            self,
320            handle_timeout_certificate,
321            request
322        )?)
323    }
324
325    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
326    async fn handle_chain_info_query(
327        &self,
328        query: linera_core::data_types::ChainInfoQuery,
329    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
330        GrpcClient::try_into_chain_info(client_delegate!(self, handle_chain_info_query, query)?)
331    }
332
333    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
334    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError> {
335        let retry_delay = self.retry_delay;
336        let max_retries = self.max_retries;
337        let max_backoff = self.max_backoff;
338        let address = self.address.clone();
339        let subscription_cooldowns = self.subscription_cooldowns.clone();
340
341        // Fast-fail if another subscription to this address recently failed.
342        // Prevents N chains from independently retrying the same dead validator.
343        {
344            let pinned = subscription_cooldowns.pin();
345            if let Some(&last_failure) = pinned.get(&address) {
346                if last_failure.elapsed() < max_backoff {
347                    return Err(NodeError::SubscriptionFailed {
348                        status: format!(
349                            "validator {address} on cooldown after recent subscription failure"
350                        ),
351                    });
352                }
353            }
354        }
355
356        // Use shared atomic counter so unfold can reset it on successful reconnection.
357        let retry_count = Arc::new(AtomicU32::new(0));
358        let subscription_request = SubscriptionRequest {
359            chain_ids: chains.into_iter().map(|chain| chain.into()).collect(),
360        };
361        let mut client = self.client.clone();
362
363        // Make the first connection attempt before returning from this method.
364        let mut stream = Some(
365            client
366                .subscribe(subscription_request.clone())
367                .await
368                .map_err(|status| {
369                    subscription_cooldowns
370                        .pin()
371                        .insert(address.clone(), Instant::now());
372                    NodeError::SubscriptionFailed {
373                        status: status.to_string(),
374                    }
375                })?
376                .into_inner(),
377        );
378
379        // A stream of `Result<grpc::Notification, tonic::Status>` that keeps calling
380        // `client.subscribe(request)` endlessly and without delay.
381        let retry_count_for_unfold = retry_count.clone();
382        let cooldowns_for_unfold = subscription_cooldowns.clone();
383        let address_for_unfold = address.clone();
384        let endlessly_retrying_notification_stream = stream::unfold((), move |()| {
385            let mut client = client.clone();
386            let subscription_request = subscription_request.clone();
387            let mut stream = stream.take();
388            let retry_count = retry_count_for_unfold.clone();
389            let cooldowns = cooldowns_for_unfold.clone();
390            let cooldown_address = address_for_unfold.clone();
391            async move {
392                let stream = if let Some(stream) = stream.take() {
393                    future::Either::Right(stream)
394                } else {
395                    match client.subscribe(subscription_request.clone()).await {
396                        Err(err) => future::Either::Left(stream::iter(iter::once(Err(err)))),
397                        Ok(response) => {
398                            // Reset retry count on successful reconnection.
399                            retry_count.store(0, Ordering::Relaxed);
400                            cooldowns.pin().remove(&cooldown_address);
401                            trace!("Successfully reconnected subscription stream");
402                            future::Either::Right(response.into_inner())
403                        }
404                    }
405                };
406                Some((stream, ()))
407            }
408        })
409        .flatten();
410
411        let span = tracing::info_span!("notification stream");
412        #[cfg(with_metrics)]
413        let address_for_metrics = self.address.clone();
414        let cooldowns_for_take_while = subscription_cooldowns;
415        let address_for_take_while = self.address.clone();
416        // The stream of `Notification`s that inserts increasing delays after retriable errors, and
417        // terminates after unexpected or fatal errors.
418        let notification_stream = endlessly_retrying_notification_stream
419            .map(|result| {
420                Option::<Notification>::try_from(result?).map_err(|err| {
421                    let message = format!("Could not deserialize notification: {err}");
422                    tonic::Status::new(Code::Internal, message)
423                })
424            })
425            .take_while(move |result| {
426                let Err(status) = result else {
427                    retry_count.store(0, Ordering::Relaxed);
428                    return future::Either::Left(future::ready(true));
429                };
430
431                #[cfg(with_metrics)]
432                metrics::VALIDATOR_SUBSCRIPTION_ERRORS
433                    .with_label_values(&[&address_for_metrics])
434                    .inc();
435
436                let current_retry_count = retry_count.load(Ordering::Relaxed);
437                if !span.in_scope(|| Self::is_retryable(status))
438                    || current_retry_count >= max_retries
439                {
440                    cooldowns_for_take_while
441                        .pin()
442                        .insert(address_for_take_while.clone(), Instant::now());
443                    return future::Either::Left(future::ready(false));
444                }
445                let delay =
446                    crate::jittered_backoff_delay(retry_delay, current_retry_count, max_backoff);
447                retry_count.fetch_add(1, Ordering::Relaxed);
448                future::Either::Right(async move {
449                    linera_base::time::timer::sleep(delay).await;
450                    true
451                })
452            })
453            .filter_map(move |result| {
454                future::ready(match result {
455                    Ok(notification @ Some(_)) => notification,
456                    Ok(None) => None,
457                    Err(err) => {
458                        debug!(%address, "{}", err);
459                        None
460                    }
461                })
462            });
463
464        Ok(Box::pin(notification_stream))
465    }
466
467    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
468    async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
469        let req = ();
470        Ok(client_delegate!(self, get_version_info, req)?.into())
471    }
472
473    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
474    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
475        let req = ();
476        Ok(client_delegate!(self, get_network_description, req)?.try_into()?)
477    }
478
479    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
480    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
481        Ok(client_delegate!(self, upload_blob, content)?.try_into()?)
482    }
483
484    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
485    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
486        Ok(client_delegate!(self, download_blob, blob_id)?.try_into()?)
487    }
488
489    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
490    async fn download_blobs(&self, blob_ids: Vec<BlobId>) -> Result<BlobStream, NodeError> {
491        debug!(
492            handler = "download_blobs",
493            num_blobs = blob_ids.len(),
494            "sending gRPC request"
495        );
496        let request = api::BlobIds::try_from(blob_ids)?;
497        let stream = self
498            .client
499            .clone()
500            .download_blobs(request)
501            .await
502            .map_err(|status| NodeError::GrpcError {
503                error: status.to_string(),
504            })?
505            .into_inner();
506        let blob_stream = stream.map(|result| match result {
507            Ok(proto_blob) => BlobContent::try_from(proto_blob).map_err(NodeError::from),
508            Err(status) => Err(NodeError::GrpcError {
509                error: status.to_string(),
510            }),
511        });
512        Ok(Box::pin(blob_stream))
513    }
514
515    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
516    async fn download_pending_blob(
517        &self,
518        chain_id: ChainId,
519        blob_id: BlobId,
520    ) -> Result<BlobContent, NodeError> {
521        let req = (chain_id, blob_id);
522        client_delegate!(self, download_pending_blob, req)?.try_into()
523    }
524
525    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
526    async fn handle_pending_blob(
527        &self,
528        chain_id: ChainId,
529        blob: BlobContent,
530    ) -> Result<ChainInfoResponse, NodeError> {
531        let req = (chain_id, blob);
532        GrpcClient::try_into_chain_info(client_delegate!(self, handle_pending_blob, req)?)
533    }
534
535    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
536    async fn download_certificate(
537        &self,
538        hash: CryptoHash,
539    ) -> Result<ConfirmedBlockCertificate, NodeError> {
540        ConfirmedBlockCertificate::try_from(Certificate::try_from(client_delegate!(
541            self,
542            download_certificate,
543            hash
544        )?)?)
545        .map_err(|_| NodeError::UnexpectedCertificateValue)
546    }
547
548    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
549    async fn download_certificates(
550        &self,
551        hashes: Vec<CryptoHash>,
552    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
553        let mut missing_hashes = hashes;
554        let mut certs_collected = Vec::with_capacity(missing_hashes.len());
555        while !missing_hashes.is_empty() {
556            // Macro doesn't compile if we pass `missing_hashes.clone()` directly to `client_delegate!`.
557            let missing = missing_hashes.clone();
558            let mut received: Vec<_> = Vec::<Certificate>::try_from(client_delegate!(
559                self,
560                download_certificates,
561                missing
562            )?)?
563            .into_iter()
564            .map(|cert| {
565                ConfirmedBlockCertificate::try_from(cert)
566                    .map_err(|_| NodeError::UnexpectedCertificateValue)
567            })
568            .collect::<Result<_, _>>()?;
569
570            // In the case of the server not returning any certificates, we break the loop.
571            if received.is_empty() {
572                break;
573            }
574
575            // Honest validator should return certificates in the same order as the requested hashes.
576            missing_hashes = missing_hashes[received.len()..].to_vec();
577            certs_collected.append(&mut received);
578        }
579        ensure!(
580            missing_hashes.is_empty(),
581            NodeError::MissingCertificates(missing_hashes)
582        );
583        Ok(certs_collected)
584    }
585
586    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
587    async fn download_certificates_by_heights(
588        &self,
589        chain_id: ChainId,
590        heights: Vec<BlockHeight>,
591    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
592        let mut missing = heights.into_iter().collect::<BTreeSet<_>>();
593        let mut certs_collected = vec![];
594        while !missing.is_empty() {
595            let request = CertificatesByHeightRequest {
596                chain_id,
597                heights: missing.iter().copied().collect(),
598            };
599            let mut received: Vec<_> =
600                client_delegate!(self, download_raw_certificates_by_heights, request)?
601                    .certificates
602                    .into_iter()
603                    .map(
604                        |RawCertificate {
605                             lite_certificate,
606                             confirmed_block,
607                         }| {
608                            let cert = bcs::from_bytes::<LiteCertificate>(&lite_certificate)
609                                .map_err(|_| NodeError::UnexpectedCertificateValue)?;
610
611                            let block = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block)
612                                .map_err(|_| NodeError::UnexpectedCertificateValue)?;
613
614                            cert.with_value(block)
615                                .ok_or(NodeError::UnexpectedCertificateValue)
616                        },
617                    )
618                    .collect::<Result<_, _>>()?;
619
620            if received.is_empty() {
621                break;
622            }
623
624            // Remove only the heights we actually received from missing set.
625            for cert in &received {
626                missing.remove(&cert.inner().height());
627            }
628            certs_collected.append(&mut received);
629        }
630        certs_collected.sort_by_key(|cert| cert.inner().height());
631        Ok(certs_collected)
632    }
633
634    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
635    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
636        Ok(client_delegate!(self, blob_last_used_by, blob_id)?.try_into()?)
637    }
638
639    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
640    async fn blob_last_used_by_certificate(
641        &self,
642        blob_id: BlobId,
643    ) -> Result<ConfirmedBlockCertificate, NodeError> {
644        Ok(client_delegate!(self, blob_last_used_by_certificate, blob_id)?.try_into()?)
645    }
646
647    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
648    async fn event_block_heights(
649        &self,
650        event_ids: Vec<EventId>,
651    ) -> Result<Vec<Option<BlockHeight>>, NodeError> {
652        let request = api::EventBlockHeightsRequest::from(event_ids);
653        Ok(client_delegate!(self, event_block_heights, request)?.try_into()?)
654    }
655
656    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
657    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
658        Ok(client_delegate!(self, missing_blob_ids, blob_ids)?.try_into()?)
659    }
660
661    #[expect(
662        clippy::cast_possible_truncation,
663        reason = "shard counts are bounded by validator config and fit in usize on supported targets"
664    )]
665    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
666    async fn get_shard_info(
667        &self,
668        chain_id: ChainId,
669    ) -> Result<linera_core::data_types::ShardInfo, NodeError> {
670        let response = client_delegate!(self, get_shard_info, chain_id)?;
671        Ok(linera_core::data_types::ShardInfo {
672            shard_id: response.shard_id as usize,
673            total_shards: response.total_shards as usize,
674        })
675    }
676}