linera_rpc/grpc/
client.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{fmt, future::Future, iter};
5
6use futures::{future, stream, StreamExt};
7use linera_base::{
8    crypto::CryptoHash,
9    data_types::{BlobContent, NetworkDescription},
10    ensure,
11    identifiers::{BlobId, ChainId},
12    time::Duration,
13};
14use linera_chain::{
15    data_types::{self},
16    types::{
17        self, Certificate, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, Timeout,
18        ValidatedBlock,
19    },
20};
21use linera_core::{
22    data_types::ChainInfoResponse,
23    node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
24    worker::Notification,
25};
26use linera_version::VersionInfo;
27use tonic::{Code, IntoRequest, Request, Status};
28use tracing::{debug, error, info, instrument, warn};
29
30use super::{
31    api::{self, validator_node_client::ValidatorNodeClient, SubscriptionRequest},
32    transport, GRPC_MAX_MESSAGE_SIZE,
33};
34use crate::{
35    HandleConfirmedCertificateRequest, HandleLiteCertRequest, HandleTimeoutCertificateRequest,
36    HandleValidatedCertificateRequest,
37};
38
39#[derive(Clone)]
40pub struct GrpcClient {
41    address: String,
42    client: ValidatorNodeClient<transport::Channel>,
43    retry_delay: Duration,
44    max_retries: u32,
45}
46
47impl GrpcClient {
48    pub fn new(
49        address: String,
50        channel: transport::Channel,
51        retry_delay: Duration,
52        max_retries: u32,
53    ) -> Self {
54        let client = ValidatorNodeClient::new(channel)
55            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
56            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
57        Self {
58            address,
59            client,
60            retry_delay,
61            max_retries,
62        }
63    }
64
65    /// Returns whether this gRPC status means the server stream should be reconnected to, or not.
66    /// Logs a warning on unexpected status codes.
67    fn is_retryable(status: &Status) -> bool {
68        match status.code() {
69            Code::DeadlineExceeded | Code::Aborted | Code::Unavailable | Code::Unknown => {
70                info!("gRPC request interrupted: {}; retrying", status);
71                true
72            }
73            Code::Ok | Code::Cancelled | Code::ResourceExhausted => {
74                error!("Unexpected gRPC status: {}; retrying", status);
75                true
76            }
77            Code::InvalidArgument
78            | Code::NotFound
79            | Code::AlreadyExists
80            | Code::PermissionDenied
81            | Code::FailedPrecondition
82            | Code::OutOfRange
83            | Code::Unimplemented
84            | Code::Internal
85            | Code::DataLoss
86            | Code::Unauthenticated => {
87                error!("Unexpected gRPC status: {}", status);
88                false
89            }
90        }
91    }
92
93    async fn delegate<F, Fut, R, S>(
94        &self,
95        f: F,
96        request: impl TryInto<R> + fmt::Debug + Clone,
97        handler: &str,
98    ) -> Result<S, NodeError>
99    where
100        F: Fn(ValidatorNodeClient<transport::Channel>, Request<R>) -> Fut,
101        Fut: Future<Output = Result<tonic::Response<S>, Status>>,
102        R: IntoRequest<R> + Clone,
103    {
104        let mut retry_count = 0;
105        let request_inner = request.try_into().map_err(|_| NodeError::GrpcError {
106            error: "could not convert request to proto".to_string(),
107        })?;
108        loop {
109            match f(self.client.clone(), Request::new(request_inner.clone())).await {
110                Err(s) if Self::is_retryable(&s) && retry_count < self.max_retries => {
111                    let delay = self.retry_delay.saturating_mul(retry_count);
112                    retry_count += 1;
113                    linera_base::time::timer::sleep(delay).await;
114                    continue;
115                }
116                Err(s) => {
117                    return Err(NodeError::GrpcError {
118                        error: format!("remote request [{handler}] failed with status: {s:?}"),
119                    });
120                }
121                Ok(result) => return Ok(result.into_inner()),
122            };
123        }
124    }
125
126    fn try_into_chain_info(
127        result: api::ChainInfoResult,
128    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
129        let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
130            error: "missing body from response".to_string(),
131        })?;
132        match inner {
133            api::chain_info_result::Inner::ChainInfoResponse(response) => {
134                Ok(response.try_into().map_err(|err| NodeError::GrpcError {
135                    error: format!("failed to unmarshal response: {}", err),
136                })?)
137            }
138            api::chain_info_result::Inner::Error(error) => Err(bincode::deserialize(&error)
139                .map_err(|err| NodeError::GrpcError {
140                    error: format!("failed to unmarshal error message: {}", err),
141                })?),
142        }
143    }
144}
145
146impl TryFrom<api::PendingBlobResult> for BlobContent {
147    type Error = NodeError;
148
149    fn try_from(result: api::PendingBlobResult) -> Result<Self, Self::Error> {
150        let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
151            error: "missing body from response".to_string(),
152        })?;
153        match inner {
154            api::pending_blob_result::Inner::Blob(blob) => {
155                Ok(blob.try_into().map_err(|err| NodeError::GrpcError {
156                    error: format!("failed to unmarshal response: {}", err),
157                })?)
158            }
159            api::pending_blob_result::Inner::Error(error) => Err(bincode::deserialize(&error)
160                .map_err(|err| NodeError::GrpcError {
161                    error: format!("failed to unmarshal error message: {}", err),
162                })?),
163        }
164    }
165}
166
167macro_rules! client_delegate {
168    ($self:ident, $handler:ident, $req:ident) => {{
169        debug!(
170            handler = stringify!($handler),
171            request = ?$req,
172            "sending gRPC request"
173        );
174        $self
175            .delegate(
176                |mut client, req| async move { client.$handler(req).await },
177                $req,
178                stringify!($handler),
179            )
180            .await
181    }};
182}
183
184impl ValidatorNode for GrpcClient {
185    type NotificationStream = NotificationStream;
186
187    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
188    async fn handle_block_proposal(
189        &self,
190        proposal: data_types::BlockProposal,
191    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
192        GrpcClient::try_into_chain_info(client_delegate!(self, handle_block_proposal, proposal)?)
193    }
194
195    #[instrument(target = "grpc_client", skip_all, fields(address = self.address))]
196    async fn handle_lite_certificate(
197        &self,
198        certificate: types::LiteCertificate<'_>,
199        delivery: CrossChainMessageDelivery,
200    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
201        let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
202        let request = HandleLiteCertRequest {
203            certificate,
204            wait_for_outgoing_messages,
205        };
206        GrpcClient::try_into_chain_info(client_delegate!(self, handle_lite_certificate, request)?)
207    }
208
209    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
210    async fn handle_confirmed_certificate(
211        &self,
212        certificate: GenericCertificate<ConfirmedBlock>,
213        delivery: CrossChainMessageDelivery,
214    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
215        let wait_for_outgoing_messages: bool = delivery.wait_for_outgoing_messages();
216        let request = HandleConfirmedCertificateRequest {
217            certificate,
218            wait_for_outgoing_messages,
219        };
220        GrpcClient::try_into_chain_info(client_delegate!(
221            self,
222            handle_confirmed_certificate,
223            request
224        )?)
225    }
226
227    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
228    async fn handle_validated_certificate(
229        &self,
230        certificate: GenericCertificate<ValidatedBlock>,
231    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
232        let request = HandleValidatedCertificateRequest { certificate };
233        GrpcClient::try_into_chain_info(client_delegate!(
234            self,
235            handle_validated_certificate,
236            request
237        )?)
238    }
239
240    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
241    async fn handle_timeout_certificate(
242        &self,
243        certificate: GenericCertificate<Timeout>,
244    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
245        let request = HandleTimeoutCertificateRequest { certificate };
246        GrpcClient::try_into_chain_info(client_delegate!(
247            self,
248            handle_timeout_certificate,
249            request
250        )?)
251    }
252
253    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
254    async fn handle_chain_info_query(
255        &self,
256        query: linera_core::data_types::ChainInfoQuery,
257    ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
258        GrpcClient::try_into_chain_info(client_delegate!(self, handle_chain_info_query, query)?)
259    }
260
261    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
262    async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError> {
263        let retry_delay = self.retry_delay;
264        let max_retries = self.max_retries;
265        let mut retry_count = 0;
266        let subscription_request = SubscriptionRequest {
267            chain_ids: chains.into_iter().map(|chain| chain.into()).collect(),
268        };
269        let mut client = self.client.clone();
270
271        // Make the first connection attempt before returning from this method.
272        let mut stream = Some(
273            client
274                .subscribe(subscription_request.clone())
275                .await
276                .map_err(|status| NodeError::SubscriptionFailed {
277                    status: status.to_string(),
278                })?
279                .into_inner(),
280        );
281
282        // A stream of `Result<grpc::Notification, tonic::Status>` that keeps calling
283        // `client.subscribe(request)` endlessly and without delay.
284        let endlessly_retrying_notification_stream = stream::unfold((), move |()| {
285            let mut client = client.clone();
286            let subscription_request = subscription_request.clone();
287            let mut stream = stream.take();
288            async move {
289                let stream = if let Some(stream) = stream.take() {
290                    future::Either::Right(stream)
291                } else {
292                    match client.subscribe(subscription_request.clone()).await {
293                        Err(err) => future::Either::Left(stream::iter(iter::once(Err(err)))),
294                        Ok(response) => future::Either::Right(response.into_inner()),
295                    }
296                };
297                Some((stream, ()))
298            }
299        })
300        .flatten();
301
302        let span = tracing::info_span!("notification stream");
303        // The stream of `Notification`s that inserts increasing delays after retriable errors, and
304        // terminates after unexpected or fatal errors.
305        let notification_stream = endlessly_retrying_notification_stream
306            .map(|result| {
307                Option::<Notification>::try_from(result?).map_err(|err| {
308                    let message = format!("Could not deserialize notification: {}", err);
309                    tonic::Status::new(Code::Internal, message)
310                })
311            })
312            .take_while(move |result| {
313                let Err(status) = result else {
314                    retry_count = 0;
315                    return future::Either::Left(future::ready(true));
316                };
317
318                if !span.in_scope(|| Self::is_retryable(status)) || retry_count >= max_retries {
319                    return future::Either::Left(future::ready(false));
320                }
321                let delay = retry_delay.saturating_mul(retry_count);
322                retry_count += 1;
323                future::Either::Right(async move {
324                    linera_base::time::timer::sleep(delay).await;
325                    true
326                })
327            })
328            .filter_map(|result| {
329                future::ready(match result {
330                    Ok(notification @ Some(_)) => notification,
331                    Ok(None) => None,
332                    Err(err) => {
333                        warn!("{}", err);
334                        None
335                    }
336                })
337            });
338
339        Ok(Box::pin(notification_stream))
340    }
341
342    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
343    async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
344        let req = ();
345        Ok(client_delegate!(self, get_version_info, req)?.into())
346    }
347
348    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
349    async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
350        let req = ();
351        Ok(client_delegate!(self, get_network_description, req)?.try_into()?)
352    }
353
354    #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
355    async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
356        Ok(client_delegate!(self, upload_blob, content)?.try_into()?)
357    }
358
359    #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
360    async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
361        Ok(client_delegate!(self, download_blob, blob_id)?.try_into()?)
362    }
363
364    #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
365    async fn download_pending_blob(
366        &self,
367        chain_id: ChainId,
368        blob_id: BlobId,
369    ) -> Result<BlobContent, NodeError> {
370        let req = (chain_id, blob_id);
371        client_delegate!(self, download_pending_blob, req)?.try_into()
372    }
373
374    #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
375    async fn handle_pending_blob(
376        &self,
377        chain_id: ChainId,
378        blob: BlobContent,
379    ) -> Result<ChainInfoResponse, NodeError> {
380        let req = (chain_id, blob);
381        GrpcClient::try_into_chain_info(client_delegate!(self, handle_pending_blob, req)?)
382    }
383
384    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
385    async fn download_certificate(
386        &self,
387        hash: CryptoHash,
388    ) -> Result<ConfirmedBlockCertificate, NodeError> {
389        ConfirmedBlockCertificate::try_from(Certificate::try_from(client_delegate!(
390            self,
391            download_certificate,
392            hash
393        )?)?)
394        .map_err(|_| NodeError::UnexpectedCertificateValue)
395    }
396
397    #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
398    async fn download_certificates(
399        &self,
400        hashes: Vec<CryptoHash>,
401    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
402        let mut missing_hashes = hashes;
403        let mut certs_collected = Vec::with_capacity(missing_hashes.len());
404        loop {
405            // Macro doesn't compile if we pass `missing_hashes.clone()` directly to `client_delegate!`.
406            let missing = missing_hashes.clone();
407            let mut received: Vec<ConfirmedBlockCertificate> = Vec::<Certificate>::try_from(
408                client_delegate!(self, download_certificates, missing)?,
409            )?
410            .into_iter()
411            .map(|cert| {
412                ConfirmedBlockCertificate::try_from(cert)
413                    .map_err(|_| NodeError::UnexpectedCertificateValue)
414            })
415            .collect::<Result<_, _>>()?;
416
417            // In the case of the server not returning any certificates, we break the loop.
418            if received.is_empty() {
419                break;
420            }
421
422            // Honest validator should return certificates in the same order as the requested hashes.
423            missing_hashes = missing_hashes[received.len()..].to_vec();
424            certs_collected.append(&mut received);
425        }
426        ensure!(
427            missing_hashes.is_empty(),
428            NodeError::MissingCertificates(missing_hashes)
429        );
430        Ok(certs_collected)
431    }
432
433    #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
434    async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
435        Ok(client_delegate!(self, blob_last_used_by, blob_id)?.try_into()?)
436    }
437
438    #[instrument(target = "grpc_client", skip(self), err, fields(address = self.address))]
439    async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
440        Ok(client_delegate!(self, missing_blob_ids, blob_ids)?.try_into()?)
441    }
442}