linera_rpc/grpc/
server.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    net::{IpAddr, SocketAddr},
6    str::FromStr,
7    task::{Context, Poll},
8};
9
10use futures::{
11    channel::mpsc, future::BoxFuture, stream::FuturesUnordered, FutureExt as _, StreamExt as _,
12};
13#[cfg(with_metrics)]
14use linera_base::time::Instant;
15use linera_base::{data_types::Blob, identifiers::ChainId, time::Duration};
16use linera_core::{
17    join_set_ext::JoinSet,
18    node::NodeError,
19    worker::{NetworkActions, Notification, Reason, WorkerState},
20    JoinSetExt as _, TaskHandle,
21};
22use linera_storage::Storage;
23use tokio::sync::{broadcast::error::RecvError, oneshot};
24use tokio_util::sync::CancellationToken;
25use tonic::{transport::Channel, Request, Response, Status};
26use tower::{builder::ServiceBuilder, Layer, Service};
27use tracing::{debug, error, info, instrument, trace, warn};
28
29use super::{
30    api::{
31        self,
32        notifier_service_client::NotifierServiceClient,
33        validator_worker_client::ValidatorWorkerClient,
34        validator_worker_server::{ValidatorWorker as ValidatorWorkerRpc, ValidatorWorkerServer},
35        BlockProposal, ChainInfoQuery, ChainInfoResult, CrossChainRequest,
36        HandlePendingBlobRequest, LiteCertificate, PendingBlobRequest, PendingBlobResult,
37    },
38    pool::GrpcConnectionPool,
39    GrpcError, GRPC_MAX_MESSAGE_SIZE,
40};
41#[cfg(feature = "opentelemetry")]
42use crate::propagation::get_traffic_type_from_request;
43use crate::{
44    config::{CrossChainConfig, NotificationConfig, ShardId, ValidatorInternalNetworkConfig},
45    cross_chain_message_queue, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
46    HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
47};
48
49type CrossChainSender = mpsc::Sender<(linera_core::data_types::CrossChainRequest, ShardId)>;
50type NotificationSender = tokio::sync::broadcast::Sender<Notification>;
51
52#[cfg(with_metrics)]
53mod metrics {
54    use std::sync::LazyLock;
55
56    use linera_base::prometheus_util::{
57        exponential_bucket_interval, linear_bucket_interval, register_histogram_vec,
58        register_int_counter_vec,
59    };
60    use prometheus::{HistogramVec, IntCounterVec};
61
62    /// Label for distinguishing organic vs synthetic (benchmark) traffic.
63    pub const TRAFFIC_TYPE_LABEL: &str = "traffic_type";
64
65    /// Label for the gRPC method name.
66    pub const METHOD_NAME_LABEL: &str = "method_name";
67
68    pub static SERVER_REQUEST_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
69        register_histogram_vec(
70            "server_request_latency",
71            "Server request latency",
72            &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
73            linear_bucket_interval(1.0, 25.0, 2000.0),
74        )
75    });
76
77    pub static SERVER_REQUEST_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
78        register_int_counter_vec(
79            "server_request_count",
80            "Server request count",
81            &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
82        )
83    });
84
85    pub static SERVER_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
86        register_int_counter_vec(
87            "server_request_success",
88            "Server request success",
89            &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
90        )
91    });
92
93    pub static SERVER_REQUEST_ERROR: LazyLock<IntCounterVec> = LazyLock::new(|| {
94        register_int_counter_vec(
95            "server_request_error",
96            "Server request error",
97            &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
98        )
99    });
100
101    pub static CROSS_CHAIN_MESSAGE_CHANNEL_FULL: LazyLock<IntCounterVec> = LazyLock::new(|| {
102        register_int_counter_vec(
103            "cross_chain_message_channel_full",
104            "Cross-chain message channel full",
105            &[],
106        )
107    });
108
109    pub static NOTIFICATIONS_SKIPPED_RECEIVER_LAG: LazyLock<IntCounterVec> = LazyLock::new(|| {
110        register_int_counter_vec(
111            "notifications_skipped_receiver_lag",
112            "Number of notifications skipped because receiver lagged behind sender",
113            &[],
114        )
115    });
116
117    pub static NOTIFICATIONS_DROPPED_NO_RECEIVER: LazyLock<IntCounterVec> = LazyLock::new(|| {
118        register_int_counter_vec(
119            "notifications_dropped_no_receiver",
120            "Number of notifications dropped because no receiver was available",
121            &[],
122        )
123    });
124
125    pub static NOTIFICATION_BATCH_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
126        register_histogram_vec(
127            "notification_batch_size",
128            "Number of notifications per batch sent to proxy",
129            &[],
130            exponential_bucket_interval(1.0, 250.0),
131        )
132    });
133
134    pub static NOTIFICATION_BATCHES_SENT: LazyLock<IntCounterVec> = LazyLock::new(|| {
135        register_int_counter_vec(
136            "notification_batches_sent",
137            "Total notification batches sent",
138            &["status"],
139        )
140    });
141}
142
143/// Handles batched forwarding of notifications to proxy and exporters.
144struct BatchForwarder {
145    nickname: String,
146    client: NotifierServiceClient<Channel>,
147    exporter_clients: Vec<NotifierServiceClient<Channel>>,
148    pending_notifications: Vec<Notification>,
149    futures: FuturesUnordered<BoxFuture<'static, ()>>,
150    batch_limit: usize,
151    max_tasks: usize,
152}
153
154impl BatchForwarder {
155    /// Spawns batch send tasks up to max_tasks limit.
156    fn spawn_batches(&mut self) {
157        while !self.pending_notifications.is_empty() && self.futures.len() < self.max_tasks {
158            let chunk_size = std::cmp::min(self.batch_limit, self.pending_notifications.len());
159            let batch: Vec<Notification> = self.pending_notifications.drain(..chunk_size).collect();
160
161            #[cfg(with_metrics)]
162            metrics::NOTIFICATION_BATCH_SIZE
163                .with_label_values(&[])
164                .observe(batch.len() as f64);
165
166            let client = self.client.clone();
167            let exporter_clients = self.exporter_clients.clone();
168            let nickname = self.nickname.clone();
169
170            self.futures.push(
171                async move {
172                    Self::send_batch(nickname, client, exporter_clients, batch).await;
173                }
174                .boxed(),
175            );
176        }
177    }
178
179    /// Returns true if there are no pending notifications and no in-flight tasks.
180    fn is_fully_drained(&self) -> bool {
181        self.pending_notifications.is_empty() && self.futures.is_empty()
182    }
183
184    /// Sends a batch of notifications to the proxy and exporters.
185    async fn send_batch(
186        nickname: String,
187        mut client: NotifierServiceClient<Channel>,
188        mut exporter_clients: Vec<NotifierServiceClient<Channel>>,
189        batch: Vec<Notification>,
190    ) {
191        // Convert to proto notifications, logging any deserialization errors
192        let mut proto_notifications = Vec::with_capacity(batch.len());
193        for notification in &batch {
194            match notification.clone().try_into() {
195                Ok(proto) => proto_notifications.push(proto),
196                Err(error) => {
197                    warn!(
198                        %error,
199                        nickname,
200                        ?notification.chain_id,
201                        ?notification.reason,
202                        "could not deserialize notification"
203                    );
204                }
205            }
206        }
207
208        // Collect chain_ids for error logging
209        let chain_ids: Vec<_> = batch.iter().map(|n| n.chain_id).collect();
210
211        // Send batch to proxy
212        let request = Request::new(api::NotificationBatch {
213            notifications: proto_notifications.clone(),
214        });
215        let result = client.notify_batch(request).await;
216
217        #[cfg(with_metrics)]
218        {
219            let status = if result.is_ok() { "success" } else { "error" };
220            metrics::NOTIFICATION_BATCHES_SENT
221                .with_label_values(&[status])
222                .inc();
223        }
224
225        if let Err(error) = result {
226            error!(
227                %error,
228                nickname,
229                batch_size = proto_notifications.len(),
230                ?chain_ids,
231                "proxy: could not send notification batch",
232            );
233        }
234
235        // Send NewBlock notifications to exporters
236        let new_block_notifications: Vec<_> = batch
237            .iter()
238            .filter(|n| matches!(n.reason, Reason::NewBlock { .. }))
239            .collect();
240
241        let exporter_notifications: Vec<api::Notification> = new_block_notifications
242            .iter()
243            .filter_map(|n| (*n).clone().try_into().ok())
244            .collect();
245
246        if !exporter_notifications.is_empty() {
247            let exporter_chain_ids: Vec<_> =
248                new_block_notifications.iter().map(|n| n.chain_id).collect();
249
250            for exporter_client in &mut exporter_clients {
251                let request = Request::new(api::NotificationBatch {
252                    notifications: exporter_notifications.clone(),
253                });
254                if let Err(error) = exporter_client.notify_batch(request).await {
255                    error!(
256                        %error,
257                        nickname,
258                        batch_size = exporter_notifications.len(),
259                        ?exporter_chain_ids,
260                        "block exporter: could not send notification batch",
261                    );
262                }
263            }
264        }
265    }
266}
267
268#[derive(Clone)]
269pub struct GrpcServer<S>
270where
271    S: Storage,
272{
273    state: WorkerState<S>,
274    shard_id: ShardId,
275    network: ValidatorInternalNetworkConfig,
276    cross_chain_sender: CrossChainSender,
277    notification_sender: NotificationSender,
278}
279
280pub struct GrpcServerHandle {
281    handle: TaskHandle<Result<(), GrpcError>>,
282}
283
284impl GrpcServerHandle {
285    pub async fn join(self) -> Result<(), GrpcError> {
286        self.handle.await?
287    }
288}
289
290#[derive(Clone)]
291pub struct GrpcPrometheusMetricsMiddlewareLayer;
292
293#[derive(Clone)]
294pub struct GrpcPrometheusMetricsMiddlewareService<T> {
295    service: T,
296}
297
298impl<S> Layer<S> for GrpcPrometheusMetricsMiddlewareLayer {
299    type Service = GrpcPrometheusMetricsMiddlewareService<S>;
300
301    fn layer(&self, service: S) -> Self::Service {
302        GrpcPrometheusMetricsMiddlewareService { service }
303    }
304}
305
306impl<S, B> Service<http::Request<B>> for GrpcPrometheusMetricsMiddlewareService<S>
307where
308    S::Future: Send + 'static,
309    S: Service<http::Request<B>> + std::marker::Send,
310    B: Send + 'static,
311{
312    type Response = S::Response;
313    type Error = S::Error;
314    type Future = BoxFuture<'static, Result<S::Response, S::Error>>;
315
316    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
317        self.service.poll_ready(cx)
318    }
319
320    fn call(&mut self, request: http::Request<B>) -> Self::Future {
321        #[cfg(with_metrics)]
322        let start = Instant::now();
323
324        // Extract the gRPC method name from the URI path. gRPC paths have the form
325        // `/{package}.{Service}/{Method}` — the first segment always contains a dot.
326        // Non-gRPC requests (bot probes, health checks, etc.) are bucketed as
327        // "non_grpc" to prevent unbounded label cardinality.
328        #[cfg(with_metrics)]
329        let method_name = {
330            let path = request.uri().path();
331            let parts: Vec<&str> = path.splitn(3, '/').collect();
332            if parts.len() == 3 && parts[1].contains('.') {
333                parts[2].to_owned()
334            } else {
335                "non_grpc".to_owned()
336            }
337        };
338
339        // Extract traffic type from request extensions (set by OtelContextLayer).
340        // When opentelemetry is enabled but no baggage is set, defaults to "organic".
341        // When opentelemetry is disabled, defaults to "unknown".
342        #[cfg(all(with_metrics, feature = "opentelemetry"))]
343        let traffic_type: &'static str = get_traffic_type_from_request(&request);
344        #[cfg(all(with_metrics, not(feature = "opentelemetry")))]
345        let traffic_type: &'static str = "unknown";
346
347        let future = self.service.call(request);
348        async move {
349            let response = future.await?;
350            #[cfg(with_metrics)]
351            {
352                metrics::SERVER_REQUEST_LATENCY
353                    .with_label_values(&[&method_name, traffic_type])
354                    .observe(start.elapsed().as_secs_f64() * 1000.0);
355                metrics::SERVER_REQUEST_COUNT
356                    .with_label_values(&[&method_name, traffic_type])
357                    .inc();
358            }
359            Ok(response)
360        }
361        .boxed()
362    }
363}
364
365impl<S> GrpcServer<S>
366where
367    S: Storage + Clone + Send + Sync + 'static,
368{
369    #[expect(clippy::too_many_arguments)]
370    pub fn spawn(
371        host: String,
372        port: u16,
373        state: WorkerState<S>,
374        shard_id: ShardId,
375        internal_network: ValidatorInternalNetworkConfig,
376        cross_chain_config: &CrossChainConfig,
377        notification_config: &NotificationConfig,
378        shutdown_signal: CancellationToken,
379        join_set: &mut JoinSet,
380    ) -> GrpcServerHandle {
381        info!(
382            "spawning gRPC server on {}:{} for shard {}",
383            host, port, shard_id
384        );
385
386        let (cross_chain_sender, cross_chain_receiver) =
387            mpsc::channel(cross_chain_config.queue_size);
388
389        let (notification_sender, _) =
390            tokio::sync::broadcast::channel(notification_config.notification_queue_size);
391
392        join_set.spawn_task({
393            info!(
394                nickname = state.nickname(),
395                "spawning cross-chain queries thread on {} for shard {}", host, shard_id
396            );
397            Self::forward_cross_chain_queries(
398                state.nickname().to_string(),
399                internal_network.clone(),
400                cross_chain_config.max_retries,
401                Duration::from_millis(cross_chain_config.retry_delay_ms),
402                Duration::from_millis(cross_chain_config.max_backoff_ms),
403                Duration::from_millis(cross_chain_config.sender_delay_ms),
404                cross_chain_config.sender_failure_rate,
405                shard_id,
406                cross_chain_receiver,
407            )
408        });
409
410        let mut exporter_forwarded = false;
411        for proxy in &internal_network.proxies {
412            let receiver = notification_sender.subscribe();
413            join_set.spawn_task({
414                info!(
415                    nickname = state.nickname(),
416                    "spawning notifications thread on {} for shard {}", host, shard_id
417                );
418                let exporter_addresses = if exporter_forwarded {
419                    vec![]
420                } else {
421                    exporter_forwarded = true;
422                    internal_network.exporter_addresses()
423                };
424                Self::forward_notifications(
425                    state.nickname().to_string(),
426                    proxy.internal_address(&internal_network.protocol),
427                    exporter_addresses,
428                    receiver,
429                    notification_config.clone(),
430                )
431            });
432        }
433
434        let (health_reporter, health_service) = tonic_health::server::health_reporter();
435
436        let grpc_server = GrpcServer {
437            state,
438            shard_id,
439            network: internal_network,
440            cross_chain_sender,
441            notification_sender,
442        };
443
444        let worker_node = ValidatorWorkerServer::new(grpc_server)
445            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
446            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
447
448        let handle = join_set.spawn_task(async move {
449            let server_address = SocketAddr::from((IpAddr::from_str(&host)?, port));
450
451            let reflection_service = tonic_reflection::server::Builder::configure()
452                .register_encoded_file_descriptor_set(crate::FILE_DESCRIPTOR_SET)
453                .build_v1()?;
454
455            health_reporter
456                .set_serving::<ValidatorWorkerServer<Self>>()
457                .await;
458
459            #[cfg(feature = "opentelemetry")]
460            let mut server = tonic::transport::Server::builder().layer(
461                ServiceBuilder::new()
462                    .layer(crate::propagation::OtelContextLayer)
463                    .layer(GrpcPrometheusMetricsMiddlewareLayer)
464                    .into_inner(),
465            );
466            #[cfg(not(feature = "opentelemetry"))]
467            let mut server = tonic::transport::Server::builder().layer(
468                ServiceBuilder::new()
469                    .layer(GrpcPrometheusMetricsMiddlewareLayer)
470                    .into_inner(),
471            );
472            server
473                .add_service(health_service)
474                .add_service(reflection_service)
475                .add_service(worker_node)
476                .serve_with_shutdown(server_address, shutdown_signal.cancelled_owned())
477                .await?;
478
479            Ok(())
480        });
481
482        GrpcServerHandle { handle }
483    }
484
485    /// Continuously waits for receiver to receive notifications and sends them to
486    /// the proxy in batches for improved throughput.
487    #[instrument(skip(receiver, config))]
488    async fn forward_notifications(
489        nickname: String,
490        proxy_address: String,
491        exporter_addresses: Vec<String>,
492        mut receiver: tokio::sync::broadcast::Receiver<Notification>,
493        config: NotificationConfig,
494    ) {
495        let channel = tonic::transport::Channel::from_shared(proxy_address.clone())
496            .expect("Proxy URI should be valid")
497            .connect_lazy();
498        let client = NotifierServiceClient::new(channel)
499            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
500            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
501
502        let exporter_clients: Vec<NotifierServiceClient<Channel>> = exporter_addresses
503            .iter()
504            .map(|address| {
505                let channel = tonic::transport::Channel::from_shared(address.clone())
506                    .expect("Exporter URI should be valid")
507                    .connect_lazy();
508                NotifierServiceClient::new(channel)
509                    .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
510                    .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
511            })
512            .collect::<Vec<_>>();
513
514        let mut forwarder = BatchForwarder {
515            nickname: nickname.clone(),
516            client,
517            exporter_clients,
518            pending_notifications: Vec::new(),
519            futures: FuturesUnordered::new(),
520            batch_limit: config.notification_batch_size,
521            max_tasks: config.notification_max_in_flight,
522        };
523
524        loop {
525            tokio::select! {
526                biased;
527
528                result = receiver.recv() => {
529                    match result {
530                        Ok(notification) => {
531                            forwarder.pending_notifications.push(notification);
532
533                            if forwarder.futures.is_empty()
534                               || (forwarder.pending_notifications.len() >= forwarder.batch_limit
535                                   && forwarder.futures.len() < forwarder.max_tasks) {
536                                forwarder.spawn_batches();
537                            }
538                        }
539                        Err(RecvError::Lagged(skipped_count)) => {
540                            warn!(
541                                nickname,
542                                skipped_count, "notification receiver lagged, messages were skipped"
543                            );
544                            #[cfg(with_metrics)]
545                            metrics::NOTIFICATIONS_SKIPPED_RECEIVER_LAG
546                                .with_label_values(&[])
547                                .inc_by(skipped_count);
548                        }
549                        Err(RecvError::Closed) => {
550                            warn!(
551                                nickname,
552                                "notification channel closed, draining pending notifications"
553                            );
554                            // Drain all pending notifications before exiting
555                            loop {
556                                forwarder.spawn_batches();
557                                if forwarder.is_fully_drained() {
558                                    break;
559                                }
560                                forwarder.futures.next().await;
561                            }
562                            break;
563                        }
564                    }
565                }
566
567                Some(()) = forwarder.futures.next() => {
568                    forwarder.spawn_batches();
569                }
570            }
571        }
572    }
573
574    fn handle_network_actions(&self, actions: NetworkActions) {
575        let mut cross_chain_sender = self.cross_chain_sender.clone();
576        let notification_sender = self.notification_sender.clone();
577
578        for request in actions.cross_chain_requests {
579            let shard_id = self.network.get_shard_id(request.target_chain_id());
580            trace!(
581                source_shard_id = self.shard_id,
582                target_shard_id = shard_id,
583                "Scheduling cross-chain query",
584            );
585
586            if let Err(error) = cross_chain_sender.try_send((request, shard_id)) {
587                error!(%error, "dropping cross-chain request");
588                #[cfg(with_metrics)]
589                if error.is_full() {
590                    metrics::CROSS_CHAIN_MESSAGE_CHANNEL_FULL
591                        .with_label_values(&[])
592                        .inc();
593                }
594            }
595        }
596
597        for notification in actions.notifications {
598            trace!("Scheduling notification query");
599            if let Err(error) = notification_sender.send(notification) {
600                error!(%error, "dropping notification");
601                #[cfg(with_metrics)]
602                metrics::NOTIFICATIONS_DROPPED_NO_RECEIVER
603                    .with_label_values(&[])
604                    .inc();
605            }
606        }
607    }
608
609    #[instrument(skip_all, fields(nickname, %this_shard))]
610    #[expect(clippy::too_many_arguments)]
611    async fn forward_cross_chain_queries(
612        nickname: String,
613        network: ValidatorInternalNetworkConfig,
614        cross_chain_max_retries: u32,
615        cross_chain_retry_delay: Duration,
616        cross_chain_max_backoff: Duration,
617        cross_chain_sender_delay: Duration,
618        cross_chain_sender_failure_rate: f32,
619        this_shard: ShardId,
620        receiver: mpsc::Receiver<(linera_core::data_types::CrossChainRequest, ShardId)>,
621    ) {
622        let pool = GrpcConnectionPool::default();
623        let handle_request =
624            move |shard_id: ShardId, request: linera_core::data_types::CrossChainRequest| {
625                let channel_result = pool.channel(network.shard(shard_id).http_address());
626                async move {
627                    let mut client = ValidatorWorkerClient::new(channel_result?)
628                        .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
629                        .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
630                    client
631                        .handle_cross_chain_request(Request::new(request.try_into()?))
632                        .await?;
633                    anyhow::Result::<_, anyhow::Error>::Ok(())
634                }
635            };
636        cross_chain_message_queue::forward_cross_chain_queries(
637            nickname,
638            cross_chain_max_retries,
639            cross_chain_retry_delay,
640            cross_chain_max_backoff,
641            cross_chain_sender_delay,
642            cross_chain_sender_failure_rate,
643            this_shard,
644            receiver,
645            handle_request,
646        )
647        .await;
648    }
649
650    fn log_request_outcome(success: bool, method_name: &str, traffic_type: &str) {
651        #![cfg_attr(not(with_metrics), allow(unused_variables))]
652        #[cfg(with_metrics)]
653        {
654            if success {
655                metrics::SERVER_REQUEST_SUCCESS
656                    .with_label_values(&[method_name, traffic_type])
657                    .inc();
658            } else {
659                metrics::SERVER_REQUEST_ERROR
660                    .with_label_values(&[method_name, traffic_type])
661                    .inc();
662            }
663        }
664    }
665
666    /// Extracts traffic type from a tonic request's extensions.
667    #[cfg(feature = "opentelemetry")]
668    fn get_traffic_type<R>(request: &Request<R>) -> &'static str {
669        get_traffic_type_from_request(request)
670    }
671
672    /// Returns "unknown" when opentelemetry feature is disabled.
673    #[cfg(not(feature = "opentelemetry"))]
674    fn get_traffic_type<R>(_request: &Request<R>) -> &'static str {
675        "unknown"
676    }
677
678    fn log_error(&self, error: &linera_core::worker::WorkerError, context: &str) {
679        let nickname = self.state.nickname();
680        if error.is_local() {
681            error!(nickname, %error, "{}", context);
682        } else {
683            debug!(nickname, %error, "{}", context);
684        }
685    }
686}
687
688#[tonic::async_trait]
689impl<S> ValidatorWorkerRpc for GrpcServer<S>
690where
691    S: Storage + Clone + Send + Sync + 'static,
692{
693    #[instrument(
694        target = "grpc_server",
695        skip_all,
696        err,
697        fields(
698            nickname = self.state.nickname(),
699            chain_id = ?request.get_ref().chain_id()
700        )
701    )]
702    async fn handle_block_proposal(
703        &self,
704        request: Request<BlockProposal>,
705    ) -> Result<Response<ChainInfoResult>, Status> {
706        let traffic_type = Self::get_traffic_type(&request);
707        let proposal = request.into_inner().try_into()?;
708        trace!(?proposal, "Handling block proposal");
709        Ok(Response::new(
710            match self.state.clone().handle_block_proposal(proposal).await {
711                Ok((info, actions)) => {
712                    Self::log_request_outcome(true, "handle_block_proposal", traffic_type);
713                    self.handle_network_actions(actions);
714                    info.try_into()?
715                }
716                Err(error) => {
717                    Self::log_request_outcome(false, "handle_block_proposal", traffic_type);
718                    self.log_error(&error, "Failed to handle block proposal");
719                    NodeError::from(error).try_into()?
720                }
721            },
722        ))
723    }
724
725    #[instrument(
726        target = "grpc_server",
727        skip_all,
728        err,
729        fields(
730            nickname = self.state.nickname(),
731            chain_id = ?request.get_ref().chain_id()
732        )
733    )]
734    async fn handle_lite_certificate(
735        &self,
736        request: Request<LiteCertificate>,
737    ) -> Result<Response<ChainInfoResult>, Status> {
738        let traffic_type = Self::get_traffic_type(&request);
739        let HandleLiteCertRequest {
740            certificate,
741            wait_for_outgoing_messages,
742        } = request.into_inner().try_into()?;
743        trace!(?certificate, "Handling lite certificate");
744        let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
745        match Box::pin(
746            self.state
747                .clone()
748                .handle_lite_certificate(certificate, sender),
749        )
750        .await
751        {
752            Ok((info, actions)) => {
753                Self::log_request_outcome(true, "handle_lite_certificate", traffic_type);
754                self.handle_network_actions(actions);
755                if let Some(receiver) = receiver {
756                    if let Err(e) = receiver.await {
757                        error!("Failed to wait for message delivery: {e}");
758                    }
759                }
760                Ok(Response::new(info.try_into()?))
761            }
762            Err(error) => {
763                Self::log_request_outcome(false, "handle_lite_certificate", traffic_type);
764                self.log_error(&error, "Failed to handle lite certificate");
765                Ok(Response::new(NodeError::from(error).try_into()?))
766            }
767        }
768    }
769
770    #[instrument(
771        target = "grpc_server",
772        skip_all,
773        err,
774        fields(
775            nickname = self.state.nickname(),
776            chain_id = ?request.get_ref().chain_id()
777        )
778    )]
779    async fn handle_confirmed_certificate(
780        &self,
781        request: Request<api::HandleConfirmedCertificateRequest>,
782    ) -> Result<Response<ChainInfoResult>, Status> {
783        let traffic_type = Self::get_traffic_type(&request);
784        let HandleConfirmedCertificateRequest {
785            certificate,
786            wait_for_outgoing_messages,
787        } = request.into_inner().try_into()?;
788        trace!(?certificate, "Handling certificate");
789        let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
790        match self
791            .state
792            .clone()
793            .handle_confirmed_certificate(certificate, sender)
794            .await
795        {
796            Ok((info, actions)) => {
797                Self::log_request_outcome(true, "handle_confirmed_certificate", traffic_type);
798                self.handle_network_actions(actions);
799                if let Some(receiver) = receiver {
800                    if let Err(e) = receiver.await {
801                        error!("Failed to wait for message delivery: {e}");
802                    }
803                }
804                Ok(Response::new(info.try_into()?))
805            }
806            Err(error) => {
807                Self::log_request_outcome(false, "handle_confirmed_certificate", traffic_type);
808                self.log_error(&error, "Failed to handle confirmed certificate");
809                Ok(Response::new(NodeError::from(error).try_into()?))
810            }
811        }
812    }
813
814    #[instrument(
815        target = "grpc_server",
816        skip_all,
817        err,
818        fields(
819            nickname = self.state.nickname(),
820            chain_id = ?request.get_ref().chain_id()
821        )
822    )]
823    async fn handle_validated_certificate(
824        &self,
825        request: Request<api::HandleValidatedCertificateRequest>,
826    ) -> Result<Response<ChainInfoResult>, Status> {
827        let traffic_type = Self::get_traffic_type(&request);
828        let HandleValidatedCertificateRequest { certificate } = request.into_inner().try_into()?;
829        trace!(?certificate, "Handling certificate");
830        match self
831            .state
832            .clone()
833            .handle_validated_certificate(certificate)
834            .await
835        {
836            Ok((info, actions)) => {
837                Self::log_request_outcome(true, "handle_validated_certificate", traffic_type);
838                self.handle_network_actions(actions);
839                Ok(Response::new(info.try_into()?))
840            }
841            Err(error) => {
842                Self::log_request_outcome(false, "handle_validated_certificate", traffic_type);
843                self.log_error(&error, "Failed to handle validated certificate");
844                Ok(Response::new(NodeError::from(error).try_into()?))
845            }
846        }
847    }
848
849    #[instrument(
850        target = "grpc_server",
851        skip_all,
852        err,
853        fields(
854            nickname = self.state.nickname(),
855            chain_id = ?request.get_ref().chain_id()
856        )
857    )]
858    async fn handle_timeout_certificate(
859        &self,
860        request: Request<api::HandleTimeoutCertificateRequest>,
861    ) -> Result<Response<ChainInfoResult>, Status> {
862        let traffic_type = Self::get_traffic_type(&request);
863        let HandleTimeoutCertificateRequest { certificate } = request.into_inner().try_into()?;
864        trace!(?certificate, "Handling Timeout certificate");
865        match self
866            .state
867            .clone()
868            .handle_timeout_certificate(certificate)
869            .await
870        {
871            Ok((info, _actions)) => {
872                Self::log_request_outcome(true, "handle_timeout_certificate", traffic_type);
873                Ok(Response::new(info.try_into()?))
874            }
875            Err(error) => {
876                Self::log_request_outcome(false, "handle_timeout_certificate", traffic_type);
877                self.log_error(&error, "Failed to handle timeout certificate");
878                Ok(Response::new(NodeError::from(error).try_into()?))
879            }
880        }
881    }
882
883    #[instrument(
884        target = "grpc_server",
885        skip_all,
886        err,
887        fields(
888            nickname = self.state.nickname(),
889            chain_id = ?request.get_ref().chain_id()
890        )
891    )]
892    async fn handle_chain_info_query(
893        &self,
894        request: Request<ChainInfoQuery>,
895    ) -> Result<Response<ChainInfoResult>, Status> {
896        let traffic_type = Self::get_traffic_type(&request);
897        let query = request.into_inner().try_into()?;
898        trace!(?query, "Handling chain info query");
899        match self.state.clone().handle_chain_info_query(query).await {
900            Ok((info, actions)) => {
901                Self::log_request_outcome(true, "handle_chain_info_query", traffic_type);
902                self.handle_network_actions(actions);
903                Ok(Response::new(info.try_into()?))
904            }
905            Err(error) => {
906                Self::log_request_outcome(false, "handle_chain_info_query", traffic_type);
907                self.log_error(&error, "Failed to handle chain info query");
908                Ok(Response::new(NodeError::from(error).try_into()?))
909            }
910        }
911    }
912
913    #[instrument(
914        target = "grpc_server",
915        skip_all,
916        err,
917        fields(
918            nickname = self.state.nickname(),
919            chain_id = ?request.get_ref().chain_id()
920        )
921    )]
922    async fn download_pending_blob(
923        &self,
924        request: Request<PendingBlobRequest>,
925    ) -> Result<Response<PendingBlobResult>, Status> {
926        let traffic_type = Self::get_traffic_type(&request);
927        let (chain_id, blob_id) = request.into_inner().try_into()?;
928        trace!(?chain_id, ?blob_id, "Download pending blob");
929        match self
930            .state
931            .clone()
932            .download_pending_blob(chain_id, blob_id)
933            .await
934        {
935            Ok(blob) => {
936                Self::log_request_outcome(true, "download_pending_blob", traffic_type);
937                Ok(Response::new(blob.into_content().try_into()?))
938            }
939            Err(error) => {
940                Self::log_request_outcome(false, "download_pending_blob", traffic_type);
941                self.log_error(&error, "Failed to download pending blob");
942                Ok(Response::new(NodeError::from(error).try_into()?))
943            }
944        }
945    }
946
947    #[instrument(
948        target = "grpc_server",
949        skip_all,
950        err,
951        fields(
952            nickname = self.state.nickname(),
953            chain_id = ?request.get_ref().chain_id
954        )
955    )]
956    async fn handle_pending_blob(
957        &self,
958        request: Request<HandlePendingBlobRequest>,
959    ) -> Result<Response<ChainInfoResult>, Status> {
960        let traffic_type = Self::get_traffic_type(&request);
961        let (chain_id, blob_content) = request.into_inner().try_into()?;
962        let blob = Blob::new(blob_content);
963        let blob_id = blob.id();
964        trace!(?chain_id, ?blob_id, "Handle pending blob");
965        match self.state.clone().handle_pending_blob(chain_id, blob).await {
966            Ok(info) => {
967                Self::log_request_outcome(true, "handle_pending_blob", traffic_type);
968                Ok(Response::new(info.try_into()?))
969            }
970            Err(error) => {
971                Self::log_request_outcome(false, "handle_pending_blob", traffic_type);
972                self.log_error(&error, "Failed to handle pending blob");
973                Ok(Response::new(NodeError::from(error).try_into()?))
974            }
975        }
976    }
977
978    #[instrument(
979        target = "grpc_server",
980        skip_all,
981        err,
982        fields(
983            nickname = self.state.nickname(),
984            chain_id = ?request.get_ref().chain_id()
985        )
986    )]
987    async fn handle_cross_chain_request(
988        &self,
989        request: Request<CrossChainRequest>,
990    ) -> Result<Response<()>, Status> {
991        let traffic_type = Self::get_traffic_type(&request);
992        let cross_chain_request = request.into_inner().try_into()?;
993        trace!(?cross_chain_request, "Handling cross-chain request");
994        match self
995            .state
996            .clone()
997            .handle_cross_chain_request(cross_chain_request)
998            .await
999        {
1000            Ok(actions) => {
1001                Self::log_request_outcome(true, "handle_cross_chain_request", traffic_type);
1002                self.handle_network_actions(actions)
1003            }
1004            Err(error) => {
1005                Self::log_request_outcome(false, "handle_cross_chain_request", traffic_type);
1006                let nickname = self.state.nickname();
1007                error!(nickname, %error, "Failed to handle cross-chain request");
1008            }
1009        }
1010        Ok(Response::new(()))
1011    }
1012}
1013
1014/// Types which are proxyable and expose the appropriate methods to be handled
1015/// by the `GrpcProxy`
1016pub trait GrpcProxyable {
1017    fn chain_id(&self) -> Option<ChainId>;
1018}
1019
1020impl GrpcProxyable for BlockProposal {
1021    fn chain_id(&self) -> Option<ChainId> {
1022        self.chain_id.clone()?.try_into().ok()
1023    }
1024}
1025
1026impl GrpcProxyable for LiteCertificate {
1027    fn chain_id(&self) -> Option<ChainId> {
1028        self.chain_id.clone()?.try_into().ok()
1029    }
1030}
1031
1032impl GrpcProxyable for api::HandleConfirmedCertificateRequest {
1033    fn chain_id(&self) -> Option<ChainId> {
1034        self.chain_id.clone()?.try_into().ok()
1035    }
1036}
1037
1038impl GrpcProxyable for api::HandleTimeoutCertificateRequest {
1039    fn chain_id(&self) -> Option<ChainId> {
1040        self.chain_id.clone()?.try_into().ok()
1041    }
1042}
1043
1044impl GrpcProxyable for api::HandleValidatedCertificateRequest {
1045    fn chain_id(&self) -> Option<ChainId> {
1046        self.chain_id.clone()?.try_into().ok()
1047    }
1048}
1049
1050impl GrpcProxyable for ChainInfoQuery {
1051    fn chain_id(&self) -> Option<ChainId> {
1052        self.chain_id.clone()?.try_into().ok()
1053    }
1054}
1055
1056impl GrpcProxyable for PendingBlobRequest {
1057    fn chain_id(&self) -> Option<ChainId> {
1058        self.chain_id.clone()?.try_into().ok()
1059    }
1060}
1061
1062impl GrpcProxyable for HandlePendingBlobRequest {
1063    fn chain_id(&self) -> Option<ChainId> {
1064        self.chain_id.clone()?.try_into().ok()
1065    }
1066}
1067
1068impl GrpcProxyable for CrossChainRequest {
1069    fn chain_id(&self) -> Option<ChainId> {
1070        use super::api::cross_chain_request::Inner;
1071
1072        match self.inner.as_ref()? {
1073            Inner::UpdateRecipient(api::UpdateRecipient { recipient, .. })
1074            | Inner::ConfirmUpdatedRecipient(api::ConfirmUpdatedRecipient { recipient, .. }) => {
1075                recipient.clone()?.try_into().ok()
1076            }
1077        }
1078    }
1079}