1use std::{
5 net::{IpAddr, SocketAddr},
6 str::FromStr,
7 task::{Context, Poll},
8};
9
10use futures::{
11 channel::mpsc, future::BoxFuture, stream::FuturesUnordered, FutureExt as _, StreamExt as _,
12};
13use linera_base::{
14 data_types::Blob,
15 identifiers::ChainId,
16 time::{Duration, Instant},
17};
18use linera_core::{
19 join_set_ext::JoinSet,
20 node::NodeError,
21 worker::{NetworkActions, Notification, Reason, WorkerState},
22 JoinSetExt as _, TaskHandle,
23};
24use linera_storage::Storage;
25use tokio::sync::{broadcast::error::RecvError, oneshot};
26use tokio_util::sync::CancellationToken;
27use tonic::{transport::Channel, Request, Response, Status};
28use tower::{builder::ServiceBuilder, Layer, Service};
29use tracing::{debug, error, info, instrument, trace, warn};
30
31use super::{
32 api::{
33 self,
34 notifier_service_client::NotifierServiceClient,
35 validator_worker_client::ValidatorWorkerClient,
36 validator_worker_server::{ValidatorWorker as ValidatorWorkerRpc, ValidatorWorkerServer},
37 BlockProposal, ChainInfoQuery, ChainInfoResult, CrossChainRequest,
38 HandlePendingBlobRequest, LiteCertificate, PendingBlobRequest, PendingBlobResult,
39 },
40 pool::GrpcConnectionPool,
41 GrpcError, GRPC_MAX_MESSAGE_SIZE,
42};
43use crate::{
44 config::{CrossChainConfig, NotificationConfig, ShardId, ValidatorInternalNetworkConfig},
45 cross_chain_message_queue, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
46 HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
47};
48
49type CrossChainSender = mpsc::Sender<(linera_core::data_types::CrossChainRequest, ShardId)>;
50type NotificationSender = tokio::sync::broadcast::Sender<Notification>;
51
52#[cfg(with_metrics)]
53mod metrics {
54 use std::sync::LazyLock;
55
56 use linera_base::prometheus_util::{
57 exponential_bucket_interval, linear_bucket_interval, register_histogram_vec,
58 register_int_counter_vec,
59 };
60 use prometheus::{HistogramVec, IntCounterVec};
61
62 pub static SERVER_REQUEST_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
63 register_histogram_vec(
64 "server_request_latency",
65 "Server request latency",
66 &[],
67 linear_bucket_interval(1.0, 25.0, 2000.0),
68 )
69 });
70
71 pub static SERVER_REQUEST_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
72 register_int_counter_vec("server_request_count", "Server request count", &[])
73 });
74
75 pub static SERVER_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
76 register_int_counter_vec(
77 "server_request_success",
78 "Server request success",
79 &["method_name"],
80 )
81 });
82
83 pub static SERVER_REQUEST_ERROR: LazyLock<IntCounterVec> = LazyLock::new(|| {
84 register_int_counter_vec(
85 "server_request_error",
86 "Server request error",
87 &["method_name"],
88 )
89 });
90
91 pub static SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE: LazyLock<HistogramVec> =
92 LazyLock::new(|| {
93 register_histogram_vec(
94 "server_request_latency_per_request_type",
95 "Server request latency per request type",
96 &["method_name"],
97 linear_bucket_interval(1.0, 25.0, 2000.0),
98 )
99 });
100
101 pub static CROSS_CHAIN_MESSAGE_CHANNEL_FULL: LazyLock<IntCounterVec> = LazyLock::new(|| {
102 register_int_counter_vec(
103 "cross_chain_message_channel_full",
104 "Cross-chain message channel full",
105 &[],
106 )
107 });
108
109 pub static NOTIFICATIONS_SKIPPED_RECEIVER_LAG: LazyLock<IntCounterVec> = LazyLock::new(|| {
110 register_int_counter_vec(
111 "notifications_skipped_receiver_lag",
112 "Number of notifications skipped because receiver lagged behind sender",
113 &[],
114 )
115 });
116
117 pub static NOTIFICATIONS_DROPPED_NO_RECEIVER: LazyLock<IntCounterVec> = LazyLock::new(|| {
118 register_int_counter_vec(
119 "notifications_dropped_no_receiver",
120 "Number of notifications dropped because no receiver was available",
121 &[],
122 )
123 });
124
125 pub static NOTIFICATION_BATCH_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
126 register_histogram_vec(
127 "notification_batch_size",
128 "Number of notifications per batch sent to proxy",
129 &[],
130 exponential_bucket_interval(1.0, 250.0),
131 )
132 });
133
134 pub static NOTIFICATION_BATCHES_SENT: LazyLock<IntCounterVec> = LazyLock::new(|| {
135 register_int_counter_vec(
136 "notification_batches_sent",
137 "Total notification batches sent",
138 &["status"],
139 )
140 });
141}
142
143struct BatchForwarder {
145 nickname: String,
146 client: NotifierServiceClient<Channel>,
147 exporter_clients: Vec<NotifierServiceClient<Channel>>,
148 pending_notifications: Vec<Notification>,
149 futures: FuturesUnordered<BoxFuture<'static, ()>>,
150 batch_limit: usize,
151 max_tasks: usize,
152}
153
154impl BatchForwarder {
155 fn spawn_batches(&mut self) {
157 while !self.pending_notifications.is_empty() && self.futures.len() < self.max_tasks {
158 let chunk_size = std::cmp::min(self.batch_limit, self.pending_notifications.len());
159 let batch: Vec<Notification> = self.pending_notifications.drain(..chunk_size).collect();
160
161 #[cfg(with_metrics)]
162 metrics::NOTIFICATION_BATCH_SIZE
163 .with_label_values(&[])
164 .observe(batch.len() as f64);
165
166 let client = self.client.clone();
167 let exporter_clients = self.exporter_clients.clone();
168 let nickname = self.nickname.clone();
169
170 self.futures.push(
171 async move {
172 Self::send_batch(nickname, client, exporter_clients, batch).await;
173 }
174 .boxed(),
175 );
176 }
177 }
178
179 fn is_fully_drained(&self) -> bool {
181 self.pending_notifications.is_empty() && self.futures.is_empty()
182 }
183
184 async fn send_batch(
186 nickname: String,
187 mut client: NotifierServiceClient<Channel>,
188 mut exporter_clients: Vec<NotifierServiceClient<Channel>>,
189 batch: Vec<Notification>,
190 ) {
191 let mut proto_notifications = Vec::with_capacity(batch.len());
193 for notification in &batch {
194 match notification.clone().try_into() {
195 Ok(proto) => proto_notifications.push(proto),
196 Err(error) => {
197 warn!(
198 %error,
199 nickname,
200 ?notification.chain_id,
201 ?notification.reason,
202 "could not deserialize notification"
203 );
204 }
205 }
206 }
207
208 let chain_ids: Vec<_> = batch.iter().map(|n| n.chain_id).collect();
210
211 let request = Request::new(api::NotificationBatch {
213 notifications: proto_notifications.clone(),
214 });
215 let result = client.notify_batch(request).await;
216
217 #[cfg(with_metrics)]
218 {
219 let status = if result.is_ok() { "success" } else { "error" };
220 metrics::NOTIFICATION_BATCHES_SENT
221 .with_label_values(&[status])
222 .inc();
223 }
224
225 if let Err(error) = result {
226 error!(
227 %error,
228 nickname,
229 batch_size = proto_notifications.len(),
230 ?chain_ids,
231 "proxy: could not send notification batch",
232 );
233 }
234
235 let new_block_notifications: Vec<_> = batch
237 .iter()
238 .filter(|n| matches!(n.reason, Reason::NewBlock { .. }))
239 .collect();
240
241 let exporter_notifications: Vec<api::Notification> = new_block_notifications
242 .iter()
243 .filter_map(|n| (*n).clone().try_into().ok())
244 .collect();
245
246 if !exporter_notifications.is_empty() {
247 let exporter_chain_ids: Vec<_> =
248 new_block_notifications.iter().map(|n| n.chain_id).collect();
249
250 for exporter_client in &mut exporter_clients {
251 let request = Request::new(api::NotificationBatch {
252 notifications: exporter_notifications.clone(),
253 });
254 if let Err(error) = exporter_client.notify_batch(request).await {
255 error!(
256 %error,
257 nickname,
258 batch_size = exporter_notifications.len(),
259 ?exporter_chain_ids,
260 "block exporter: could not send notification batch",
261 );
262 }
263 }
264 }
265 }
266}
267
268#[derive(Clone)]
269pub struct GrpcServer<S>
270where
271 S: Storage,
272{
273 state: WorkerState<S>,
274 shard_id: ShardId,
275 network: ValidatorInternalNetworkConfig,
276 cross_chain_sender: CrossChainSender,
277 notification_sender: NotificationSender,
278}
279
280pub struct GrpcServerHandle {
281 handle: TaskHandle<Result<(), GrpcError>>,
282}
283
284impl GrpcServerHandle {
285 pub async fn join(self) -> Result<(), GrpcError> {
286 self.handle.await?
287 }
288}
289
290#[derive(Clone)]
291pub struct GrpcPrometheusMetricsMiddlewareLayer;
292
293#[derive(Clone)]
294pub struct GrpcPrometheusMetricsMiddlewareService<T> {
295 service: T,
296}
297
298impl<S> Layer<S> for GrpcPrometheusMetricsMiddlewareLayer {
299 type Service = GrpcPrometheusMetricsMiddlewareService<S>;
300
301 fn layer(&self, service: S) -> Self::Service {
302 GrpcPrometheusMetricsMiddlewareService { service }
303 }
304}
305
306impl<S, Req> Service<Req> for GrpcPrometheusMetricsMiddlewareService<S>
307where
308 S::Future: Send + 'static,
309 S: Service<Req> + std::marker::Send,
310{
311 type Response = S::Response;
312 type Error = S::Error;
313 type Future = BoxFuture<'static, Result<S::Response, S::Error>>;
314
315 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
316 self.service.poll_ready(cx)
317 }
318
319 fn call(&mut self, request: Req) -> Self::Future {
320 #[cfg(with_metrics)]
321 let start = Instant::now();
322 let future = self.service.call(request);
323 async move {
324 let response = future.await?;
325 #[cfg(with_metrics)]
326 {
327 metrics::SERVER_REQUEST_LATENCY
328 .with_label_values(&[])
329 .observe(start.elapsed().as_secs_f64() * 1000.0);
330 metrics::SERVER_REQUEST_COUNT.with_label_values(&[]).inc();
331 }
332 Ok(response)
333 }
334 .boxed()
335 }
336}
337
338impl<S> GrpcServer<S>
339where
340 S: Storage + Clone + Send + Sync + 'static,
341{
342 #[expect(clippy::too_many_arguments)]
343 pub fn spawn(
344 host: String,
345 port: u16,
346 state: WorkerState<S>,
347 shard_id: ShardId,
348 internal_network: ValidatorInternalNetworkConfig,
349 cross_chain_config: CrossChainConfig,
350 notification_config: NotificationConfig,
351 shutdown_signal: CancellationToken,
352 join_set: &mut JoinSet,
353 ) -> GrpcServerHandle {
354 info!(
355 "spawning gRPC server on {}:{} for shard {}",
356 host, port, shard_id
357 );
358
359 let (cross_chain_sender, cross_chain_receiver) =
360 mpsc::channel(cross_chain_config.queue_size);
361
362 let (notification_sender, _) =
363 tokio::sync::broadcast::channel(notification_config.notification_queue_size);
364
365 join_set.spawn_task({
366 info!(
367 nickname = state.nickname(),
368 "spawning cross-chain queries thread on {} for shard {}", host, shard_id
369 );
370 Self::forward_cross_chain_queries(
371 state.nickname().to_string(),
372 internal_network.clone(),
373 cross_chain_config.max_retries,
374 Duration::from_millis(cross_chain_config.retry_delay_ms),
375 Duration::from_millis(cross_chain_config.sender_delay_ms),
376 cross_chain_config.sender_failure_rate,
377 shard_id,
378 cross_chain_receiver,
379 )
380 });
381
382 let mut exporter_forwarded = false;
383 for proxy in &internal_network.proxies {
384 let receiver = notification_sender.subscribe();
385 join_set.spawn_task({
386 info!(
387 nickname = state.nickname(),
388 "spawning notifications thread on {} for shard {}", host, shard_id
389 );
390 let exporter_addresses = if exporter_forwarded {
391 vec![]
392 } else {
393 exporter_forwarded = true;
394 internal_network.exporter_addresses()
395 };
396 Self::forward_notifications(
397 state.nickname().to_string(),
398 proxy.internal_address(&internal_network.protocol),
399 exporter_addresses,
400 receiver,
401 notification_config.clone(),
402 )
403 });
404 }
405
406 let (health_reporter, health_service) = tonic_health::server::health_reporter();
407
408 let grpc_server = GrpcServer {
409 state,
410 shard_id,
411 network: internal_network,
412 cross_chain_sender,
413 notification_sender,
414 };
415
416 let worker_node = ValidatorWorkerServer::new(grpc_server)
417 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
418 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
419
420 let handle = join_set.spawn_task(async move {
421 let server_address = SocketAddr::from((IpAddr::from_str(&host)?, port));
422
423 let reflection_service = tonic_reflection::server::Builder::configure()
424 .register_encoded_file_descriptor_set(crate::FILE_DESCRIPTOR_SET)
425 .build_v1()?;
426
427 health_reporter
428 .set_serving::<ValidatorWorkerServer<Self>>()
429 .await;
430
431 tonic::transport::Server::builder()
432 .layer(
433 ServiceBuilder::new()
434 .layer(GrpcPrometheusMetricsMiddlewareLayer)
435 .into_inner(),
436 )
437 .add_service(health_service)
438 .add_service(reflection_service)
439 .add_service(worker_node)
440 .serve_with_shutdown(server_address, shutdown_signal.cancelled_owned())
441 .await?;
442
443 Ok(())
444 });
445
446 GrpcServerHandle { handle }
447 }
448
449 #[instrument(skip(receiver, config))]
452 async fn forward_notifications(
453 nickname: String,
454 proxy_address: String,
455 exporter_addresses: Vec<String>,
456 mut receiver: tokio::sync::broadcast::Receiver<Notification>,
457 config: NotificationConfig,
458 ) {
459 let channel = tonic::transport::Channel::from_shared(proxy_address.clone())
460 .expect("Proxy URI should be valid")
461 .connect_lazy();
462 let client = NotifierServiceClient::new(channel)
463 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
464 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
465
466 let exporter_clients: Vec<NotifierServiceClient<Channel>> = exporter_addresses
467 .iter()
468 .map(|address| {
469 let channel = tonic::transport::Channel::from_shared(address.clone())
470 .expect("Exporter URI should be valid")
471 .connect_lazy();
472 NotifierServiceClient::new(channel)
473 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
474 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
475 })
476 .collect::<Vec<_>>();
477
478 let mut forwarder = BatchForwarder {
479 nickname: nickname.clone(),
480 client,
481 exporter_clients,
482 pending_notifications: Vec::new(),
483 futures: FuturesUnordered::new(),
484 batch_limit: config.notification_batch_size,
485 max_tasks: config.notification_max_in_flight,
486 };
487
488 loop {
489 tokio::select! {
490 biased;
491
492 result = receiver.recv() => {
493 match result {
494 Ok(notification) => {
495 forwarder.pending_notifications.push(notification);
496
497 if forwarder.futures.is_empty()
498 || (forwarder.pending_notifications.len() >= forwarder.batch_limit
499 && forwarder.futures.len() < forwarder.max_tasks) {
500 forwarder.spawn_batches();
501 }
502 }
503 Err(RecvError::Lagged(skipped_count)) => {
504 warn!(
505 nickname,
506 skipped_count, "notification receiver lagged, messages were skipped"
507 );
508 #[cfg(with_metrics)]
509 metrics::NOTIFICATIONS_SKIPPED_RECEIVER_LAG
510 .with_label_values(&[])
511 .inc_by(skipped_count);
512 }
513 Err(RecvError::Closed) => {
514 warn!(
515 nickname,
516 "notification channel closed, draining pending notifications"
517 );
518 loop {
520 forwarder.spawn_batches();
521 if forwarder.is_fully_drained() {
522 break;
523 }
524 forwarder.futures.next().await;
525 }
526 break;
527 }
528 }
529 }
530
531 Some(()) = forwarder.futures.next() => {
532 forwarder.spawn_batches();
533 }
534 }
535 }
536 }
537
538 fn handle_network_actions(&self, actions: NetworkActions) {
539 let mut cross_chain_sender = self.cross_chain_sender.clone();
540 let notification_sender = self.notification_sender.clone();
541
542 for request in actions.cross_chain_requests {
543 let shard_id = self.network.get_shard_id(request.target_chain_id());
544 trace!(
545 source_shard_id = self.shard_id,
546 target_shard_id = shard_id,
547 "Scheduling cross-chain query",
548 );
549
550 if let Err(error) = cross_chain_sender.try_send((request, shard_id)) {
551 error!(%error, "dropping cross-chain request");
552 #[cfg(with_metrics)]
553 if error.is_full() {
554 metrics::CROSS_CHAIN_MESSAGE_CHANNEL_FULL
555 .with_label_values(&[])
556 .inc();
557 }
558 }
559 }
560
561 for notification in actions.notifications {
562 trace!("Scheduling notification query");
563 if let Err(error) = notification_sender.send(notification) {
564 error!(%error, "dropping notification");
565 #[cfg(with_metrics)]
566 metrics::NOTIFICATIONS_DROPPED_NO_RECEIVER
567 .with_label_values(&[])
568 .inc();
569 }
570 }
571 }
572
573 #[instrument(skip_all, fields(nickname, %this_shard))]
574 #[expect(clippy::too_many_arguments)]
575 async fn forward_cross_chain_queries(
576 nickname: String,
577 network: ValidatorInternalNetworkConfig,
578 cross_chain_max_retries: u32,
579 cross_chain_retry_delay: Duration,
580 cross_chain_sender_delay: Duration,
581 cross_chain_sender_failure_rate: f32,
582 this_shard: ShardId,
583 receiver: mpsc::Receiver<(linera_core::data_types::CrossChainRequest, ShardId)>,
584 ) {
585 let pool = GrpcConnectionPool::default();
586 let handle_request =
587 move |shard_id: ShardId, request: linera_core::data_types::CrossChainRequest| {
588 let channel_result = pool.channel(network.shard(shard_id).http_address());
589 async move {
590 let mut client = ValidatorWorkerClient::new(channel_result?)
591 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
592 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
593 client
594 .handle_cross_chain_request(Request::new(request.try_into()?))
595 .await?;
596 anyhow::Result::<_, anyhow::Error>::Ok(())
597 }
598 };
599 cross_chain_message_queue::forward_cross_chain_queries(
600 nickname,
601 cross_chain_max_retries,
602 cross_chain_retry_delay,
603 cross_chain_sender_delay,
604 cross_chain_sender_failure_rate,
605 this_shard,
606 receiver,
607 handle_request,
608 )
609 .await;
610 }
611
612 fn log_request_outcome_and_latency(start: Instant, success: bool, method_name: &str) {
613 #![cfg_attr(not(with_metrics), allow(unused_variables))]
614 #[cfg(with_metrics)]
615 {
616 metrics::SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE
617 .with_label_values(&[method_name])
618 .observe(start.elapsed().as_secs_f64() * 1000.0);
619 if success {
620 metrics::SERVER_REQUEST_SUCCESS
621 .with_label_values(&[method_name])
622 .inc();
623 } else {
624 metrics::SERVER_REQUEST_ERROR
625 .with_label_values(&[method_name])
626 .inc();
627 }
628 }
629 }
630
631 fn log_error(&self, error: &linera_core::worker::WorkerError, context: &str) {
632 let nickname = self.state.nickname();
633 if error.is_local() {
634 error!(nickname, %error, "{}", context);
635 } else {
636 debug!(nickname, %error, "{}", context);
637 }
638 }
639}
640
641#[tonic::async_trait]
642impl<S> ValidatorWorkerRpc for GrpcServer<S>
643where
644 S: Storage + Clone + Send + Sync + 'static,
645{
646 #[instrument(
647 target = "grpc_server",
648 skip_all,
649 err,
650 fields(
651 nickname = self.state.nickname(),
652 chain_id = ?request.get_ref().chain_id()
653 )
654 )]
655 async fn handle_block_proposal(
656 &self,
657 request: Request<BlockProposal>,
658 ) -> Result<Response<ChainInfoResult>, Status> {
659 let start = Instant::now();
660 let proposal = request.into_inner().try_into()?;
661 trace!(?proposal, "Handling block proposal");
662 Ok(Response::new(
663 match self.state.clone().handle_block_proposal(proposal).await {
664 Ok((info, actions)) => {
665 Self::log_request_outcome_and_latency(start, true, "handle_block_proposal");
666 self.handle_network_actions(actions);
667 info.try_into()?
668 }
669 Err(error) => {
670 Self::log_request_outcome_and_latency(start, false, "handle_block_proposal");
671 self.log_error(&error, "Failed to handle block proposal");
672 NodeError::from(error).try_into()?
673 }
674 },
675 ))
676 }
677
678 #[instrument(
679 target = "grpc_server",
680 skip_all,
681 err,
682 fields(
683 nickname = self.state.nickname(),
684 chain_id = ?request.get_ref().chain_id()
685 )
686 )]
687 async fn handle_lite_certificate(
688 &self,
689 request: Request<LiteCertificate>,
690 ) -> Result<Response<ChainInfoResult>, Status> {
691 let start = Instant::now();
692 let HandleLiteCertRequest {
693 certificate,
694 wait_for_outgoing_messages,
695 } = request.into_inner().try_into()?;
696 trace!(?certificate, "Handling lite certificate");
697 let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
698 match Box::pin(
699 self.state
700 .clone()
701 .handle_lite_certificate(certificate, sender),
702 )
703 .await
704 {
705 Ok((info, actions)) => {
706 Self::log_request_outcome_and_latency(start, true, "handle_lite_certificate");
707 self.handle_network_actions(actions);
708 if let Some(receiver) = receiver {
709 if let Err(e) = receiver.await {
710 error!("Failed to wait for message delivery: {e}");
711 }
712 }
713 Ok(Response::new(info.try_into()?))
714 }
715 Err(error) => {
716 Self::log_request_outcome_and_latency(start, false, "handle_lite_certificate");
717 self.log_error(&error, "Failed to handle lite certificate");
718 Ok(Response::new(NodeError::from(error).try_into()?))
719 }
720 }
721 }
722
723 #[instrument(
724 target = "grpc_server",
725 skip_all,
726 err,
727 fields(
728 nickname = self.state.nickname(),
729 chain_id = ?request.get_ref().chain_id()
730 )
731 )]
732 async fn handle_confirmed_certificate(
733 &self,
734 request: Request<api::HandleConfirmedCertificateRequest>,
735 ) -> Result<Response<ChainInfoResult>, Status> {
736 let start = Instant::now();
737 let HandleConfirmedCertificateRequest {
738 certificate,
739 wait_for_outgoing_messages,
740 } = request.into_inner().try_into()?;
741 trace!(?certificate, "Handling certificate");
742 let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
743 match self
744 .state
745 .clone()
746 .handle_confirmed_certificate(certificate, sender)
747 .await
748 {
749 Ok((info, actions)) => {
750 Self::log_request_outcome_and_latency(start, true, "handle_confirmed_certificate");
751 self.handle_network_actions(actions);
752 if let Some(receiver) = receiver {
753 if let Err(e) = receiver.await {
754 error!("Failed to wait for message delivery: {e}");
755 }
756 }
757 Ok(Response::new(info.try_into()?))
758 }
759 Err(error) => {
760 Self::log_request_outcome_and_latency(start, false, "handle_confirmed_certificate");
761 self.log_error(&error, "Failed to handle confirmed certificate");
762 Ok(Response::new(NodeError::from(error).try_into()?))
763 }
764 }
765 }
766
767 #[instrument(
768 target = "grpc_server",
769 skip_all,
770 err,
771 fields(
772 nickname = self.state.nickname(),
773 chain_id = ?request.get_ref().chain_id()
774 )
775 )]
776 async fn handle_validated_certificate(
777 &self,
778 request: Request<api::HandleValidatedCertificateRequest>,
779 ) -> Result<Response<ChainInfoResult>, Status> {
780 let start = Instant::now();
781 let HandleValidatedCertificateRequest { certificate } = request.into_inner().try_into()?;
782 trace!(?certificate, "Handling certificate");
783 match self
784 .state
785 .clone()
786 .handle_validated_certificate(certificate)
787 .await
788 {
789 Ok((info, actions)) => {
790 Self::log_request_outcome_and_latency(start, true, "handle_validated_certificate");
791 self.handle_network_actions(actions);
792 Ok(Response::new(info.try_into()?))
793 }
794 Err(error) => {
795 Self::log_request_outcome_and_latency(start, false, "handle_validated_certificate");
796 self.log_error(&error, "Failed to handle validated certificate");
797 Ok(Response::new(NodeError::from(error).try_into()?))
798 }
799 }
800 }
801
802 #[instrument(
803 target = "grpc_server",
804 skip_all,
805 err,
806 fields(
807 nickname = self.state.nickname(),
808 chain_id = ?request.get_ref().chain_id()
809 )
810 )]
811 async fn handle_timeout_certificate(
812 &self,
813 request: Request<api::HandleTimeoutCertificateRequest>,
814 ) -> Result<Response<ChainInfoResult>, Status> {
815 let start = Instant::now();
816 let HandleTimeoutCertificateRequest { certificate } = request.into_inner().try_into()?;
817 trace!(?certificate, "Handling Timeout certificate");
818 match self
819 .state
820 .clone()
821 .handle_timeout_certificate(certificate)
822 .await
823 {
824 Ok((info, _actions)) => {
825 Self::log_request_outcome_and_latency(start, true, "handle_timeout_certificate");
826 Ok(Response::new(info.try_into()?))
827 }
828 Err(error) => {
829 Self::log_request_outcome_and_latency(start, false, "handle_timeout_certificate");
830 self.log_error(&error, "Failed to handle timeout certificate");
831 Ok(Response::new(NodeError::from(error).try_into()?))
832 }
833 }
834 }
835
836 #[instrument(
837 target = "grpc_server",
838 skip_all,
839 err,
840 fields(
841 nickname = self.state.nickname(),
842 chain_id = ?request.get_ref().chain_id()
843 )
844 )]
845 async fn handle_chain_info_query(
846 &self,
847 request: Request<ChainInfoQuery>,
848 ) -> Result<Response<ChainInfoResult>, Status> {
849 let start = Instant::now();
850 let query = request.into_inner().try_into()?;
851 trace!(?query, "Handling chain info query");
852 match self.state.clone().handle_chain_info_query(query).await {
853 Ok((info, actions)) => {
854 Self::log_request_outcome_and_latency(start, true, "handle_chain_info_query");
855 self.handle_network_actions(actions);
856 Ok(Response::new(info.try_into()?))
857 }
858 Err(error) => {
859 Self::log_request_outcome_and_latency(start, false, "handle_chain_info_query");
860 self.log_error(&error, "Failed to handle chain info query");
861 Ok(Response::new(NodeError::from(error).try_into()?))
862 }
863 }
864 }
865
866 #[instrument(
867 target = "grpc_server",
868 skip_all,
869 err,
870 fields(
871 nickname = self.state.nickname(),
872 chain_id = ?request.get_ref().chain_id()
873 )
874 )]
875 async fn download_pending_blob(
876 &self,
877 request: Request<PendingBlobRequest>,
878 ) -> Result<Response<PendingBlobResult>, Status> {
879 let start = Instant::now();
880 let (chain_id, blob_id) = request.into_inner().try_into()?;
881 trace!(?chain_id, ?blob_id, "Download pending blob");
882 match self
883 .state
884 .clone()
885 .download_pending_blob(chain_id, blob_id)
886 .await
887 {
888 Ok(blob) => {
889 Self::log_request_outcome_and_latency(start, true, "download_pending_blob");
890 Ok(Response::new(blob.into_content().try_into()?))
891 }
892 Err(error) => {
893 Self::log_request_outcome_and_latency(start, false, "download_pending_blob");
894 self.log_error(&error, "Failed to download pending blob");
895 Ok(Response::new(NodeError::from(error).try_into()?))
896 }
897 }
898 }
899
900 #[instrument(
901 target = "grpc_server",
902 skip_all,
903 err,
904 fields(
905 nickname = self.state.nickname(),
906 chain_id = ?request.get_ref().chain_id
907 )
908 )]
909 async fn handle_pending_blob(
910 &self,
911 request: Request<HandlePendingBlobRequest>,
912 ) -> Result<Response<ChainInfoResult>, Status> {
913 let start = Instant::now();
914 let (chain_id, blob_content) = request.into_inner().try_into()?;
915 let blob = Blob::new(blob_content);
916 let blob_id = blob.id();
917 trace!(?chain_id, ?blob_id, "Handle pending blob");
918 match self.state.clone().handle_pending_blob(chain_id, blob).await {
919 Ok(info) => {
920 Self::log_request_outcome_and_latency(start, true, "handle_pending_blob");
921 Ok(Response::new(info.try_into()?))
922 }
923 Err(error) => {
924 Self::log_request_outcome_and_latency(start, false, "handle_pending_blob");
925 self.log_error(&error, "Failed to handle pending blob");
926 Ok(Response::new(NodeError::from(error).try_into()?))
927 }
928 }
929 }
930
931 #[instrument(
932 target = "grpc_server",
933 skip_all,
934 err,
935 fields(
936 nickname = self.state.nickname(),
937 chain_id = ?request.get_ref().chain_id()
938 )
939 )]
940 async fn handle_cross_chain_request(
941 &self,
942 request: Request<CrossChainRequest>,
943 ) -> Result<Response<()>, Status> {
944 let start = Instant::now();
945 let request = request.into_inner().try_into()?;
946 trace!(?request, "Handling cross-chain request");
947 match self.state.clone().handle_cross_chain_request(request).await {
948 Ok(actions) => {
949 Self::log_request_outcome_and_latency(start, true, "handle_cross_chain_request");
950 self.handle_network_actions(actions)
951 }
952 Err(error) => {
953 Self::log_request_outcome_and_latency(start, false, "handle_cross_chain_request");
954 let nickname = self.state.nickname();
955 error!(nickname, %error, "Failed to handle cross-chain request");
956 }
957 }
958 Ok(Response::new(()))
959 }
960}
961
962pub trait GrpcProxyable {
965 fn chain_id(&self) -> Option<ChainId>;
966}
967
968impl GrpcProxyable for BlockProposal {
969 fn chain_id(&self) -> Option<ChainId> {
970 self.chain_id.clone()?.try_into().ok()
971 }
972}
973
974impl GrpcProxyable for LiteCertificate {
975 fn chain_id(&self) -> Option<ChainId> {
976 self.chain_id.clone()?.try_into().ok()
977 }
978}
979
980impl GrpcProxyable for api::HandleConfirmedCertificateRequest {
981 fn chain_id(&self) -> Option<ChainId> {
982 self.chain_id.clone()?.try_into().ok()
983 }
984}
985
986impl GrpcProxyable for api::HandleTimeoutCertificateRequest {
987 fn chain_id(&self) -> Option<ChainId> {
988 self.chain_id.clone()?.try_into().ok()
989 }
990}
991
992impl GrpcProxyable for api::HandleValidatedCertificateRequest {
993 fn chain_id(&self) -> Option<ChainId> {
994 self.chain_id.clone()?.try_into().ok()
995 }
996}
997
998impl GrpcProxyable for ChainInfoQuery {
999 fn chain_id(&self) -> Option<ChainId> {
1000 self.chain_id.clone()?.try_into().ok()
1001 }
1002}
1003
1004impl GrpcProxyable for PendingBlobRequest {
1005 fn chain_id(&self) -> Option<ChainId> {
1006 self.chain_id.clone()?.try_into().ok()
1007 }
1008}
1009
1010impl GrpcProxyable for HandlePendingBlobRequest {
1011 fn chain_id(&self) -> Option<ChainId> {
1012 self.chain_id.clone()?.try_into().ok()
1013 }
1014}
1015
1016impl GrpcProxyable for CrossChainRequest {
1017 fn chain_id(&self) -> Option<ChainId> {
1018 use super::api::cross_chain_request::Inner;
1019
1020 match self.inner.as_ref()? {
1021 Inner::UpdateRecipient(api::UpdateRecipient { recipient, .. })
1022 | Inner::ConfirmUpdatedRecipient(api::ConfirmUpdatedRecipient { recipient, .. }) => {
1023 recipient.clone()?.try_into().ok()
1024 }
1025 }
1026 }
1027}