1use std::{fmt, future::Future, iter};
5
6use futures::{future, stream, StreamExt};
7use linera_base::{
8 crypto::CryptoHash,
9 data_types::{BlobContent, BlockHeight, NetworkDescription},
10 ensure,
11 identifiers::{BlobId, ChainId},
12 time::Duration,
13};
14use linera_chain::{
15 data_types::{self},
16 types::{
17 self, Certificate, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
18 LiteCertificate, Timeout, ValidatedBlock,
19 },
20};
21use linera_core::{
22 data_types::{CertificatesByHeightRequest, ChainInfoResponse},
23 node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
24 worker::Notification,
25};
26use linera_version::VersionInfo;
27use tonic::{Code, IntoRequest, Request, Status};
28use tracing::{debug, instrument, trace, warn, Level};
29
30use super::{
31 api::{self, validator_node_client::ValidatorNodeClient, SubscriptionRequest},
32 transport, GRPC_MAX_MESSAGE_SIZE,
33};
34use crate::{
35 grpc::api::RawCertificate, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
36 HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
37};
38
39#[derive(Clone)]
40pub struct GrpcClient {
41 address: String,
42 client: ValidatorNodeClient<transport::Channel>,
43 retry_delay: Duration,
44 max_retries: u32,
45}
46
47impl GrpcClient {
48 pub fn new(
49 address: String,
50 channel: transport::Channel,
51 retry_delay: Duration,
52 max_retries: u32,
53 ) -> Self {
54 let client = ValidatorNodeClient::new(channel)
55 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
56 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
57 Self {
58 address,
59 client,
60 retry_delay,
61 max_retries,
62 }
63 }
64
65 pub fn address(&self) -> &str {
66 &self.address
67 }
68
69 fn is_retryable(status: &Status) -> bool {
72 match status.code() {
73 Code::DeadlineExceeded | Code::Aborted | Code::Unavailable | Code::Unknown => {
74 trace!("gRPC request interrupted: {status:?}; retrying");
75 true
76 }
77 Code::Ok | Code::Cancelled | Code::ResourceExhausted => {
78 trace!("Unexpected gRPC status: {status:?}; retrying");
79 true
80 }
81 Code::NotFound => false, Code::InvalidArgument
83 | Code::AlreadyExists
84 | Code::PermissionDenied
85 | Code::FailedPrecondition
86 | Code::OutOfRange
87 | Code::Unimplemented
88 | Code::Internal
89 | Code::DataLoss
90 | Code::Unauthenticated => {
91 trace!("Unexpected gRPC status: {status:?}");
92 false
93 }
94 }
95 }
96
97 async fn delegate<F, Fut, R, S>(
98 &self,
99 f: F,
100 request: impl TryInto<R> + fmt::Debug + Clone,
101 handler: &str,
102 ) -> Result<S, NodeError>
103 where
104 F: Fn(ValidatorNodeClient<transport::Channel>, Request<R>) -> Fut,
105 Fut: Future<Output = Result<tonic::Response<S>, Status>>,
106 R: IntoRequest<R> + Clone,
107 {
108 let mut retry_count = 0;
109 let request_inner = request.try_into().map_err(|_| NodeError::GrpcError {
110 error: "could not convert request to proto".to_string(),
111 })?;
112 loop {
113 match f(self.client.clone(), Request::new(request_inner.clone())).await {
114 Err(s) if Self::is_retryable(&s) && retry_count < self.max_retries => {
115 let delay = self.retry_delay.saturating_mul(retry_count);
116 retry_count += 1;
117 linera_base::time::timer::sleep(delay).await;
118 continue;
119 }
120 Err(s) => {
121 return Err(NodeError::GrpcError {
122 error: format!("remote request [{handler}] failed with status: {s:?}"),
123 });
124 }
125 Ok(result) => return Ok(result.into_inner()),
126 };
127 }
128 }
129
130 fn try_into_chain_info(
131 result: api::ChainInfoResult,
132 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
133 let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
134 error: "missing body from response".to_string(),
135 })?;
136 match inner {
137 api::chain_info_result::Inner::ChainInfoResponse(response) => {
138 Ok(response.try_into().map_err(|err| NodeError::GrpcError {
139 error: format!("failed to unmarshal response: {}", err),
140 })?)
141 }
142 api::chain_info_result::Inner::Error(error) => Err(bincode::deserialize(&error)
143 .map_err(|err| NodeError::GrpcError {
144 error: format!("failed to unmarshal error message: {}", err),
145 })?),
146 }
147 }
148}
149
150impl TryFrom<api::PendingBlobResult> for BlobContent {
151 type Error = NodeError;
152
153 fn try_from(result: api::PendingBlobResult) -> Result<Self, Self::Error> {
154 let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
155 error: "missing body from response".to_string(),
156 })?;
157 match inner {
158 api::pending_blob_result::Inner::Blob(blob) => {
159 Ok(blob.try_into().map_err(|err| NodeError::GrpcError {
160 error: format!("failed to unmarshal response: {}", err),
161 })?)
162 }
163 api::pending_blob_result::Inner::Error(error) => Err(bincode::deserialize(&error)
164 .map_err(|err| NodeError::GrpcError {
165 error: format!("failed to unmarshal error message: {}", err),
166 })?),
167 }
168 }
169}
170
171macro_rules! client_delegate {
172 ($self:ident, $handler:ident, $req:ident) => {{
173 debug!(
174 handler = stringify!($handler),
175 request = ?$req,
176 "sending gRPC request"
177 );
178 $self
179 .delegate(
180 |mut client, req| async move { client.$handler(req).await },
181 $req,
182 stringify!($handler),
183 )
184 .await
185 }};
186}
187
188impl ValidatorNode for GrpcClient {
189 type NotificationStream = NotificationStream;
190
191 fn address(&self) -> String {
192 self.address.clone()
193 }
194
195 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
196 async fn handle_block_proposal(
197 &self,
198 proposal: data_types::BlockProposal,
199 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
200 GrpcClient::try_into_chain_info(client_delegate!(self, handle_block_proposal, proposal)?)
201 }
202
203 #[instrument(target = "grpc_client", skip_all, fields(address = self.address))]
204 async fn handle_lite_certificate(
205 &self,
206 certificate: types::LiteCertificate<'_>,
207 delivery: CrossChainMessageDelivery,
208 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
209 let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
210 let request = HandleLiteCertRequest {
211 certificate,
212 wait_for_outgoing_messages,
213 };
214 GrpcClient::try_into_chain_info(client_delegate!(self, handle_lite_certificate, request)?)
215 }
216
217 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
218 async fn handle_confirmed_certificate(
219 &self,
220 certificate: GenericCertificate<ConfirmedBlock>,
221 delivery: CrossChainMessageDelivery,
222 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
223 let wait_for_outgoing_messages: bool = delivery.wait_for_outgoing_messages();
224 let request = HandleConfirmedCertificateRequest {
225 certificate,
226 wait_for_outgoing_messages,
227 };
228 GrpcClient::try_into_chain_info(client_delegate!(
229 self,
230 handle_confirmed_certificate,
231 request
232 )?)
233 }
234
235 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
236 async fn handle_validated_certificate(
237 &self,
238 certificate: GenericCertificate<ValidatedBlock>,
239 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
240 let request = HandleValidatedCertificateRequest { certificate };
241 GrpcClient::try_into_chain_info(client_delegate!(
242 self,
243 handle_validated_certificate,
244 request
245 )?)
246 }
247
248 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
249 async fn handle_timeout_certificate(
250 &self,
251 certificate: GenericCertificate<Timeout>,
252 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
253 let request = HandleTimeoutCertificateRequest { certificate };
254 GrpcClient::try_into_chain_info(client_delegate!(
255 self,
256 handle_timeout_certificate,
257 request
258 )?)
259 }
260
261 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
262 async fn handle_chain_info_query(
263 &self,
264 query: linera_core::data_types::ChainInfoQuery,
265 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
266 GrpcClient::try_into_chain_info(client_delegate!(self, handle_chain_info_query, query)?)
267 }
268
269 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
270 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError> {
271 let retry_delay = self.retry_delay;
272 let max_retries = self.max_retries;
273 let mut retry_count = 0;
274 let subscription_request = SubscriptionRequest {
275 chain_ids: chains.into_iter().map(|chain| chain.into()).collect(),
276 };
277 let mut client = self.client.clone();
278
279 let mut stream = Some(
281 client
282 .subscribe(subscription_request.clone())
283 .await
284 .map_err(|status| NodeError::SubscriptionFailed {
285 status: status.to_string(),
286 })?
287 .into_inner(),
288 );
289
290 let endlessly_retrying_notification_stream = stream::unfold((), move |()| {
293 let mut client = client.clone();
294 let subscription_request = subscription_request.clone();
295 let mut stream = stream.take();
296 async move {
297 let stream = if let Some(stream) = stream.take() {
298 future::Either::Right(stream)
299 } else {
300 match client.subscribe(subscription_request.clone()).await {
301 Err(err) => future::Either::Left(stream::iter(iter::once(Err(err)))),
302 Ok(response) => future::Either::Right(response.into_inner()),
303 }
304 };
305 Some((stream, ()))
306 }
307 })
308 .flatten();
309
310 let span = tracing::info_span!("notification stream");
311 let notification_stream = endlessly_retrying_notification_stream
314 .map(|result| {
315 Option::<Notification>::try_from(result?).map_err(|err| {
316 let message = format!("Could not deserialize notification: {}", err);
317 tonic::Status::new(Code::Internal, message)
318 })
319 })
320 .take_while(move |result| {
321 let Err(status) = result else {
322 retry_count = 0;
323 return future::Either::Left(future::ready(true));
324 };
325
326 if !span.in_scope(|| Self::is_retryable(status)) || retry_count >= max_retries {
327 return future::Either::Left(future::ready(false));
328 }
329 let delay = retry_delay.saturating_mul(retry_count);
330 retry_count += 1;
331 future::Either::Right(async move {
332 linera_base::time::timer::sleep(delay).await;
333 true
334 })
335 })
336 .filter_map(|result| {
337 future::ready(match result {
338 Ok(notification @ Some(_)) => notification,
339 Ok(None) => None,
340 Err(err) => {
341 warn!("{}", err);
342 None
343 }
344 })
345 });
346
347 Ok(Box::pin(notification_stream))
348 }
349
350 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
351 async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
352 let req = ();
353 Ok(client_delegate!(self, get_version_info, req)?.into())
354 }
355
356 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
357 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
358 let req = ();
359 Ok(client_delegate!(self, get_network_description, req)?.try_into()?)
360 }
361
362 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
363 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
364 Ok(client_delegate!(self, upload_blob, content)?.try_into()?)
365 }
366
367 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
368 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
369 Ok(client_delegate!(self, download_blob, blob_id)?.try_into()?)
370 }
371
372 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
373 async fn download_pending_blob(
374 &self,
375 chain_id: ChainId,
376 blob_id: BlobId,
377 ) -> Result<BlobContent, NodeError> {
378 let req = (chain_id, blob_id);
379 client_delegate!(self, download_pending_blob, req)?.try_into()
380 }
381
382 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
383 async fn handle_pending_blob(
384 &self,
385 chain_id: ChainId,
386 blob: BlobContent,
387 ) -> Result<ChainInfoResponse, NodeError> {
388 let req = (chain_id, blob);
389 GrpcClient::try_into_chain_info(client_delegate!(self, handle_pending_blob, req)?)
390 }
391
392 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
393 async fn download_certificate(
394 &self,
395 hash: CryptoHash,
396 ) -> Result<ConfirmedBlockCertificate, NodeError> {
397 ConfirmedBlockCertificate::try_from(Certificate::try_from(client_delegate!(
398 self,
399 download_certificate,
400 hash
401 )?)?)
402 .map_err(|_| NodeError::UnexpectedCertificateValue)
403 }
404
405 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
406 async fn download_certificates(
407 &self,
408 hashes: Vec<CryptoHash>,
409 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
410 let mut missing_hashes = hashes;
411 let mut certs_collected = Vec::with_capacity(missing_hashes.len());
412 while !missing_hashes.is_empty() {
413 let missing = missing_hashes.clone();
415 let mut received: Vec<ConfirmedBlockCertificate> = Vec::<Certificate>::try_from(
416 client_delegate!(self, download_certificates, missing)?,
417 )?
418 .into_iter()
419 .map(|cert| {
420 ConfirmedBlockCertificate::try_from(cert)
421 .map_err(|_| NodeError::UnexpectedCertificateValue)
422 })
423 .collect::<Result<_, _>>()?;
424
425 if received.is_empty() {
427 break;
428 }
429
430 missing_hashes = missing_hashes[received.len()..].to_vec();
432 certs_collected.append(&mut received);
433 }
434 ensure!(
435 missing_hashes.is_empty(),
436 NodeError::MissingCertificates(missing_hashes)
437 );
438 Ok(certs_collected)
439 }
440
441 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
442 async fn download_certificates_by_heights(
443 &self,
444 chain_id: ChainId,
445 heights: Vec<BlockHeight>,
446 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
447 let mut missing = heights;
448 let mut certs_collected = vec![];
449 while !missing.is_empty() {
450 let request = CertificatesByHeightRequest {
451 chain_id,
452 heights: missing.clone(),
453 };
454 let mut received: Vec<ConfirmedBlockCertificate> =
455 client_delegate!(self, download_raw_certificates_by_heights, request)?
456 .certificates
457 .into_iter()
458 .map(
459 |RawCertificate {
460 lite_certificate,
461 confirmed_block,
462 }| {
463 let cert = bcs::from_bytes::<LiteCertificate>(&lite_certificate)
464 .map_err(|_| NodeError::UnexpectedCertificateValue)?;
465
466 let block = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block)
467 .map_err(|_| NodeError::UnexpectedCertificateValue)?;
468
469 cert.with_value(block)
470 .ok_or(NodeError::UnexpectedCertificateValue)
471 },
472 )
473 .collect::<Result<_, _>>()?;
474
475 if received.is_empty() {
476 break;
477 }
478
479 missing = missing[received.len()..].to_vec();
481 certs_collected.append(&mut received);
482 }
483
484 Ok(certs_collected)
485 }
486
487 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
488 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
489 Ok(client_delegate!(self, blob_last_used_by, blob_id)?.try_into()?)
490 }
491
492 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
493 async fn blob_last_used_by_certificate(
494 &self,
495 blob_id: BlobId,
496 ) -> Result<ConfirmedBlockCertificate, NodeError> {
497 Ok(client_delegate!(self, blob_last_used_by_certificate, blob_id)?.try_into()?)
498 }
499
500 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
501 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
502 Ok(client_delegate!(self, missing_blob_ids, blob_ids)?.try_into()?)
503 }
504
505 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
506 async fn get_shard_info(
507 &self,
508 chain_id: ChainId,
509 ) -> Result<linera_core::data_types::ShardInfo, NodeError> {
510 let response = client_delegate!(self, get_shard_info, chain_id)?;
511 Ok(linera_core::data_types::ShardInfo {
512 shard_id: response.shard_id as usize,
513 total_shards: response.total_shards as usize,
514 })
515 }
516}