1use std::{
5 net::{IpAddr, SocketAddr},
6 str::FromStr,
7 task::{Context, Poll},
8};
9
10use futures::{
11 channel::mpsc, future::BoxFuture, stream::FuturesUnordered, FutureExt as _, StreamExt as _,
12};
13#[cfg(with_metrics)]
14use linera_base::time::Instant;
15use linera_base::{data_types::Blob, identifiers::ChainId, time::Duration};
16use linera_core::{
17 join_set_ext::JoinSet,
18 node::NodeError,
19 worker::{NetworkActions, Notification, Reason, WorkerState},
20 JoinSetExt as _, TaskHandle,
21};
22use linera_storage::Storage;
23use tokio::sync::{broadcast::error::RecvError, oneshot};
24use tokio_util::sync::CancellationToken;
25use tonic::{transport::Channel, Request, Response, Status};
26use tower::{builder::ServiceBuilder, Layer, Service};
27use tracing::{debug, error, info, instrument, trace, warn};
28
29use super::{
30 api::{
31 self,
32 notifier_service_client::NotifierServiceClient,
33 validator_worker_client::ValidatorWorkerClient,
34 validator_worker_server::{ValidatorWorker as ValidatorWorkerRpc, ValidatorWorkerServer},
35 BlockProposal, ChainInfoQuery, ChainInfoResult, CrossChainRequest,
36 HandlePendingBlobRequest, LiteCertificate, PendingBlobRequest, PendingBlobResult,
37 },
38 pool::GrpcConnectionPool,
39 GrpcError, GRPC_MAX_MESSAGE_SIZE,
40};
41#[cfg(feature = "opentelemetry")]
42use crate::propagation::get_traffic_type_from_request;
43use crate::{
44 config::{CrossChainConfig, NotificationConfig, ShardId, ValidatorInternalNetworkConfig},
45 cross_chain_message_queue, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
46 HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
47};
48
49type CrossChainSender = mpsc::Sender<(linera_core::data_types::CrossChainRequest, ShardId)>;
50type NotificationSender = tokio::sync::broadcast::Sender<Notification>;
51
52#[cfg(with_metrics)]
53mod metrics {
54 use std::sync::LazyLock;
55
56 use linera_base::prometheus_util::{
57 exponential_bucket_interval, linear_bucket_interval, register_histogram_vec,
58 register_int_counter_vec,
59 };
60 use prometheus::{HistogramVec, IntCounterVec};
61
62 use super::super::{ERROR_TYPE_LABEL, METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL};
63
64 pub static SERVER_REQUEST_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
65 register_histogram_vec(
66 "server_request_latency",
67 "Server request latency",
68 &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
69 linear_bucket_interval(1.0, 50.0, 5000.0),
70 )
71 });
72
73 pub static SERVER_REQUEST_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
74 register_int_counter_vec(
75 "server_request_count",
76 "Server request count",
77 &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
78 )
79 });
80
81 pub static SERVER_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
82 register_int_counter_vec(
83 "server_request_success",
84 "Server request success",
85 &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
86 )
87 });
88
89 pub static SERVER_REQUEST_ERROR: LazyLock<IntCounterVec> = LazyLock::new(|| {
90 register_int_counter_vec(
91 "server_request_error",
92 "Server request error",
93 &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL, ERROR_TYPE_LABEL],
94 )
95 });
96
97 pub static SERVER_REQUEST_CANCELLED: LazyLock<IntCounterVec> = LazyLock::new(|| {
98 register_int_counter_vec(
99 "server_request_cancelled",
100 "Server requests whose handler future was dropped before completion (e.g. client-side timeout / disconnect)",
101 &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
102 )
103 });
104
105 pub static CROSS_CHAIN_MESSAGE_CHANNEL_FULL: LazyLock<IntCounterVec> = LazyLock::new(|| {
106 register_int_counter_vec(
107 "cross_chain_message_channel_full",
108 "Cross-chain message channel full",
109 &[],
110 )
111 });
112
113 pub static NOTIFICATIONS_SKIPPED_RECEIVER_LAG: LazyLock<IntCounterVec> = LazyLock::new(|| {
114 register_int_counter_vec(
115 "notifications_skipped_receiver_lag",
116 "Number of notifications skipped because receiver lagged behind sender",
117 &[],
118 )
119 });
120
121 pub static NOTIFICATIONS_DROPPED_NO_RECEIVER: LazyLock<IntCounterVec> = LazyLock::new(|| {
122 register_int_counter_vec(
123 "notifications_dropped_no_receiver",
124 "Number of notifications dropped because no receiver was available",
125 &[],
126 )
127 });
128
129 pub static NOTIFICATION_BATCH_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
130 register_histogram_vec(
131 "notification_batch_size",
132 "Number of notifications per batch sent to proxy",
133 &[],
134 exponential_bucket_interval(1.0, 250.0),
135 )
136 });
137
138 pub static NOTIFICATION_BATCHES_SENT: LazyLock<IntCounterVec> = LazyLock::new(|| {
139 register_int_counter_vec(
140 "notification_batches_sent",
141 "Total notification batches sent",
142 &["status"],
143 )
144 });
145}
146
147struct BatchForwarder {
149 nickname: String,
150 client: NotifierServiceClient<Channel>,
151 exporter_clients: Vec<NotifierServiceClient<Channel>>,
152 pending_notifications: Vec<Notification>,
153 futures: FuturesUnordered<BoxFuture<'static, ()>>,
154 batch_limit: usize,
155 max_tasks: usize,
156}
157
158impl BatchForwarder {
159 fn spawn_batches(&mut self) {
161 while !self.pending_notifications.is_empty() && self.futures.len() < self.max_tasks {
162 let chunk_size = std::cmp::min(self.batch_limit, self.pending_notifications.len());
163 let batch: Vec<Notification> = self.pending_notifications.drain(..chunk_size).collect();
164
165 #[cfg(with_metrics)]
166 metrics::NOTIFICATION_BATCH_SIZE
167 .with_label_values(&[])
168 .observe(batch.len() as f64);
169
170 let client = self.client.clone();
171 let exporter_clients = self.exporter_clients.clone();
172 let nickname = self.nickname.clone();
173
174 self.futures.push(
175 async move {
176 Self::send_batch(nickname, client, exporter_clients, batch).await;
177 }
178 .boxed(),
179 );
180 }
181 }
182
183 fn is_fully_drained(&self) -> bool {
185 self.pending_notifications.is_empty() && self.futures.is_empty()
186 }
187
188 async fn send_batch(
190 nickname: String,
191 mut client: NotifierServiceClient<Channel>,
192 mut exporter_clients: Vec<NotifierServiceClient<Channel>>,
193 batch: Vec<Notification>,
194 ) {
195 let mut proto_notifications = Vec::with_capacity(batch.len());
197 for notification in &batch {
198 match notification.clone().try_into() {
199 Ok(proto) => proto_notifications.push(proto),
200 Err(error) => {
201 warn!(
202 %error,
203 nickname,
204 ?notification.chain_id,
205 ?notification.reason,
206 "could not deserialize notification"
207 );
208 }
209 }
210 }
211
212 let chain_ids: Vec<_> = batch.iter().map(|n| n.chain_id).collect();
214
215 let request = Request::new(api::NotificationBatch {
217 notifications: proto_notifications.clone(),
218 });
219 let result = client.notify_batch(request).await;
220
221 #[cfg(with_metrics)]
222 {
223 let status = if result.is_ok() { "success" } else { "error" };
224 metrics::NOTIFICATION_BATCHES_SENT
225 .with_label_values(&[status])
226 .inc();
227 }
228
229 if let Err(error) = result {
230 error!(
231 %error,
232 nickname,
233 batch_size = proto_notifications.len(),
234 ?chain_ids,
235 "proxy: could not send notification batch",
236 );
237 }
238
239 let new_block_notifications: Vec<_> = batch
241 .iter()
242 .filter(|n| matches!(n.reason, Reason::NewBlock { .. }))
243 .collect();
244
245 let exporter_notifications: Vec<api::Notification> = new_block_notifications
246 .iter()
247 .filter_map(|n| (*n).clone().try_into().ok())
248 .collect();
249
250 if !exporter_notifications.is_empty() {
251 let exporter_chain_ids: Vec<_> =
252 new_block_notifications.iter().map(|n| n.chain_id).collect();
253
254 for exporter_client in &mut exporter_clients {
255 let request = Request::new(api::NotificationBatch {
256 notifications: exporter_notifications.clone(),
257 });
258 if let Err(error) = exporter_client.notify_batch(request).await {
259 error!(
260 %error,
261 nickname,
262 batch_size = exporter_notifications.len(),
263 ?exporter_chain_ids,
264 "block exporter: could not send notification batch",
265 );
266 }
267 }
268 }
269 }
270}
271
272#[derive(Clone)]
273pub struct GrpcServer<S>
274where
275 S: Storage,
276{
277 state: WorkerState<S>,
278 shard_id: ShardId,
279 network: ValidatorInternalNetworkConfig,
280 cross_chain_sender: CrossChainSender,
281 notification_sender: NotificationSender,
282}
283
284pub struct GrpcServerHandle {
285 handle: TaskHandle<Result<(), GrpcError>>,
286}
287
288impl GrpcServerHandle {
289 pub async fn join(self) -> Result<(), GrpcError> {
290 self.handle.await?
291 }
292}
293
294#[cfg(with_metrics)]
295struct ServerRequestCancellationGuard {
296 method_name: String,
297 traffic_type: &'static str,
298 completed: bool,
299}
300
301#[cfg(with_metrics)]
302impl Drop for ServerRequestCancellationGuard {
303 fn drop(&mut self) {
304 if !self.completed {
305 metrics::SERVER_REQUEST_CANCELLED
306 .with_label_values(&[&self.method_name, self.traffic_type])
307 .inc();
308 }
309 }
310}
311
312#[derive(Clone)]
313pub struct GrpcPrometheusMetricsMiddlewareLayer;
314
315#[derive(Clone)]
316pub struct GrpcPrometheusMetricsMiddlewareService<T> {
317 service: T,
318}
319
320impl<S> Layer<S> for GrpcPrometheusMetricsMiddlewareLayer {
321 type Service = GrpcPrometheusMetricsMiddlewareService<S>;
322
323 fn layer(&self, service: S) -> Self::Service {
324 GrpcPrometheusMetricsMiddlewareService { service }
325 }
326}
327
328impl<S, B> Service<http::Request<B>> for GrpcPrometheusMetricsMiddlewareService<S>
329where
330 S::Future: Send + 'static,
331 S: Service<http::Request<B>> + std::marker::Send,
332 B: Send + 'static,
333{
334 type Response = S::Response;
335 type Error = S::Error;
336 type Future = BoxFuture<'static, Result<S::Response, S::Error>>;
337
338 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
339 self.service.poll_ready(cx)
340 }
341
342 fn call(&mut self, request: http::Request<B>) -> Self::Future {
343 #[cfg(with_metrics)]
344 let start = Instant::now();
345
346 #[cfg(with_metrics)]
347 let method_name = super::extract_grpc_method_name(request.uri().path()).to_owned();
348
349 #[cfg(all(with_metrics, feature = "opentelemetry"))]
353 let traffic_type: &'static str = get_traffic_type_from_request(&request);
354 #[cfg(all(with_metrics, not(feature = "opentelemetry")))]
355 let traffic_type: &'static str = "unknown";
356
357 let future = self.service.call(request);
358 async move {
359 #[cfg(with_metrics)]
360 let mut cancellation_guard = ServerRequestCancellationGuard {
361 method_name,
362 traffic_type,
363 completed: false,
364 };
365 let response = future.await?;
366 #[cfg(with_metrics)]
367 {
368 cancellation_guard.completed = true;
369 metrics::SERVER_REQUEST_LATENCY
370 .with_label_values(&[&cancellation_guard.method_name, traffic_type])
371 .observe(start.elapsed().as_secs_f64() * 1000.0);
372 metrics::SERVER_REQUEST_COUNT
373 .with_label_values(&[&cancellation_guard.method_name, traffic_type])
374 .inc();
375 }
376 Ok(response)
377 }
378 .boxed()
379 }
380}
381
382impl<S> GrpcServer<S>
383where
384 S: Storage + Clone + Send + Sync + 'static,
385{
386 #[expect(clippy::too_many_arguments)]
387 pub fn spawn(
388 host: String,
389 port: u16,
390 state: WorkerState<S>,
391 shard_id: ShardId,
392 internal_network: ValidatorInternalNetworkConfig,
393 cross_chain_config: &CrossChainConfig,
394 notification_config: &NotificationConfig,
395 shutdown_signal: CancellationToken,
396 join_set: &mut JoinSet,
397 ) -> GrpcServerHandle {
398 info!(
399 "spawning gRPC server on {}:{} for shard {}",
400 host, port, shard_id
401 );
402
403 let (cross_chain_sender, cross_chain_receiver) =
404 mpsc::channel(cross_chain_config.queue_size);
405
406 let state = {
410 let routing_network = internal_network.clone();
411 let routing_sender = cross_chain_sender.clone();
412 state.with_outbound_cross_chain_sender(std::sync::Arc::new(move |request| {
413 let shard_id = routing_network.get_shard_id(request.target_chain_id());
414 if let Err(error) = routing_sender.clone().try_send((request, shard_id)) {
415 error!(%error, "dropping cross-chain request");
416 }
417 }))
418 };
419
420 let (notification_sender, _) =
421 tokio::sync::broadcast::channel(notification_config.notification_queue_size);
422
423 join_set.spawn_task({
424 info!(
425 nickname = state.nickname(),
426 "spawning cross-chain queries thread on {} for shard {}", host, shard_id
427 );
428 Self::forward_cross_chain_queries(
429 state.nickname().to_string(),
430 internal_network.clone(),
431 cross_chain_config.max_retries,
432 Duration::from_millis(cross_chain_config.retry_delay_ms),
433 Duration::from_millis(cross_chain_config.max_backoff_ms),
434 Duration::from_millis(cross_chain_config.sender_delay_ms),
435 cross_chain_config.sender_failure_rate,
436 shard_id,
437 cross_chain_receiver,
438 )
439 });
440
441 let mut exporter_forwarded = false;
442 for proxy in &internal_network.proxies {
443 let receiver = notification_sender.subscribe();
444 join_set.spawn_task({
445 info!(
446 nickname = state.nickname(),
447 "spawning notifications thread on {} for shard {}", host, shard_id
448 );
449 let exporter_addresses = if exporter_forwarded {
450 vec![]
451 } else {
452 exporter_forwarded = true;
453 internal_network.exporter_addresses()
454 };
455 Self::forward_notifications(
456 state.nickname().to_string(),
457 proxy.internal_address(&internal_network.protocol),
458 exporter_addresses,
459 receiver,
460 notification_config.clone(),
461 )
462 });
463 }
464
465 let (health_reporter, health_service) = tonic_health::server::health_reporter();
466
467 let grpc_server = GrpcServer {
468 state,
469 shard_id,
470 network: internal_network,
471 cross_chain_sender,
472 notification_sender,
473 };
474
475 let worker_node = ValidatorWorkerServer::new(grpc_server)
476 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
477 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
478
479 let handle = join_set.spawn_task(async move {
480 let server_address = SocketAddr::from((IpAddr::from_str(&host)?, port));
481
482 let reflection_service = tonic_reflection::server::Builder::configure()
483 .register_encoded_file_descriptor_set(crate::FILE_DESCRIPTOR_SET)
484 .build_v1()?;
485
486 health_reporter
487 .set_serving::<ValidatorWorkerServer<Self>>()
488 .await;
489
490 #[cfg(feature = "opentelemetry")]
491 let mut server = tonic::transport::Server::builder().layer(
492 ServiceBuilder::new()
493 .layer(crate::propagation::OtelContextLayer)
494 .layer(GrpcPrometheusMetricsMiddlewareLayer)
495 .into_inner(),
496 );
497 #[cfg(not(feature = "opentelemetry"))]
498 let mut server = tonic::transport::Server::builder().layer(
499 ServiceBuilder::new()
500 .layer(GrpcPrometheusMetricsMiddlewareLayer)
501 .into_inner(),
502 );
503 server
504 .add_service(health_service)
505 .add_service(reflection_service)
506 .add_service(worker_node)
507 .serve_with_shutdown(server_address, shutdown_signal.cancelled_owned())
508 .await?;
509
510 Ok(())
511 });
512
513 GrpcServerHandle { handle }
514 }
515
516 #[instrument(skip(receiver, config))]
519 async fn forward_notifications(
520 nickname: String,
521 proxy_address: String,
522 exporter_addresses: Vec<String>,
523 mut receiver: tokio::sync::broadcast::Receiver<Notification>,
524 config: NotificationConfig,
525 ) {
526 let channel = tonic::transport::Channel::from_shared(proxy_address.clone())
527 .expect("Proxy URI should be valid")
528 .connect_lazy();
529 let client = NotifierServiceClient::new(channel)
530 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
531 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
532
533 let exporter_clients: Vec<NotifierServiceClient<Channel>> = exporter_addresses
534 .iter()
535 .map(|address| {
536 let channel = tonic::transport::Channel::from_shared(address.clone())
537 .expect("Exporter URI should be valid")
538 .connect_lazy();
539 NotifierServiceClient::new(channel)
540 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
541 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
542 })
543 .collect::<Vec<_>>();
544
545 let mut forwarder = BatchForwarder {
546 nickname: nickname.clone(),
547 client,
548 exporter_clients,
549 pending_notifications: Vec::new(),
550 futures: FuturesUnordered::new(),
551 batch_limit: config.notification_batch_size,
552 max_tasks: config.notification_max_in_flight,
553 };
554
555 loop {
556 tokio::select! {
557 biased;
558
559 result = receiver.recv() => {
560 match result {
561 Ok(notification) => {
562 forwarder.pending_notifications.push(notification);
563
564 if forwarder.futures.is_empty()
565 || (forwarder.pending_notifications.len() >= forwarder.batch_limit
566 && forwarder.futures.len() < forwarder.max_tasks) {
567 forwarder.spawn_batches();
568 }
569 }
570 Err(RecvError::Lagged(skipped_count)) => {
571 warn!(
572 nickname,
573 skipped_count, "notification receiver lagged, messages were skipped"
574 );
575 #[cfg(with_metrics)]
576 metrics::NOTIFICATIONS_SKIPPED_RECEIVER_LAG
577 .with_label_values(&[])
578 .inc_by(skipped_count);
579 }
580 Err(RecvError::Closed) => {
581 warn!(
582 nickname,
583 "notification channel closed, draining pending notifications"
584 );
585 loop {
587 forwarder.spawn_batches();
588 if forwarder.is_fully_drained() {
589 break;
590 }
591 forwarder.futures.next().await;
592 }
593 break;
594 }
595 }
596 }
597
598 Some(()) = forwarder.futures.next() => {
599 forwarder.spawn_batches();
600 }
601 }
602 }
603 }
604
605 fn handle_network_actions(&self, actions: NetworkActions) {
606 let mut cross_chain_sender = self.cross_chain_sender.clone();
607 let notification_sender = self.notification_sender.clone();
608
609 for request in actions.cross_chain_requests {
610 let shard_id = self.network.get_shard_id(request.target_chain_id());
611 trace!(
612 source_shard_id = self.shard_id,
613 target_shard_id = shard_id,
614 "Scheduling cross-chain query",
615 );
616
617 if let Err(error) = cross_chain_sender.try_send((request, shard_id)) {
618 error!(%error, "dropping cross-chain request");
619 #[cfg(with_metrics)]
620 if error.is_full() {
621 metrics::CROSS_CHAIN_MESSAGE_CHANNEL_FULL
622 .with_label_values(&[])
623 .inc();
624 }
625 }
626 }
627
628 for notification in actions.notifications {
629 trace!("Scheduling notification query");
630 if let Err(error) = notification_sender.send(notification) {
631 error!(%error, "dropping notification");
632 #[cfg(with_metrics)]
633 metrics::NOTIFICATIONS_DROPPED_NO_RECEIVER
634 .with_label_values(&[])
635 .inc();
636 }
637 }
638 }
639
640 #[instrument(skip_all, fields(nickname, %this_shard))]
641 #[expect(clippy::too_many_arguments)]
642 async fn forward_cross_chain_queries(
643 nickname: String,
644 network: ValidatorInternalNetworkConfig,
645 cross_chain_max_retries: u32,
646 cross_chain_retry_delay: Duration,
647 cross_chain_max_backoff: Duration,
648 cross_chain_sender_delay: Duration,
649 cross_chain_sender_failure_rate: f32,
650 this_shard: ShardId,
651 receiver: mpsc::Receiver<(linera_core::data_types::CrossChainRequest, ShardId)>,
652 ) {
653 let pool = GrpcConnectionPool::default();
654 let handle_request =
655 move |shard_id: ShardId, request: linera_core::data_types::CrossChainRequest| {
656 let channel_result = pool.channel(network.shard(shard_id).http_address());
657 async move {
658 let mut client = ValidatorWorkerClient::new(channel_result?)
659 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
660 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
661 client
662 .handle_cross_chain_request(Request::new(request.try_into()?))
663 .await?;
664 anyhow::Result::<_, anyhow::Error>::Ok(())
665 }
666 };
667 cross_chain_message_queue::forward_cross_chain_queries(
668 nickname,
669 cross_chain_max_retries,
670 cross_chain_retry_delay,
671 cross_chain_max_backoff,
672 cross_chain_sender_delay,
673 cross_chain_sender_failure_rate,
674 this_shard,
675 receiver,
676 handle_request,
677 )
678 .await;
679 }
680
681 fn log_request_success(method_name: &str, traffic_type: &str) {
682 #![cfg_attr(not(with_metrics), allow(unused_variables))]
683 #[cfg(with_metrics)]
684 metrics::SERVER_REQUEST_SUCCESS
685 .with_label_values(&[method_name, traffic_type])
686 .inc();
687 }
688
689 fn log_request_error(method_name: &str, traffic_type: &str, error_type: &str) {
690 #![cfg_attr(not(with_metrics), allow(unused_variables))]
691 #[cfg(with_metrics)]
692 metrics::SERVER_REQUEST_ERROR
693 .with_label_values(&[method_name, traffic_type, error_type])
694 .inc();
695 }
696
697 #[cfg(feature = "opentelemetry")]
699 fn get_traffic_type<R>(request: &Request<R>) -> &'static str {
700 get_traffic_type_from_request(request)
701 }
702
703 #[cfg(not(feature = "opentelemetry"))]
705 fn get_traffic_type<R>(_request: &Request<R>) -> &'static str {
706 "unknown"
707 }
708
709 fn log_error(&self, error: &linera_core::worker::WorkerError, context: &str) {
710 let nickname = self.state.nickname();
711 if error.is_local() {
712 error!(nickname, %error, "{}", context);
713 } else {
714 debug!(nickname, %error, "{}", context);
715 }
716 }
717}
718
719#[tonic::async_trait]
720impl<S> ValidatorWorkerRpc for GrpcServer<S>
721where
722 S: Storage + Clone + Send + Sync + 'static,
723{
724 #[instrument(
725 target = "grpc_server",
726 skip_all,
727 err,
728 fields(
729 nickname = self.state.nickname(),
730 chain_id = ?request.get_ref().chain_id()
731 )
732 )]
733 async fn handle_block_proposal(
734 &self,
735 request: Request<BlockProposal>,
736 ) -> Result<Response<ChainInfoResult>, Status> {
737 let traffic_type = Self::get_traffic_type(&request);
738 let proposal = request.into_inner().try_into()?;
739 trace!(?proposal, "Handling block proposal");
740 Ok(Response::new(
741 match self.state.clone().handle_block_proposal(proposal).await {
742 Ok((info, actions)) => {
743 Self::log_request_success("handle_block_proposal", traffic_type);
744 self.handle_network_actions(actions);
745 info.try_into()?
746 }
747 Err(error) => {
748 Self::log_request_error(
749 "handle_block_proposal",
750 traffic_type,
751 &error.error_type(),
752 );
753 self.log_error(&error, "Failed to handle block proposal");
754 NodeError::from(error).try_into()?
755 }
756 },
757 ))
758 }
759
760 #[instrument(
761 target = "grpc_server",
762 skip_all,
763 err,
764 fields(
765 nickname = self.state.nickname(),
766 chain_id = ?request.get_ref().chain_id()
767 )
768 )]
769 async fn handle_lite_certificate(
770 &self,
771 request: Request<LiteCertificate>,
772 ) -> Result<Response<ChainInfoResult>, Status> {
773 let traffic_type = Self::get_traffic_type(&request);
774 let HandleLiteCertRequest {
775 certificate,
776 wait_for_outgoing_messages,
777 } = request.into_inner().try_into()?;
778 trace!(?certificate, "Handling lite certificate");
779 let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
780 match Box::pin(
781 self.state
782 .clone()
783 .handle_lite_certificate(certificate, sender),
784 )
785 .await
786 {
787 Ok((info, actions)) => {
788 Self::log_request_success("handle_lite_certificate", traffic_type);
789 self.handle_network_actions(actions);
790 if let Some(receiver) = receiver {
791 if let Err(e) = receiver.await {
792 error!("Failed to wait for message delivery: {e}");
793 }
794 }
795 Ok(Response::new(info.try_into()?))
796 }
797 Err(error) => {
798 Self::log_request_error(
799 "handle_lite_certificate",
800 traffic_type,
801 &error.error_type(),
802 );
803 self.log_error(&error, "Failed to handle lite certificate");
804 Ok(Response::new(NodeError::from(error).try_into()?))
805 }
806 }
807 }
808
809 #[instrument(
810 target = "grpc_server",
811 skip_all,
812 err,
813 fields(
814 nickname = self.state.nickname(),
815 chain_id = ?request.get_ref().chain_id()
816 )
817 )]
818 async fn handle_confirmed_certificate(
819 &self,
820 request: Request<api::HandleConfirmedCertificateRequest>,
821 ) -> Result<Response<ChainInfoResult>, Status> {
822 let traffic_type = Self::get_traffic_type(&request);
823 let HandleConfirmedCertificateRequest {
824 certificate,
825 wait_for_outgoing_messages,
826 } = request.into_inner().try_into()?;
827 trace!(?certificate, "Handling certificate");
828 let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
829 match self
830 .state
831 .clone()
832 .handle_confirmed_certificate(certificate, sender)
833 .await
834 {
835 Ok((info, actions)) => {
836 Self::log_request_success("handle_confirmed_certificate", traffic_type);
837 self.handle_network_actions(actions);
838 if let Some(receiver) = receiver {
839 if let Err(e) = receiver.await {
840 error!("Failed to wait for message delivery: {e}");
841 }
842 }
843 Ok(Response::new(info.try_into()?))
844 }
845 Err(error) => {
846 Self::log_request_error(
847 "handle_confirmed_certificate",
848 traffic_type,
849 &error.error_type(),
850 );
851 self.log_error(&error, "Failed to handle confirmed certificate");
852 Ok(Response::new(NodeError::from(error).try_into()?))
853 }
854 }
855 }
856
857 #[instrument(
858 target = "grpc_server",
859 skip_all,
860 err,
861 fields(
862 nickname = self.state.nickname(),
863 chain_id = ?request.get_ref().chain_id()
864 )
865 )]
866 async fn handle_validated_certificate(
867 &self,
868 request: Request<api::HandleValidatedCertificateRequest>,
869 ) -> Result<Response<ChainInfoResult>, Status> {
870 let traffic_type = Self::get_traffic_type(&request);
871 let HandleValidatedCertificateRequest { certificate } = request.into_inner().try_into()?;
872 trace!(?certificate, "Handling certificate");
873 match self
874 .state
875 .clone()
876 .handle_validated_certificate(certificate)
877 .await
878 {
879 Ok((info, actions)) => {
880 Self::log_request_success("handle_validated_certificate", traffic_type);
881 self.handle_network_actions(actions);
882 Ok(Response::new(info.try_into()?))
883 }
884 Err(error) => {
885 Self::log_request_error(
886 "handle_validated_certificate",
887 traffic_type,
888 &error.error_type(),
889 );
890 self.log_error(&error, "Failed to handle validated certificate");
891 Ok(Response::new(NodeError::from(error).try_into()?))
892 }
893 }
894 }
895
896 #[instrument(
897 target = "grpc_server",
898 skip_all,
899 err,
900 fields(
901 nickname = self.state.nickname(),
902 chain_id = ?request.get_ref().chain_id()
903 )
904 )]
905 async fn handle_timeout_certificate(
906 &self,
907 request: Request<api::HandleTimeoutCertificateRequest>,
908 ) -> Result<Response<ChainInfoResult>, Status> {
909 let traffic_type = Self::get_traffic_type(&request);
910 let HandleTimeoutCertificateRequest { certificate } = request.into_inner().try_into()?;
911 trace!(?certificate, "Handling Timeout certificate");
912 match self
913 .state
914 .clone()
915 .handle_timeout_certificate(certificate)
916 .await
917 {
918 Ok((info, _actions)) => {
919 Self::log_request_success("handle_timeout_certificate", traffic_type);
920 Ok(Response::new(info.try_into()?))
921 }
922 Err(error) => {
923 Self::log_request_error(
924 "handle_timeout_certificate",
925 traffic_type,
926 &error.error_type(),
927 );
928 self.log_error(&error, "Failed to handle timeout certificate");
929 Ok(Response::new(NodeError::from(error).try_into()?))
930 }
931 }
932 }
933
934 #[instrument(
935 target = "grpc_server",
936 skip_all,
937 err,
938 fields(
939 nickname = self.state.nickname(),
940 chain_id = ?request.get_ref().chain_id()
941 )
942 )]
943 async fn handle_chain_info_query(
944 &self,
945 request: Request<ChainInfoQuery>,
946 ) -> Result<Response<ChainInfoResult>, Status> {
947 let traffic_type = Self::get_traffic_type(&request);
948 let query = request.into_inner().try_into()?;
949 trace!(?query, "Handling chain info query");
950 match self.state.clone().handle_chain_info_query(query).await {
951 Ok(info) => {
952 Self::log_request_success("handle_chain_info_query", traffic_type);
953 Ok(Response::new(info.try_into()?))
954 }
955 Err(error) => {
956 Self::log_request_error(
957 "handle_chain_info_query",
958 traffic_type,
959 &error.error_type(),
960 );
961 self.log_error(&error, "Failed to handle chain info query");
962 Ok(Response::new(NodeError::from(error).try_into()?))
963 }
964 }
965 }
966
967 #[instrument(
968 target = "grpc_server",
969 skip_all,
970 err,
971 fields(
972 nickname = self.state.nickname(),
973 chain_id = ?request.get_ref().chain_id()
974 )
975 )]
976 async fn download_pending_blob(
977 &self,
978 request: Request<PendingBlobRequest>,
979 ) -> Result<Response<PendingBlobResult>, Status> {
980 let traffic_type = Self::get_traffic_type(&request);
981 let (chain_id, blob_id) = request.into_inner().try_into()?;
982 trace!(?blob_id, "Download pending blob");
983 match self
984 .state
985 .clone()
986 .download_pending_blob(chain_id, blob_id)
987 .await
988 {
989 Ok(blob) => {
990 Self::log_request_success("download_pending_blob", traffic_type);
991 Ok(Response::new(blob.content().clone().try_into()?))
992 }
993 Err(error) => {
994 Self::log_request_error("download_pending_blob", traffic_type, &error.error_type());
995 self.log_error(&error, "Failed to download pending blob");
996 Ok(Response::new(NodeError::from(error).try_into()?))
997 }
998 }
999 }
1000
1001 #[instrument(
1002 target = "grpc_server",
1003 skip_all,
1004 err,
1005 fields(
1006 nickname = self.state.nickname(),
1007 chain_id = ?request.get_ref().chain_id
1008 )
1009 )]
1010 async fn handle_pending_blob(
1011 &self,
1012 request: Request<HandlePendingBlobRequest>,
1013 ) -> Result<Response<ChainInfoResult>, Status> {
1014 let traffic_type = Self::get_traffic_type(&request);
1015 let (chain_id, blob_content) = request.into_inner().try_into()?;
1016 let blob = Blob::new(blob_content);
1017 let blob_id = blob.id();
1018 trace!(?blob_id, "Handle pending blob");
1019 match self.state.clone().handle_pending_blob(chain_id, blob).await {
1020 Ok(info) => {
1021 Self::log_request_success("handle_pending_blob", traffic_type);
1022 Ok(Response::new(info.try_into()?))
1023 }
1024 Err(error) => {
1025 Self::log_request_error("handle_pending_blob", traffic_type, &error.error_type());
1026 self.log_error(&error, "Failed to handle pending blob");
1027 Ok(Response::new(NodeError::from(error).try_into()?))
1028 }
1029 }
1030 }
1031
1032 #[instrument(
1033 target = "grpc_server",
1034 skip_all,
1035 err,
1036 fields(
1037 nickname = self.state.nickname(),
1038 chain_id = ?request.get_ref().chain_id()
1039 )
1040 )]
1041 async fn handle_cross_chain_request(
1042 &self,
1043 request: Request<CrossChainRequest>,
1044 ) -> Result<Response<()>, Status> {
1045 let traffic_type = Self::get_traffic_type(&request);
1046 let cross_chain_request = request.into_inner().try_into()?;
1047 trace!(?cross_chain_request, "Handling cross-chain request");
1048 match self
1049 .state
1050 .clone()
1051 .handle_cross_chain_request(cross_chain_request)
1052 .await
1053 {
1054 Ok(actions) => {
1055 Self::log_request_success("handle_cross_chain_request", traffic_type);
1056 self.handle_network_actions(actions)
1057 }
1058 Err(error) => {
1059 Self::log_request_error(
1060 "handle_cross_chain_request",
1061 traffic_type,
1062 &error.error_type(),
1063 );
1064 self.log_error(&error, "Failed to handle cross-chain request");
1065 }
1066 }
1067 Ok(Response::new(()))
1068 }
1069}
1070
1071pub trait GrpcProxyable {
1074 fn chain_id(&self) -> Option<ChainId>;
1075}
1076
1077impl GrpcProxyable for BlockProposal {
1078 fn chain_id(&self) -> Option<ChainId> {
1079 self.chain_id.clone()?.try_into().ok()
1080 }
1081}
1082
1083impl GrpcProxyable for LiteCertificate {
1084 fn chain_id(&self) -> Option<ChainId> {
1085 self.chain_id.clone()?.try_into().ok()
1086 }
1087}
1088
1089impl GrpcProxyable for api::HandleConfirmedCertificateRequest {
1090 fn chain_id(&self) -> Option<ChainId> {
1091 self.chain_id.clone()?.try_into().ok()
1092 }
1093}
1094
1095impl GrpcProxyable for api::HandleTimeoutCertificateRequest {
1096 fn chain_id(&self) -> Option<ChainId> {
1097 self.chain_id.clone()?.try_into().ok()
1098 }
1099}
1100
1101impl GrpcProxyable for api::HandleValidatedCertificateRequest {
1102 fn chain_id(&self) -> Option<ChainId> {
1103 self.chain_id.clone()?.try_into().ok()
1104 }
1105}
1106
1107impl GrpcProxyable for ChainInfoQuery {
1108 fn chain_id(&self) -> Option<ChainId> {
1109 self.chain_id.clone()?.try_into().ok()
1110 }
1111}
1112
1113impl GrpcProxyable for PendingBlobRequest {
1114 fn chain_id(&self) -> Option<ChainId> {
1115 self.chain_id.clone()?.try_into().ok()
1116 }
1117}
1118
1119impl GrpcProxyable for HandlePendingBlobRequest {
1120 fn chain_id(&self) -> Option<ChainId> {
1121 self.chain_id.clone()?.try_into().ok()
1122 }
1123}
1124
1125impl GrpcProxyable for CrossChainRequest {
1126 fn chain_id(&self) -> Option<ChainId> {
1127 use super::api::cross_chain_request::Inner;
1128
1129 match self.inner.as_ref()? {
1130 Inner::UpdateRecipient(api::UpdateRecipient { recipient, .. })
1131 | Inner::ConfirmUpdatedRecipient(api::ConfirmUpdatedRecipient { recipient, .. })
1132 | Inner::RevertConfirm(api::RevertConfirm { recipient, .. }) => {
1133 recipient.clone()?.try_into().ok()
1134 }
1135 }
1136 }
1137}