1use std::{
5 collections::BTreeSet,
6 fmt,
7 future::Future,
8 iter,
9 sync::{
10 atomic::{AtomicU32, Ordering},
11 Arc,
12 },
13};
14
15use futures::{future, stream, StreamExt};
16use linera_base::{
17 crypto::CryptoHash,
18 data_types::{BlobContent, BlockHeight, NetworkDescription},
19 ensure,
20 identifiers::{BlobId, ChainId, EventId},
21 time::{Duration, Instant},
22};
23use linera_chain::{
24 data_types::{self},
25 types::{
26 self, Certificate, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
27 LiteCertificate, Timeout, ValidatedBlock,
28 },
29};
30#[cfg(with_metrics)]
31mod metrics {
32 use std::sync::LazyLock;
33
34 use linera_base::prometheus_util::register_int_counter_vec;
35 use prometheus::IntCounterVec;
36
37 pub static VALIDATOR_SUBSCRIPTION_ERRORS: LazyLock<IntCounterVec> = LazyLock::new(|| {
38 register_int_counter_vec(
39 "validator_subscription_errors",
40 "Number of notification subscription stream errors per validator",
41 &["address"],
42 )
43 });
44}
45
46use linera_core::{
47 data_types::{CertificatesByHeightRequest, ChainInfoResponse},
48 node::{BlobStream, CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode},
49 worker::Notification,
50};
51use linera_version::VersionInfo;
52use tonic::{Code, IntoRequest, Request, Status};
53use tracing::{debug, instrument, trace, Level};
54
55use super::{
56 api::{self, validator_node_client::ValidatorNodeClient, SubscriptionRequest},
57 transport, GRPC_MAX_MESSAGE_SIZE,
58};
59#[cfg(feature = "opentelemetry")]
60use crate::propagation::{get_context_with_traffic_type, inject_context};
61use crate::{
62 grpc::api::RawCertificate, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
63 HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
64};
65
66#[derive(Clone)]
67pub struct GrpcClient {
68 address: String,
69 client: ValidatorNodeClient<transport::Channel>,
70 retry_delay: Duration,
71 max_retries: u32,
72 max_backoff: Duration,
73 subscription_cooldowns: papaya::HashMap<String, Instant>,
77}
78
79impl GrpcClient {
80 pub fn new(
81 address: String,
82 channel: transport::Channel,
83 retry_delay: Duration,
84 max_retries: u32,
85 max_backoff: Duration,
86 subscription_cooldowns: papaya::HashMap<String, Instant>,
87 ) -> Self {
88 let client = ValidatorNodeClient::new(channel)
89 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
90 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
91 Self {
92 address,
93 client,
94 retry_delay,
95 max_retries,
96 max_backoff,
97 subscription_cooldowns,
98 }
99 }
100
101 pub fn address(&self) -> &str {
102 &self.address
103 }
104
105 fn is_retryable(status: &Status) -> bool {
108 match status.code() {
109 Code::DeadlineExceeded | Code::Aborted | Code::Unavailable | Code::Unknown => {
110 trace!("gRPC request interrupted: {status:?}; retrying");
111 true
112 }
113 Code::Ok | Code::Cancelled | Code::ResourceExhausted => {
114 trace!("Unexpected gRPC status: {status:?}; retrying");
115 true
116 }
117 Code::Internal if status.message().contains("h2 protocol error") => {
118 trace!("gRPC connection reset: {status:?}; retrying");
122 true
123 }
124 Code::Internal if status.message().contains("502 Bad Gateway") => {
125 trace!("gRPC proxy error (502): {status:?}; retrying");
131 true
132 }
133 Code::NotFound => false, Code::InvalidArgument
135 | Code::AlreadyExists
136 | Code::PermissionDenied
137 | Code::FailedPrecondition
138 | Code::OutOfRange
139 | Code::Unimplemented
140 | Code::Internal
141 | Code::DataLoss
142 | Code::Unauthenticated => {
143 trace!("Unexpected gRPC status: {status:?}");
144 false
145 }
146 }
147 }
148
149 async fn delegate<F, Fut, R, S>(
150 &self,
151 f: F,
152 request: impl TryInto<R> + fmt::Debug + Clone,
153 handler: &str,
154 ) -> Result<S, NodeError>
155 where
156 F: Fn(ValidatorNodeClient<transport::Channel>, Request<R>) -> Fut,
157 Fut: Future<Output = Result<tonic::Response<S>, Status>>,
158 R: IntoRequest<R> + Clone,
159 {
160 let mut retry_count = 0;
161 let request_inner = request.try_into().map_err(|_| NodeError::GrpcError {
162 error: "could not convert request to proto".to_string(),
163 })?;
164 loop {
165 #[allow(unused_mut)]
166 let mut request = Request::new(request_inner.clone());
167 #[cfg(feature = "opentelemetry")]
171 inject_context(&get_context_with_traffic_type(), request.metadata_mut());
172 match f(self.client.clone(), request).await {
173 Err(s) if Self::is_retryable(&s) && retry_count < self.max_retries => {
174 let delay = crate::jittered_backoff_delay(
175 self.retry_delay,
176 retry_count,
177 self.max_backoff,
178 );
179 retry_count += 1;
180 linera_base::time::timer::sleep(delay).await;
181 continue;
182 }
183 Err(s) => {
184 return Err(NodeError::GrpcError {
185 error: format!("remote request [{handler}] failed with status: {s:?}"),
186 });
187 }
188 Ok(result) => return Ok(result.into_inner()),
189 };
190 }
191 }
192
193 fn try_into_chain_info(
194 result: api::ChainInfoResult,
195 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
196 let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
197 error: "missing body from response".to_string(),
198 })?;
199 match inner {
200 api::chain_info_result::Inner::ChainInfoResponse(response) => {
201 Ok(response.try_into().map_err(|err| NodeError::GrpcError {
202 error: format!("failed to unmarshal response: {err}"),
203 })?)
204 }
205 api::chain_info_result::Inner::Error(error) => Err(bincode::deserialize(&error)
206 .map_err(|err| NodeError::GrpcError {
207 error: format!("failed to unmarshal error message: {err}"),
208 })?),
209 }
210 }
211}
212
213impl TryFrom<api::PendingBlobResult> for BlobContent {
214 type Error = NodeError;
215
216 fn try_from(result: api::PendingBlobResult) -> Result<Self, Self::Error> {
217 let inner = result.inner.ok_or_else(|| NodeError::GrpcError {
218 error: "missing body from response".to_string(),
219 })?;
220 match inner {
221 api::pending_blob_result::Inner::Blob(blob) => {
222 Ok(blob.try_into().map_err(|err| NodeError::GrpcError {
223 error: format!("failed to unmarshal response: {err}"),
224 })?)
225 }
226 api::pending_blob_result::Inner::Error(error) => Err(bincode::deserialize(&error)
227 .map_err(|err| NodeError::GrpcError {
228 error: format!("failed to unmarshal error message: {err}"),
229 })?),
230 }
231 }
232}
233
234macro_rules! client_delegate {
235 ($self:ident, $handler:ident, $req:ident) => {{
236 debug!(
237 handler = stringify!($handler),
238 request = ?$req,
239 "sending gRPC request"
240 );
241 $self
242 .delegate(
243 |mut client, req| async move { client.$handler(req).await },
244 $req,
245 stringify!($handler),
246 )
247 .await
248 }};
249}
250
251impl ValidatorNode for GrpcClient {
252 type NotificationStream = NotificationStream;
253
254 fn address(&self) -> String {
255 self.address.clone()
256 }
257
258 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
259 async fn handle_block_proposal(
260 &self,
261 proposal: data_types::BlockProposal,
262 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
263 GrpcClient::try_into_chain_info(client_delegate!(self, handle_block_proposal, proposal)?)
264 }
265
266 #[instrument(target = "grpc_client", skip_all, fields(address = self.address))]
267 async fn handle_lite_certificate(
268 &self,
269 certificate: types::LiteCertificate<'_>,
270 delivery: CrossChainMessageDelivery,
271 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
272 let wait_for_outgoing_messages = delivery.wait_for_outgoing_messages();
273 let request = HandleLiteCertRequest {
274 certificate,
275 wait_for_outgoing_messages,
276 };
277 GrpcClient::try_into_chain_info(client_delegate!(self, handle_lite_certificate, request)?)
278 }
279
280 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
281 async fn handle_confirmed_certificate(
282 &self,
283 certificate: Arc<GenericCertificate<ConfirmedBlock>>,
284 delivery: CrossChainMessageDelivery,
285 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
286 let wait_for_outgoing_messages: bool = delivery.wait_for_outgoing_messages();
287 let request = HandleConfirmedCertificateRequest {
288 certificate: Arc::unwrap_or_clone(certificate),
289 wait_for_outgoing_messages,
290 };
291 GrpcClient::try_into_chain_info(client_delegate!(
292 self,
293 handle_confirmed_certificate,
294 request
295 )?)
296 }
297
298 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
299 async fn handle_validated_certificate(
300 &self,
301 certificate: GenericCertificate<ValidatedBlock>,
302 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
303 let request = HandleValidatedCertificateRequest { certificate };
304 GrpcClient::try_into_chain_info(client_delegate!(
305 self,
306 handle_validated_certificate,
307 request
308 )?)
309 }
310
311 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
312 async fn handle_timeout_certificate(
313 &self,
314 certificate: GenericCertificate<Timeout>,
315 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
316 let request = HandleTimeoutCertificateRequest { certificate };
317 GrpcClient::try_into_chain_info(client_delegate!(
318 self,
319 handle_timeout_certificate,
320 request
321 )?)
322 }
323
324 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
325 async fn handle_chain_info_query(
326 &self,
327 query: linera_core::data_types::ChainInfoQuery,
328 ) -> Result<linera_core::data_types::ChainInfoResponse, NodeError> {
329 GrpcClient::try_into_chain_info(client_delegate!(self, handle_chain_info_query, query)?)
330 }
331
332 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
333 async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError> {
334 let retry_delay = self.retry_delay;
335 let max_retries = self.max_retries;
336 let max_backoff = self.max_backoff;
337 let address = self.address.clone();
338 let subscription_cooldowns = self.subscription_cooldowns.clone();
339
340 {
343 let pinned = subscription_cooldowns.pin();
344 if let Some(&last_failure) = pinned.get(&address) {
345 if last_failure.elapsed() < max_backoff {
346 return Err(NodeError::SubscriptionFailed {
347 status: format!(
348 "validator {address} on cooldown after recent subscription failure"
349 ),
350 });
351 }
352 }
353 }
354
355 let retry_count = Arc::new(AtomicU32::new(0));
357 let subscription_request = SubscriptionRequest {
358 chain_ids: chains.into_iter().map(|chain| chain.into()).collect(),
359 };
360 let mut client = self.client.clone();
361
362 let mut stream = Some(
364 client
365 .subscribe(subscription_request.clone())
366 .await
367 .map_err(|status| {
368 subscription_cooldowns
369 .pin()
370 .insert(address.clone(), Instant::now());
371 NodeError::SubscriptionFailed {
372 status: status.to_string(),
373 }
374 })?
375 .into_inner(),
376 );
377
378 let retry_count_for_unfold = retry_count.clone();
381 let cooldowns_for_unfold = subscription_cooldowns.clone();
382 let address_for_unfold = address.clone();
383 let endlessly_retrying_notification_stream = stream::unfold((), move |()| {
384 let mut client = client.clone();
385 let subscription_request = subscription_request.clone();
386 let mut stream = stream.take();
387 let retry_count = retry_count_for_unfold.clone();
388 let cooldowns = cooldowns_for_unfold.clone();
389 let cooldown_address = address_for_unfold.clone();
390 async move {
391 let stream = if let Some(stream) = stream.take() {
392 future::Either::Right(stream)
393 } else {
394 match client.subscribe(subscription_request.clone()).await {
395 Err(err) => future::Either::Left(stream::iter(iter::once(Err(err)))),
396 Ok(response) => {
397 retry_count.store(0, Ordering::Relaxed);
399 cooldowns.pin().remove(&cooldown_address);
400 trace!("Successfully reconnected subscription stream");
401 future::Either::Right(response.into_inner())
402 }
403 }
404 };
405 Some((stream, ()))
406 }
407 })
408 .flatten();
409
410 let span = tracing::info_span!("notification stream");
411 #[cfg(with_metrics)]
412 let address_for_metrics = self.address.clone();
413 let cooldowns_for_take_while = subscription_cooldowns;
414 let address_for_take_while = self.address.clone();
415 let notification_stream = endlessly_retrying_notification_stream
418 .map(|result| {
419 Option::<Notification>::try_from(result?).map_err(|err| {
420 let message = format!("Could not deserialize notification: {err}");
421 tonic::Status::new(Code::Internal, message)
422 })
423 })
424 .take_while(move |result| {
425 let Err(status) = result else {
426 retry_count.store(0, Ordering::Relaxed);
427 return future::Either::Left(future::ready(true));
428 };
429
430 #[cfg(with_metrics)]
431 metrics::VALIDATOR_SUBSCRIPTION_ERRORS
432 .with_label_values(&[&address_for_metrics])
433 .inc();
434
435 let current_retry_count = retry_count.load(Ordering::Relaxed);
436 if !span.in_scope(|| Self::is_retryable(status))
437 || current_retry_count >= max_retries
438 {
439 cooldowns_for_take_while
440 .pin()
441 .insert(address_for_take_while.clone(), Instant::now());
442 return future::Either::Left(future::ready(false));
443 }
444 let delay =
445 crate::jittered_backoff_delay(retry_delay, current_retry_count, max_backoff);
446 retry_count.fetch_add(1, Ordering::Relaxed);
447 future::Either::Right(async move {
448 linera_base::time::timer::sleep(delay).await;
449 true
450 })
451 })
452 .filter_map(move |result| {
453 future::ready(match result {
454 Ok(notification @ Some(_)) => notification,
455 Ok(None) => None,
456 Err(err) => {
457 debug!(%address, "{}", err);
458 None
459 }
460 })
461 });
462
463 Ok(Box::pin(notification_stream))
464 }
465
466 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
467 async fn get_version_info(&self) -> Result<VersionInfo, NodeError> {
468 let req = ();
469 Ok(client_delegate!(self, get_version_info, req)?.into())
470 }
471
472 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
473 async fn get_network_description(&self) -> Result<NetworkDescription, NodeError> {
474 let req = ();
475 Ok(client_delegate!(self, get_network_description, req)?.try_into()?)
476 }
477
478 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
479 async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
480 Ok(client_delegate!(self, upload_blob, content)?.try_into()?)
481 }
482
483 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
484 async fn download_blob(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
485 Ok(client_delegate!(self, download_blob, blob_id)?.try_into()?)
486 }
487
488 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
489 async fn download_blobs(&self, blob_ids: Vec<BlobId>) -> Result<BlobStream, NodeError> {
490 debug!(
491 handler = "download_blobs",
492 num_blobs = blob_ids.len(),
493 "sending gRPC request"
494 );
495 let request = api::BlobIds::try_from(blob_ids)?;
496 let stream = self
497 .client
498 .clone()
499 .download_blobs(request)
500 .await
501 .map_err(|status| NodeError::GrpcError {
502 error: status.to_string(),
503 })?
504 .into_inner();
505 let blob_stream = stream.map(|result| match result {
506 Ok(proto_blob) => BlobContent::try_from(proto_blob).map_err(NodeError::from),
507 Err(status) => Err(NodeError::GrpcError {
508 error: status.to_string(),
509 }),
510 });
511 Ok(Box::pin(blob_stream))
512 }
513
514 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
515 async fn download_pending_blob(
516 &self,
517 chain_id: ChainId,
518 blob_id: BlobId,
519 ) -> Result<BlobContent, NodeError> {
520 let req = (chain_id, blob_id);
521 client_delegate!(self, download_pending_blob, req)?.try_into()
522 }
523
524 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
525 async fn handle_pending_blob(
526 &self,
527 chain_id: ChainId,
528 blob: BlobContent,
529 ) -> Result<ChainInfoResponse, NodeError> {
530 let req = (chain_id, blob);
531 GrpcClient::try_into_chain_info(client_delegate!(self, handle_pending_blob, req)?)
532 }
533
534 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
535 async fn download_certificate(
536 &self,
537 hash: CryptoHash,
538 ) -> Result<ConfirmedBlockCertificate, NodeError> {
539 ConfirmedBlockCertificate::try_from(Certificate::try_from(client_delegate!(
540 self,
541 download_certificate,
542 hash
543 )?)?)
544 .map_err(|_| NodeError::UnexpectedCertificateValue)
545 }
546
547 #[instrument(target = "grpc_client", skip_all, err(level = Level::DEBUG), fields(address = self.address))]
548 async fn download_certificates(
549 &self,
550 hashes: Vec<CryptoHash>,
551 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
552 let mut missing_hashes = hashes;
553 let mut certs_collected = Vec::with_capacity(missing_hashes.len());
554 while !missing_hashes.is_empty() {
555 let missing = missing_hashes.clone();
557 let mut received: Vec<_> = Vec::<Certificate>::try_from(client_delegate!(
558 self,
559 download_certificates,
560 missing
561 )?)?
562 .into_iter()
563 .map(|cert| {
564 ConfirmedBlockCertificate::try_from(cert)
565 .map_err(|_| NodeError::UnexpectedCertificateValue)
566 })
567 .collect::<Result<_, _>>()?;
568
569 if received.is_empty() {
571 break;
572 }
573
574 missing_hashes = missing_hashes[received.len()..].to_vec();
576 certs_collected.append(&mut received);
577 }
578 ensure!(
579 missing_hashes.is_empty(),
580 NodeError::MissingCertificates(missing_hashes)
581 );
582 Ok(certs_collected)
583 }
584
585 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
586 async fn download_certificates_by_heights(
587 &self,
588 chain_id: ChainId,
589 heights: Vec<BlockHeight>,
590 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
591 let mut missing = heights.into_iter().collect::<BTreeSet<_>>();
592 let mut certs_collected = vec![];
593 while !missing.is_empty() {
594 let request = CertificatesByHeightRequest {
595 chain_id,
596 heights: missing.iter().copied().collect(),
597 };
598 let mut received: Vec<_> =
599 client_delegate!(self, download_raw_certificates_by_heights, request)?
600 .certificates
601 .into_iter()
602 .map(
603 |RawCertificate {
604 lite_certificate,
605 confirmed_block,
606 }| {
607 let cert = bcs::from_bytes::<LiteCertificate>(&lite_certificate)
608 .map_err(|_| NodeError::UnexpectedCertificateValue)?;
609
610 let block = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block)
611 .map_err(|_| NodeError::UnexpectedCertificateValue)?;
612
613 cert.with_value(block)
614 .ok_or(NodeError::UnexpectedCertificateValue)
615 },
616 )
617 .collect::<Result<_, _>>()?;
618
619 if received.is_empty() {
620 break;
621 }
622
623 for cert in &received {
625 missing.remove(&cert.inner().height());
626 }
627 certs_collected.append(&mut received);
628 }
629 certs_collected.sort_by_key(|cert| cert.inner().height());
630 Ok(certs_collected)
631 }
632
633 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
634 async fn blob_last_used_by(&self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
635 Ok(client_delegate!(self, blob_last_used_by, blob_id)?.try_into()?)
636 }
637
638 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
639 async fn blob_last_used_by_certificate(
640 &self,
641 blob_id: BlobId,
642 ) -> Result<ConfirmedBlockCertificate, NodeError> {
643 Ok(client_delegate!(self, blob_last_used_by_certificate, blob_id)?.try_into()?)
644 }
645
646 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
647 async fn event_block_heights(
648 &self,
649 event_ids: Vec<EventId>,
650 ) -> Result<Vec<Option<BlockHeight>>, NodeError> {
651 let request = api::EventBlockHeightsRequest::from(event_ids);
652 Ok(client_delegate!(self, event_block_heights, request)?.try_into()?)
653 }
654
655 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
656 async fn missing_blob_ids(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, NodeError> {
657 Ok(client_delegate!(self, missing_blob_ids, blob_ids)?.try_into()?)
658 }
659
660 #[instrument(target = "grpc_client", skip(self), err(level = Level::DEBUG), fields(address = self.address))]
661 async fn get_shard_info(
662 &self,
663 chain_id: ChainId,
664 ) -> Result<linera_core::data_types::ShardInfo, NodeError> {
665 let response = client_delegate!(self, get_shard_info, chain_id)?;
666 Ok(linera_core::data_types::ShardInfo {
667 shard_id: response.shard_id as usize,
668 total_shards: response.total_shards as usize,
669 })
670 }
671}