1use std::{
5 collections::BTreeSet,
6 fmt,
7 future::Future,
8 iter,
9 sync::{
10 atomic::{AtomicU32, Ordering},
11 Arc,
12 },
13};
14
15use futures::{future, stream, StreamExt};
16use linera_base::{
17 crypto::CryptoHash,
18 data_types::{BlobContent, BlockHeight, NetworkDescription},
19 ensure,
20 identifiers::{BlobId, ChainId, EventId},
21 time::{Duration, Instant},
22};
23use linera_chain::{
24 data_types::{self},
25 types::{
26 self, Certificate, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27 LiteCertificate, Timeout, ValidatedBlock,
28 },
29};
30#[cfg(with_metrics)]
31mod metrics {
32 use std::sync::LazyLock;
33
34 use linera_base::prometheus_util::register_int_counter_vec;
35 use prometheus::IntCounterVec;
36
37 pub static VALIDATOR_SUBSCRIPTION_ERRORS: LazyLock<IntCounterVec> = LazyLock::new(|| {
38 register_int_counter_vec(
39 "validator_subscription_errors",
40 "Number of notification subscription stream errors per validator",
41 &["address"],
42 )
43 });
44}
45
46use linera_core::{
47 data_types::{CertificatesByHeightRequest, ChainInfoResponse},
48 node::{BlobStream, CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
49 worker::Notification,
50};
51use linera_storage::Arc as CacheArc;
52use linera_version::VersionInfo;
53use tonic::{Code, IntoRequest, Request, Status};
54use tracing::{debug, instrument, trace, Level};
55
56use super::{
57 api::{self, validator_node_client::ValidatorNodeClient, SubscriptionRequest},
58 transport, GRPC_MAX_MESSAGE_SIZE,
59};
60#[cfg(feature = "opentelemetry")]
61use crate::propagation::{get_context_with_traffic_type, inject_context};
62use crate::{
63 grpc::api::RawCertificate, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
64 HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
65};
66
67#[derive(Clone)]
68pub struct GrpcClient {
69 address: String,
70 client: ValidatorNodeClient<transport::Channel>,
71 retry_delay: Duration,
72 max_retries: u32,
73 max_backoff: Duration,
74 subscription_cooldowns: Arc<papaya::HashMap<String, Instant>>,
78}
79
80impl GrpcClient {
81 pub fn new(
82 address: String,
83 channel: transport::Channel,
84 retry_delay: Duration,
85 max_retries: u32,
86 max_backoff: Duration,
87 subscription_cooldowns: Arc<papaya::HashMap<String, Instant>>,
88 ) -> Self {
89 let client = ValidatorNodeClient::new(channel)
90 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
91 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
92 Self {
93 address,
94 client,
95 retry_delay,
96 max_retries,
97 max_backoff,
98 subscription_cooldowns,
99 }
100 }
101
102 pub fn address(&self) -> &str {
103 &self.address
104 }
105
106 fn is_retryable(status: &Status) -> bool {
109 match status.code() {
110 Code::DeadlineExceeded | Code::Aborted | Code::Unavailable | Code::Unknown => {
111 trace!("gRPC request interrupted: {status:?}; retrying");
112 true
113 }
114 Code::Ok | Code::Cancelled | Code::ResourceExhausted => {
115 trace!("Unexpected gRPC status: {status:?}; retrying");
116 true
117 }
118 Code::Internal if status.message().contains("h2 protocol error") => {
119 trace!("gRPC connection reset: {status:?}; retrying");
123 true
124 }
125 Code::Internal if status.message().contains("502 Bad Gateway") => {
126 trace!("gRPC proxy error (502): {status:?}; retrying");
132 true
133 }
134 Code::NotFound => false, Code::InvalidArgument
136 | Code::AlreadyExists
137 | Code::PermissionDenied
138 | Code::FailedPrecondition
139 | Code::OutOfRange
140 | Code::Unimplemented
141 | Code::Internal
142 | Code::DataLoss
143 | Code::Unauthenticated => {
144 trace!("Unexpected gRPC status: {status:?}");
145 false
146 }
147 }
148 }
149
150 async fn delegate<F, Fut, R, S>(
151 &self,
152 f: F,
153 request: impl TryInto<R> + fmt::Debug + Clone,
154 handler: &str,
155 ) -> Result<S, NodeError>
156 where
157 F: Fn(ValidatorNodeClient<transport::Channel>, Request<R>) -> Fut,
158 Fut: Future<Output = Result<tonic::Response<S>, Status>>,
159 R: IntoRequest<R> + Clone,
160 {
161 let mut retry_count = 0;
162 let request_inner = request.try_into().map_err(|_| NodeError::GrpcError {
163 error: "could not convert request to proto".to_string(),
164 })?;
165 loop {
166 #[allow(unused_mut)]
167 let mut request = Request::new(request_inner.clone());
168 #[cfg(feature = "opentelemetry")]
172 inject_context(&get_context_with_traffic_type(), request.metadata_mut());
173 match f(self.client.clone(), request).await {
174 Err(s) if Self::is_retryable(&s) && retry_count < self.max_retries => {
175 let delay = crate::jittered_backoff_delay(
176 self.retry_delay,
177 retry_count,
178 self.max_backoff,
179 );
180 retry_count += 1;
181 linera_base::time::timer::sleep(delay).await;
182 continue;
183 }
184 Err(s) => {
185 return Err(NodeError::GrpcError {
186 error: format!("remote request [{handler}] failed with status: {s:?}"),
187 });
188 }
189 Ok(result) => return Ok(result.into_inner()),
190 };
191 }
192 }
193
194 fn try_into_chain_info(
195 result: api::ChainInfoResult,
196 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
197 let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
198 error: "missing body from response".to_string(),
199 })?;
200 match inner {
201 api::chain_info_result::Inner::ChainInfoResponse(response) => {
202 Ok(response.try_into().map_err(|err| NodeError::GrpcError {
203 error: format!("failed to unmarshal response: {err}"),
204 })?)
205 }
206 api::chain_info_result::Inner::Error(error) => Err(bincode::deserialize(&error)
207 .map_err(|err| NodeError::GrpcError {
208 error: format!("failed to unmarshal error message: {err}"),
209 })?),
210 }
211 }
212}
213
214impl TryFrom<api::PendingBlobResult> for BlobContent {
215 type Error = NodeError;
216
217 fn try_from(result: api::PendingBlobResult) -> Result<Self, Self::Error> {
218 let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
219 error: "missing body from response".to_string(),
220 })?;
221 match inner {
222 api::pending_blob_result::Inner::Blob(blob) => {
223 Ok(blob.try_into().map_err(|err| NodeError::GrpcError {
224 error: format!("failed to unmarshal response: {err}"),
225 })?)
226 }
227 api::pending_blob_result::Inner::Error(error) => Err(bincode::deserialize(&error)
228 .map_err(|err| NodeError::GrpcError {
229 error: format!("failed to unmarshal error message: {err}"),
230 })?),
231 }
232 }
233}
234
235macro_rules! client_delegate {
236 ($self:ident, $handler:ident, $req:ident) => {{
237 debug!(
238 handler = stringify!($handler),
239 request = ?$req,
240 "sending gRPC request"
241 );
242 $self
243 .delegate(
244 |mut client, req| async move { client.$handler(req).await },
245 $req,
246 stringify!($handler),
247 )
248 .await
249 }};
250}
251
252impl ValidatorNode for GrpcClient {
253 type NotificationStream = NotificationStream;
254
255 fn address(&self) -> String {
256 self.address.clone()
257 }
258
259 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
260 async fn handle_block_proposal(
261 &self,
262 proposal: data_types::BlockProposal,
263 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
264 GrpcClient::try_into_chain_info(client_delegate!(self, handle_block_proposal, proposal)?)
265 }
266
267 #[instrument(target = "grpc_client", skip_all, fields(address = self.address))]
268 async fn handle_lite_certificate(
269 &self,
270 certificate: types::LiteCertificate<'_>,
271 delivery: CrossChainMessageDelivery,
272 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
273 let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
274 let request = HandleLiteCertRequest {
275 certificate,
276 wait_for_outgoing_messages,
277 };
278 GrpcClient::try_into_chain_info(client_delegate!(self, handle_lite_certificate, request)?)
279 }
280
281 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
282 async fn handle_confirmed_certificate(
283 &self,
284 certificate: CacheArc<GenericCertificate<ConfirmedBlock>>,
285 delivery: CrossChainMessageDelivery,
286 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
287 let wait_for_outgoing_messages: bool = delivery.wait_for_outgoing_messages();
288 let request = HandleConfirmedCertificateRequest {
289 certificate: CacheArc::unwrap_or_clone(certificate),
290 wait_for_outgoing_messages,
291 };
292 GrpcClient::try_into_chain_info(client_delegate!(
293 self,
294 handle_confirmed_certificate,
295 request
296 )?)
297 }
298
299 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
300 async fn handle_validated_certificate(
301 &self,
302 certificate: GenericCertificate<ValidatedBlock>,
303 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
304 let request = HandleValidatedCertificateRequest { certificate };
305 GrpcClient::try_into_chain_info(client_delegate!(
306 self,
307 handle_validated_certificate,
308 request
309 )?)
310 }
311
312 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
313 async fn handle_timeout_certificate(
314 &self,
315 certificate: GenericCertificate<Timeout>,
316 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
317 let request = HandleTimeoutCertificateRequest { certificate };
318 GrpcClient::try_into_chain_info(client_delegate!(
319 self,
320 handle_timeout_certificate,
321 request
322 )?)
323 }
324
325 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
326 async fn handle_chain_info_query(
327 &self,
328 query: linera_core::data_types::ChainInfoQuery,
329 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
330 GrpcClient::try_into_chain_info(client_delegate!(self, handle_chain_info_query, query)?)
331 }
332
333 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
334 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError> {
335 let retry_delay = self.retry_delay;
336 let max_retries = self.max_retries;
337 let max_backoff = self.max_backoff;
338 let address = self.address.clone();
339 let subscription_cooldowns = self.subscription_cooldowns.clone();
340
341 {
344 let pinned = subscription_cooldowns.pin();
345 if let Some(&last_failure) = pinned.get(&address) {
346 if last_failure.elapsed() < max_backoff {
347 return Err(NodeError::SubscriptionFailed {
348 status: format!(
349 "validator {address} on cooldown after recent subscription failure"
350 ),
351 });
352 }
353 }
354 }
355
356 let retry_count = Arc::new(AtomicU32::new(0));
358 let subscription_request = SubscriptionRequest {
359 chain_ids: chains.into_iter().map(|chain| chain.into()).collect(),
360 };
361 let mut client = self.client.clone();
362
363 let mut stream = Some(
365 client
366 .subscribe(subscription_request.clone())
367 .await
368 .map_err(|status| {
369 subscription_cooldowns
370 .pin()
371 .insert(address.clone(), Instant::now());
372 NodeError::SubscriptionFailed {
373 status: status.to_string(),
374 }
375 })?
376 .into_inner(),
377 );
378
379 let retry_count_for_unfold = retry_count.clone();
382 let cooldowns_for_unfold = subscription_cooldowns.clone();
383 let address_for_unfold = address.clone();
384 let endlessly_retrying_notification_stream = stream::unfold((), move |()| {
385 let mut client = client.clone();
386 let subscription_request = subscription_request.clone();
387 let mut stream = stream.take();
388 let retry_count = retry_count_for_unfold.clone();
389 let cooldowns = cooldowns_for_unfold.clone();
390 let cooldown_address = address_for_unfold.clone();
391 async move {
392 let stream = if let Some(stream) = stream.take() {
393 future::Either::Right(stream)
394 } else {
395 match client.subscribe(subscription_request.clone()).await {
396 Err(err) => future::Either::Left(stream::iter(iter::once(Err(err)))),
397 Ok(response) => {
398 retry_count.store(0, Ordering::Relaxed);
400 cooldowns.pin().remove(&cooldown_address);
401 trace!("Successfully reconnected subscription stream");
402 future::Either::Right(response.into_inner())
403 }
404 }
405 };
406 Some((stream, ()))
407 }
408 })
409 .flatten();
410
411 let span = tracing::info_span!("notification stream");
412 #[cfg(with_metrics)]
413 let address_for_metrics = self.address.clone();
414 let cooldowns_for_take_while = subscription_cooldowns;
415 let address_for_take_while = self.address.clone();
416 let notification_stream = endlessly_retrying_notification_stream
419 .map(|result| {
420 Option::<Notification>::try_from(result?).map_err(|err| {
421 let message = format!("Could not deserialize notification: {err}");
422 tonic::Status::new(Code::Internal, message)
423 })
424 })
425 .take_while(move |result| {
426 let Err(status) = result else {
427 retry_count.store(0, Ordering::Relaxed);
428 return future::Either::Left(future::ready(true));
429 };
430
431 #[cfg(with_metrics)]
432 metrics::VALIDATOR_SUBSCRIPTION_ERRORS
433 .with_label_values(&[&address_for_metrics])
434 .inc();
435
436 let current_retry_count = retry_count.load(Ordering::Relaxed);
437 if !span.in_scope(|| Self::is_retryable(status))
438 || current_retry_count >= max_retries
439 {
440 cooldowns_for_take_while
441 .pin()
442 .insert(address_for_take_while.clone(), Instant::now());
443 return future::Either::Left(future::ready(false));
444 }
445 let delay =
446 crate::jittered_backoff_delay(retry_delay, current_retry_count, max_backoff);
447 retry_count.fetch_add(1, Ordering::Relaxed);
448 future::Either::Right(async move {
449 linera_base::time::timer::sleep(delay).await;
450 true
451 })
452 })
453 .filter_map(move |result| {
454 future::ready(match result {
455 Ok(notification @ Some(_)) => notification,
456 Ok(None) => None,
457 Err(err) => {
458 debug!(%address, "{}", err);
459 None
460 }
461 })
462 });
463
464 Ok(Box::pin(notification_stream))
465 }
466
467 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
468 async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
469 let req = ();
470 Ok(client_delegate!(self, get_version_info, req)?.into())
471 }
472
473 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
474 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
475 let req = ();
476 Ok(client_delegate!(self, get_network_description, req)?.try_into()?)
477 }
478
479 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
480 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
481 Ok(client_delegate!(self, upload_blob, content)?.try_into()?)
482 }
483
484 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
485 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
486 Ok(client_delegate!(self, download_blob, blob_id)?.try_into()?)
487 }
488
489 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
490 async fn download_blobs(&self, blob_ids: Vec<BlobId>) -> Result<BlobStream, NodeError> {
491 debug!(
492 handler = "download_blobs",
493 num_blobs = blob_ids.len(),
494 "sending gRPC request"
495 );
496 let request = api::BlobIds::try_from(blob_ids)?;
497 let stream = self
498 .client
499 .clone()
500 .download_blobs(request)
501 .await
502 .map_err(|status| NodeError::GrpcError {
503 error: status.to_string(),
504 })?
505 .into_inner();
506 let blob_stream = stream.map(|result| match result {
507 Ok(proto_blob) => BlobContent::try_from(proto_blob).map_err(NodeError::from),
508 Err(status) => Err(NodeError::GrpcError {
509 error: status.to_string(),
510 }),
511 });
512 Ok(Box::pin(blob_stream))
513 }
514
515 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
516 async fn download_pending_blob(
517 &self,
518 chain_id: ChainId,
519 blob_id: BlobId,
520 ) -> Result<BlobContent, NodeError> {
521 let req = (chain_id, blob_id);
522 client_delegate!(self, download_pending_blob, req)?.try_into()
523 }
524
525 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
526 async fn handle_pending_blob(
527 &self,
528 chain_id: ChainId,
529 blob: BlobContent,
530 ) -> Result<ChainInfoResponse, NodeError> {
531 let req = (chain_id, blob);
532 GrpcClient::try_into_chain_info(client_delegate!(self, handle_pending_blob, req)?)
533 }
534
535 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
536 async fn download_certificate(
537 &self,
538 hash: CryptoHash,
539 ) -> Result<ConfirmedBlockCertificate, NodeError> {
540 ConfirmedBlockCertificate::try_from(Certificate::try_from(client_delegate!(
541 self,
542 download_certificate,
543 hash
544 )?)?)
545 .map_err(|_| NodeError::UnexpectedCertificateValue)
546 }
547
548 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
549 async fn download_certificates(
550 &self,
551 hashes: Vec<CryptoHash>,
552 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
553 let mut missing_hashes = hashes;
554 let mut certs_collected = Vec::with_capacity(missing_hashes.len());
555 while !missing_hashes.is_empty() {
556 let missing = missing_hashes.clone();
558 let mut received: Vec<_> = Vec::<Certificate>::try_from(client_delegate!(
559 self,
560 download_certificates,
561 missing
562 )?)?
563 .into_iter()
564 .map(|cert| {
565 ConfirmedBlockCertificate::try_from(cert)
566 .map_err(|_| NodeError::UnexpectedCertificateValue)
567 })
568 .collect::<Result<_, _>>()?;
569
570 if received.is_empty() {
572 break;
573 }
574
575 missing_hashes = missing_hashes[received.len()..].to_vec();
577 certs_collected.append(&mut received);
578 }
579 ensure!(
580 missing_hashes.is_empty(),
581 NodeError::MissingCertificates(missing_hashes)
582 );
583 Ok(certs_collected)
584 }
585
586 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
587 async fn download_certificates_by_heights(
588 &self,
589 chain_id: ChainId,
590 heights: Vec<BlockHeight>,
591 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
592 let mut missing = heights.into_iter().collect::<BTreeSet<_>>();
593 let mut certs_collected = vec![];
594 while !missing.is_empty() {
595 let request = CertificatesByHeightRequest {
596 chain_id,
597 heights: missing.iter().copied().collect(),
598 };
599 let mut received: Vec<_> =
600 client_delegate!(self, download_raw_certificates_by_heights, request)?
601 .certificates
602 .into_iter()
603 .map(
604 |RawCertificate {
605 lite_certificate,
606 confirmed_block,
607 }| {
608 let cert = bcs::from_bytes::<LiteCertificate>(&lite_certificate)
609 .map_err(|_| NodeError::UnexpectedCertificateValue)?;
610
611 let block = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block)
612 .map_err(|_| NodeError::UnexpectedCertificateValue)?;
613
614 cert.with_value(block)
615 .ok_or(NodeError::UnexpectedCertificateValue)
616 },
617 )
618 .collect::<Result<_, _>>()?;
619
620 if received.is_empty() {
621 break;
622 }
623
624 for cert in &received {
626 missing.remove(&cert.inner().height());
627 }
628 certs_collected.append(&mut received);
629 }
630 certs_collected.sort_by_key(|cert| cert.inner().height());
631 Ok(certs_collected)
632 }
633
634 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
635 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
636 Ok(client_delegate!(self, blob_last_used_by, blob_id)?.try_into()?)
637 }
638
639 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
640 async fn blob_last_used_by_certificate(
641 &self,
642 blob_id: BlobId,
643 ) -> Result<ConfirmedBlockCertificate, NodeError> {
644 Ok(client_delegate!(self, blob_last_used_by_certificate, blob_id)?.try_into()?)
645 }
646
647 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
648 async fn event_block_heights(
649 &self,
650 event_ids: Vec<EventId>,
651 ) -> Result<Vec<Option<BlockHeight>>, NodeError> {
652 let request = api::EventBlockHeightsRequest::from(event_ids);
653 Ok(client_delegate!(self, event_block_heights, request)?.try_into()?)
654 }
655
656 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
657 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
658 Ok(client_delegate!(self, missing_blob_ids, blob_ids)?.try_into()?)
659 }
660
661 #[expect(
662 clippy::cast_possible_truncation,
663 reason = "shard counts are bounded by validator config and fit in usize on supported targets"
664 )]
665 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
666 async fn get_shard_info(
667 &self,
668 chain_id: ChainId,
669 ) -> Result<linera_core::data_types::ShardInfo, NodeError> {
670 let response = client_delegate!(self, get_shard_info, chain_id)?;
671 Ok(linera_core::data_types::ShardInfo {
672 shard_id: response.shard_id as usize,
673 total_shards: response.total_shards as usize,
674 })
675 }
676}