linera_rpc/grpc/
client.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{fmt, future::Future, iter};
5
6use futures::{future, stream, StreamExt};
7use linera_base::{
8    crypto::CryptoHash,
9    data_types::{BlobContent, BlockHeight, NetworkDescription},
10    ensure,
11    identifiers::{BlobId, ChainId},
12    time::Duration,
13};
14use linera_chain::{
15    data_types::{self},
16    types::{
17        self, Certificate, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
18        LiteCertificate, Timeout, ValidatedBlock,
19    },
20};
21use linera_core::{
22    data_types::{CertificatesByHeightRequest, ChainInfoResponse},
23    node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
24    worker::Notification,
25};
26use linera_version::VersionInfo;
27use tonic::{Code, IntoRequest, Request, Status};
28use tracing::{debug, instrument, trace, warn, Level};
29
30use super::{
31    api::{self, validator_node_client::ValidatorNodeClient, SubscriptionRequest},
32    transport, GRPC_MAX_MESSAGE_SIZE,
33};
34use crate::{
35    grpc::api::RawCertificate, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
36    HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
37};
38
39#[derive(Clone)]
40pub struct GrpcClient {
41    address: String,
42    client: ValidatorNodeClient<transport::Channel>,
43    retry_delay: Duration,
44    max_retries: u32,
45}
46
47impl GrpcClient {
48    pub fn new(
49        address: String,
50        channel: transport::Channel,
51        retry_delay: Duration,
52        max_retries: u32,
53    ) -> Self {
54        let client = ValidatorNodeClient::new(channel)
55            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
56            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
57        Self {
58            address,
59            client,
60            retry_delay,
61            max_retries,
62        }
63    }
64
65    pub fn address(&self) -> &str {
66        &self.address
67    }
68
69    /// Returns whether this gRPC status means the server stream should be reconnected to, or not.
70    /// Logs a warning on unexpected status codes.
71    fn is_retryable(status: &Status) -> bool {
72        match status.code() {
73            Code::DeadlineExceeded | Code::Aborted | Code::Unavailable | Code::Unknown => {
74                trace!("gRPC request interrupted: {status:?}; retrying");
75                true
76            }
77            Code::Ok | Code::Cancelled | Code::ResourceExhausted => {
78                trace!("Unexpected gRPC status: {status:?}; retrying");
79                true
80            }
81            Code::NotFound => false, // This code is used if e.g. the validator is missing blobs.
82            Code::InvalidArgument
83            | Code::AlreadyExists
84            | Code::PermissionDenied
85            | Code::FailedPrecondition
86            | Code::OutOfRange
87            | Code::Unimplemented
88            | Code::Internal
89            | Code::DataLoss
90            | Code::Unauthenticated => {
91                trace!("Unexpected gRPC status: {status:?}");
92                false
93            }
94        }
95    }
96
97    async fn delegate<F, Fut, R, S>(
98        &self,
99        f: F,
100        request: impl TryInto<R> + fmt::Debug + Clone,
101        handler: &str,
102    ) -> Result<S, NodeError>
103    where
104        F: Fn(ValidatorNodeClient<transport::Channel>, Request<R>) -> Fut,
105        Fut: Future<Output = Result<tonic::Response<S>, Status>>,
106        R: IntoRequest<R> + Clone,
107    {
108        let mut retry_count = 0;
109        let request_inner = request.try_into().map_err(|_| NodeError::GrpcError {
110            error: "could not convert request to proto".to_string(),
111        })?;
112        loop {
113            match f(self.client.clone(), Request::new(request_inner.clone())).await {
114                Err(s) if Self::is_retryable(&s) && retry_count < self.max_retries => {
115                    let delay = self.retry_delay.saturating_mul(retry_count);
116                    retry_count += 1;
117                    linera_base::time::timer::sleep(delay).await;
118                    continue;
119                }
120                Err(s) => {
121                    return Err(NodeError::GrpcError {
122                        error: format!("remote request [{handler}] failed with status: {s:?}"),
123                    });
124                }
125                Ok(result) => return Ok(result.into_inner()),
126            };
127        }
128    }
129
130    fn try_into_chain_info(
131        result: api::ChainInfoResult,
132    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
133        let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
134            error: "missing body from response".to_string(),
135        })?;
136        match inner {
137            api::chain_info_result::Inner::ChainInfoResponse(response) => {
138                Ok(response.try_into().map_err(|err| NodeError::GrpcError {
139                    error: format!("failed to unmarshal response: {}", err),
140                })?)
141            }
142            api::chain_info_result::Inner::Error(error) => Err(bincode::deserialize(&error)
143                .map_err(|err| NodeError::GrpcError {
144                    error: format!("failed to unmarshal error message: {}", err),
145                })?),
146        }
147    }
148}
149
150impl TryFrom<api::PendingBlobResult> for BlobContent {
151    type Error = NodeError;
152
153    fn try_from(result: api::PendingBlobResult) -> Result<Self, Self::Error> {
154        let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
155            error: "missing body from response".to_string(),
156        })?;
157        match inner {
158            api::pending_blob_result::Inner::Blob(blob) => {
159                Ok(blob.try_into().map_err(|err| NodeError::GrpcError {
160                    error: format!("failed to unmarshal response: {}", err),
161                })?)
162            }
163            api::pending_blob_result::Inner::Error(error) => Err(bincode::deserialize(&error)
164                .map_err(|err| NodeError::GrpcError {
165                    error: format!("failed to unmarshal error message: {}", err),
166                })?),
167        }
168    }
169}
170
171macro_rules! client_delegate {
172    ($self:ident, $handler:ident, $req:ident) => {{
173        debug!(
174            handler = stringify!($handler),
175            request = ?$req,
176            "sending gRPC request"
177        );
178        $self
179            .delegate(
180                |mut client, req| async move { client.$handler(req).await },
181                $req,
182                stringify!($handler),
183            )
184            .await
185    }};
186}
187
188impl ValidatorNode for GrpcClient {
189    type NotificationStream = NotificationStream;
190
191    fn address(&self) -> String {
192        self.address.clone()
193    }
194
195    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
196    async fn handle_block_proposal(
197        &self,
198        proposal: data_types::BlockProposal,
199    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
200        GrpcClient::try_into_chain_info(client_delegate!(self, handle_block_proposal, proposal)?)
201    }
202
203    #[instrument(target = "grpc_client", skip_all, fields(address = self.address))]
204    async fn handle_lite_certificate(
205        &self,
206        certificate: types::LiteCertificate<'_>,
207        delivery: CrossChainMessageDelivery,
208    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
209        let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
210        let request = HandleLiteCertRequest {
211            certificate,
212            wait_for_outgoing_messages,
213        };
214        GrpcClient::try_into_chain_info(client_delegate!(self, handle_lite_certificate, request)?)
215    }
216
217    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
218    async fn handle_confirmed_certificate(
219        &self,
220        certificate: GenericCertificate<ConfirmedBlock>,
221        delivery: CrossChainMessageDelivery,
222    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
223        let wait_for_outgoing_messages: bool = delivery.wait_for_outgoing_messages();
224        let request = HandleConfirmedCertificateRequest {
225            certificate,
226            wait_for_outgoing_messages,
227        };
228        GrpcClient::try_into_chain_info(client_delegate!(
229            self,
230            handle_confirmed_certificate,
231            request
232        )?)
233    }
234
235    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
236    async fn handle_validated_certificate(
237        &self,
238        certificate: GenericCertificate<ValidatedBlock>,
239    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
240        let request = HandleValidatedCertificateRequest { certificate };
241        GrpcClient::try_into_chain_info(client_delegate!(
242            self,
243            handle_validated_certificate,
244            request
245        )?)
246    }
247
248    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
249    async fn handle_timeout_certificate(
250        &self,
251        certificate: GenericCertificate<Timeout>,
252    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
253        let request = HandleTimeoutCertificateRequest { certificate };
254        GrpcClient::try_into_chain_info(client_delegate!(
255            self,
256            handle_timeout_certificate,
257            request
258        )?)
259    }
260
261    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
262    async fn handle_chain_info_query(
263        &self,
264        query: linera_core::data_types::ChainInfoQuery,
265    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
266        GrpcClient::try_into_chain_info(client_delegate!(self, handle_chain_info_query, query)?)
267    }
268
269    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
270    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError> {
271        let retry_delay = self.retry_delay;
272        let max_retries = self.max_retries;
273        let mut retry_count = 0;
274        let subscription_request = SubscriptionRequest {
275            chain_ids: chains.into_iter().map(|chain| chain.into()).collect(),
276        };
277        let mut client = self.client.clone();
278
279        // Make the first connection attempt before returning from this method.
280        let mut stream = Some(
281            client
282                .subscribe(subscription_request.clone())
283                .await
284                .map_err(|status| NodeError::SubscriptionFailed {
285                    status: status.to_string(),
286                })?
287                .into_inner(),
288        );
289
290        // A stream of `Result<grpc::Notification, tonic::Status>` that keeps calling
291        // `client.subscribe(request)` endlessly and without delay.
292        let endlessly_retrying_notification_stream = stream::unfold((), move |()| {
293            let mut client = client.clone();
294            let subscription_request = subscription_request.clone();
295            let mut stream = stream.take();
296            async move {
297                let stream = if let Some(stream) = stream.take() {
298                    future::Either::Right(stream)
299                } else {
300                    match client.subscribe(subscription_request.clone()).await {
301                        Err(err) => future::Either::Left(stream::iter(iter::once(Err(err)))),
302                        Ok(response) => future::Either::Right(response.into_inner()),
303                    }
304                };
305                Some((stream, ()))
306            }
307        })
308        .flatten();
309
310        let span = tracing::info_span!("notification stream");
311        // The stream of `Notification`s that inserts increasing delays after retriable errors, and
312        // terminates after unexpected or fatal errors.
313        let notification_stream = endlessly_retrying_notification_stream
314            .map(|result| {
315                Option::<Notification>::try_from(result?).map_err(|err| {
316                    let message = format!("Could not deserialize notification: {}", err);
317                    tonic::Status::new(Code::Internal, message)
318                })
319            })
320            .take_while(move |result| {
321                let Err(status) = result else {
322                    retry_count = 0;
323                    return future::Either::Left(future::ready(true));
324                };
325
326                if !span.in_scope(|| Self::is_retryable(status)) || retry_count >= max_retries {
327                    return future::Either::Left(future::ready(false));
328                }
329                let delay = retry_delay.saturating_mul(retry_count);
330                retry_count += 1;
331                future::Either::Right(async move {
332                    linera_base::time::timer::sleep(delay).await;
333                    true
334                })
335            })
336            .filter_map(|result| {
337                future::ready(match result {
338                    Ok(notification @ Some(_)) => notification,
339                    Ok(None) => None,
340                    Err(err) => {
341                        warn!("{}", err);
342                        None
343                    }
344                })
345            });
346
347        Ok(Box::pin(notification_stream))
348    }
349
350    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
351    async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
352        let req = ();
353        Ok(client_delegate!(self, get_version_info, req)?.into())
354    }
355
356    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
357    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
358        let req = ();
359        Ok(client_delegate!(self, get_network_description, req)?.try_into()?)
360    }
361
362    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
363    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
364        Ok(client_delegate!(self, upload_blob, content)?.try_into()?)
365    }
366
367    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
368    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
369        Ok(client_delegate!(self, download_blob, blob_id)?.try_into()?)
370    }
371
372    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
373    async fn download_pending_blob(
374        &self,
375        chain_id: ChainId,
376        blob_id: BlobId,
377    ) -> Result<BlobContent, NodeError> {
378        let req = (chain_id, blob_id);
379        client_delegate!(self, download_pending_blob, req)?.try_into()
380    }
381
382    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
383    async fn handle_pending_blob(
384        &self,
385        chain_id: ChainId,
386        blob: BlobContent,
387    ) -> Result<ChainInfoResponse, NodeError> {
388        let req = (chain_id, blob);
389        GrpcClient::try_into_chain_info(client_delegate!(self, handle_pending_blob, req)?)
390    }
391
392    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
393    async fn download_certificate(
394        &self,
395        hash: CryptoHash,
396    ) -> Result<ConfirmedBlockCertificate, NodeError> {
397        ConfirmedBlockCertificate::try_from(Certificate::try_from(client_delegate!(
398            self,
399            download_certificate,
400            hash
401        )?)?)
402        .map_err(|_| NodeError::UnexpectedCertificateValue)
403    }
404
405    #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
406    async fn download_certificates(
407        &self,
408        hashes: Vec<CryptoHash>,
409    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
410        let mut missing_hashes = hashes;
411        let mut certs_collected = Vec::with_capacity(missing_hashes.len());
412        while !missing_hashes.is_empty() {
413            // Macro doesn't compile if we pass `missing_hashes.clone()` directly to `client_delegate!`.
414            let missing = missing_hashes.clone();
415            let mut received: Vec<ConfirmedBlockCertificate> = Vec::<Certificate>::try_from(
416                client_delegate!(self, download_certificates, missing)?,
417            )?
418            .into_iter()
419            .map(|cert| {
420                ConfirmedBlockCertificate::try_from(cert)
421                    .map_err(|_| NodeError::UnexpectedCertificateValue)
422            })
423            .collect::<Result<_, _>>()?;
424
425            // In the case of the server not returning any certificates, we break the loop.
426            if received.is_empty() {
427                break;
428            }
429
430            // Honest validator should return certificates in the same order as the requested hashes.
431            missing_hashes = missing_hashes[received.len()..].to_vec();
432            certs_collected.append(&mut received);
433        }
434        ensure!(
435            missing_hashes.is_empty(),
436            NodeError::MissingCertificates(missing_hashes)
437        );
438        Ok(certs_collected)
439    }
440
441    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
442    async fn download_certificates_by_heights(
443        &self,
444        chain_id: ChainId,
445        heights: Vec<BlockHeight>,
446    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
447        let mut missing = heights;
448        let mut certs_collected = vec![];
449        while !missing.is_empty() {
450            let request = CertificatesByHeightRequest {
451                chain_id,
452                heights: missing.clone(),
453            };
454            let mut received: Vec<ConfirmedBlockCertificate> =
455                client_delegate!(self, download_raw_certificates_by_heights, request)?
456                    .certificates
457                    .into_iter()
458                    .map(
459                        |RawCertificate {
460                             lite_certificate,
461                             confirmed_block,
462                         }| {
463                            let cert = bcs::from_bytes::<LiteCertificate>(&lite_certificate)
464                                .map_err(|_| NodeError::UnexpectedCertificateValue)?;
465
466                            let block = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block)
467                                .map_err(|_| NodeError::UnexpectedCertificateValue)?;
468
469                            cert.with_value(block)
470                                .ok_or(NodeError::UnexpectedCertificateValue)
471                        },
472                    )
473                    .collect::<Result<_, _>>()?;
474
475            if received.is_empty() {
476                break;
477            }
478
479            // Honest validator should return certificates in the same order as the requested hashes.
480            missing = missing[received.len()..].to_vec();
481            certs_collected.append(&mut received);
482        }
483
484        Ok(certs_collected)
485    }
486
487    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
488    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
489        Ok(client_delegate!(self, blob_last_used_by, blob_id)?.try_into()?)
490    }
491
492    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
493    async fn blob_last_used_by_certificate(
494        &self,
495        blob_id: BlobId,
496    ) -> Result<ConfirmedBlockCertificate, NodeError> {
497        Ok(client_delegate!(self, blob_last_used_by_certificate, blob_id)?.try_into()?)
498    }
499
500    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
501    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
502        Ok(client_delegate!(self, missing_blob_ids, blob_ids)?.try_into()?)
503    }
504
505    #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
506    async fn get_shard_info(
507        &self,
508        chain_id: ChainId,
509    ) -> Result<linera_core::data_types::ShardInfo, NodeError> {
510        let response = client_delegate!(self, get_shard_info, chain_id)?;
511        Ok(linera_core::data_types::ShardInfo {
512            shard_id: response.shard_id as usize,
513            total_shards: response.total_shards as usize,
514        })
515    }
516}