linera_rpc/grpc/
client.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{fmt, future::Future, iter};
5
6use futures::{future, stream, StreamExt};
7use linera_base::{
8    crypto::CryptoHash,
9    data_types::{BlobContent, NetworkDescription},
10    ensure,
11    identifiers::{BlobId, ChainId},
12    time::Duration,
13};
14use linera_chain::{
15    data_types::{self},
16    types::{
17        self, Certificate, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, Timeout,
18        ValidatedBlock,
19    },
20};
21use linera_core::{
22    data_types::ChainInfoResponse,
23    node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
24    worker::Notification,
25};
26use linera_version::VersionInfo;
27use tonic::{Code, IntoRequest, Request, Status};
28use tracing::{debug, error, info, instrument, warn};
29
30use super::{
31    api::{self, validator_node_client::ValidatorNodeClient, SubscriptionRequest},
32    transport, GRPC_MAX_MESSAGE_SIZE,
33};
34use crate::{
35    HandleConfirmedCertificateRequest, HandleLiteCertRequest, HandleTimeoutCertificateRequest,
36    HandleValidatedCertificateRequest,
37};
38
39#[derive(Clone)]
40pub struct GrpcClient {
41    address: String,
42    client: ValidatorNodeClient<transport::Channel>,
43    retry_delay: Duration,
44    max_retries: u32,
45}
46
47impl GrpcClient {
48    pub fn new(
49        address: String,
50        channel: transport::Channel,
51        retry_delay: Duration,
52        max_retries: u32,
53    ) -> Self {
54        let client = ValidatorNodeClient::new(channel)
55            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
56            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
57        Self {
58            address,
59            client,
60            retry_delay,
61            max_retries,
62        }
63    }
64
65    pub fn address(&self) -> &str {
66        &self.address
67    }
68
69    /// Returns whether this gRPC status means the server stream should be reconnected to, or not.
70    /// Logs a warning on unexpected status codes.
71    fn is_retryable(status: &Status) -> bool {
72        match status.code() {
73            Code::DeadlineExceeded | Code::Aborted | Code::Unavailable | Code::Unknown => {
74                info!("gRPC request interrupted: {}; retrying", status);
75                true
76            }
77            Code::Ok | Code::Cancelled | Code::ResourceExhausted => {
78                error!("Unexpected gRPC status: {}; retrying", status);
79                true
80            }
81            Code::InvalidArgument
82            | Code::NotFound
83            | Code::AlreadyExists
84            | Code::PermissionDenied
85            | Code::FailedPrecondition
86            | Code::OutOfRange
87            | Code::Unimplemented
88            | Code::Internal
89            | Code::DataLoss
90            | Code::Unauthenticated => {
91                error!("Unexpected gRPC status: {}", status);
92                false
93            }
94        }
95    }
96
97    async fn delegate<F, Fut, R, S>(
98        &self,
99        f: F,
100        request: impl TryInto<R> + fmt::Debug + Clone,
101        handler: &str,
102    ) -> Result<S, NodeError>
103    where
104        F: Fn(ValidatorNodeClient<transport::Channel>, Request<R>) -> Fut,
105        Fut: Future<Output = Result<tonic::Response<S>, Status>>,
106        R: IntoRequest<R> + Clone,
107    {
108        let mut retry_count = 0;
109        let request_inner = request.try_into().map_err(|_| NodeError::GrpcError {
110            error: "could not convert request to proto".to_string(),
111        })?;
112        loop {
113            match f(self.client.clone(), Request::new(request_inner.clone())).await {
114                Err(s) if Self::is_retryable(&s) && retry_count < self.max_retries => {
115                    let delay = self.retry_delay.saturating_mul(retry_count);
116                    retry_count += 1;
117                    linera_base::time::timer::sleep(delay).await;
118                    continue;
119                }
120                Err(s) => {
121                    return Err(NodeError::GrpcError {
122                        error: format!("remote request [{handler}] failed with status: {s:?}"),
123                    });
124                }
125                Ok(result) => return Ok(result.into_inner()),
126            };
127        }
128    }
129
130    fn try_into_chain_info(
131        result: api::ChainInfoResult,
132    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
133        let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
134            error: "missing body from response".to_string(),
135        })?;
136        match inner {
137            api::chain_info_result::Inner::ChainInfoResponse(response) => {
138                Ok(response.try_into().map_err(|err| NodeError::GrpcError {
139                    error: format!("failed to unmarshal response: {}", err),
140                })?)
141            }
142            api::chain_info_result::Inner::Error(error) => Err(bincode::deserialize(&error)
143                .map_err(|err| NodeError::GrpcError {
144                    error: format!("failed to unmarshal error message: {}", err),
145                })?),
146        }
147    }
148}
149
150impl TryFrom<api::PendingBlobResult> for BlobContent {
151    type Error = NodeError;
152
153    fn try_from(result: api::PendingBlobResult) -> Result<Self, Self::Error> {
154        let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
155            error: "missing body from response".to_string(),
156        })?;
157        match inner {
158            api::pending_blob_result::Inner::Blob(blob) => {
159                Ok(blob.try_into().map_err(|err| NodeError::GrpcError {
160                    error: format!("failed to unmarshal response: {}", err),
161                })?)
162            }
163            api::pending_blob_result::Inner::Error(error) => Err(bincode::deserialize(&error)
164                .map_err(|err| NodeError::GrpcError {
165                    error: format!("failed to unmarshal error message: {}", err),
166                })?),
167        }
168    }
169}
170
171macro_rules! client_delegate {
172    ($self:ident, $handler:ident, $req:ident) => {{
173        debug!(
174            handler = stringify!($handler),
175            request = ?$req,
176            "sending gRPC request"
177        );
178        $self
179            .delegate(
180                |mut client, req| async move { client.$handler(req).await },
181                $req,
182                stringify!($handler),
183            )
184            .await
185    }};
186}
187
188impl ValidatorNode for GrpcClient {
189    type NotificationStream = NotificationStream;
190
191    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
192    async fn handle_block_proposal(
193        &self,
194        proposal: data_types::BlockProposal,
195    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
196        GrpcClient::try_into_chain_info(client_delegate!(self, handle_block_proposal, proposal)?)
197    }
198
199    #[instrument(target = "grpc_client", skip_all, fields(address = self.address))]
200    async fn handle_lite_certificate(
201        &self,
202        certificate: types::LiteCertificate<'_>,
203        delivery: CrossChainMessageDelivery,
204    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
205        let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
206        let request = HandleLiteCertRequest {
207            certificate,
208            wait_for_outgoing_messages,
209        };
210        GrpcClient::try_into_chain_info(client_delegate!(self, handle_lite_certificate, request)?)
211    }
212
213    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
214    async fn handle_confirmed_certificate(
215        &self,
216        certificate: GenericCertificate<ConfirmedBlock>,
217        delivery: CrossChainMessageDelivery,
218    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
219        let wait_for_outgoing_messages: bool = delivery.wait_for_outgoing_messages();
220        let request = HandleConfirmedCertificateRequest {
221            certificate,
222            wait_for_outgoing_messages,
223        };
224        GrpcClient::try_into_chain_info(client_delegate!(
225            self,
226            handle_confirmed_certificate,
227            request
228        )?)
229    }
230
231    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
232    async fn handle_validated_certificate(
233        &self,
234        certificate: GenericCertificate<ValidatedBlock>,
235    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
236        let request = HandleValidatedCertificateRequest { certificate };
237        GrpcClient::try_into_chain_info(client_delegate!(
238            self,
239            handle_validated_certificate,
240            request
241        )?)
242    }
243
244    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
245    async fn handle_timeout_certificate(
246        &self,
247        certificate: GenericCertificate<Timeout>,
248    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
249        let request = HandleTimeoutCertificateRequest { certificate };
250        GrpcClient::try_into_chain_info(client_delegate!(
251            self,
252            handle_timeout_certificate,
253            request
254        )?)
255    }
256
257    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
258    async fn handle_chain_info_query(
259        &self,
260        query: linera_core::data_types::ChainInfoQuery,
261    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
262        GrpcClient::try_into_chain_info(client_delegate!(self, handle_chain_info_query, query)?)
263    }
264
265    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
266    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError> {
267        let retry_delay = self.retry_delay;
268        let max_retries = self.max_retries;
269        let mut retry_count = 0;
270        let subscription_request = SubscriptionRequest {
271            chain_ids: chains.into_iter().map(|chain| chain.into()).collect(),
272        };
273        let mut client = self.client.clone();
274
275        // Make the first connection attempt before returning from this method.
276        let mut stream = Some(
277            client
278                .subscribe(subscription_request.clone())
279                .await
280                .map_err(|status| NodeError::SubscriptionFailed {
281                    status: status.to_string(),
282                })?
283                .into_inner(),
284        );
285
286        // A stream of `Result<grpc::Notification, tonic::Status>` that keeps calling
287        // `client.subscribe(request)` endlessly and without delay.
288        let endlessly_retrying_notification_stream = stream::unfold((), move |()| {
289            let mut client = client.clone();
290            let subscription_request = subscription_request.clone();
291            let mut stream = stream.take();
292            async move {
293                let stream = if let Some(stream) = stream.take() {
294                    future::Either::Right(stream)
295                } else {
296                    match client.subscribe(subscription_request.clone()).await {
297                        Err(err) => future::Either::Left(stream::iter(iter::once(Err(err)))),
298                        Ok(response) => future::Either::Right(response.into_inner()),
299                    }
300                };
301                Some((stream, ()))
302            }
303        })
304        .flatten();
305
306        let span = tracing::info_span!("notification stream");
307        // The stream of `Notification`s that inserts increasing delays after retriable errors, and
308        // terminates after unexpected or fatal errors.
309        let notification_stream = endlessly_retrying_notification_stream
310            .map(|result| {
311                Option::<Notification>::try_from(result?).map_err(|err| {
312                    let message = format!("Could not deserialize notification: {}", err);
313                    tonic::Status::new(Code::Internal, message)
314                })
315            })
316            .take_while(move |result| {
317                let Err(status) = result else {
318                    retry_count = 0;
319                    return future::Either::Left(future::ready(true));
320                };
321
322                if !span.in_scope(|| Self::is_retryable(status)) || retry_count >= max_retries {
323                    return future::Either::Left(future::ready(false));
324                }
325                let delay = retry_delay.saturating_mul(retry_count);
326                retry_count += 1;
327                future::Either::Right(async move {
328                    linera_base::time::timer::sleep(delay).await;
329                    true
330                })
331            })
332            .filter_map(|result| {
333                future::ready(match result {
334                    Ok(notification @ Some(_)) => notification,
335                    Ok(None) => None,
336                    Err(err) => {
337                        warn!("{}", err);
338                        None
339                    }
340                })
341            });
342
343        Ok(Box::pin(notification_stream))
344    }
345
346    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
347    async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
348        let req = ();
349        Ok(client_delegate!(self, get_version_info, req)?.into())
350    }
351
352    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
353    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
354        let req = ();
355        Ok(client_delegate!(self, get_network_description, req)?.try_into()?)
356    }
357
358    #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
359    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
360        Ok(client_delegate!(self, upload_blob, content)?.try_into()?)
361    }
362
363    #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
364    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
365        Ok(client_delegate!(self, download_blob, blob_id)?.try_into()?)
366    }
367
368    #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
369    async fn download_pending_blob(
370        &self,
371        chain_id: ChainId,
372        blob_id: BlobId,
373    ) -> Result<BlobContent, NodeError> {
374        let req = (chain_id, blob_id);
375        client_delegate!(self, download_pending_blob, req)?.try_into()
376    }
377
378    #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
379    async fn handle_pending_blob(
380        &self,
381        chain_id: ChainId,
382        blob: BlobContent,
383    ) -> Result<ChainInfoResponse, NodeError> {
384        let req = (chain_id, blob);
385        GrpcClient::try_into_chain_info(client_delegate!(self, handle_pending_blob, req)?)
386    }
387
388    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
389    async fn download_certificate(
390        &self,
391        hash: CryptoHash,
392    ) -> Result<ConfirmedBlockCertificate, NodeError> {
393        ConfirmedBlockCertificate::try_from(Certificate::try_from(client_delegate!(
394            self,
395            download_certificate,
396            hash
397        )?)?)
398        .map_err(|_| NodeError::UnexpectedCertificateValue)
399    }
400
401    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
402    async fn download_certificates(
403        &self,
404        hashes: Vec<CryptoHash>,
405    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
406        let mut missing_hashes = hashes;
407        let mut certs_collected = Vec::with_capacity(missing_hashes.len());
408        loop {
409            // Macro doesn't compile if we pass `missing_hashes.clone()` directly to `client_delegate!`.
410            let missing = missing_hashes.clone();
411            let mut received: Vec<ConfirmedBlockCertificate> = Vec::<Certificate>::try_from(
412                client_delegate!(self, download_certificates, missing)?,
413            )?
414            .into_iter()
415            .map(|cert| {
416                ConfirmedBlockCertificate::try_from(cert)
417                    .map_err(|_| NodeError::UnexpectedCertificateValue)
418            })
419            .collect::<Result<_, _>>()?;
420
421            // In the case of the server not returning any certificates, we break the loop.
422            if received.is_empty() {
423                break;
424            }
425
426            // Honest validator should return certificates in the same order as the requested hashes.
427            missing_hashes = missing_hashes[received.len()..].to_vec();
428            certs_collected.append(&mut received);
429        }
430        ensure!(
431            missing_hashes.is_empty(),
432            NodeError::MissingCertificates(missing_hashes)
433        );
434        Ok(certs_collected)
435    }
436
437    #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
438    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
439        Ok(client_delegate!(self, blob_last_used_by, blob_id)?.try_into()?)
440    }
441
442    #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
443    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
444        Ok(client_delegate!(self, missing_blob_ids, blob_ids)?.try_into()?)
445    }
446}