linera_rpc/grpc/
client.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::BTreeSet,
6    fmt,
7    future::Future,
8    iter,
9    sync::{
10        atomic::{AtomicU32, Ordering},
11        Arc,
12    },
13};
14
15use futures::{future, stream, StreamExt};
16use linera_base::{
17    crypto::CryptoHash,
18    data_types::{BlobContent, BlockHeight, NetworkDescription},
19    ensure,
20    identifiers::{BlobId, ChainId, EventId},
21    time::{Duration, Instant},
22};
23use linera_chain::{
24    data_types::{self},
25    types::{
26        self, Certificate, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27        LiteCertificate, Timeout, ValidatedBlock,
28    },
29};
30#[cfg(with_metrics)]
31mod metrics {
32    use std::sync::LazyLock;
33
34    use linera_base::prometheus_util::register_int_counter_vec;
35    use prometheus::IntCounterVec;
36
37    pub static VALIDATOR_SUBSCRIPTION_ERRORS: LazyLock<IntCounterVec> = LazyLock::new(|| {
38        register_int_counter_vec(
39            "validator_subscription_errors",
40            "Number of notification subscription stream errors per validator",
41            &["address"],
42        )
43    });
44}
45
46use linera_core::{
47    data_types::{CertificatesByHeightRequest, ChainInfoResponse},
48    node::{BlobStream, CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
49    worker::Notification,
50};
51use linera_version::VersionInfo;
52use tonic::{Code, IntoRequest, Request, Status};
53use tracing::{debug, instrument, trace, Level};
54
55use super::{
56    api::{self, validator_node_client::ValidatorNodeClient, SubscriptionRequest},
57    transport, GRPC_MAX_MESSAGE_SIZE,
58};
59#[cfg(feature = "opentelemetry")]
60use crate::propagation::{get_context_with_traffic_type, inject_context};
61use crate::{
62    grpc::api::RawCertificate, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
63    HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
64};
65
66#[derive(Clone)]
67pub struct GrpcClient {
68    address: String,
69    client: ValidatorNodeClient<transport::Channel>,
70    retry_delay: Duration,
71    max_retries: u32,
72    max_backoff: Duration,
73    /// Shared across all `GrpcClient` instances created by the same `GrpcNodeProvider`.
74    /// Tracks when each validator address last had a subscription failure, so that
75    /// other chains don't independently retry the same dead validator.
76    subscription_cooldowns: papaya::HashMap<String, Instant>,
77}
78
79impl GrpcClient {
80    pub fn new(
81        address: String,
82        channel: transport::Channel,
83        retry_delay: Duration,
84        max_retries: u32,
85        max_backoff: Duration,
86        subscription_cooldowns: papaya::HashMap<String, Instant>,
87    ) -> Self {
88        let client = ValidatorNodeClient::new(channel)
89            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
90            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
91        Self {
92            address,
93            client,
94            retry_delay,
95            max_retries,
96            max_backoff,
97            subscription_cooldowns,
98        }
99    }
100
101    pub fn address(&self) -> &str {
102        &self.address
103    }
104
105    /// Returns whether this gRPC status means the server stream should be reconnected to, or not.
106    /// Logs a warning on unexpected status codes.
107    fn is_retryable(status: &Status) -> bool {
108        match status.code() {
109            Code::DeadlineExceeded | Code::Aborted | Code::Unavailable | Code::Unknown => {
110                trace!("gRPC request interrupted: {status:?}; retrying");
111                true
112            }
113            Code::Ok | Code::Cancelled | Code::ResourceExhausted => {
114                trace!("Unexpected gRPC status: {status:?}; retrying");
115                true
116            }
117            Code::Internal if status.message().contains("h2 protocol error") => {
118                // HTTP/2 connection reset errors are transient network issues, not real
119                // internal errors. This happens when the server restarts and the
120                // connection is forcibly closed.
121                trace!("gRPC connection reset: {status:?}; retrying");
122                true
123            }
124            Code::Internal if status.message().contains("502 Bad Gateway") => {
125                // When a proxy/ingress returns HTTP 502 (e.g. during rolling restarts),
126                // tonic's frame decoder fails on the non-gRPC response body before the
127                // HTTP-to-gRPC status mapping can run, producing Code::Internal instead
128                // of Code::Unavailable. Per the gRPC spec, HTTP 502 maps to UNAVAILABLE
129                // which is retryable. This works around tonic#2365.
130                trace!("gRPC proxy error (502): {status:?}; retrying");
131                true
132            }
133            Code::NotFound => false, // This code is used if e.g. the validator is missing blobs.
134            Code::InvalidArgument
135            | Code::AlreadyExists
136            | Code::PermissionDenied
137            | Code::FailedPrecondition
138            | Code::OutOfRange
139            | Code::Unimplemented
140            | Code::Internal
141            | Code::DataLoss
142            | Code::Unauthenticated => {
143                trace!("Unexpected gRPC status: {status:?}");
144                false
145            }
146        }
147    }
148
149    async fn delegate<F, Fut, R, S>(
150        &self,
151        f: F,
152        request: impl TryInto<R> + fmt::Debug + Clone,
153        handler: &str,
154    ) -> Result<S, NodeError>
155    where
156        F: Fn(ValidatorNodeClient<transport::Channel>, Request<R>) -> Fut,
157        Fut: Future<Output = Result<tonic::Response<S>, Status>>,
158        R: IntoRequest<R> + Clone,
159    {
160        let mut retry_count = 0;
161        let request_inner = request.try_into().map_err(|_| NodeError::GrpcError {
162            error: "could not convert request to proto".to_string(),
163        })?;
164        loop {
165            #[allow(unused_mut)]
166            let mut request = Request::new(request_inner.clone());
167            // Inject OpenTelemetry context (trace context + baggage) into gRPC metadata.
168            // This uses get_context_with_traffic_type() to also check the LINERA_TRAFFIC_TYPE
169            // environment variable, allowing benchmark tools to mark their traffic as synthetic.
170            #[cfg(feature = "opentelemetry")]
171            inject_context(&get_context_with_traffic_type(), request.metadata_mut());
172            match f(self.client.clone(), request).await {
173                Err(s) if Self::is_retryable(&s) && retry_count < self.max_retries => {
174                    let delay = crate::jittered_backoff_delay(
175                        self.retry_delay,
176                        retry_count,
177                        self.max_backoff,
178                    );
179                    retry_count += 1;
180                    linera_base::time::timer::sleep(delay).await;
181                    continue;
182                }
183                Err(s) => {
184                    return Err(NodeError::GrpcError {
185                        error: format!("remote request [{handler}] failed with status: {s:?}"),
186                    });
187                }
188                Ok(result) => return Ok(result.into_inner()),
189            };
190        }
191    }
192
193    fn try_into_chain_info(
194        result: api::ChainInfoResult,
195    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
196        let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
197            error: "missing body from response".to_string(),
198        })?;
199        match inner {
200            api::chain_info_result::Inner::ChainInfoResponse(response) => {
201                Ok(response.try_into().map_err(|err| NodeError::GrpcError {
202                    error: format!("failed to unmarshal response: {err}"),
203                })?)
204            }
205            api::chain_info_result::Inner::Error(error) => Err(bincode::deserialize(&error)
206                .map_err(|err| NodeError::GrpcError {
207                    error: format!("failed to unmarshal error message: {err}"),
208                })?),
209        }
210    }
211}
212
213impl TryFrom<api::PendingBlobResult> for BlobContent {
214    type Error = NodeError;
215
216    fn try_from(result: api::PendingBlobResult) -> Result<Self, Self::Error> {
217        let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
218            error: "missing body from response".to_string(),
219        })?;
220        match inner {
221            api::pending_blob_result::Inner::Blob(blob) => {
222                Ok(blob.try_into().map_err(|err| NodeError::GrpcError {
223                    error: format!("failed to unmarshal response: {err}"),
224                })?)
225            }
226            api::pending_blob_result::Inner::Error(error) => Err(bincode::deserialize(&error)
227                .map_err(|err| NodeError::GrpcError {
228                    error: format!("failed to unmarshal error message: {err}"),
229                })?),
230        }
231    }
232}
233
234macro_rules! client_delegate {
235    ($self:ident, $handler:ident, $req:ident) => {{
236        debug!(
237            handler = stringify!($handler),
238            request = ?$req,
239            "sending gRPC request"
240        );
241        $self
242            .delegate(
243                |mut client, req| async move { client.$handler(req).await },
244                $req,
245                stringify!($handler),
246            )
247            .await
248    }};
249}
250
251impl ValidatorNode for GrpcClient {
252    type NotificationStream = NotificationStream;
253
254    fn address(&self) -> String {
255        self.address.clone()
256    }
257
258    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
259    async fn handle_block_proposal(
260        &self,
261        proposal: data_types::BlockProposal,
262    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
263        GrpcClient::try_into_chain_info(client_delegate!(self, handle_block_proposal, proposal)?)
264    }
265
266    #[instrument(target = "grpc_client", skip_all, fields(address = self.address))]
267    async fn handle_lite_certificate(
268        &self,
269        certificate: types::LiteCertificate<'_>,
270        delivery: CrossChainMessageDelivery,
271    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
272        let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
273        let request = HandleLiteCertRequest {
274            certificate,
275            wait_for_outgoing_messages,
276        };
277        GrpcClient::try_into_chain_info(client_delegate!(self, handle_lite_certificate, request)?)
278    }
279
280    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
281    async fn handle_confirmed_certificate(
282        &self,
283        certificate: Arc<GenericCertificate<ConfirmedBlock>>,
284        delivery: CrossChainMessageDelivery,
285    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
286        let wait_for_outgoing_messages: bool = delivery.wait_for_outgoing_messages();
287        let request = HandleConfirmedCertificateRequest {
288            certificate: Arc::unwrap_or_clone(certificate),
289            wait_for_outgoing_messages,
290        };
291        GrpcClient::try_into_chain_info(client_delegate!(
292            self,
293            handle_confirmed_certificate,
294            request
295        )?)
296    }
297
298    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
299    async fn handle_validated_certificate(
300        &self,
301        certificate: GenericCertificate<ValidatedBlock>,
302    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
303        let request = HandleValidatedCertificateRequest { certificate };
304        GrpcClient::try_into_chain_info(client_delegate!(
305            self,
306            handle_validated_certificate,
307            request
308        )?)
309    }
310
311    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
312    async fn handle_timeout_certificate(
313        &self,
314        certificate: GenericCertificate<Timeout>,
315    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
316        let request = HandleTimeoutCertificateRequest { certificate };
317        GrpcClient::try_into_chain_info(client_delegate!(
318            self,
319            handle_timeout_certificate,
320            request
321        )?)
322    }
323
324    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
325    async fn handle_chain_info_query(
326        &self,
327        query: linera_core::data_types::ChainInfoQuery,
328    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
329        GrpcClient::try_into_chain_info(client_delegate!(self, handle_chain_info_query, query)?)
330    }
331
332    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
333    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError> {
334        let retry_delay = self.retry_delay;
335        let max_retries = self.max_retries;
336        let max_backoff = self.max_backoff;
337        let address = self.address.clone();
338        let subscription_cooldowns = self.subscription_cooldowns.clone();
339
340        // Fast-fail if another subscription to this address recently failed.
341        // Prevents N chains from independently retrying the same dead validator.
342        {
343            let pinned = subscription_cooldowns.pin();
344            if let Some(&last_failure) = pinned.get(&address) {
345                if last_failure.elapsed() < max_backoff {
346                    return Err(NodeError::SubscriptionFailed {
347                        status: format!(
348                            "validator {address} on cooldown after recent subscription failure"
349                        ),
350                    });
351                }
352            }
353        }
354
355        // Use shared atomic counter so unfold can reset it on successful reconnection.
356        let retry_count = Arc::new(AtomicU32::new(0));
357        let subscription_request = SubscriptionRequest {
358            chain_ids: chains.into_iter().map(|chain| chain.into()).collect(),
359        };
360        let mut client = self.client.clone();
361
362        // Make the first connection attempt before returning from this method.
363        let mut stream = Some(
364            client
365                .subscribe(subscription_request.clone())
366                .await
367                .map_err(|status| {
368                    subscription_cooldowns
369                        .pin()
370                        .insert(address.clone(), Instant::now());
371                    NodeError::SubscriptionFailed {
372                        status: status.to_string(),
373                    }
374                })?
375                .into_inner(),
376        );
377
378        // A stream of `Result<grpc::Notification, tonic::Status>` that keeps calling
379        // `client.subscribe(request)` endlessly and without delay.
380        let retry_count_for_unfold = retry_count.clone();
381        let cooldowns_for_unfold = subscription_cooldowns.clone();
382        let address_for_unfold = address.clone();
383        let endlessly_retrying_notification_stream = stream::unfold((), move |()| {
384            let mut client = client.clone();
385            let subscription_request = subscription_request.clone();
386            let mut stream = stream.take();
387            let retry_count = retry_count_for_unfold.clone();
388            let cooldowns = cooldowns_for_unfold.clone();
389            let cooldown_address = address_for_unfold.clone();
390            async move {
391                let stream = if let Some(stream) = stream.take() {
392                    future::Either::Right(stream)
393                } else {
394                    match client.subscribe(subscription_request.clone()).await {
395                        Err(err) => future::Either::Left(stream::iter(iter::once(Err(err)))),
396                        Ok(response) => {
397                            // Reset retry count on successful reconnection.
398                            retry_count.store(0, Ordering::Relaxed);
399                            cooldowns.pin().remove(&cooldown_address);
400                            trace!("Successfully reconnected subscription stream");
401                            future::Either::Right(response.into_inner())
402                        }
403                    }
404                };
405                Some((stream, ()))
406            }
407        })
408        .flatten();
409
410        let span = tracing::info_span!("notification stream");
411        #[cfg(with_metrics)]
412        let address_for_metrics = self.address.clone();
413        let cooldowns_for_take_while = subscription_cooldowns;
414        let address_for_take_while = self.address.clone();
415        // The stream of `Notification`s that inserts increasing delays after retriable errors, and
416        // terminates after unexpected or fatal errors.
417        let notification_stream = endlessly_retrying_notification_stream
418            .map(|result| {
419                Option::<Notification>::try_from(result?).map_err(|err| {
420                    let message = format!("Could not deserialize notification: {err}");
421                    tonic::Status::new(Code::Internal, message)
422                })
423            })
424            .take_while(move |result| {
425                let Err(status) = result else {
426                    retry_count.store(0, Ordering::Relaxed);
427                    return future::Either::Left(future::ready(true));
428                };
429
430                #[cfg(with_metrics)]
431                metrics::VALIDATOR_SUBSCRIPTION_ERRORS
432                    .with_label_values(&[&address_for_metrics])
433                    .inc();
434
435                let current_retry_count = retry_count.load(Ordering::Relaxed);
436                if !span.in_scope(|| Self::is_retryable(status))
437                    || current_retry_count >= max_retries
438                {
439                    cooldowns_for_take_while
440                        .pin()
441                        .insert(address_for_take_while.clone(), Instant::now());
442                    return future::Either::Left(future::ready(false));
443                }
444                let delay =
445                    crate::jittered_backoff_delay(retry_delay, current_retry_count, max_backoff);
446                retry_count.fetch_add(1, Ordering::Relaxed);
447                future::Either::Right(async move {
448                    linera_base::time::timer::sleep(delay).await;
449                    true
450                })
451            })
452            .filter_map(move |result| {
453                future::ready(match result {
454                    Ok(notification @ Some(_)) => notification,
455                    Ok(None) => None,
456                    Err(err) => {
457                        debug!(%address, "{}", err);
458                        None
459                    }
460                })
461            });
462
463        Ok(Box::pin(notification_stream))
464    }
465
466    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
467    async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
468        let req = ();
469        Ok(client_delegate!(self, get_version_info, req)?.into())
470    }
471
472    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
473    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
474        let req = ();
475        Ok(client_delegate!(self, get_network_description, req)?.try_into()?)
476    }
477
478    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
479    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
480        Ok(client_delegate!(self, upload_blob, content)?.try_into()?)
481    }
482
483    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
484    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
485        Ok(client_delegate!(self, download_blob, blob_id)?.try_into()?)
486    }
487
488    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
489    async fn download_blobs(&self, blob_ids: Vec<BlobId>) -> Result<BlobStream, NodeError> {
490        debug!(
491            handler = "download_blobs",
492            num_blobs = blob_ids.len(),
493            "sending gRPC request"
494        );
495        let request = api::BlobIds::try_from(blob_ids)?;
496        let stream = self
497            .client
498            .clone()
499            .download_blobs(request)
500            .await
501            .map_err(|status| NodeError::GrpcError {
502                error: status.to_string(),
503            })?
504            .into_inner();
505        let blob_stream = stream.map(|result| match result {
506            Ok(proto_blob) => BlobContent::try_from(proto_blob).map_err(NodeError::from),
507            Err(status) => Err(NodeError::GrpcError {
508                error: status.to_string(),
509            }),
510        });
511        Ok(Box::pin(blob_stream))
512    }
513
514    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
515    async fn download_pending_blob(
516        &self,
517        chain_id: ChainId,
518        blob_id: BlobId,
519    ) -> Result<BlobContent, NodeError> {
520        let req = (chain_id, blob_id);
521        client_delegate!(self, download_pending_blob, req)?.try_into()
522    }
523
524    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
525    async fn handle_pending_blob(
526        &self,
527        chain_id: ChainId,
528        blob: BlobContent,
529    ) -> Result<ChainInfoResponse, NodeError> {
530        let req = (chain_id, blob);
531        GrpcClient::try_into_chain_info(client_delegate!(self, handle_pending_blob, req)?)
532    }
533
534    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
535    async fn download_certificate(
536        &self,
537        hash: CryptoHash,
538    ) -> Result<ConfirmedBlockCertificate, NodeError> {
539        ConfirmedBlockCertificate::try_from(Certificate::try_from(client_delegate!(
540            self,
541            download_certificate,
542            hash
543        )?)?)
544        .map_err(|_| NodeError::UnexpectedCertificateValue)
545    }
546
547    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
548    async fn download_certificates(
549        &self,
550        hashes: Vec<CryptoHash>,
551    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
552        let mut missing_hashes = hashes;
553        let mut certs_collected = Vec::with_capacity(missing_hashes.len());
554        while !missing_hashes.is_empty() {
555            // Macro doesn't compile if we pass `missing_hashes.clone()` directly to `client_delegate!`.
556            let missing = missing_hashes.clone();
557            let mut received: Vec<_> = Vec::<Certificate>::try_from(client_delegate!(
558                self,
559                download_certificates,
560                missing
561            )?)?
562            .into_iter()
563            .map(|cert| {
564                ConfirmedBlockCertificate::try_from(cert)
565                    .map_err(|_| NodeError::UnexpectedCertificateValue)
566            })
567            .collect::<Result<_, _>>()?;
568
569            // In the case of the server not returning any certificates, we break the loop.
570            if received.is_empty() {
571                break;
572            }
573
574            // Honest validator should return certificates in the same order as the requested hashes.
575            missing_hashes = missing_hashes[received.len()..].to_vec();
576            certs_collected.append(&mut received);
577        }
578        ensure!(
579            missing_hashes.is_empty(),
580            NodeError::MissingCertificates(missing_hashes)
581        );
582        Ok(certs_collected)
583    }
584
585    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
586    async fn download_certificates_by_heights(
587        &self,
588        chain_id: ChainId,
589        heights: Vec<BlockHeight>,
590    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
591        let mut missing = heights.into_iter().collect::<BTreeSet<_>>();
592        let mut certs_collected = vec![];
593        while !missing.is_empty() {
594            let request = CertificatesByHeightRequest {
595                chain_id,
596                heights: missing.iter().copied().collect(),
597            };
598            let mut received: Vec<_> =
599                client_delegate!(self, download_raw_certificates_by_heights, request)?
600                    .certificates
601                    .into_iter()
602                    .map(
603                        |RawCertificate {
604                             lite_certificate,
605                             confirmed_block,
606                         }| {
607                            let cert = bcs::from_bytes::<LiteCertificate>(&lite_certificate)
608                                .map_err(|_| NodeError::UnexpectedCertificateValue)?;
609
610                            let block = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block)
611                                .map_err(|_| NodeError::UnexpectedCertificateValue)?;
612
613                            cert.with_value(block)
614                                .ok_or(NodeError::UnexpectedCertificateValue)
615                        },
616                    )
617                    .collect::<Result<_, _>>()?;
618
619            if received.is_empty() {
620                break;
621            }
622
623            // Remove only the heights we actually received from missing set.
624            for cert in &received {
625                missing.remove(&cert.inner().height());
626            }
627            certs_collected.append(&mut received);
628        }
629        certs_collected.sort_by_key(|cert| cert.inner().height());
630        Ok(certs_collected)
631    }
632
633    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
634    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
635        Ok(client_delegate!(self, blob_last_used_by, blob_id)?.try_into()?)
636    }
637
638    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
639    async fn blob_last_used_by_certificate(
640        &self,
641        blob_id: BlobId,
642    ) -> Result<ConfirmedBlockCertificate, NodeError> {
643        Ok(client_delegate!(self, blob_last_used_by_certificate, blob_id)?.try_into()?)
644    }
645
646    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
647    async fn event_block_heights(
648        &self,
649        event_ids: Vec<EventId>,
650    ) -> Result<Vec<Option<BlockHeight>>, NodeError> {
651        let request = api::EventBlockHeightsRequest::from(event_ids);
652        Ok(client_delegate!(self, event_block_heights, request)?.try_into()?)
653    }
654
655    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
656    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
657        Ok(client_delegate!(self, missing_blob_ids, blob_ids)?.try_into()?)
658    }
659
660    #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
661    async fn get_shard_info(
662        &self,
663        chain_id: ChainId,
664    ) -> Result<linera_core::data_types::ShardInfo, NodeError> {
665        let response = client_delegate!(self, get_shard_info, chain_id)?;
666        Ok(linera_core::data_types::ShardInfo {
667            shard_id: response.shard_id as usize,
668            total_shards: response.total_shards as usize,
669        })
670    }
671}