1use std::{
5 net::{IpAddr, SocketAddr},
6 str::FromStr,
7 task::{Context, Poll},
8};
9
10use futures::{
11 channel::mpsc, future::BoxFuture, stream::FuturesUnordered, FutureExt as _, StreamExt as _,
12};
13#[cfg(with_metrics)]
14use linera_base::time::Instant;
15use linera_base::{data_types::Blob, identifiers::ChainId, time::Duration};
16use linera_core::{
17 join_set_ext::JoinSet,
18 node::NodeError,
19 worker::{NetworkActions, Notification, Reason, WorkerState},
20 JoinSetExt as _, ProcessConfirmedBlockMode, TaskHandle,
21};
22use linera_storage::Storage;
23use tokio::sync::{broadcast::error::RecvError, oneshot};
24use tokio_util::sync::CancellationToken;
25use tonic::{transport::Channel, Request, Response, Status};
26use tower::{builder::ServiceBuilder, Layer, Service};
27use tracing::{debug, error, info, instrument, trace, warn};
28
29use super::{
30 api::{
31 self,
32 notifier_service_client::NotifierServiceClient,
33 validator_worker_client::ValidatorWorkerClient,
34 validator_worker_server::{ValidatorWorker as ValidatorWorkerRpc, ValidatorWorkerServer},
35 BlockProposal, ChainInfoQuery, ChainInfoResult, CrossChainRequest,
36 HandlePendingBlobRequest, LiteCertificate, PendingBlobRequest, PendingBlobResult,
37 },
38 pool::GrpcConnectionPool,
39 GrpcError, GRPC_MAX_MESSAGE_SIZE,
40};
41#[cfg(feature = "opentelemetry")]
42use crate::propagation::get_traffic_type_from_request;
43use crate::{
44 config::{CrossChainConfig, NotificationConfig, ShardId, ValidatorInternalNetworkConfig},
45 cross_chain_message_queue, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
46 HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
47};
48
49type CrossChainSender = mpsc::Sender<(linera_core::data_types::CrossChainRequest, ShardId)>;
50type NotificationSender = tokio::sync::broadcast::Sender<Notification>;
51
52#[cfg(with_metrics)]
53mod metrics {
54 use std::sync::LazyLock;
55
56 use linera_base::prometheus_util::{
57 exponential_bucket_interval, linear_bucket_interval, register_histogram_vec,
58 register_int_counter_vec,
59 };
60 use prometheus::{HistogramVec, IntCounterVec};
61
62 use super::super::{ERROR_TYPE_LABEL, METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL};
63
64 pub static SERVER_REQUEST_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
65 register_histogram_vec(
66 "server_request_latency",
67 "Server request latency",
68 &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
69 linear_bucket_interval(1.0, 50.0, 5000.0),
70 )
71 });
72
73 pub static SERVER_REQUEST_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
74 register_int_counter_vec(
75 "server_request_count",
76 "Server request count",
77 &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
78 )
79 });
80
81 pub static SERVER_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
82 register_int_counter_vec(
83 "server_request_success",
84 "Server request success",
85 &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
86 )
87 });
88
89 pub static SERVER_REQUEST_ERROR: LazyLock<IntCounterVec> = LazyLock::new(|| {
90 register_int_counter_vec(
91 "server_request_error",
92 "Server request error",
93 &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL, ERROR_TYPE_LABEL],
94 )
95 });
96
97 pub static SERVER_REQUEST_CANCELLED: LazyLock<IntCounterVec> = LazyLock::new(|| {
98 register_int_counter_vec(
99 "server_request_cancelled",
100 "Server requests whose handler future was dropped before completion (e.g. client-side timeout / disconnect)",
101 &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
102 )
103 });
104
105 pub static CROSS_CHAIN_MESSAGE_CHANNEL_FULL: LazyLock<IntCounterVec> = LazyLock::new(|| {
106 register_int_counter_vec(
107 "cross_chain_message_channel_full",
108 "Cross-chain message channel full",
109 &[],
110 )
111 });
112
113 pub static NOTIFICATIONS_SKIPPED_RECEIVER_LAG: LazyLock<IntCounterVec> = LazyLock::new(|| {
114 register_int_counter_vec(
115 "notifications_skipped_receiver_lag",
116 "Number of notifications skipped because receiver lagged behind sender",
117 &[],
118 )
119 });
120
121 pub static NOTIFICATIONS_DROPPED_NO_RECEIVER: LazyLock<IntCounterVec> = LazyLock::new(|| {
122 register_int_counter_vec(
123 "notifications_dropped_no_receiver",
124 "Number of notifications dropped because no receiver was available",
125 &[],
126 )
127 });
128
129 pub static NOTIFICATION_BATCH_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
130 register_histogram_vec(
131 "notification_batch_size",
132 "Number of notifications per batch sent to proxy",
133 &[],
134 exponential_bucket_interval(1.0, 250.0),
135 )
136 });
137
138 pub static NOTIFICATION_BATCHES_SENT: LazyLock<IntCounterVec> = LazyLock::new(|| {
139 register_int_counter_vec(
140 "notification_batches_sent",
141 "Total notification batches sent",
142 &["status"],
143 )
144 });
145}
146
147struct BatchForwarder {
149 nickname: String,
150 client: NotifierServiceClient<Channel>,
151 exporter_clients: Vec<NotifierServiceClient<Channel>>,
152 pending_notifications: Vec<Notification>,
153 futures: FuturesUnordered<BoxFuture<'static, ()>>,
154 batch_limit: usize,
155 max_tasks: usize,
156}
157
158impl BatchForwarder {
159 fn spawn_batches(&mut self) {
161 while !self.pending_notifications.is_empty() && self.futures.len() < self.max_tasks {
162 let chunk_size = std::cmp::min(self.batch_limit, self.pending_notifications.len());
163 let batch: Vec<Notification> = self.pending_notifications.drain(..chunk_size).collect();
164
165 #[cfg(with_metrics)]
166 metrics::NOTIFICATION_BATCH_SIZE
167 .with_label_values(&[])
168 .observe(batch.len() as f64);
169
170 let client = self.client.clone();
171 let exporter_clients = self.exporter_clients.clone();
172 let nickname = self.nickname.clone();
173
174 self.futures.push(
175 async move {
176 Self::send_batch(nickname, client, exporter_clients, batch).await;
177 }
178 .boxed(),
179 );
180 }
181 }
182
183 fn is_fully_drained(&self) -> bool {
185 self.pending_notifications.is_empty() && self.futures.is_empty()
186 }
187
188 async fn send_batch(
190 nickname: String,
191 mut client: NotifierServiceClient<Channel>,
192 mut exporter_clients: Vec<NotifierServiceClient<Channel>>,
193 batch: Vec<Notification>,
194 ) {
195 let mut proto_notifications = Vec::with_capacity(batch.len());
197 for notification in &batch {
198 match notification.clone().try_into() {
199 Ok(proto) => proto_notifications.push(proto),
200 Err(error) => {
201 warn!(
202 %error,
203 nickname,
204 ?notification.chain_id,
205 ?notification.reason,
206 "could not deserialize notification"
207 );
208 }
209 }
210 }
211
212 let chain_ids: Vec<_> = batch.iter().map(|n| n.chain_id).collect();
214
215 let request = Request::new(api::NotificationBatch {
217 notifications: proto_notifications.clone(),
218 });
219 let result = client.notify_batch(request).await;
220
221 #[cfg(with_metrics)]
222 {
223 let status = if result.is_ok() { "success" } else { "error" };
224 metrics::NOTIFICATION_BATCHES_SENT
225 .with_label_values(&[status])
226 .inc();
227 }
228
229 if let Err(error) = result {
230 error!(
231 %error,
232 nickname,
233 batch_size = proto_notifications.len(),
234 ?chain_ids,
235 "proxy: could not send notification batch",
236 );
237 }
238
239 let new_block_notifications: Vec<_> = batch
241 .iter()
242 .filter(|n| matches!(n.reason, Reason::NewBlock { .. }))
243 .collect();
244
245 let exporter_notifications: Vec<api::Notification> = new_block_notifications
246 .iter()
247 .filter_map(|n| (*n).clone().try_into().ok())
248 .collect();
249
250 if !exporter_notifications.is_empty() {
251 let exporter_chain_ids: Vec<_> =
252 new_block_notifications.iter().map(|n| n.chain_id).collect();
253
254 for exporter_client in &mut exporter_clients {
255 let request = Request::new(api::NotificationBatch {
256 notifications: exporter_notifications.clone(),
257 });
258 if let Err(error) = exporter_client.notify_batch(request).await {
259 error!(
260 %error,
261 nickname,
262 batch_size = exporter_notifications.len(),
263 ?exporter_chain_ids,
264 "block exporter: could not send notification batch",
265 );
266 }
267 }
268 }
269 }
270}
271
272#[derive(Clone)]
273pub struct GrpcServer<S>
274where
275 S: Storage,
276{
277 state: WorkerState<S>,
278 shard_id: ShardId,
279 network: ValidatorInternalNetworkConfig,
280 cross_chain_sender: CrossChainSender,
281 notification_sender: NotificationSender,
282}
283
284pub struct GrpcServerHandle {
285 handle: TaskHandle<Result<(), GrpcError>>,
286}
287
288impl GrpcServerHandle {
289 pub async fn join(self) -> Result<(), GrpcError> {
290 self.handle.await?
291 }
292}
293
294#[cfg(with_metrics)]
295struct ServerRequestCancellationGuard {
296 method_name: String,
297 traffic_type: &'static str,
298 completed: bool,
299}
300
301#[cfg(with_metrics)]
302impl Drop for ServerRequestCancellationGuard {
303 fn drop(&mut self) {
304 if !self.completed {
305 metrics::SERVER_REQUEST_CANCELLED
306 .with_label_values(&[&self.method_name, self.traffic_type])
307 .inc();
308 }
309 }
310}
311
312#[derive(Clone)]
313pub struct GrpcPrometheusMetricsMiddlewareLayer;
314
315#[derive(Clone)]
316pub struct GrpcPrometheusMetricsMiddlewareService<T> {
317 service: T,
318}
319
320impl<S> Layer<S> for GrpcPrometheusMetricsMiddlewareLayer {
321 type Service = GrpcPrometheusMetricsMiddlewareService<S>;
322
323 fn layer(&self, service: S) -> Self::Service {
324 GrpcPrometheusMetricsMiddlewareService { service }
325 }
326}
327
328impl<S, B> Service<http::Request<B>> for GrpcPrometheusMetricsMiddlewareService<S>
329where
330 S::Future: Send + 'static,
331 S: Service<http::Request<B>> + std::marker::Send,
332 B: Send + 'static,
333{
334 type Response = S::Response;
335 type Error = S::Error;
336 type Future = BoxFuture<'static, Result<S::Response, S::Error>>;
337
338 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
339 self.service.poll_ready(cx)
340 }
341
342 fn call(&mut self, request: http::Request<B>) -> Self::Future {
343 #[cfg(with_metrics)]
344 let start = Instant::now();
345
346 #[cfg(with_metrics)]
347 let method_name = super::extract_grpc_method_name(request.uri().path()).to_owned();
348
349 #[cfg(all(with_metrics, feature = "opentelemetry"))]
353 let traffic_type: &'static str = get_traffic_type_from_request(&request);
354 #[cfg(all(with_metrics, not(feature = "opentelemetry")))]
355 let traffic_type: &'static str = "unknown";
356
357 let future = self.service.call(request);
358 async move {
359 #[cfg(with_metrics)]
360 let mut cancellation_guard = ServerRequestCancellationGuard {
361 method_name,
362 traffic_type,
363 completed: false,
364 };
365 let response = future.await?;
366 #[cfg(with_metrics)]
367 {
368 cancellation_guard.completed = true;
369 metrics::SERVER_REQUEST_LATENCY
370 .with_label_values(&[&cancellation_guard.method_name, traffic_type])
371 .observe(start.elapsed().as_secs_f64() * 1000.0);
372 metrics::SERVER_REQUEST_COUNT
373 .with_label_values(&[&cancellation_guard.method_name, traffic_type])
374 .inc();
375 }
376 Ok(response)
377 }
378 .boxed()
379 }
380}
381
382impl<S> GrpcServer<S>
383where
384 S: Storage + Clone + Send + Sync + 'static,
385{
386 #[expect(clippy::too_many_arguments)]
387 pub fn spawn(
388 host: String,
389 port: u16,
390 state: WorkerState<S>,
391 shard_id: ShardId,
392 internal_network: ValidatorInternalNetworkConfig,
393 cross_chain_config: &CrossChainConfig,
394 notification_config: &NotificationConfig,
395 shutdown_signal: CancellationToken,
396 join_set: &mut JoinSet,
397 ) -> GrpcServerHandle {
398 info!(
399 "spawning gRPC server on {}:{} for shard {}",
400 host, port, shard_id
401 );
402
403 let (cross_chain_sender, cross_chain_receiver) =
404 mpsc::channel(cross_chain_config.queue_size);
405
406 let state = {
410 let routing_network = internal_network.clone();
411 let routing_sender = cross_chain_sender.clone();
412 state.with_outbound_cross_chain_sender(std::sync::Arc::new(move |request| {
413 let shard_id = routing_network.get_shard_id(request.target_chain_id());
414 if let Err(error) = routing_sender.clone().try_send((request, shard_id)) {
415 error!(%error, "dropping cross-chain request");
416 }
417 }))
418 };
419
420 let (notification_sender, _) =
421 tokio::sync::broadcast::channel(notification_config.notification_queue_size);
422
423 join_set.spawn_task({
424 info!(
425 nickname = state.nickname(),
426 "spawning cross-chain queries thread on {} for shard {}", host, shard_id
427 );
428 Self::forward_cross_chain_queries(
429 state.nickname().to_string(),
430 internal_network.clone(),
431 cross_chain_config.max_retries,
432 Duration::from_millis(cross_chain_config.retry_delay_ms),
433 Duration::from_millis(cross_chain_config.max_backoff_ms),
434 Duration::from_millis(cross_chain_config.sender_delay_ms),
435 cross_chain_config.sender_failure_rate,
436 shard_id,
437 cross_chain_receiver,
438 )
439 });
440
441 let mut exporter_forwarded = false;
442 for proxy in &internal_network.proxies {
443 let receiver = notification_sender.subscribe();
444 join_set.spawn_task({
445 info!(
446 nickname = state.nickname(),
447 "spawning notifications thread on {} for shard {}", host, shard_id
448 );
449 let exporter_addresses = if exporter_forwarded {
450 vec![]
451 } else {
452 exporter_forwarded = true;
453 internal_network.exporter_addresses()
454 };
455 Self::forward_notifications(
456 state.nickname().to_string(),
457 proxy.internal_address(&internal_network.protocol),
458 exporter_addresses,
459 receiver,
460 notification_config.clone(),
461 )
462 });
463 }
464
465 let (health_reporter, health_service) = tonic_health::server::health_reporter();
466
467 let grpc_server = GrpcServer {
468 state,
469 shard_id,
470 network: internal_network,
471 cross_chain_sender,
472 notification_sender,
473 };
474
475 let worker_node = ValidatorWorkerServer::new(grpc_server)
476 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
477 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
478
479 let handle = join_set.spawn_task(async move {
480 let server_address = SocketAddr::from((IpAddr::from_str(&host)?, port));
481
482 let reflection_service = tonic_reflection::server::Builder::configure()
483 .register_encoded_file_descriptor_set(crate::FILE_DESCRIPTOR_SET)
484 .build_v1()?;
485
486 health_reporter
487 .set_serving::<ValidatorWorkerServer<Self>>()
488 .await;
489
490 #[cfg(feature = "opentelemetry")]
491 let mut server = tonic::transport::Server::builder().layer(
492 ServiceBuilder::new()
493 .layer(crate::propagation::OtelContextLayer)
494 .layer(GrpcPrometheusMetricsMiddlewareLayer)
495 .into_inner(),
496 );
497 #[cfg(not(feature = "opentelemetry"))]
498 let mut server = tonic::transport::Server::builder().layer(
499 ServiceBuilder::new()
500 .layer(GrpcPrometheusMetricsMiddlewareLayer)
501 .into_inner(),
502 );
503 server
504 .add_service(health_service)
505 .add_service(reflection_service)
506 .add_service(worker_node)
507 .serve_with_shutdown(server_address, shutdown_signal.cancelled_owned())
508 .await?;
509
510 Ok(())
511 });
512
513 GrpcServerHandle { handle }
514 }
515
516 #[instrument(skip(receiver, config))]
519 async fn forward_notifications(
520 nickname: String,
521 proxy_address: String,
522 exporter_addresses: Vec<String>,
523 mut receiver: tokio::sync::broadcast::Receiver<Notification>,
524 config: NotificationConfig,
525 ) {
526 let channel = tonic::transport::Channel::from_shared(proxy_address.clone())
527 .expect("Proxy URI should be valid")
528 .connect_lazy();
529 let client = NotifierServiceClient::new(channel)
530 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
531 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
532
533 let exporter_clients: Vec<NotifierServiceClient<Channel>> = exporter_addresses
534 .iter()
535 .map(|address| {
536 let channel = tonic::transport::Channel::from_shared(address.clone())
537 .expect("Exporter URI should be valid")
538 .connect_lazy();
539 NotifierServiceClient::new(channel)
540 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
541 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
542 })
543 .collect::<Vec<_>>();
544
545 let mut forwarder = BatchForwarder {
546 nickname: nickname.clone(),
547 client,
548 exporter_clients,
549 pending_notifications: Vec::new(),
550 futures: FuturesUnordered::new(),
551 batch_limit: config.notification_batch_size,
552 max_tasks: config.notification_max_in_flight,
553 };
554
555 loop {
556 tokio::select! {
557 biased;
558
559 result = receiver.recv() => {
560 match result {
561 Ok(notification) => {
562 forwarder.pending_notifications.push(notification);
563
564 if forwarder.futures.is_empty()
565 || (forwarder.pending_notifications.len() >= forwarder.batch_limit
566 && forwarder.futures.len() < forwarder.max_tasks) {
567 forwarder.spawn_batches();
568 }
569 }
570 Err(RecvError::Lagged(skipped_count)) => {
571 warn!(
572 nickname,
573 skipped_count, "notification receiver lagged, messages were skipped"
574 );
575 #[cfg(with_metrics)]
576 metrics::NOTIFICATIONS_SKIPPED_RECEIVER_LAG
577 .with_label_values(&[])
578 .inc_by(skipped_count);
579 }
580 Err(RecvError::Closed) => {
581 warn!(
582 nickname,
583 "notification channel closed, draining pending notifications"
584 );
585 loop {
587 forwarder.spawn_batches();
588 if forwarder.is_fully_drained() {
589 break;
590 }
591 forwarder.futures.next().await;
592 }
593 break;
594 }
595 }
596 }
597
598 Some(()) = forwarder.futures.next() => {
599 forwarder.spawn_batches();
600 }
601 }
602 }
603 }
604
605 fn handle_network_actions(&self, actions: NetworkActions) {
606 let mut cross_chain_sender = self.cross_chain_sender.clone();
607 let notification_sender = self.notification_sender.clone();
608
609 for request in actions.cross_chain_requests {
610 let shard_id = self.network.get_shard_id(request.target_chain_id());
611 trace!(
612 source_shard_id = self.shard_id,
613 target_shard_id = shard_id,
614 "Scheduling cross-chain query",
615 );
616
617 if let Err(error) = cross_chain_sender.try_send((request, shard_id)) {
618 error!(%error, "dropping cross-chain request");
619 #[cfg(with_metrics)]
620 if error.is_full() {
621 metrics::CROSS_CHAIN_MESSAGE_CHANNEL_FULL
622 .with_label_values(&[])
623 .inc();
624 }
625 }
626 }
627
628 for notification in actions.notifications {
629 trace!("Scheduling notification query");
630 if let Err(error) = notification_sender.send(notification) {
631 error!(%error, "dropping notification");
632 #[cfg(with_metrics)]
633 metrics::NOTIFICATIONS_DROPPED_NO_RECEIVER
634 .with_label_values(&[])
635 .inc();
636 }
637 }
638 }
639
640 #[instrument(skip_all, fields(nickname, %this_shard))]
641 #[expect(clippy::too_many_arguments)]
642 async fn forward_cross_chain_queries(
643 nickname: String,
644 network: ValidatorInternalNetworkConfig,
645 cross_chain_max_retries: u32,
646 cross_chain_retry_delay: Duration,
647 cross_chain_max_backoff: Duration,
648 cross_chain_sender_delay: Duration,
649 cross_chain_sender_failure_rate: f32,
650 this_shard: ShardId,
651 receiver: mpsc::Receiver<(linera_core::data_types::CrossChainRequest, ShardId)>,
652 ) {
653 let pool = GrpcConnectionPool::default();
654 let handle_request =
655 move |shard_id: ShardId, request: linera_core::data_types::CrossChainRequest| {
656 let channel_result = pool.channel(network.shard(shard_id).http_address());
657 async move {
658 let mut client = ValidatorWorkerClient::new(channel_result?)
659 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
660 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
661 client
662 .handle_cross_chain_request(Request::new(request.try_into()?))
663 .await?;
664 anyhow::Result::<_, anyhow::Error>::Ok(())
665 }
666 };
667 cross_chain_message_queue::forward_cross_chain_queries(
668 nickname,
669 cross_chain_max_retries,
670 cross_chain_retry_delay,
671 cross_chain_max_backoff,
672 cross_chain_sender_delay,
673 cross_chain_sender_failure_rate,
674 this_shard,
675 receiver,
676 handle_request,
677 )
678 .await;
679 }
680
681 fn log_request_success(method_name: &str, traffic_type: &str) {
682 #![cfg_attr(not(with_metrics), allow(unused_variables))]
683 #[cfg(with_metrics)]
684 metrics::SERVER_REQUEST_SUCCESS
685 .with_label_values(&[method_name, traffic_type])
686 .inc();
687 }
688
689 fn log_request_error(method_name: &str, traffic_type: &str, error_type: &str) {
690 #![cfg_attr(not(with_metrics), allow(unused_variables))]
691 #[cfg(with_metrics)]
692 metrics::SERVER_REQUEST_ERROR
693 .with_label_values(&[method_name, traffic_type, error_type])
694 .inc();
695 }
696
697 #[cfg(feature = "opentelemetry")]
699 fn get_traffic_type<R>(request: &Request<R>) -> &'static str {
700 get_traffic_type_from_request(request)
701 }
702
703 #[cfg(not(feature = "opentelemetry"))]
705 fn get_traffic_type<R>(_request: &Request<R>) -> &'static str {
706 "unknown"
707 }
708
709 fn log_error(&self, error: &linera_core::worker::WorkerError, context: &str) {
710 let nickname = self.state.nickname();
711 if error.is_local() {
712 error!(nickname, %error, "{}", context);
713 } else {
714 debug!(nickname, %error, "{}", context);
715 }
716 }
717}
718
719#[tonic::async_trait]
720impl<S> ValidatorWorkerRpc for GrpcServer<S>
721where
722 S: Storage + Clone + Send + Sync + 'static,
723{
724 #[instrument(
725 target = "grpc_server",
726 skip_all,
727 err,
728 fields(
729 nickname = self.state.nickname(),
730 chain_id = ?request.get_ref().chain_id()
731 )
732 )]
733 async fn handle_block_proposal(
734 &self,
735 request: Request<BlockProposal>,
736 ) -> Result<Response<ChainInfoResult>, Status> {
737 let traffic_type = Self::get_traffic_type(&request);
738 let proposal = request.into_inner().try_into()?;
739 trace!(?proposal, "Handling block proposal");
740 let (result, actions) = self.state.clone().handle_block_proposal(proposal).await;
741 self.handle_network_actions(actions);
746 Ok(Response::new(match result {
747 Ok(info) => {
748 Self::log_request_success("handle_block_proposal", traffic_type);
749 info.try_into()?
750 }
751 Err(error) => {
752 Self::log_request_error("handle_block_proposal", traffic_type, &error.error_type());
753 self.log_error(&error, "Failed to handle block proposal");
754 NodeError::from(error).try_into()?
755 }
756 }))
757 }
758
759 #[instrument(
760 target = "grpc_server",
761 skip_all,
762 err,
763 fields(
764 nickname = self.state.nickname(),
765 chain_id = ?request.get_ref().chain_id()
766 )
767 )]
768 async fn handle_lite_certificate(
769 &self,
770 request: Request<LiteCertificate>,
771 ) -> Result<Response<ChainInfoResult>, Status> {
772 let traffic_type = Self::get_traffic_type(&request);
773 let HandleLiteCertRequest {
774 certificate,
775 wait_for_outgoing_messages,
776 } = request.into_inner().try_into()?;
777 trace!(?certificate, "Handling lite certificate");
778 let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
779 match Box::pin(
780 self.state
781 .clone()
782 .handle_lite_certificate(certificate, sender),
783 )
784 .await
785 {
786 Ok((info, actions)) => {
787 Self::log_request_success("handle_lite_certificate", traffic_type);
788 self.handle_network_actions(actions);
789 if let Some(receiver) = receiver {
790 if let Err(e) = receiver.await {
791 error!("Failed to wait for message delivery: {e}");
792 }
793 }
794 Ok(Response::new(info.try_into()?))
795 }
796 Err(error) => {
797 Self::log_request_error(
798 "handle_lite_certificate",
799 traffic_type,
800 &error.error_type(),
801 );
802 self.log_error(&error, "Failed to handle lite certificate");
803 Ok(Response::new(NodeError::from(error).try_into()?))
804 }
805 }
806 }
807
808 #[instrument(
809 target = "grpc_server",
810 skip_all,
811 err,
812 fields(
813 nickname = self.state.nickname(),
814 chain_id = ?request.get_ref().chain_id()
815 )
816 )]
817 async fn handle_confirmed_certificate(
818 &self,
819 request: Request<api::HandleConfirmedCertificateRequest>,
820 ) -> Result<Response<ChainInfoResult>, Status> {
821 let traffic_type = Self::get_traffic_type(&request);
822 let HandleConfirmedCertificateRequest {
823 certificate,
824 wait_for_outgoing_messages,
825 } = request.into_inner().try_into()?;
826 trace!(?certificate, "Handling certificate");
827 let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
828 match self
829 .state
830 .clone()
831 .handle_confirmed_certificate(certificate, ProcessConfirmedBlockMode::Auto, sender)
832 .await
833 {
834 Ok((info, actions)) => {
835 Self::log_request_success("handle_confirmed_certificate", traffic_type);
836 self.handle_network_actions(actions);
837 if let Some(receiver) = receiver {
838 if let Err(e) = receiver.await {
839 error!("Failed to wait for message delivery: {e}");
840 }
841 }
842 Ok(Response::new(info.try_into()?))
843 }
844 Err(error) => {
845 Self::log_request_error(
846 "handle_confirmed_certificate",
847 traffic_type,
848 &error.error_type(),
849 );
850 self.log_error(&error, "Failed to handle confirmed certificate");
851 Ok(Response::new(NodeError::from(error).try_into()?))
852 }
853 }
854 }
855
856 #[instrument(
857 target = "grpc_server",
858 skip_all,
859 err,
860 fields(
861 nickname = self.state.nickname(),
862 chain_id = ?request.get_ref().chain_id()
863 )
864 )]
865 async fn handle_validated_certificate(
866 &self,
867 request: Request<api::HandleValidatedCertificateRequest>,
868 ) -> Result<Response<ChainInfoResult>, Status> {
869 let traffic_type = Self::get_traffic_type(&request);
870 let HandleValidatedCertificateRequest { certificate } = request.into_inner().try_into()?;
871 trace!(?certificate, "Handling certificate");
872 match self
873 .state
874 .clone()
875 .handle_validated_certificate(certificate)
876 .await
877 {
878 Ok((info, actions)) => {
879 Self::log_request_success("handle_validated_certificate", traffic_type);
880 self.handle_network_actions(actions);
881 Ok(Response::new(info.try_into()?))
882 }
883 Err(error) => {
884 Self::log_request_error(
885 "handle_validated_certificate",
886 traffic_type,
887 &error.error_type(),
888 );
889 self.log_error(&error, "Failed to handle validated certificate");
890 Ok(Response::new(NodeError::from(error).try_into()?))
891 }
892 }
893 }
894
895 #[instrument(
896 target = "grpc_server",
897 skip_all,
898 err,
899 fields(
900 nickname = self.state.nickname(),
901 chain_id = ?request.get_ref().chain_id()
902 )
903 )]
904 async fn handle_timeout_certificate(
905 &self,
906 request: Request<api::HandleTimeoutCertificateRequest>,
907 ) -> Result<Response<ChainInfoResult>, Status> {
908 let traffic_type = Self::get_traffic_type(&request);
909 let HandleTimeoutCertificateRequest { certificate } = request.into_inner().try_into()?;
910 trace!(?certificate, "Handling Timeout certificate");
911 match self
912 .state
913 .clone()
914 .handle_timeout_certificate(certificate)
915 .await
916 {
917 Ok((info, _actions)) => {
918 Self::log_request_success("handle_timeout_certificate", traffic_type);
919 Ok(Response::new(info.try_into()?))
920 }
921 Err(error) => {
922 Self::log_request_error(
923 "handle_timeout_certificate",
924 traffic_type,
925 &error.error_type(),
926 );
927 self.log_error(&error, "Failed to handle timeout certificate");
928 Ok(Response::new(NodeError::from(error).try_into()?))
929 }
930 }
931 }
932
933 #[instrument(
934 target = "grpc_server",
935 skip_all,
936 err,
937 fields(
938 nickname = self.state.nickname(),
939 chain_id = ?request.get_ref().chain_id()
940 )
941 )]
942 async fn handle_chain_info_query(
943 &self,
944 request: Request<ChainInfoQuery>,
945 ) -> Result<Response<ChainInfoResult>, Status> {
946 let traffic_type = Self::get_traffic_type(&request);
947 let query = request.into_inner().try_into()?;
948 trace!(?query, "Handling chain info query");
949 match self.state.clone().handle_chain_info_query(query).await {
950 Ok(info) => {
951 Self::log_request_success("handle_chain_info_query", traffic_type);
952 Ok(Response::new(info.try_into()?))
953 }
954 Err(error) => {
955 Self::log_request_error(
956 "handle_chain_info_query",
957 traffic_type,
958 &error.error_type(),
959 );
960 self.log_error(&error, "Failed to handle chain info query");
961 Ok(Response::new(NodeError::from(error).try_into()?))
962 }
963 }
964 }
965
966 #[instrument(
967 target = "grpc_server",
968 skip_all,
969 err,
970 fields(
971 nickname = self.state.nickname(),
972 chain_id = ?request.get_ref().chain_id()
973 )
974 )]
975 async fn download_pending_blob(
976 &self,
977 request: Request<PendingBlobRequest>,
978 ) -> Result<Response<PendingBlobResult>, Status> {
979 let traffic_type = Self::get_traffic_type(&request);
980 let (chain_id, blob_id) = request.into_inner().try_into()?;
981 trace!(?blob_id, "Download pending blob");
982 match self
983 .state
984 .clone()
985 .download_pending_blob(chain_id, blob_id)
986 .await
987 {
988 Ok(blob) => {
989 Self::log_request_success("download_pending_blob", traffic_type);
990 Ok(Response::new(blob.content().clone().try_into()?))
991 }
992 Err(error) => {
993 Self::log_request_error("download_pending_blob", traffic_type, &error.error_type());
994 self.log_error(&error, "Failed to download pending blob");
995 Ok(Response::new(NodeError::from(error).try_into()?))
996 }
997 }
998 }
999
1000 #[instrument(
1001 target = "grpc_server",
1002 skip_all,
1003 err,
1004 fields(
1005 nickname = self.state.nickname(),
1006 chain_id = ?request.get_ref().chain_id()
1007 )
1008 )]
1009 async fn handle_pending_blob(
1010 &self,
1011 request: Request<HandlePendingBlobRequest>,
1012 ) -> Result<Response<ChainInfoResult>, Status> {
1013 let traffic_type = Self::get_traffic_type(&request);
1014 let (chain_id, blob_content) = request.into_inner().try_into()?;
1015 let blob = Blob::new(blob_content);
1016 let blob_id = blob.id();
1017 trace!(?blob_id, "Handle pending blob");
1018 match self.state.clone().handle_pending_blob(chain_id, blob).await {
1019 Ok(info) => {
1020 Self::log_request_success("handle_pending_blob", traffic_type);
1021 Ok(Response::new(info.try_into()?))
1022 }
1023 Err(error) => {
1024 Self::log_request_error("handle_pending_blob", traffic_type, &error.error_type());
1025 self.log_error(&error, "Failed to handle pending blob");
1026 Ok(Response::new(NodeError::from(error).try_into()?))
1027 }
1028 }
1029 }
1030
1031 #[instrument(
1032 target = "grpc_server",
1033 skip_all,
1034 err,
1035 fields(
1036 nickname = self.state.nickname(),
1037 chain_id = ?request.get_ref().chain_id()
1038 )
1039 )]
1040 async fn handle_cross_chain_request(
1041 &self,
1042 request: Request<CrossChainRequest>,
1043 ) -> Result<Response<()>, Status> {
1044 let traffic_type = Self::get_traffic_type(&request);
1045 let cross_chain_request = request.into_inner().try_into()?;
1046 trace!(?cross_chain_request, "Handling cross-chain request");
1047 match self
1048 .state
1049 .clone()
1050 .handle_cross_chain_request(cross_chain_request)
1051 .await
1052 {
1053 Ok(actions) => {
1054 Self::log_request_success("handle_cross_chain_request", traffic_type);
1055 self.handle_network_actions(actions)
1056 }
1057 Err(error) => {
1058 Self::log_request_error(
1059 "handle_cross_chain_request",
1060 traffic_type,
1061 &error.error_type(),
1062 );
1063 self.log_error(&error, "Failed to handle cross-chain request");
1064 }
1065 }
1066 Ok(Response::new(()))
1067 }
1068}
1069
1070pub trait GrpcProxyable {
1073 fn chain_id(&self) -> Option<ChainId>;
1074}
1075
1076impl GrpcProxyable for BlockProposal {
1077 fn chain_id(&self) -> Option<ChainId> {
1078 self.chain_id.clone()?.try_into().ok()
1079 }
1080}
1081
1082impl GrpcProxyable for LiteCertificate {
1083 fn chain_id(&self) -> Option<ChainId> {
1084 self.chain_id.clone()?.try_into().ok()
1085 }
1086}
1087
1088impl GrpcProxyable for api::HandleConfirmedCertificateRequest {
1089 fn chain_id(&self) -> Option<ChainId> {
1090 self.chain_id.clone()?.try_into().ok()
1091 }
1092}
1093
1094impl GrpcProxyable for api::HandleTimeoutCertificateRequest {
1095 fn chain_id(&self) -> Option<ChainId> {
1096 self.chain_id.clone()?.try_into().ok()
1097 }
1098}
1099
1100impl GrpcProxyable for api::HandleValidatedCertificateRequest {
1101 fn chain_id(&self) -> Option<ChainId> {
1102 self.chain_id.clone()?.try_into().ok()
1103 }
1104}
1105
1106impl GrpcProxyable for ChainInfoQuery {
1107 fn chain_id(&self) -> Option<ChainId> {
1108 self.chain_id.clone()?.try_into().ok()
1109 }
1110}
1111
1112impl GrpcProxyable for PendingBlobRequest {
1113 fn chain_id(&self) -> Option<ChainId> {
1114 self.chain_id.clone()?.try_into().ok()
1115 }
1116}
1117
1118impl GrpcProxyable for HandlePendingBlobRequest {
1119 fn chain_id(&self) -> Option<ChainId> {
1120 self.chain_id.clone()?.try_into().ok()
1121 }
1122}
1123
1124impl GrpcProxyable for CrossChainRequest {
1125 fn chain_id(&self) -> Option<ChainId> {
1126 use super::api::cross_chain_request::Inner;
1127
1128 match self.inner.as_ref()? {
1129 Inner::UpdateRecipient(api::UpdateRecipient { recipient, .. })
1130 | Inner::ConfirmUpdatedRecipient(api::ConfirmUpdatedRecipient { recipient, .. })
1131 | Inner::RevertConfirm(api::RevertConfirm { recipient, .. }) => {
1132 recipient.clone()?.try_into().ok()
1133 }
1134 }
1135 }
1136}