1use std::{
5 collections::BTreeSet,
6 fmt,
7 future::Future,
8 iter,
9 sync::{
10 atomic::{AtomicU32, Ordering},
11 Arc,
12 },
13};
14
15use futures::{future, stream, StreamExt};
16use linera_base::{
17 crypto::CryptoHash,
18 data_types::{BlobContent, BlockHeight, NetworkDescription},
19 ensure,
20 identifiers::{BlobId, ChainId},
21 time::Duration,
22};
23use linera_chain::{
24 data_types::{self},
25 types::{
26 self, Certificate, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27 LiteCertificate, Timeout, ValidatedBlock,
28 },
29};
30use linera_core::{
31 data_types::{CertificatesByHeightRequest, ChainInfoResponse},
32 node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
33 worker::Notification,
34};
35use linera_version::VersionInfo;
36use tonic::{Code, IntoRequest, Request, Status};
37use tracing::{debug, instrument, trace, warn, Level};
38
39use super::{
40 api::{self, validator_node_client::ValidatorNodeClient, SubscriptionRequest},
41 transport, GRPC_MAX_MESSAGE_SIZE,
42};
43#[cfg(feature = "opentelemetry")]
44use crate::propagation::{get_context_with_traffic_type, inject_context};
45use crate::{
46 full_jitter_delay, grpc::api::RawCertificate, HandleConfirmedCertificateRequest,
47 HandleLiteCertRequest, HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
48};
49
50#[derive(Clone)]
51pub struct GrpcClient {
52 address: String,
53 client: ValidatorNodeClient<transport::Channel>,
54 retry_delay: Duration,
55 max_retries: u32,
56 max_backoff: Duration,
57}
58
59impl GrpcClient {
60 pub fn new(
61 address: String,
62 channel: transport::Channel,
63 retry_delay: Duration,
64 max_retries: u32,
65 max_backoff: Duration,
66 ) -> Self {
67 let client = ValidatorNodeClient::new(channel)
68 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
69 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
70 Self {
71 address,
72 client,
73 retry_delay,
74 max_retries,
75 max_backoff,
76 }
77 }
78
79 pub fn address(&self) -> &str {
80 &self.address
81 }
82
83 fn is_retryable(status: &Status) -> bool {
86 match status.code() {
87 Code::DeadlineExceeded | Code::Aborted | Code::Unavailable | Code::Unknown => {
88 trace!("gRPC request interrupted: {status:?}; retrying");
89 true
90 }
91 Code::Ok | Code::Cancelled | Code::ResourceExhausted => {
92 trace!("Unexpected gRPC status: {status:?}; retrying");
93 true
94 }
95 Code::Internal if status.message().contains("h2 protocol error") => {
96 trace!("gRPC connection reset: {status:?}; retrying");
100 true
101 }
102 Code::NotFound => false, Code::InvalidArgument
104 | Code::AlreadyExists
105 | Code::PermissionDenied
106 | Code::FailedPrecondition
107 | Code::OutOfRange
108 | Code::Unimplemented
109 | Code::Internal
110 | Code::DataLoss
111 | Code::Unauthenticated => {
112 trace!("Unexpected gRPC status: {status:?}");
113 false
114 }
115 }
116 }
117
118 async fn delegate<F, Fut, R, S>(
119 &self,
120 f: F,
121 request: impl TryInto<R> + fmt::Debug + Clone,
122 handler: &str,
123 ) -> Result<S, NodeError>
124 where
125 F: Fn(ValidatorNodeClient<transport::Channel>, Request<R>) -> Fut,
126 Fut: Future<Output = Result<tonic::Response<S>, Status>>,
127 R: IntoRequest<R> + Clone,
128 {
129 let mut retry_count = 0;
130 let request_inner = request.try_into().map_err(|_| NodeError::GrpcError {
131 error: "could not convert request to proto".to_string(),
132 })?;
133 loop {
134 #[allow(unused_mut)]
135 let mut request = Request::new(request_inner.clone());
136 #[cfg(feature = "opentelemetry")]
140 inject_context(&get_context_with_traffic_type(), request.metadata_mut());
141 match f(self.client.clone(), request).await {
142 Err(s) if Self::is_retryable(&s) && retry_count < self.max_retries => {
143 let delay = full_jitter_delay(self.retry_delay, retry_count, self.max_backoff);
144 retry_count += 1;
145 linera_base::time::timer::sleep(delay).await;
146 continue;
147 }
148 Err(s) => {
149 return Err(NodeError::GrpcError {
150 error: format!("remote request [{handler}] failed with status: {s:?}"),
151 });
152 }
153 Ok(result) => return Ok(result.into_inner()),
154 };
155 }
156 }
157
158 fn try_into_chain_info(
159 result: api::ChainInfoResult,
160 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
161 let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
162 error: "missing body from response".to_string(),
163 })?;
164 match inner {
165 api::chain_info_result::Inner::ChainInfoResponse(response) => {
166 Ok(response.try_into().map_err(|err| NodeError::GrpcError {
167 error: format!("failed to unmarshal response: {}", err),
168 })?)
169 }
170 api::chain_info_result::Inner::Error(error) => Err(bincode::deserialize(&error)
171 .map_err(|err| NodeError::GrpcError {
172 error: format!("failed to unmarshal error message: {}", err),
173 })?),
174 }
175 }
176}
177
178impl TryFrom<api::PendingBlobResult> for BlobContent {
179 type Error = NodeError;
180
181 fn try_from(result: api::PendingBlobResult) -> Result<Self, Self::Error> {
182 let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
183 error: "missing body from response".to_string(),
184 })?;
185 match inner {
186 api::pending_blob_result::Inner::Blob(blob) => {
187 Ok(blob.try_into().map_err(|err| NodeError::GrpcError {
188 error: format!("failed to unmarshal response: {}", err),
189 })?)
190 }
191 api::pending_blob_result::Inner::Error(error) => Err(bincode::deserialize(&error)
192 .map_err(|err| NodeError::GrpcError {
193 error: format!("failed to unmarshal error message: {}", err),
194 })?),
195 }
196 }
197}
198
199macro_rules! client_delegate {
200 ($self:ident, $handler:ident, $req:ident) => {{
201 debug!(
202 handler = stringify!($handler),
203 request = ?$req,
204 "sending gRPC request"
205 );
206 $self
207 .delegate(
208 |mut client, req| async move { client.$handler(req).await },
209 $req,
210 stringify!($handler),
211 )
212 .await
213 }};
214}
215
216impl ValidatorNode for GrpcClient {
217 type NotificationStream = NotificationStream;
218
219 fn address(&self) -> String {
220 self.address.clone()
221 }
222
223 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
224 async fn handle_block_proposal(
225 &self,
226 proposal: data_types::BlockProposal,
227 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
228 GrpcClient::try_into_chain_info(client_delegate!(self, handle_block_proposal, proposal)?)
229 }
230
231 #[instrument(target = "grpc_client", skip_all, fields(address = self.address))]
232 async fn handle_lite_certificate(
233 &self,
234 certificate: types::LiteCertificate<'_>,
235 delivery: CrossChainMessageDelivery,
236 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
237 let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
238 let request = HandleLiteCertRequest {
239 certificate,
240 wait_for_outgoing_messages,
241 };
242 GrpcClient::try_into_chain_info(client_delegate!(self, handle_lite_certificate, request)?)
243 }
244
245 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
246 async fn handle_confirmed_certificate(
247 &self,
248 certificate: GenericCertificate<ConfirmedBlock>,
249 delivery: CrossChainMessageDelivery,
250 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
251 let wait_for_outgoing_messages: bool = delivery.wait_for_outgoing_messages();
252 let request = HandleConfirmedCertificateRequest {
253 certificate,
254 wait_for_outgoing_messages,
255 };
256 GrpcClient::try_into_chain_info(client_delegate!(
257 self,
258 handle_confirmed_certificate,
259 request
260 )?)
261 }
262
263 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
264 async fn handle_validated_certificate(
265 &self,
266 certificate: GenericCertificate<ValidatedBlock>,
267 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
268 let request = HandleValidatedCertificateRequest { certificate };
269 GrpcClient::try_into_chain_info(client_delegate!(
270 self,
271 handle_validated_certificate,
272 request
273 )?)
274 }
275
276 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
277 async fn handle_timeout_certificate(
278 &self,
279 certificate: GenericCertificate<Timeout>,
280 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
281 let request = HandleTimeoutCertificateRequest { certificate };
282 GrpcClient::try_into_chain_info(client_delegate!(
283 self,
284 handle_timeout_certificate,
285 request
286 )?)
287 }
288
289 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
290 async fn handle_chain_info_query(
291 &self,
292 query: linera_core::data_types::ChainInfoQuery,
293 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
294 GrpcClient::try_into_chain_info(client_delegate!(self, handle_chain_info_query, query)?)
295 }
296
297 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
298 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError> {
299 let retry_delay = self.retry_delay;
300 let max_retries = self.max_retries;
301 let max_backoff = self.max_backoff;
302 let retry_count = Arc::new(AtomicU32::new(0));
304 let subscription_request = SubscriptionRequest {
305 chain_ids: chains.into_iter().map(|chain| chain.into()).collect(),
306 };
307 let mut client = self.client.clone();
308
309 let mut stream = Some(
311 client
312 .subscribe(subscription_request.clone())
313 .await
314 .map_err(|status| NodeError::SubscriptionFailed {
315 status: status.to_string(),
316 })?
317 .into_inner(),
318 );
319
320 let retry_count_for_unfold = retry_count.clone();
323 let endlessly_retrying_notification_stream = stream::unfold((), move |()| {
324 let mut client = client.clone();
325 let subscription_request = subscription_request.clone();
326 let mut stream = stream.take();
327 let retry_count = retry_count_for_unfold.clone();
328 async move {
329 let stream = if let Some(stream) = stream.take() {
330 future::Either::Right(stream)
331 } else {
332 match client.subscribe(subscription_request.clone()).await {
333 Err(err) => future::Either::Left(stream::iter(iter::once(Err(err)))),
334 Ok(response) => {
335 retry_count.store(0, Ordering::Relaxed);
337 trace!("Successfully reconnected subscription stream");
338 future::Either::Right(response.into_inner())
339 }
340 }
341 };
342 Some((stream, ()))
343 }
344 })
345 .flatten();
346
347 let span = tracing::info_span!("notification stream");
348 let notification_stream = endlessly_retrying_notification_stream
351 .map(|result| {
352 Option::<Notification>::try_from(result?).map_err(|err| {
353 let message = format!("Could not deserialize notification: {}", err);
354 tonic::Status::new(Code::Internal, message)
355 })
356 })
357 .take_while(move |result| {
358 let Err(status) = result else {
359 retry_count.store(0, Ordering::Relaxed);
360 return future::Either::Left(future::ready(true));
361 };
362
363 let current_retry_count = retry_count.load(Ordering::Relaxed);
364 if !span.in_scope(|| Self::is_retryable(status))
365 || current_retry_count >= max_retries
366 {
367 return future::Either::Left(future::ready(false));
368 }
369 let delay = full_jitter_delay(retry_delay, current_retry_count, max_backoff);
370 retry_count.fetch_add(1, Ordering::Relaxed);
371 future::Either::Right(async move {
372 linera_base::time::timer::sleep(delay).await;
373 true
374 })
375 })
376 .filter_map(|result| {
377 future::ready(match result {
378 Ok(notification @ Some(_)) => notification,
379 Ok(None) => None,
380 Err(err) => {
381 warn!("{}", err);
382 None
383 }
384 })
385 });
386
387 Ok(Box::pin(notification_stream))
388 }
389
390 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
391 async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
392 let req = ();
393 Ok(client_delegate!(self, get_version_info, req)?.into())
394 }
395
396 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
397 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
398 let req = ();
399 Ok(client_delegate!(self, get_network_description, req)?.try_into()?)
400 }
401
402 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
403 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
404 Ok(client_delegate!(self, upload_blob, content)?.try_into()?)
405 }
406
407 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
408 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
409 Ok(client_delegate!(self, download_blob, blob_id)?.try_into()?)
410 }
411
412 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
413 async fn download_pending_blob(
414 &self,
415 chain_id: ChainId,
416 blob_id: BlobId,
417 ) -> Result<BlobContent, NodeError> {
418 let req = (chain_id, blob_id);
419 client_delegate!(self, download_pending_blob, req)?.try_into()
420 }
421
422 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
423 async fn handle_pending_blob(
424 &self,
425 chain_id: ChainId,
426 blob: BlobContent,
427 ) -> Result<ChainInfoResponse, NodeError> {
428 let req = (chain_id, blob);
429 GrpcClient::try_into_chain_info(client_delegate!(self, handle_pending_blob, req)?)
430 }
431
432 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
433 async fn download_certificate(
434 &self,
435 hash: CryptoHash,
436 ) -> Result<ConfirmedBlockCertificate, NodeError> {
437 ConfirmedBlockCertificate::try_from(Certificate::try_from(client_delegate!(
438 self,
439 download_certificate,
440 hash
441 )?)?)
442 .map_err(|_| NodeError::UnexpectedCertificateValue)
443 }
444
445 #[instrument(target = "grpc_client", skip_all, err(level = Level::WARN), fields(address = self.address))]
446 async fn download_certificates(
447 &self,
448 hashes: Vec<CryptoHash>,
449 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
450 let mut missing_hashes = hashes;
451 let mut certs_collected = Vec::with_capacity(missing_hashes.len());
452 while !missing_hashes.is_empty() {
453 let missing = missing_hashes.clone();
455 let mut received: Vec<_> = Vec::<Certificate>::try_from(client_delegate!(
456 self,
457 download_certificates,
458 missing
459 )?)?
460 .into_iter()
461 .map(|cert| {
462 ConfirmedBlockCertificate::try_from(cert)
463 .map_err(|_| NodeError::UnexpectedCertificateValue)
464 })
465 .collect::<Result<_, _>>()?;
466
467 if received.is_empty() {
469 break;
470 }
471
472 missing_hashes = missing_hashes[received.len()..].to_vec();
474 certs_collected.append(&mut received);
475 }
476 ensure!(
477 missing_hashes.is_empty(),
478 NodeError::MissingCertificates(missing_hashes)
479 );
480 Ok(certs_collected)
481 }
482
483 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
484 async fn download_certificates_by_heights(
485 &self,
486 chain_id: ChainId,
487 heights: Vec<BlockHeight>,
488 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
489 let mut missing = heights.into_iter().collect::<BTreeSet<_>>();
490 let mut certs_collected = vec![];
491 while !missing.is_empty() {
492 let request = CertificatesByHeightRequest {
493 chain_id,
494 heights: missing.iter().copied().collect(),
495 };
496 let mut received: Vec<_> =
497 client_delegate!(self, download_raw_certificates_by_heights, request)?
498 .certificates
499 .into_iter()
500 .map(
501 |RawCertificate {
502 lite_certificate,
503 confirmed_block,
504 }| {
505 let cert = bcs::from_bytes::<LiteCertificate>(&lite_certificate)
506 .map_err(|_| NodeError::UnexpectedCertificateValue)?;
507
508 let block = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block)
509 .map_err(|_| NodeError::UnexpectedCertificateValue)?;
510
511 cert.with_value(block)
512 .ok_or(NodeError::UnexpectedCertificateValue)
513 },
514 )
515 .collect::<Result<_, _>>()?;
516
517 if received.is_empty() {
518 break;
519 }
520
521 for cert in &received {
523 missing.remove(&cert.inner().height());
524 }
525 certs_collected.append(&mut received);
526 }
527 certs_collected.sort_by_key(|cert| cert.inner().height());
528 Ok(certs_collected)
529 }
530
531 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
532 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
533 Ok(client_delegate!(self, blob_last_used_by, blob_id)?.try_into()?)
534 }
535
536 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
537 async fn blob_last_used_by_certificate(
538 &self,
539 blob_id: BlobId,
540 ) -> Result<ConfirmedBlockCertificate, NodeError> {
541 Ok(client_delegate!(self, blob_last_used_by_certificate, blob_id)?.try_into()?)
542 }
543
544 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
545 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
546 Ok(client_delegate!(self, missing_blob_ids, blob_ids)?.try_into()?)
547 }
548
549 #[instrument(target = "grpc_client", skip(self), err(level = Level::WARN), fields(address = self.address))]
550 async fn get_shard_info(
551 &self,
552 chain_id: ChainId,
553 ) -> Result<linera_core::data_types::ShardInfo, NodeError> {
554 let response = client_delegate!(self, get_shard_info, chain_id)?;
555 Ok(linera_core::data_types::ShardInfo {
556 shard_id: response.shard_id as usize,
557 total_shards: response.total_shards as usize,
558 })
559 }
560}