1use std::{
5 net::{IpAddr, SocketAddr},
6 str::FromStr,
7 task::{Context, Poll},
8};
9
10use futures::{
11 channel::mpsc, future::BoxFuture, stream::FuturesUnordered, FutureExt as _, StreamExt as _,
12};
13#[cfg(with_metrics)]
14use linera_base::time::Instant;
15use linera_base::{data_types::Blob, identifiers::ChainId, time::Duration};
16use linera_core::{
17 join_set_ext::JoinSet,
18 node::NodeError,
19 worker::{NetworkActions, Notification, Reason, WorkerState},
20 JoinSetExt as _, TaskHandle,
21};
22use linera_storage::Storage;
23use tokio::sync::{broadcast::error::RecvError, oneshot};
24use tokio_util::sync::CancellationToken;
25use tonic::{transport::Channel, Request, Response, Status};
26use tower::{builder::ServiceBuilder, Layer, Service};
27use tracing::{debug, error, info, instrument, trace, warn};
28
29use super::{
30 api::{
31 self,
32 notifier_service_client::NotifierServiceClient,
33 validator_worker_client::ValidatorWorkerClient,
34 validator_worker_server::{ValidatorWorker as ValidatorWorkerRpc, ValidatorWorkerServer},
35 BlockProposal, ChainInfoQuery, ChainInfoResult, CrossChainRequest,
36 HandlePendingBlobRequest, LiteCertificate, PendingBlobRequest, PendingBlobResult,
37 },
38 pool::GrpcConnectionPool,
39 GrpcError, GRPC_MAX_MESSAGE_SIZE,
40};
41#[cfg(feature = "opentelemetry")]
42use crate::propagation::get_traffic_type_from_request;
43use crate::{
44 config::{CrossChainConfig, NotificationConfig, ShardId, ValidatorInternalNetworkConfig},
45 cross_chain_message_queue, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
46 HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
47};
48
49type CrossChainSender = mpsc::Sender<(linera_core::data_types::CrossChainRequest, ShardId)>;
50type NotificationSender = tokio::sync::broadcast::Sender<Notification>;
51
52#[cfg(with_metrics)]
53mod metrics {
54 use std::sync::LazyLock;
55
56 use linera_base::prometheus_util::{
57 exponential_bucket_interval, linear_bucket_interval, register_histogram_vec,
58 register_int_counter_vec,
59 };
60 use prometheus::{HistogramVec, IntCounterVec};
61
62 pub const TRAFFIC_TYPE_LABEL: &str = "traffic_type";
64
65 pub const METHOD_NAME_LABEL: &str = "method_name";
67
68 pub static SERVER_REQUEST_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
69 register_histogram_vec(
70 "server_request_latency",
71 "Server request latency",
72 &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
73 linear_bucket_interval(1.0, 25.0, 2000.0),
74 )
75 });
76
77 pub static SERVER_REQUEST_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
78 register_int_counter_vec(
79 "server_request_count",
80 "Server request count",
81 &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
82 )
83 });
84
85 pub static SERVER_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
86 register_int_counter_vec(
87 "server_request_success",
88 "Server request success",
89 &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
90 )
91 });
92
93 pub static SERVER_REQUEST_ERROR: LazyLock<IntCounterVec> = LazyLock::new(|| {
94 register_int_counter_vec(
95 "server_request_error",
96 "Server request error",
97 &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
98 )
99 });
100
101 pub static CROSS_CHAIN_MESSAGE_CHANNEL_FULL: LazyLock<IntCounterVec> = LazyLock::new(|| {
102 register_int_counter_vec(
103 "cross_chain_message_channel_full",
104 "Cross-chain message channel full",
105 &[],
106 )
107 });
108
109 pub static NOTIFICATIONS_SKIPPED_RECEIVER_LAG: LazyLock<IntCounterVec> = LazyLock::new(|| {
110 register_int_counter_vec(
111 "notifications_skipped_receiver_lag",
112 "Number of notifications skipped because receiver lagged behind sender",
113 &[],
114 )
115 });
116
117 pub static NOTIFICATIONS_DROPPED_NO_RECEIVER: LazyLock<IntCounterVec> = LazyLock::new(|| {
118 register_int_counter_vec(
119 "notifications_dropped_no_receiver",
120 "Number of notifications dropped because no receiver was available",
121 &[],
122 )
123 });
124
125 pub static NOTIFICATION_BATCH_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
126 register_histogram_vec(
127 "notification_batch_size",
128 "Number of notifications per batch sent to proxy",
129 &[],
130 exponential_bucket_interval(1.0, 250.0),
131 )
132 });
133
134 pub static NOTIFICATION_BATCHES_SENT: LazyLock<IntCounterVec> = LazyLock::new(|| {
135 register_int_counter_vec(
136 "notification_batches_sent",
137 "Total notification batches sent",
138 &["status"],
139 )
140 });
141}
142
143struct BatchForwarder {
145 nickname: String,
146 client: NotifierServiceClient<Channel>,
147 exporter_clients: Vec<NotifierServiceClient<Channel>>,
148 pending_notifications: Vec<Notification>,
149 futures: FuturesUnordered<BoxFuture<'static, ()>>,
150 batch_limit: usize,
151 max_tasks: usize,
152}
153
154impl BatchForwarder {
155 fn spawn_batches(&mut self) {
157 while !self.pending_notifications.is_empty() && self.futures.len() < self.max_tasks {
158 let chunk_size = std::cmp::min(self.batch_limit, self.pending_notifications.len());
159 let batch: Vec<Notification> = self.pending_notifications.drain(..chunk_size).collect();
160
161 #[cfg(with_metrics)]
162 metrics::NOTIFICATION_BATCH_SIZE
163 .with_label_values(&[])
164 .observe(batch.len() as f64);
165
166 let client = self.client.clone();
167 let exporter_clients = self.exporter_clients.clone();
168 let nickname = self.nickname.clone();
169
170 self.futures.push(
171 async move {
172 Self::send_batch(nickname, client, exporter_clients, batch).await;
173 }
174 .boxed(),
175 );
176 }
177 }
178
179 fn is_fully_drained(&self) -> bool {
181 self.pending_notifications.is_empty() && self.futures.is_empty()
182 }
183
184 async fn send_batch(
186 nickname: String,
187 mut client: NotifierServiceClient<Channel>,
188 mut exporter_clients: Vec<NotifierServiceClient<Channel>>,
189 batch: Vec<Notification>,
190 ) {
191 let mut proto_notifications = Vec::with_capacity(batch.len());
193 for notification in &batch {
194 match notification.clone().try_into() {
195 Ok(proto) => proto_notifications.push(proto),
196 Err(error) => {
197 warn!(
198 %error,
199 nickname,
200 ?notification.chain_id,
201 ?notification.reason,
202 "could not deserialize notification"
203 );
204 }
205 }
206 }
207
208 let chain_ids: Vec<_> = batch.iter().map(|n| n.chain_id).collect();
210
211 let request = Request::new(api::NotificationBatch {
213 notifications: proto_notifications.clone(),
214 });
215 let result = client.notify_batch(request).await;
216
217 #[cfg(with_metrics)]
218 {
219 let status = if result.is_ok() { "success" } else { "error" };
220 metrics::NOTIFICATION_BATCHES_SENT
221 .with_label_values(&[status])
222 .inc();
223 }
224
225 if let Err(error) = result {
226 error!(
227 %error,
228 nickname,
229 batch_size = proto_notifications.len(),
230 ?chain_ids,
231 "proxy: could not send notification batch",
232 );
233 }
234
235 let new_block_notifications: Vec<_> = batch
237 .iter()
238 .filter(|n| matches!(n.reason, Reason::NewBlock { .. }))
239 .collect();
240
241 let exporter_notifications: Vec<api::Notification> = new_block_notifications
242 .iter()
243 .filter_map(|n| (*n).clone().try_into().ok())
244 .collect();
245
246 if !exporter_notifications.is_empty() {
247 let exporter_chain_ids: Vec<_> =
248 new_block_notifications.iter().map(|n| n.chain_id).collect();
249
250 for exporter_client in &mut exporter_clients {
251 let request = Request::new(api::NotificationBatch {
252 notifications: exporter_notifications.clone(),
253 });
254 if let Err(error) = exporter_client.notify_batch(request).await {
255 error!(
256 %error,
257 nickname,
258 batch_size = exporter_notifications.len(),
259 ?exporter_chain_ids,
260 "block exporter: could not send notification batch",
261 );
262 }
263 }
264 }
265 }
266}
267
268#[derive(Clone)]
269pub struct GrpcServer<S>
270where
271 S: Storage,
272{
273 state: WorkerState<S>,
274 shard_id: ShardId,
275 network: ValidatorInternalNetworkConfig,
276 cross_chain_sender: CrossChainSender,
277 notification_sender: NotificationSender,
278}
279
280pub struct GrpcServerHandle {
281 handle: TaskHandle<Result<(), GrpcError>>,
282}
283
284impl GrpcServerHandle {
285 pub async fn join(self) -> Result<(), GrpcError> {
286 self.handle.await?
287 }
288}
289
290#[derive(Clone)]
291pub struct GrpcPrometheusMetricsMiddlewareLayer;
292
293#[derive(Clone)]
294pub struct GrpcPrometheusMetricsMiddlewareService<T> {
295 service: T,
296}
297
298impl<S> Layer<S> for GrpcPrometheusMetricsMiddlewareLayer {
299 type Service = GrpcPrometheusMetricsMiddlewareService<S>;
300
301 fn layer(&self, service: S) -> Self::Service {
302 GrpcPrometheusMetricsMiddlewareService { service }
303 }
304}
305
306impl<S, B> Service<http::Request<B>> for GrpcPrometheusMetricsMiddlewareService<S>
307where
308 S::Future: Send + 'static,
309 S: Service<http::Request<B>> + std::marker::Send,
310 B: Send + 'static,
311{
312 type Response = S::Response;
313 type Error = S::Error;
314 type Future = BoxFuture<'static, Result<S::Response, S::Error>>;
315
316 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
317 self.service.poll_ready(cx)
318 }
319
320 fn call(&mut self, request: http::Request<B>) -> Self::Future {
321 #[cfg(with_metrics)]
322 let start = Instant::now();
323
324 #[cfg(with_metrics)]
329 let method_name = {
330 let path = request.uri().path();
331 let parts: Vec<&str> = path.splitn(3, '/').collect();
332 if parts.len() == 3 && parts[1].contains('.') {
333 parts[2].to_owned()
334 } else {
335 "non_grpc".to_owned()
336 }
337 };
338
339 #[cfg(all(with_metrics, feature = "opentelemetry"))]
343 let traffic_type: &'static str = get_traffic_type_from_request(&request);
344 #[cfg(all(with_metrics, not(feature = "opentelemetry")))]
345 let traffic_type: &'static str = "unknown";
346
347 let future = self.service.call(request);
348 async move {
349 let response = future.await?;
350 #[cfg(with_metrics)]
351 {
352 metrics::SERVER_REQUEST_LATENCY
353 .with_label_values(&[&method_name, traffic_type])
354 .observe(start.elapsed().as_secs_f64() * 1000.0);
355 metrics::SERVER_REQUEST_COUNT
356 .with_label_values(&[&method_name, traffic_type])
357 .inc();
358 }
359 Ok(response)
360 }
361 .boxed()
362 }
363}
364
365impl<S> GrpcServer<S>
366where
367 S: Storage + Clone + Send + Sync + 'static,
368{
369 #[expect(clippy::too_many_arguments)]
370 pub fn spawn(
371 host: String,
372 port: u16,
373 state: WorkerState<S>,
374 shard_id: ShardId,
375 internal_network: ValidatorInternalNetworkConfig,
376 cross_chain_config: &CrossChainConfig,
377 notification_config: &NotificationConfig,
378 shutdown_signal: CancellationToken,
379 join_set: &mut JoinSet,
380 ) -> GrpcServerHandle {
381 info!(
382 "spawning gRPC server on {}:{} for shard {}",
383 host, port, shard_id
384 );
385
386 let (cross_chain_sender, cross_chain_receiver) =
387 mpsc::channel(cross_chain_config.queue_size);
388
389 let (notification_sender, _) =
390 tokio::sync::broadcast::channel(notification_config.notification_queue_size);
391
392 join_set.spawn_task({
393 info!(
394 nickname = state.nickname(),
395 "spawning cross-chain queries thread on {} for shard {}", host, shard_id
396 );
397 Self::forward_cross_chain_queries(
398 state.nickname().to_string(),
399 internal_network.clone(),
400 cross_chain_config.max_retries,
401 Duration::from_millis(cross_chain_config.retry_delay_ms),
402 Duration::from_millis(cross_chain_config.max_backoff_ms),
403 Duration::from_millis(cross_chain_config.sender_delay_ms),
404 cross_chain_config.sender_failure_rate,
405 shard_id,
406 cross_chain_receiver,
407 )
408 });
409
410 let mut exporter_forwarded = false;
411 for proxy in &internal_network.proxies {
412 let receiver = notification_sender.subscribe();
413 join_set.spawn_task({
414 info!(
415 nickname = state.nickname(),
416 "spawning notifications thread on {} for shard {}", host, shard_id
417 );
418 let exporter_addresses = if exporter_forwarded {
419 vec![]
420 } else {
421 exporter_forwarded = true;
422 internal_network.exporter_addresses()
423 };
424 Self::forward_notifications(
425 state.nickname().to_string(),
426 proxy.internal_address(&internal_network.protocol),
427 exporter_addresses,
428 receiver,
429 notification_config.clone(),
430 )
431 });
432 }
433
434 let (health_reporter, health_service) = tonic_health::server::health_reporter();
435
436 let grpc_server = GrpcServer {
437 state,
438 shard_id,
439 network: internal_network,
440 cross_chain_sender,
441 notification_sender,
442 };
443
444 let worker_node = ValidatorWorkerServer::new(grpc_server)
445 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
446 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
447
448 let handle = join_set.spawn_task(async move {
449 let server_address = SocketAddr::from((IpAddr::from_str(&host)?, port));
450
451 let reflection_service = tonic_reflection::server::Builder::configure()
452 .register_encoded_file_descriptor_set(crate::FILE_DESCRIPTOR_SET)
453 .build_v1()?;
454
455 health_reporter
456 .set_serving::<ValidatorWorkerServer<Self>>()
457 .await;
458
459 #[cfg(feature = "opentelemetry")]
460 let mut server = tonic::transport::Server::builder().layer(
461 ServiceBuilder::new()
462 .layer(crate::propagation::OtelContextLayer)
463 .layer(GrpcPrometheusMetricsMiddlewareLayer)
464 .into_inner(),
465 );
466 #[cfg(not(feature = "opentelemetry"))]
467 let mut server = tonic::transport::Server::builder().layer(
468 ServiceBuilder::new()
469 .layer(GrpcPrometheusMetricsMiddlewareLayer)
470 .into_inner(),
471 );
472 server
473 .add_service(health_service)
474 .add_service(reflection_service)
475 .add_service(worker_node)
476 .serve_with_shutdown(server_address, shutdown_signal.cancelled_owned())
477 .await?;
478
479 Ok(())
480 });
481
482 GrpcServerHandle { handle }
483 }
484
485 #[instrument(skip(receiver, config))]
488 async fn forward_notifications(
489 nickname: String,
490 proxy_address: String,
491 exporter_addresses: Vec<String>,
492 mut receiver: tokio::sync::broadcast::Receiver<Notification>,
493 config: NotificationConfig,
494 ) {
495 let channel = tonic::transport::Channel::from_shared(proxy_address.clone())
496 .expect("Proxy URI should be valid")
497 .connect_lazy();
498 let client = NotifierServiceClient::new(channel)
499 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
500 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
501
502 let exporter_clients: Vec<NotifierServiceClient<Channel>> = exporter_addresses
503 .iter()
504 .map(|address| {
505 let channel = tonic::transport::Channel::from_shared(address.clone())
506 .expect("Exporter URI should be valid")
507 .connect_lazy();
508 NotifierServiceClient::new(channel)
509 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
510 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
511 })
512 .collect::<Vec<_>>();
513
514 let mut forwarder = BatchForwarder {
515 nickname: nickname.clone(),
516 client,
517 exporter_clients,
518 pending_notifications: Vec::new(),
519 futures: FuturesUnordered::new(),
520 batch_limit: config.notification_batch_size,
521 max_tasks: config.notification_max_in_flight,
522 };
523
524 loop {
525 tokio::select! {
526 biased;
527
528 result = receiver.recv() => {
529 match result {
530 Ok(notification) => {
531 forwarder.pending_notifications.push(notification);
532
533 if forwarder.futures.is_empty()
534 || (forwarder.pending_notifications.len() >= forwarder.batch_limit
535 && forwarder.futures.len() < forwarder.max_tasks) {
536 forwarder.spawn_batches();
537 }
538 }
539 Err(RecvError::Lagged(skipped_count)) => {
540 warn!(
541 nickname,
542 skipped_count, "notification receiver lagged, messages were skipped"
543 );
544 #[cfg(with_metrics)]
545 metrics::NOTIFICATIONS_SKIPPED_RECEIVER_LAG
546 .with_label_values(&[])
547 .inc_by(skipped_count);
548 }
549 Err(RecvError::Closed) => {
550 warn!(
551 nickname,
552 "notification channel closed, draining pending notifications"
553 );
554 loop {
556 forwarder.spawn_batches();
557 if forwarder.is_fully_drained() {
558 break;
559 }
560 forwarder.futures.next().await;
561 }
562 break;
563 }
564 }
565 }
566
567 Some(()) = forwarder.futures.next() => {
568 forwarder.spawn_batches();
569 }
570 }
571 }
572 }
573
574 fn handle_network_actions(&self, actions: NetworkActions) {
575 let mut cross_chain_sender = self.cross_chain_sender.clone();
576 let notification_sender = self.notification_sender.clone();
577
578 for request in actions.cross_chain_requests {
579 let shard_id = self.network.get_shard_id(request.target_chain_id());
580 trace!(
581 source_shard_id = self.shard_id,
582 target_shard_id = shard_id,
583 "Scheduling cross-chain query",
584 );
585
586 if let Err(error) = cross_chain_sender.try_send((request, shard_id)) {
587 error!(%error, "dropping cross-chain request");
588 #[cfg(with_metrics)]
589 if error.is_full() {
590 metrics::CROSS_CHAIN_MESSAGE_CHANNEL_FULL
591 .with_label_values(&[])
592 .inc();
593 }
594 }
595 }
596
597 for notification in actions.notifications {
598 trace!("Scheduling notification query");
599 if let Err(error) = notification_sender.send(notification) {
600 error!(%error, "dropping notification");
601 #[cfg(with_metrics)]
602 metrics::NOTIFICATIONS_DROPPED_NO_RECEIVER
603 .with_label_values(&[])
604 .inc();
605 }
606 }
607 }
608
609 #[instrument(skip_all, fields(nickname, %this_shard))]
610 #[expect(clippy::too_many_arguments)]
611 async fn forward_cross_chain_queries(
612 nickname: String,
613 network: ValidatorInternalNetworkConfig,
614 cross_chain_max_retries: u32,
615 cross_chain_retry_delay: Duration,
616 cross_chain_max_backoff: Duration,
617 cross_chain_sender_delay: Duration,
618 cross_chain_sender_failure_rate: f32,
619 this_shard: ShardId,
620 receiver: mpsc::Receiver<(linera_core::data_types::CrossChainRequest, ShardId)>,
621 ) {
622 let pool = GrpcConnectionPool::default();
623 let handle_request =
624 move |shard_id: ShardId, request: linera_core::data_types::CrossChainRequest| {
625 let channel_result = pool.channel(network.shard(shard_id).http_address());
626 async move {
627 let mut client = ValidatorWorkerClient::new(channel_result?)
628 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
629 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
630 client
631 .handle_cross_chain_request(Request::new(request.try_into()?))
632 .await?;
633 anyhow::Result::<_, anyhow::Error>::Ok(())
634 }
635 };
636 cross_chain_message_queue::forward_cross_chain_queries(
637 nickname,
638 cross_chain_max_retries,
639 cross_chain_retry_delay,
640 cross_chain_max_backoff,
641 cross_chain_sender_delay,
642 cross_chain_sender_failure_rate,
643 this_shard,
644 receiver,
645 handle_request,
646 )
647 .await;
648 }
649
650 fn log_request_outcome(success: bool, method_name: &str, traffic_type: &str) {
651 #![cfg_attr(not(with_metrics), allow(unused_variables))]
652 #[cfg(with_metrics)]
653 {
654 if success {
655 metrics::SERVER_REQUEST_SUCCESS
656 .with_label_values(&[method_name, traffic_type])
657 .inc();
658 } else {
659 metrics::SERVER_REQUEST_ERROR
660 .with_label_values(&[method_name, traffic_type])
661 .inc();
662 }
663 }
664 }
665
666 #[cfg(feature = "opentelemetry")]
668 fn get_traffic_type<R>(request: &Request<R>) -> &'static str {
669 get_traffic_type_from_request(request)
670 }
671
672 #[cfg(not(feature = "opentelemetry"))]
674 fn get_traffic_type<R>(_request: &Request<R>) -> &'static str {
675 "unknown"
676 }
677
678 fn log_error(&self, error: &linera_core::worker::WorkerError, context: &str) {
679 let nickname = self.state.nickname();
680 if error.is_local() {
681 error!(nickname, %error, "{}", context);
682 } else {
683 debug!(nickname, %error, "{}", context);
684 }
685 }
686}
687
688#[tonic::async_trait]
689impl<S> ValidatorWorkerRpc for GrpcServer<S>
690where
691 S: Storage + Clone + Send + Sync + 'static,
692{
693 #[instrument(
694 target = "grpc_server",
695 skip_all,
696 err,
697 fields(
698 nickname = self.state.nickname(),
699 chain_id = ?request.get_ref().chain_id()
700 )
701 )]
702 async fn handle_block_proposal(
703 &self,
704 request: Request<BlockProposal>,
705 ) -> Result<Response<ChainInfoResult>, Status> {
706 let traffic_type = Self::get_traffic_type(&request);
707 let proposal = request.into_inner().try_into()?;
708 trace!(?proposal, "Handling block proposal");
709 Ok(Response::new(
710 match self.state.clone().handle_block_proposal(proposal).await {
711 Ok((info, actions)) => {
712 Self::log_request_outcome(true, "handle_block_proposal", traffic_type);
713 self.handle_network_actions(actions);
714 info.try_into()?
715 }
716 Err(error) => {
717 Self::log_request_outcome(false, "handle_block_proposal", traffic_type);
718 self.log_error(&error, "Failed to handle block proposal");
719 NodeError::from(error).try_into()?
720 }
721 },
722 ))
723 }
724
725 #[instrument(
726 target = "grpc_server",
727 skip_all,
728 err,
729 fields(
730 nickname = self.state.nickname(),
731 chain_id = ?request.get_ref().chain_id()
732 )
733 )]
734 async fn handle_lite_certificate(
735 &self,
736 request: Request<LiteCertificate>,
737 ) -> Result<Response<ChainInfoResult>, Status> {
738 let traffic_type = Self::get_traffic_type(&request);
739 let HandleLiteCertRequest {
740 certificate,
741 wait_for_outgoing_messages,
742 } = request.into_inner().try_into()?;
743 trace!(?certificate, "Handling lite certificate");
744 let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
745 match Box::pin(
746 self.state
747 .clone()
748 .handle_lite_certificate(certificate, sender),
749 )
750 .await
751 {
752 Ok((info, actions)) => {
753 Self::log_request_outcome(true, "handle_lite_certificate", traffic_type);
754 self.handle_network_actions(actions);
755 if let Some(receiver) = receiver {
756 if let Err(e) = receiver.await {
757 error!("Failed to wait for message delivery: {e}");
758 }
759 }
760 Ok(Response::new(info.try_into()?))
761 }
762 Err(error) => {
763 Self::log_request_outcome(false, "handle_lite_certificate", traffic_type);
764 self.log_error(&error, "Failed to handle lite certificate");
765 Ok(Response::new(NodeError::from(error).try_into()?))
766 }
767 }
768 }
769
770 #[instrument(
771 target = "grpc_server",
772 skip_all,
773 err,
774 fields(
775 nickname = self.state.nickname(),
776 chain_id = ?request.get_ref().chain_id()
777 )
778 )]
779 async fn handle_confirmed_certificate(
780 &self,
781 request: Request<api::HandleConfirmedCertificateRequest>,
782 ) -> Result<Response<ChainInfoResult>, Status> {
783 let traffic_type = Self::get_traffic_type(&request);
784 let HandleConfirmedCertificateRequest {
785 certificate,
786 wait_for_outgoing_messages,
787 } = request.into_inner().try_into()?;
788 trace!(?certificate, "Handling certificate");
789 let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
790 match self
791 .state
792 .clone()
793 .handle_confirmed_certificate(certificate, sender)
794 .await
795 {
796 Ok((info, actions)) => {
797 Self::log_request_outcome(true, "handle_confirmed_certificate", traffic_type);
798 self.handle_network_actions(actions);
799 if let Some(receiver) = receiver {
800 if let Err(e) = receiver.await {
801 error!("Failed to wait for message delivery: {e}");
802 }
803 }
804 Ok(Response::new(info.try_into()?))
805 }
806 Err(error) => {
807 Self::log_request_outcome(false, "handle_confirmed_certificate", traffic_type);
808 self.log_error(&error, "Failed to handle confirmed certificate");
809 Ok(Response::new(NodeError::from(error).try_into()?))
810 }
811 }
812 }
813
814 #[instrument(
815 target = "grpc_server",
816 skip_all,
817 err,
818 fields(
819 nickname = self.state.nickname(),
820 chain_id = ?request.get_ref().chain_id()
821 )
822 )]
823 async fn handle_validated_certificate(
824 &self,
825 request: Request<api::HandleValidatedCertificateRequest>,
826 ) -> Result<Response<ChainInfoResult>, Status> {
827 let traffic_type = Self::get_traffic_type(&request);
828 let HandleValidatedCertificateRequest { certificate } = request.into_inner().try_into()?;
829 trace!(?certificate, "Handling certificate");
830 match self
831 .state
832 .clone()
833 .handle_validated_certificate(certificate)
834 .await
835 {
836 Ok((info, actions)) => {
837 Self::log_request_outcome(true, "handle_validated_certificate", traffic_type);
838 self.handle_network_actions(actions);
839 Ok(Response::new(info.try_into()?))
840 }
841 Err(error) => {
842 Self::log_request_outcome(false, "handle_validated_certificate", traffic_type);
843 self.log_error(&error, "Failed to handle validated certificate");
844 Ok(Response::new(NodeError::from(error).try_into()?))
845 }
846 }
847 }
848
849 #[instrument(
850 target = "grpc_server",
851 skip_all,
852 err,
853 fields(
854 nickname = self.state.nickname(),
855 chain_id = ?request.get_ref().chain_id()
856 )
857 )]
858 async fn handle_timeout_certificate(
859 &self,
860 request: Request<api::HandleTimeoutCertificateRequest>,
861 ) -> Result<Response<ChainInfoResult>, Status> {
862 let traffic_type = Self::get_traffic_type(&request);
863 let HandleTimeoutCertificateRequest { certificate } = request.into_inner().try_into()?;
864 trace!(?certificate, "Handling Timeout certificate");
865 match self
866 .state
867 .clone()
868 .handle_timeout_certificate(certificate)
869 .await
870 {
871 Ok((info, _actions)) => {
872 Self::log_request_outcome(true, "handle_timeout_certificate", traffic_type);
873 Ok(Response::new(info.try_into()?))
874 }
875 Err(error) => {
876 Self::log_request_outcome(false, "handle_timeout_certificate", traffic_type);
877 self.log_error(&error, "Failed to handle timeout certificate");
878 Ok(Response::new(NodeError::from(error).try_into()?))
879 }
880 }
881 }
882
883 #[instrument(
884 target = "grpc_server",
885 skip_all,
886 err,
887 fields(
888 nickname = self.state.nickname(),
889 chain_id = ?request.get_ref().chain_id()
890 )
891 )]
892 async fn handle_chain_info_query(
893 &self,
894 request: Request<ChainInfoQuery>,
895 ) -> Result<Response<ChainInfoResult>, Status> {
896 let traffic_type = Self::get_traffic_type(&request);
897 let query = request.into_inner().try_into()?;
898 trace!(?query, "Handling chain info query");
899 match self.state.clone().handle_chain_info_query(query).await {
900 Ok((info, actions)) => {
901 Self::log_request_outcome(true, "handle_chain_info_query", traffic_type);
902 self.handle_network_actions(actions);
903 Ok(Response::new(info.try_into()?))
904 }
905 Err(error) => {
906 Self::log_request_outcome(false, "handle_chain_info_query", traffic_type);
907 self.log_error(&error, "Failed to handle chain info query");
908 Ok(Response::new(NodeError::from(error).try_into()?))
909 }
910 }
911 }
912
913 #[instrument(
914 target = "grpc_server",
915 skip_all,
916 err,
917 fields(
918 nickname = self.state.nickname(),
919 chain_id = ?request.get_ref().chain_id()
920 )
921 )]
922 async fn download_pending_blob(
923 &self,
924 request: Request<PendingBlobRequest>,
925 ) -> Result<Response<PendingBlobResult>, Status> {
926 let traffic_type = Self::get_traffic_type(&request);
927 let (chain_id, blob_id) = request.into_inner().try_into()?;
928 trace!(?chain_id, ?blob_id, "Download pending blob");
929 match self
930 .state
931 .clone()
932 .download_pending_blob(chain_id, blob_id)
933 .await
934 {
935 Ok(blob) => {
936 Self::log_request_outcome(true, "download_pending_blob", traffic_type);
937 Ok(Response::new(blob.into_content().try_into()?))
938 }
939 Err(error) => {
940 Self::log_request_outcome(false, "download_pending_blob", traffic_type);
941 self.log_error(&error, "Failed to download pending blob");
942 Ok(Response::new(NodeError::from(error).try_into()?))
943 }
944 }
945 }
946
947 #[instrument(
948 target = "grpc_server",
949 skip_all,
950 err,
951 fields(
952 nickname = self.state.nickname(),
953 chain_id = ?request.get_ref().chain_id
954 )
955 )]
956 async fn handle_pending_blob(
957 &self,
958 request: Request<HandlePendingBlobRequest>,
959 ) -> Result<Response<ChainInfoResult>, Status> {
960 let traffic_type = Self::get_traffic_type(&request);
961 let (chain_id, blob_content) = request.into_inner().try_into()?;
962 let blob = Blob::new(blob_content);
963 let blob_id = blob.id();
964 trace!(?chain_id, ?blob_id, "Handle pending blob");
965 match self.state.clone().handle_pending_blob(chain_id, blob).await {
966 Ok(info) => {
967 Self::log_request_outcome(true, "handle_pending_blob", traffic_type);
968 Ok(Response::new(info.try_into()?))
969 }
970 Err(error) => {
971 Self::log_request_outcome(false, "handle_pending_blob", traffic_type);
972 self.log_error(&error, "Failed to handle pending blob");
973 Ok(Response::new(NodeError::from(error).try_into()?))
974 }
975 }
976 }
977
978 #[instrument(
979 target = "grpc_server",
980 skip_all,
981 err,
982 fields(
983 nickname = self.state.nickname(),
984 chain_id = ?request.get_ref().chain_id()
985 )
986 )]
987 async fn handle_cross_chain_request(
988 &self,
989 request: Request<CrossChainRequest>,
990 ) -> Result<Response<()>, Status> {
991 let traffic_type = Self::get_traffic_type(&request);
992 let cross_chain_request = request.into_inner().try_into()?;
993 trace!(?cross_chain_request, "Handling cross-chain request");
994 match self
995 .state
996 .clone()
997 .handle_cross_chain_request(cross_chain_request)
998 .await
999 {
1000 Ok(actions) => {
1001 Self::log_request_outcome(true, "handle_cross_chain_request", traffic_type);
1002 self.handle_network_actions(actions)
1003 }
1004 Err(error) => {
1005 Self::log_request_outcome(false, "handle_cross_chain_request", traffic_type);
1006 let nickname = self.state.nickname();
1007 error!(nickname, %error, "Failed to handle cross-chain request");
1008 }
1009 }
1010 Ok(Response::new(()))
1011 }
1012}
1013
1014pub trait GrpcProxyable {
1017 fn chain_id(&self) -> Option<ChainId>;
1018}
1019
1020impl GrpcProxyable for BlockProposal {
1021 fn chain_id(&self) -> Option<ChainId> {
1022 self.chain_id.clone()?.try_into().ok()
1023 }
1024}
1025
1026impl GrpcProxyable for LiteCertificate {
1027 fn chain_id(&self) -> Option<ChainId> {
1028 self.chain_id.clone()?.try_into().ok()
1029 }
1030}
1031
1032impl GrpcProxyable for api::HandleConfirmedCertificateRequest {
1033 fn chain_id(&self) -> Option<ChainId> {
1034 self.chain_id.clone()?.try_into().ok()
1035 }
1036}
1037
1038impl GrpcProxyable for api::HandleTimeoutCertificateRequest {
1039 fn chain_id(&self) -> Option<ChainId> {
1040 self.chain_id.clone()?.try_into().ok()
1041 }
1042}
1043
1044impl GrpcProxyable for api::HandleValidatedCertificateRequest {
1045 fn chain_id(&self) -> Option<ChainId> {
1046 self.chain_id.clone()?.try_into().ok()
1047 }
1048}
1049
1050impl GrpcProxyable for ChainInfoQuery {
1051 fn chain_id(&self) -> Option<ChainId> {
1052 self.chain_id.clone()?.try_into().ok()
1053 }
1054}
1055
1056impl GrpcProxyable for PendingBlobRequest {
1057 fn chain_id(&self) -> Option<ChainId> {
1058 self.chain_id.clone()?.try_into().ok()
1059 }
1060}
1061
1062impl GrpcProxyable for HandlePendingBlobRequest {
1063 fn chain_id(&self) -> Option<ChainId> {
1064 self.chain_id.clone()?.try_into().ok()
1065 }
1066}
1067
1068impl GrpcProxyable for CrossChainRequest {
1069 fn chain_id(&self) -> Option<ChainId> {
1070 use super::api::cross_chain_request::Inner;
1071
1072 match self.inner.as_ref()? {
1073 Inner::UpdateRecipient(api::UpdateRecipient { recipient, .. })
1074 | Inner::ConfirmUpdatedRecipient(api::ConfirmUpdatedRecipient { recipient, .. }) => {
1075 recipient.clone()?.try_into().ok()
1076 }
1077 }
1078 }
1079}