linera_rpc/grpc/
client.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::BTreeSet,
6    fmt,
7    future::Future,
8    iter,
9    sync::{
10        atomic::{AtomicU32, Ordering},
11        Arc,
12    },
13};
14
15use futures::{future, stream, StreamExt};
16use linera_base::{
17    crypto::CryptoHash,
18    data_types::{BlobContent, BlockHeight, NetworkDescription},
19    ensure,
20    identifiers::{BlobId, ChainId},
21    time::Duration,
22};
23use linera_chain::{
24    data_types::{self},
25    types::{
26        self, Certificate, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27        LiteCertificate, Timeout, ValidatedBlock,
28    },
29};
30use linera_core::{
31    data_types::{CertificatesByHeightRequest, ChainInfoResponse},
32    node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
33    worker::Notification,
34};
35use linera_version::VersionInfo;
36use tonic::{Code, IntoRequest, Request, Status};
37use tracing::{debug, instrument, trace, warn, Level};
38
39use super::{
40    api::{self, validator_node_client::ValidatorNodeClient, SubscriptionRequest},
41    transport, GRPC_MAX_MESSAGE_SIZE,
42};
43#[cfg(feature = "opentelemetry")]
44use crate::propagation::{get_context_with_traffic_type, inject_context};
45use crate::{
46    full_jitter_delay, grpc::api::RawCertificate, HandleConfirmedCertificateRequest,
47    HandleLiteCertRequest, HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
48};
49
50#[derive(Clone)]
51pub struct GrpcClient {
52    address: String,
53    client: ValidatorNodeClient<transport::Channel>,
54    retry_delay: Duration,
55    max_retries: u32,
56    max_backoff: Duration,
57}
58
59impl GrpcClient {
60    pub fn new(
61        address: String,
62        channel: transport::Channel,
63        retry_delay: Duration,
64        max_retries: u32,
65        max_backoff: Duration,
66    ) -> Self {
67        let client = ValidatorNodeClient::new(channel)
68            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
69            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
70        Self {
71            address,
72            client,
73            retry_delay,
74            max_retries,
75            max_backoff,
76        }
77    }
78
79    pub fn address(&self) -> &str {
80        &self.address
81    }
82
83    /// Returns whether this gRPC status means the server stream should be reconnected to, or not.
84    /// Logs a warning on unexpected status codes.
85    fn is_retryable(status: &Status) -> bool {
86        match status.code() {
87            Code::DeadlineExceeded | Code::Aborted | Code::Unavailable | Code::Unknown => {
88                trace!("gRPC request interrupted: {status:?}; retrying");
89                true
90            }
91            Code::Ok | Code::Cancelled | Code::ResourceExhausted => {
92                trace!("Unexpected gRPC status: {status:?}; retrying");
93                true
94            }
95            Code::Internal if status.message().contains("h2 protocol error") => {
96                // HTTP/2 connection reset errors are transient network issues, not real
97                // internal errors. This happens when the server restarts and the
98                // connection is forcibly closed.
99                trace!("gRPC connection reset: {status:?}; retrying");
100                true
101            }
102            Code::NotFound => false, // This code is used if e.g. the validator is missing blobs.
103            Code::InvalidArgument
104            | Code::AlreadyExists
105            | Code::PermissionDenied
106            | Code::FailedPrecondition
107            | Code::OutOfRange
108            | Code::Unimplemented
109            | Code::Internal
110            | Code::DataLoss
111            | Code::Unauthenticated => {
112                trace!("Unexpected gRPC status: {status:?}");
113                false
114            }
115        }
116    }
117
118    async fn delegate<F, Fut, R, S>(
119        &self,
120        f: F,
121        request: impl TryInto<R> + fmt::Debug + Clone,
122        handler: &str,
123    ) -> Result<S, NodeError>
124    where
125        F: Fn(ValidatorNodeClient<transport::Channel>, Request<R>) -> Fut,
126        Fut: Future<Output = Result<tonic::Response<S>, Status>>,
127        R: IntoRequest<R> + Clone,
128    {
129        let mut retry_count = 0;
130        let request_inner = request.try_into().map_err(|_| NodeError::GrpcError {
131            error: "could not convert request to proto".to_string(),
132        })?;
133        loop {
134            #[allow(unused_mut)]
135            let mut request = Request::new(request_inner.clone());
136            // Inject OpenTelemetry context (trace context + baggage) into gRPC metadata.
137            // This uses get_context_with_traffic_type() to also check the LINERA_TRAFFIC_TYPE
138            // environment variable, allowing benchmark tools to mark their traffic as synthetic.
139            #[cfg(feature = "opentelemetry")]
140            inject_context(&get_context_with_traffic_type(), request.metadata_mut());
141            match f(self.client.clone(), request).await {
142                Err(s) if Self::is_retryable(&s) && retry_count < self.max_retries => {
143                    let delay = full_jitter_delay(self.retry_delay, retry_count, self.max_backoff);
144                    retry_count += 1;
145                    linera_base::time::timer::sleep(delay).await;
146                    continue;
147                }
148                Err(s) => {
149                    return Err(NodeError::GrpcError {
150                        error: format!("remote request [{handler}] failed with status: {s:?}"),
151                    });
152                }
153                Ok(result) => return Ok(result.into_inner()),
154            };
155        }
156    }
157
158    fn try_into_chain_info(
159        result: api::ChainInfoResult,
160    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
161        let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
162            error: "missing body from response".to_string(),
163        })?;
164        match inner {
165            api::chain_info_result::Inner::ChainInfoResponse(response) => {
166                Ok(response.try_into().map_err(|err| NodeError::GrpcError {
167                    error: format!("failed to unmarshal response: {}", err),
168                })?)
169            }
170            api::chain_info_result::Inner::Error(error) => Err(bincode::deserialize(&error)
171                .map_err(|err| NodeError::GrpcError {
172                    error: format!("failed to unmarshal error message: {}", err),
173                })?),
174        }
175    }
176}
177
178impl TryFrom<api::PendingBlobResult> for BlobContent {
179    type Error = NodeError;
180
181    fn try_from(result: api::PendingBlobResult) -> Result<Self, Self::Error> {
182        let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
183            error: "missing body from response".to_string(),
184        })?;
185        match inner {
186            api::pending_blob_result::Inner::Blob(blob) => {
187                Ok(blob.try_into().map_err(|err| NodeError::GrpcError {
188                    error: format!("failed to unmarshal response: {}", err),
189                })?)
190            }
191            api::pending_blob_result::Inner::Error(error) => Err(bincode::deserialize(&error)
192                .map_err(|err| NodeError::GrpcError {
193                    error: format!("failed to unmarshal error message: {}", err),
194                })?),
195        }
196    }
197}
198
199macro_rules! client_delegate {
200    ($self:ident, $handler:ident, $req:ident) => {{
201        debug!(
202            handler = stringify!($handler),
203            request = ?$req,
204            "sending gRPC request"
205        );
206        $self
207            .delegate(
208                |mut client, req| async move { client.$handler(req).await },
209                $req,
210                stringify!($handler),
211            )
212            .await
213    }};
214}
215
216impl ValidatorNode for GrpcClient {
217    type NotificationStream = NotificationStream;
218
219    fn address(&self) -> String {
220        self.address.clone()
221    }
222
223    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
224    async fn handle_block_proposal(
225        &self,
226        proposal: data_types::BlockProposal,
227    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
228        GrpcClient::try_into_chain_info(client_delegate!(self, handle_block_proposal, proposal)?)
229    }
230
231    #[instrument(target = "grpc_client", skip_all, fields(address = self.address))]
232    async fn handle_lite_certificate(
233        &self,
234        certificate: types::LiteCertificate<'_>,
235        delivery: CrossChainMessageDelivery,
236    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
237        let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
238        let request = HandleLiteCertRequest {
239            certificate,
240            wait_for_outgoing_messages,
241        };
242        GrpcClient::try_into_chain_info(client_delegate!(self, handle_lite_certificate, request)?)
243    }
244
245    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
246    async fn handle_confirmed_certificate(
247        &self,
248        certificate: GenericCertificate<ConfirmedBlock>,
249        delivery: CrossChainMessageDelivery,
250    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
251        let wait_for_outgoing_messages: bool = delivery.wait_for_outgoing_messages();
252        let request = HandleConfirmedCertificateRequest {
253            certificate,
254            wait_for_outgoing_messages,
255        };
256        GrpcClient::try_into_chain_info(client_delegate!(
257            self,
258            handle_confirmed_certificate,
259            request
260        )?)
261    }
262
263    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
264    async fn handle_validated_certificate(
265        &self,
266        certificate: GenericCertificate<ValidatedBlock>,
267    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
268        let request = HandleValidatedCertificateRequest { certificate };
269        GrpcClient::try_into_chain_info(client_delegate!(
270            self,
271            handle_validated_certificate,
272            request
273        )?)
274    }
275
276    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
277    async fn handle_timeout_certificate(
278        &self,
279        certificate: GenericCertificate<Timeout>,
280    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
281        let request = HandleTimeoutCertificateRequest { certificate };
282        GrpcClient::try_into_chain_info(client_delegate!(
283            self,
284            handle_timeout_certificate,
285            request
286        )?)
287    }
288
289    #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
290    async fn handle_chain_info_query(
291        &self,
292        query: linera_core::data_types::ChainInfoQuery,
293    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
294        GrpcClient::try_into_chain_info(client_delegate!(self, handle_chain_info_query, query)?)
295    }
296
297    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
298    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError> {
299        let retry_delay = self.retry_delay;
300        let max_retries = self.max_retries;
301        let max_backoff = self.max_backoff;
302        // Use shared atomic counter so unfold can reset it on successful reconnection.
303        let retry_count = Arc::new(AtomicU32::new(0));
304        let subscription_request = SubscriptionRequest {
305            chain_ids: chains.into_iter().map(|chain| chain.into()).collect(),
306        };
307        let mut client = self.client.clone();
308
309        // Make the first connection attempt before returning from this method.
310        let mut stream = Some(
311            client
312                .subscribe(subscription_request.clone())
313                .await
314                .map_err(|status| NodeError::SubscriptionFailed {
315                    status: status.to_string(),
316                })?
317                .into_inner(),
318        );
319
320        // A stream of `Result<grpc::Notification, tonic::Status>` that keeps calling
321        // `client.subscribe(request)` endlessly and without delay.
322        let retry_count_for_unfold = retry_count.clone();
323        let endlessly_retrying_notification_stream = stream::unfold((), move |()| {
324            let mut client = client.clone();
325            let subscription_request = subscription_request.clone();
326            let mut stream = stream.take();
327            let retry_count = retry_count_for_unfold.clone();
328            async move {
329                let stream = if let Some(stream) = stream.take() {
330                    future::Either::Right(stream)
331                } else {
332                    match client.subscribe(subscription_request.clone()).await {
333                        Err(err) => future::Either::Left(stream::iter(iter::once(Err(err)))),
334                        Ok(response) => {
335                            // Reset retry count on successful reconnection.
336                            retry_count.store(0, Ordering::Relaxed);
337                            trace!("Successfully reconnected subscription stream");
338                            future::Either::Right(response.into_inner())
339                        }
340                    }
341                };
342                Some((stream, ()))
343            }
344        })
345        .flatten();
346
347        let span = tracing::info_span!("notification stream");
348        // The stream of `Notification`s that inserts increasing delays after retriable errors, and
349        // terminates after unexpected or fatal errors.
350        let notification_stream = endlessly_retrying_notification_stream
351            .map(|result| {
352                Option::<Notification>::try_from(result?).map_err(|err| {
353                    let message = format!("Could not deserialize notification: {}", err);
354                    tonic::Status::new(Code::Internal, message)
355                })
356            })
357            .take_while(move |result| {
358                let Err(status) = result else {
359                    retry_count.store(0, Ordering::Relaxed);
360                    return future::Either::Left(future::ready(true));
361                };
362
363                let current_retry_count = retry_count.load(Ordering::Relaxed);
364                if !span.in_scope(|| Self::is_retryable(status))
365                    || current_retry_count >= max_retries
366                {
367                    return future::Either::Left(future::ready(false));
368                }
369                let delay = full_jitter_delay(retry_delay, current_retry_count, max_backoff);
370                retry_count.fetch_add(1, Ordering::Relaxed);
371                future::Either::Right(async move {
372                    linera_base::time::timer::sleep(delay).await;
373                    true
374                })
375            })
376            .filter_map(|result| {
377                future::ready(match result {
378                    Ok(notification @ Some(_)) => notification,
379                    Ok(None) => None,
380                    Err(err) => {
381                        warn!("{}", err);
382                        None
383                    }
384                })
385            });
386
387        Ok(Box::pin(notification_stream))
388    }
389
390    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
391    async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
392        let req = ();
393        Ok(client_delegate!(self, get_version_info, req)?.into())
394    }
395
396    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
397    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
398        let req = ();
399        Ok(client_delegate!(self, get_network_description, req)?.try_into()?)
400    }
401
402    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
403    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
404        Ok(client_delegate!(self, upload_blob, content)?.try_into()?)
405    }
406
407    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
408    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
409        Ok(client_delegate!(self, download_blob, blob_id)?.try_into()?)
410    }
411
412    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
413    async fn download_pending_blob(
414        &self,
415        chain_id: ChainId,
416        blob_id: BlobId,
417    ) -> Result<BlobContent, NodeError> {
418        let req = (chain_id, blob_id);
419        client_delegate!(self, download_pending_blob, req)?.try_into()
420    }
421
422    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
423    async fn handle_pending_blob(
424        &self,
425        chain_id: ChainId,
426        blob: BlobContent,
427    ) -> Result<ChainInfoResponse, NodeError> {
428        let req = (chain_id, blob);
429        GrpcClient::try_into_chain_info(client_delegate!(self, handle_pending_blob, req)?)
430    }
431
432    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
433    async fn download_certificate(
434        &self,
435        hash: CryptoHash,
436    ) -> Result<ConfirmedBlockCertificate, NodeError> {
437        ConfirmedBlockCertificate::try_from(Certificate::try_from(client_delegate!(
438            self,
439            download_certificate,
440            hash
441        )?)?)
442        .map_err(|_| NodeError::UnexpectedCertificateValue)
443    }
444
445    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
446    async fn download_certificates(
447        &self,
448        hashes: Vec<CryptoHash>,
449    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
450        let mut missing_hashes = hashes;
451        let mut certs_collected = Vec::with_capacity(missing_hashes.len());
452        while !missing_hashes.is_empty() {
453            // Macro doesn't compile if we pass `missing_hashes.clone()` directly to `client_delegate!`.
454            let missing = missing_hashes.clone();
455            let mut received: Vec<_> = Vec::<Certificate>::try_from(client_delegate!(
456                self,
457                download_certificates,
458                missing
459            )?)?
460            .into_iter()
461            .map(|cert| {
462                ConfirmedBlockCertificate::try_from(cert)
463                    .map_err(|_| NodeError::UnexpectedCertificateValue)
464            })
465            .collect::<Result<_, _>>()?;
466
467            // In the case of the server not returning any certificates, we break the loop.
468            if received.is_empty() {
469                break;
470            }
471
472            // Honest validator should return certificates in the same order as the requested hashes.
473            missing_hashes = missing_hashes[received.len()..].to_vec();
474            certs_collected.append(&mut received);
475        }
476        ensure!(
477            missing_hashes.is_empty(),
478            NodeError::MissingCertificates(missing_hashes)
479        );
480        Ok(certs_collected)
481    }
482
483    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
484    async fn download_certificates_by_heights(
485        &self,
486        chain_id: ChainId,
487        heights: Vec<BlockHeight>,
488    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
489        let mut missing = heights.into_iter().collect::<BTreeSet<_>>();
490        let mut certs_collected = vec![];
491        while !missing.is_empty() {
492            let request = CertificatesByHeightRequest {
493                chain_id,
494                heights: missing.iter().copied().collect(),
495            };
496            let mut received: Vec<_> =
497                client_delegate!(self, download_raw_certificates_by_heights, request)?
498                    .certificates
499                    .into_iter()
500                    .map(
501                        |RawCertificate {
502                             lite_certificate,
503                             confirmed_block,
504                         }| {
505                            let cert = bcs::from_bytes::<LiteCertificate>(&lite_certificate)
506                                .map_err(|_| NodeError::UnexpectedCertificateValue)?;
507
508                            let block = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block)
509                                .map_err(|_| NodeError::UnexpectedCertificateValue)?;
510
511                            cert.with_value(block)
512                                .ok_or(NodeError::UnexpectedCertificateValue)
513                        },
514                    )
515                    .collect::<Result<_, _>>()?;
516
517            if received.is_empty() {
518                break;
519            }
520
521            // Remove only the heights we actually received from missing set.
522            for cert in &received {
523                missing.remove(&cert.inner().height());
524            }
525            certs_collected.append(&mut received);
526        }
527        certs_collected.sort_by_key(|cert| cert.inner().height());
528        Ok(certs_collected)
529    }
530
531    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
532    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
533        Ok(client_delegate!(self, blob_last_used_by, blob_id)?.try_into()?)
534    }
535
536    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
537    async fn blob_last_used_by_certificate(
538        &self,
539        blob_id: BlobId,
540    ) -> Result<ConfirmedBlockCertificate, NodeError> {
541        Ok(client_delegate!(self, blob_last_used_by_certificate, blob_id)?.try_into()?)
542    }
543
544    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
545    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
546        Ok(client_delegate!(self, missing_blob_ids, blob_ids)?.try_into()?)
547    }
548
549    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
550    async fn get_shard_info(
551        &self,
552        chain_id: ChainId,
553    ) -> Result<linera_core::data_types::ShardInfo, NodeError> {
554        let response = client_delegate!(self, get_shard_info, chain_id)?;
555        Ok(linera_core::data_types::ShardInfo {
556            shard_id: response.shard_id as usize,
557            total_shards: response.total_shards as usize,
558        })
559    }
560}