linera_rpc/grpc/
server.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    net::{IpAddr, SocketAddr},
6    str::FromStr,
7    task::{Context, Poll},
8};
9
10use futures::{
11    channel::mpsc, future::BoxFuture, stream::FuturesUnordered, FutureExt as _, StreamExt as _,
12};
13use linera_base::{
14    data_types::Blob,
15    identifiers::ChainId,
16    time::{Duration, Instant},
17};
18use linera_core::{
19    join_set_ext::JoinSet,
20    node::NodeError,
21    worker::{NetworkActions, Notification, Reason, WorkerState},
22    JoinSetExt as _, TaskHandle,
23};
24use linera_storage::Storage;
25use tokio::sync::{broadcast::error::RecvError, oneshot};
26use tokio_util::sync::CancellationToken;
27use tonic::{transport::Channel, Request, Response, Status};
28use tower::{builder::ServiceBuilder, Layer, Service};
29use tracing::{debug, error, info, instrument, trace, warn};
30
31use super::{
32    api::{
33        self,
34        notifier_service_client::NotifierServiceClient,
35        validator_worker_client::ValidatorWorkerClient,
36        validator_worker_server::{ValidatorWorker as ValidatorWorkerRpc, ValidatorWorkerServer},
37        BlockProposal, ChainInfoQuery, ChainInfoResult, CrossChainRequest,
38        HandlePendingBlobRequest, LiteCertificate, PendingBlobRequest, PendingBlobResult,
39    },
40    pool::GrpcConnectionPool,
41    GrpcError, GRPC_MAX_MESSAGE_SIZE,
42};
43use crate::{
44    config::{CrossChainConfig, NotificationConfig, ShardId, ValidatorInternalNetworkConfig},
45    cross_chain_message_queue, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
46    HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
47};
48
49type CrossChainSender = mpsc::Sender<(linera_core::data_types::CrossChainRequest, ShardId)>;
50type NotificationSender = tokio::sync::broadcast::Sender<Notification>;
51
52#[cfg(with_metrics)]
53mod metrics {
54    use std::sync::LazyLock;
55
56    use linera_base::prometheus_util::{
57        exponential_bucket_interval, linear_bucket_interval, register_histogram_vec,
58        register_int_counter_vec,
59    };
60    use prometheus::{HistogramVec, IntCounterVec};
61
62    pub static SERVER_REQUEST_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
63        register_histogram_vec(
64            "server_request_latency",
65            "Server request latency",
66            &[],
67            linear_bucket_interval(1.0, 25.0, 2000.0),
68        )
69    });
70
71    pub static SERVER_REQUEST_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
72        register_int_counter_vec("server_request_count", "Server request count", &[])
73    });
74
75    pub static SERVER_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
76        register_int_counter_vec(
77            "server_request_success",
78            "Server request success",
79            &["method_name"],
80        )
81    });
82
83    pub static SERVER_REQUEST_ERROR: LazyLock<IntCounterVec> = LazyLock::new(|| {
84        register_int_counter_vec(
85            "server_request_error",
86            "Server request error",
87            &["method_name"],
88        )
89    });
90
91    pub static SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE: LazyLock<HistogramVec> =
92        LazyLock::new(|| {
93            register_histogram_vec(
94                "server_request_latency_per_request_type",
95                "Server request latency per request type",
96                &["method_name"],
97                linear_bucket_interval(1.0, 25.0, 2000.0),
98            )
99        });
100
101    pub static CROSS_CHAIN_MESSAGE_CHANNEL_FULL: LazyLock<IntCounterVec> = LazyLock::new(|| {
102        register_int_counter_vec(
103            "cross_chain_message_channel_full",
104            "Cross-chain message channel full",
105            &[],
106        )
107    });
108
109    pub static NOTIFICATIONS_SKIPPED_RECEIVER_LAG: LazyLock<IntCounterVec> = LazyLock::new(|| {
110        register_int_counter_vec(
111            "notifications_skipped_receiver_lag",
112            "Number of notifications skipped because receiver lagged behind sender",
113            &[],
114        )
115    });
116
117    pub static NOTIFICATIONS_DROPPED_NO_RECEIVER: LazyLock<IntCounterVec> = LazyLock::new(|| {
118        register_int_counter_vec(
119            "notifications_dropped_no_receiver",
120            "Number of notifications dropped because no receiver was available",
121            &[],
122        )
123    });
124
125    pub static NOTIFICATION_BATCH_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
126        register_histogram_vec(
127            "notification_batch_size",
128            "Number of notifications per batch sent to proxy",
129            &[],
130            exponential_bucket_interval(1.0, 250.0),
131        )
132    });
133
134    pub static NOTIFICATION_BATCHES_SENT: LazyLock<IntCounterVec> = LazyLock::new(|| {
135        register_int_counter_vec(
136            "notification_batches_sent",
137            "Total notification batches sent",
138            &["status"],
139        )
140    });
141}
142
143/// Handles batched forwarding of notifications to proxy and exporters.
144struct BatchForwarder {
145    nickname: String,
146    client: NotifierServiceClient<Channel>,
147    exporter_clients: Vec<NotifierServiceClient<Channel>>,
148    pending_notifications: Vec<Notification>,
149    futures: FuturesUnordered<BoxFuture<'static, ()>>,
150    batch_limit: usize,
151    max_tasks: usize,
152}
153
154impl BatchForwarder {
155    /// Spawns batch send tasks up to max_tasks limit.
156    fn spawn_batches(&mut self) {
157        while !self.pending_notifications.is_empty() && self.futures.len() < self.max_tasks {
158            let chunk_size = std::cmp::min(self.batch_limit, self.pending_notifications.len());
159            let batch: Vec<Notification> = self.pending_notifications.drain(..chunk_size).collect();
160
161            #[cfg(with_metrics)]
162            metrics::NOTIFICATION_BATCH_SIZE
163                .with_label_values(&[])
164                .observe(batch.len() as f64);
165
166            let client = self.client.clone();
167            let exporter_clients = self.exporter_clients.clone();
168            let nickname = self.nickname.clone();
169
170            self.futures.push(
171                async move {
172                    Self::send_batch(nickname, client, exporter_clients, batch).await;
173                }
174                .boxed(),
175            );
176        }
177    }
178
179    /// Returns true if there are no pending notifications and no in-flight tasks.
180    fn is_fully_drained(&self) -> bool {
181        self.pending_notifications.is_empty() && self.futures.is_empty()
182    }
183
184    /// Sends a batch of notifications to the proxy and exporters.
185    async fn send_batch(
186        nickname: String,
187        mut client: NotifierServiceClient<Channel>,
188        mut exporter_clients: Vec<NotifierServiceClient<Channel>>,
189        batch: Vec<Notification>,
190    ) {
191        // Convert to proto notifications, logging any deserialization errors
192        let mut proto_notifications = Vec::with_capacity(batch.len());
193        for notification in &batch {
194            match notification.clone().try_into() {
195                Ok(proto) => proto_notifications.push(proto),
196                Err(error) => {
197                    warn!(
198                        %error,
199                        nickname,
200                        ?notification.chain_id,
201                        ?notification.reason,
202                        "could not deserialize notification"
203                    );
204                }
205            }
206        }
207
208        // Collect chain_ids for error logging
209        let chain_ids: Vec<_> = batch.iter().map(|n| n.chain_id).collect();
210
211        // Send batch to proxy
212        let request = Request::new(api::NotificationBatch {
213            notifications: proto_notifications.clone(),
214        });
215        let result = client.notify_batch(request).await;
216
217        #[cfg(with_metrics)]
218        {
219            let status = if result.is_ok() { "success" } else { "error" };
220            metrics::NOTIFICATION_BATCHES_SENT
221                .with_label_values(&[status])
222                .inc();
223        }
224
225        if let Err(error) = result {
226            error!(
227                %error,
228                nickname,
229                batch_size = proto_notifications.len(),
230                ?chain_ids,
231                "proxy: could not send notification batch",
232            );
233        }
234
235        // Send NewBlock notifications to exporters
236        let new_block_notifications: Vec<_> = batch
237            .iter()
238            .filter(|n| matches!(n.reason, Reason::NewBlock { .. }))
239            .collect();
240
241        let exporter_notifications: Vec<api::Notification> = new_block_notifications
242            .iter()
243            .filter_map(|n| (*n).clone().try_into().ok())
244            .collect();
245
246        if !exporter_notifications.is_empty() {
247            let exporter_chain_ids: Vec<_> =
248                new_block_notifications.iter().map(|n| n.chain_id).collect();
249
250            for exporter_client in &mut exporter_clients {
251                let request = Request::new(api::NotificationBatch {
252                    notifications: exporter_notifications.clone(),
253                });
254                if let Err(error) = exporter_client.notify_batch(request).await {
255                    error!(
256                        %error,
257                        nickname,
258                        batch_size = exporter_notifications.len(),
259                        ?exporter_chain_ids,
260                        "block exporter: could not send notification batch",
261                    );
262                }
263            }
264        }
265    }
266}
267
268#[derive(Clone)]
269pub struct GrpcServer<S>
270where
271    S: Storage,
272{
273    state: WorkerState<S>,
274    shard_id: ShardId,
275    network: ValidatorInternalNetworkConfig,
276    cross_chain_sender: CrossChainSender,
277    notification_sender: NotificationSender,
278}
279
280pub struct GrpcServerHandle {
281    handle: TaskHandle<Result<(), GrpcError>>,
282}
283
284impl GrpcServerHandle {
285    pub async fn join(self) -> Result<(), GrpcError> {
286        self.handle.await?
287    }
288}
289
290#[derive(Clone)]
291pub struct GrpcPrometheusMetricsMiddlewareLayer;
292
293#[derive(Clone)]
294pub struct GrpcPrometheusMetricsMiddlewareService<T> {
295    service: T,
296}
297
298impl<S> Layer<S> for GrpcPrometheusMetricsMiddlewareLayer {
299    type Service = GrpcPrometheusMetricsMiddlewareService<S>;
300
301    fn layer(&self, service: S) -> Self::Service {
302        GrpcPrometheusMetricsMiddlewareService { service }
303    }
304}
305
306impl<S, Req> Service<Req> for GrpcPrometheusMetricsMiddlewareService<S>
307where
308    S::Future: Send + 'static,
309    S: Service<Req> + std::marker::Send,
310{
311    type Response = S::Response;
312    type Error = S::Error;
313    type Future = BoxFuture<'static, Result<S::Response, S::Error>>;
314
315    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
316        self.service.poll_ready(cx)
317    }
318
319    fn call(&mut self, request: Req) -> Self::Future {
320        #[cfg(with_metrics)]
321        let start = Instant::now();
322        let future = self.service.call(request);
323        async move {
324            let response = future.await?;
325            #[cfg(with_metrics)]
326            {
327                metrics::SERVER_REQUEST_LATENCY
328                    .with_label_values(&[])
329                    .observe(start.elapsed().as_secs_f64() * 1000.0);
330                metrics::SERVER_REQUEST_COUNT.with_label_values(&[]).inc();
331            }
332            Ok(response)
333        }
334        .boxed()
335    }
336}
337
338impl<S> GrpcServer<S>
339where
340    S: Storage + Clone + Send + Sync + 'static,
341{
342    #[expect(clippy::too_many_arguments)]
343    pub fn spawn(
344        host: String,
345        port: u16,
346        state: WorkerState<S>,
347        shard_id: ShardId,
348        internal_network: ValidatorInternalNetworkConfig,
349        cross_chain_config: CrossChainConfig,
350        notification_config: NotificationConfig,
351        shutdown_signal: CancellationToken,
352        join_set: &mut JoinSet,
353    ) -> GrpcServerHandle {
354        info!(
355            "spawning gRPC server on {}:{} for shard {}",
356            host, port, shard_id
357        );
358
359        let (cross_chain_sender, cross_chain_receiver) =
360            mpsc::channel(cross_chain_config.queue_size);
361
362        let (notification_sender, _) =
363            tokio::sync::broadcast::channel(notification_config.notification_queue_size);
364
365        join_set.spawn_task({
366            info!(
367                nickname = state.nickname(),
368                "spawning cross-chain queries thread on {} for shard {}", host, shard_id
369            );
370            Self::forward_cross_chain_queries(
371                state.nickname().to_string(),
372                internal_network.clone(),
373                cross_chain_config.max_retries,
374                Duration::from_millis(cross_chain_config.retry_delay_ms),
375                Duration::from_millis(cross_chain_config.sender_delay_ms),
376                cross_chain_config.sender_failure_rate,
377                shard_id,
378                cross_chain_receiver,
379            )
380        });
381
382        let mut exporter_forwarded = false;
383        for proxy in &internal_network.proxies {
384            let receiver = notification_sender.subscribe();
385            join_set.spawn_task({
386                info!(
387                    nickname = state.nickname(),
388                    "spawning notifications thread on {} for shard {}", host, shard_id
389                );
390                let exporter_addresses = if exporter_forwarded {
391                    vec![]
392                } else {
393                    exporter_forwarded = true;
394                    internal_network.exporter_addresses()
395                };
396                Self::forward_notifications(
397                    state.nickname().to_string(),
398                    proxy.internal_address(&internal_network.protocol),
399                    exporter_addresses,
400                    receiver,
401                    notification_config.clone(),
402                )
403            });
404        }
405
406        let (health_reporter, health_service) = tonic_health::server::health_reporter();
407
408        let grpc_server = GrpcServer {
409            state,
410            shard_id,
411            network: internal_network,
412            cross_chain_sender,
413            notification_sender,
414        };
415
416        let worker_node = ValidatorWorkerServer::new(grpc_server)
417            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
418            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
419
420        let handle = join_set.spawn_task(async move {
421            let server_address = SocketAddr::from((IpAddr::from_str(&host)?, port));
422
423            let reflection_service = tonic_reflection::server::Builder::configure()
424                .register_encoded_file_descriptor_set(crate::FILE_DESCRIPTOR_SET)
425                .build_v1()?;
426
427            health_reporter
428                .set_serving::<ValidatorWorkerServer<Self>>()
429                .await;
430
431            tonic::transport::Server::builder()
432                .layer(
433                    ServiceBuilder::new()
434                        .layer(GrpcPrometheusMetricsMiddlewareLayer)
435                        .into_inner(),
436                )
437                .add_service(health_service)
438                .add_service(reflection_service)
439                .add_service(worker_node)
440                .serve_with_shutdown(server_address, shutdown_signal.cancelled_owned())
441                .await?;
442
443            Ok(())
444        });
445
446        GrpcServerHandle { handle }
447    }
448
449    /// Continuously waits for receiver to receive notifications and sends them to
450    /// the proxy in batches for improved throughput.
451    #[instrument(skip(receiver, config))]
452    async fn forward_notifications(
453        nickname: String,
454        proxy_address: String,
455        exporter_addresses: Vec<String>,
456        mut receiver: tokio::sync::broadcast::Receiver<Notification>,
457        config: NotificationConfig,
458    ) {
459        let channel = tonic::transport::Channel::from_shared(proxy_address.clone())
460            .expect("Proxy URI should be valid")
461            .connect_lazy();
462        let client = NotifierServiceClient::new(channel)
463            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
464            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
465
466        let exporter_clients: Vec<NotifierServiceClient<Channel>> = exporter_addresses
467            .iter()
468            .map(|address| {
469                let channel = tonic::transport::Channel::from_shared(address.clone())
470                    .expect("Exporter URI should be valid")
471                    .connect_lazy();
472                NotifierServiceClient::new(channel)
473                    .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
474                    .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
475            })
476            .collect::<Vec<_>>();
477
478        let mut forwarder = BatchForwarder {
479            nickname: nickname.clone(),
480            client,
481            exporter_clients,
482            pending_notifications: Vec::new(),
483            futures: FuturesUnordered::new(),
484            batch_limit: config.notification_batch_size,
485            max_tasks: config.notification_max_in_flight,
486        };
487
488        loop {
489            tokio::select! {
490                biased;
491
492                result = receiver.recv() => {
493                    match result {
494                        Ok(notification) => {
495                            forwarder.pending_notifications.push(notification);
496
497                            if forwarder.futures.is_empty()
498                               || (forwarder.pending_notifications.len() >= forwarder.batch_limit
499                                   && forwarder.futures.len() < forwarder.max_tasks) {
500                                forwarder.spawn_batches();
501                            }
502                        }
503                        Err(RecvError::Lagged(skipped_count)) => {
504                            warn!(
505                                nickname,
506                                skipped_count, "notification receiver lagged, messages were skipped"
507                            );
508                            #[cfg(with_metrics)]
509                            metrics::NOTIFICATIONS_SKIPPED_RECEIVER_LAG
510                                .with_label_values(&[])
511                                .inc_by(skipped_count);
512                        }
513                        Err(RecvError::Closed) => {
514                            warn!(
515                                nickname,
516                                "notification channel closed, draining pending notifications"
517                            );
518                            // Drain all pending notifications before exiting
519                            loop {
520                                forwarder.spawn_batches();
521                                if forwarder.is_fully_drained() {
522                                    break;
523                                }
524                                forwarder.futures.next().await;
525                            }
526                            break;
527                        }
528                    }
529                }
530
531                Some(()) = forwarder.futures.next() => {
532                    forwarder.spawn_batches();
533                }
534            }
535        }
536    }
537
538    fn handle_network_actions(&self, actions: NetworkActions) {
539        let mut cross_chain_sender = self.cross_chain_sender.clone();
540        let notification_sender = self.notification_sender.clone();
541
542        for request in actions.cross_chain_requests {
543            let shard_id = self.network.get_shard_id(request.target_chain_id());
544            trace!(
545                source_shard_id = self.shard_id,
546                target_shard_id = shard_id,
547                "Scheduling cross-chain query",
548            );
549
550            if let Err(error) = cross_chain_sender.try_send((request, shard_id)) {
551                error!(%error, "dropping cross-chain request");
552                #[cfg(with_metrics)]
553                if error.is_full() {
554                    metrics::CROSS_CHAIN_MESSAGE_CHANNEL_FULL
555                        .with_label_values(&[])
556                        .inc();
557                }
558            }
559        }
560
561        for notification in actions.notifications {
562            trace!("Scheduling notification query");
563            if let Err(error) = notification_sender.send(notification) {
564                error!(%error, "dropping notification");
565                #[cfg(with_metrics)]
566                metrics::NOTIFICATIONS_DROPPED_NO_RECEIVER
567                    .with_label_values(&[])
568                    .inc();
569            }
570        }
571    }
572
573    #[instrument(skip_all, fields(nickname, %this_shard))]
574    #[expect(clippy::too_many_arguments)]
575    async fn forward_cross_chain_queries(
576        nickname: String,
577        network: ValidatorInternalNetworkConfig,
578        cross_chain_max_retries: u32,
579        cross_chain_retry_delay: Duration,
580        cross_chain_sender_delay: Duration,
581        cross_chain_sender_failure_rate: f32,
582        this_shard: ShardId,
583        receiver: mpsc::Receiver<(linera_core::data_types::CrossChainRequest, ShardId)>,
584    ) {
585        let pool = GrpcConnectionPool::default();
586        let handle_request =
587            move |shard_id: ShardId, request: linera_core::data_types::CrossChainRequest| {
588                let channel_result = pool.channel(network.shard(shard_id).http_address());
589                async move {
590                    let mut client = ValidatorWorkerClient::new(channel_result?)
591                        .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
592                        .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
593                    client
594                        .handle_cross_chain_request(Request::new(request.try_into()?))
595                        .await?;
596                    anyhow::Result::<_, anyhow::Error>::Ok(())
597                }
598            };
599        cross_chain_message_queue::forward_cross_chain_queries(
600            nickname,
601            cross_chain_max_retries,
602            cross_chain_retry_delay,
603            cross_chain_sender_delay,
604            cross_chain_sender_failure_rate,
605            this_shard,
606            receiver,
607            handle_request,
608        )
609        .await;
610    }
611
612    fn log_request_outcome_and_latency(start: Instant, success: bool, method_name: &str) {
613        #![cfg_attr(not(with_metrics), allow(unused_variables))]
614        #[cfg(with_metrics)]
615        {
616            metrics::SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE
617                .with_label_values(&[method_name])
618                .observe(start.elapsed().as_secs_f64() * 1000.0);
619            if success {
620                metrics::SERVER_REQUEST_SUCCESS
621                    .with_label_values(&[method_name])
622                    .inc();
623            } else {
624                metrics::SERVER_REQUEST_ERROR
625                    .with_label_values(&[method_name])
626                    .inc();
627            }
628        }
629    }
630
631    fn log_error(&self, error: &linera_core::worker::WorkerError, context: &str) {
632        let nickname = self.state.nickname();
633        if error.is_local() {
634            error!(nickname, %error, "{}", context);
635        } else {
636            debug!(nickname, %error, "{}", context);
637        }
638    }
639}
640
641#[tonic::async_trait]
642impl<S> ValidatorWorkerRpc for GrpcServer<S>
643where
644    S: Storage + Clone + Send + Sync + 'static,
645{
646    #[instrument(
647        target = "grpc_server",
648        skip_all,
649        err,
650        fields(
651            nickname = self.state.nickname(),
652            chain_id = ?request.get_ref().chain_id()
653        )
654    )]
655    async fn handle_block_proposal(
656        &self,
657        request: Request<BlockProposal>,
658    ) -> Result<Response<ChainInfoResult>, Status> {
659        let start = Instant::now();
660        let proposal = request.into_inner().try_into()?;
661        trace!(?proposal, "Handling block proposal");
662        Ok(Response::new(
663            match self.state.clone().handle_block_proposal(proposal).await {
664                Ok((info, actions)) => {
665                    Self::log_request_outcome_and_latency(start, true, "handle_block_proposal");
666                    self.handle_network_actions(actions);
667                    info.try_into()?
668                }
669                Err(error) => {
670                    Self::log_request_outcome_and_latency(start, false, "handle_block_proposal");
671                    self.log_error(&error, "Failed to handle block proposal");
672                    NodeError::from(error).try_into()?
673                }
674            },
675        ))
676    }
677
678    #[instrument(
679        target = "grpc_server",
680        skip_all,
681        err,
682        fields(
683            nickname = self.state.nickname(),
684            chain_id = ?request.get_ref().chain_id()
685        )
686    )]
687    async fn handle_lite_certificate(
688        &self,
689        request: Request<LiteCertificate>,
690    ) -> Result<Response<ChainInfoResult>, Status> {
691        let start = Instant::now();
692        let HandleLiteCertRequest {
693            certificate,
694            wait_for_outgoing_messages,
695        } = request.into_inner().try_into()?;
696        trace!(?certificate, "Handling lite certificate");
697        let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
698        match Box::pin(
699            self.state
700                .clone()
701                .handle_lite_certificate(certificate, sender),
702        )
703        .await
704        {
705            Ok((info, actions)) => {
706                Self::log_request_outcome_and_latency(start, true, "handle_lite_certificate");
707                self.handle_network_actions(actions);
708                if let Some(receiver) = receiver {
709                    if let Err(e) = receiver.await {
710                        error!("Failed to wait for message delivery: {e}");
711                    }
712                }
713                Ok(Response::new(info.try_into()?))
714            }
715            Err(error) => {
716                Self::log_request_outcome_and_latency(start, false, "handle_lite_certificate");
717                self.log_error(&error, "Failed to handle lite certificate");
718                Ok(Response::new(NodeError::from(error).try_into()?))
719            }
720        }
721    }
722
723    #[instrument(
724        target = "grpc_server",
725        skip_all,
726        err,
727        fields(
728            nickname = self.state.nickname(),
729            chain_id = ?request.get_ref().chain_id()
730        )
731    )]
732    async fn handle_confirmed_certificate(
733        &self,
734        request: Request<api::HandleConfirmedCertificateRequest>,
735    ) -> Result<Response<ChainInfoResult>, Status> {
736        let start = Instant::now();
737        let HandleConfirmedCertificateRequest {
738            certificate,
739            wait_for_outgoing_messages,
740        } = request.into_inner().try_into()?;
741        trace!(?certificate, "Handling certificate");
742        let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
743        match self
744            .state
745            .clone()
746            .handle_confirmed_certificate(certificate, sender)
747            .await
748        {
749            Ok((info, actions)) => {
750                Self::log_request_outcome_and_latency(start, true, "handle_confirmed_certificate");
751                self.handle_network_actions(actions);
752                if let Some(receiver) = receiver {
753                    if let Err(e) = receiver.await {
754                        error!("Failed to wait for message delivery: {e}");
755                    }
756                }
757                Ok(Response::new(info.try_into()?))
758            }
759            Err(error) => {
760                Self::log_request_outcome_and_latency(start, false, "handle_confirmed_certificate");
761                self.log_error(&error, "Failed to handle confirmed certificate");
762                Ok(Response::new(NodeError::from(error).try_into()?))
763            }
764        }
765    }
766
767    #[instrument(
768        target = "grpc_server",
769        skip_all,
770        err,
771        fields(
772            nickname = self.state.nickname(),
773            chain_id = ?request.get_ref().chain_id()
774        )
775    )]
776    async fn handle_validated_certificate(
777        &self,
778        request: Request<api::HandleValidatedCertificateRequest>,
779    ) -> Result<Response<ChainInfoResult>, Status> {
780        let start = Instant::now();
781        let HandleValidatedCertificateRequest { certificate } = request.into_inner().try_into()?;
782        trace!(?certificate, "Handling certificate");
783        match self
784            .state
785            .clone()
786            .handle_validated_certificate(certificate)
787            .await
788        {
789            Ok((info, actions)) => {
790                Self::log_request_outcome_and_latency(start, true, "handle_validated_certificate");
791                self.handle_network_actions(actions);
792                Ok(Response::new(info.try_into()?))
793            }
794            Err(error) => {
795                Self::log_request_outcome_and_latency(start, false, "handle_validated_certificate");
796                self.log_error(&error, "Failed to handle validated certificate");
797                Ok(Response::new(NodeError::from(error).try_into()?))
798            }
799        }
800    }
801
802    #[instrument(
803        target = "grpc_server",
804        skip_all,
805        err,
806        fields(
807            nickname = self.state.nickname(),
808            chain_id = ?request.get_ref().chain_id()
809        )
810    )]
811    async fn handle_timeout_certificate(
812        &self,
813        request: Request<api::HandleTimeoutCertificateRequest>,
814    ) -> Result<Response<ChainInfoResult>, Status> {
815        let start = Instant::now();
816        let HandleTimeoutCertificateRequest { certificate } = request.into_inner().try_into()?;
817        trace!(?certificate, "Handling Timeout certificate");
818        match self
819            .state
820            .clone()
821            .handle_timeout_certificate(certificate)
822            .await
823        {
824            Ok((info, _actions)) => {
825                Self::log_request_outcome_and_latency(start, true, "handle_timeout_certificate");
826                Ok(Response::new(info.try_into()?))
827            }
828            Err(error) => {
829                Self::log_request_outcome_and_latency(start, false, "handle_timeout_certificate");
830                self.log_error(&error, "Failed to handle timeout certificate");
831                Ok(Response::new(NodeError::from(error).try_into()?))
832            }
833        }
834    }
835
836    #[instrument(
837        target = "grpc_server",
838        skip_all,
839        err,
840        fields(
841            nickname = self.state.nickname(),
842            chain_id = ?request.get_ref().chain_id()
843        )
844    )]
845    async fn handle_chain_info_query(
846        &self,
847        request: Request<ChainInfoQuery>,
848    ) -> Result<Response<ChainInfoResult>, Status> {
849        let start = Instant::now();
850        let query = request.into_inner().try_into()?;
851        trace!(?query, "Handling chain info query");
852        match self.state.clone().handle_chain_info_query(query).await {
853            Ok((info, actions)) => {
854                Self::log_request_outcome_and_latency(start, true, "handle_chain_info_query");
855                self.handle_network_actions(actions);
856                Ok(Response::new(info.try_into()?))
857            }
858            Err(error) => {
859                Self::log_request_outcome_and_latency(start, false, "handle_chain_info_query");
860                self.log_error(&error, "Failed to handle chain info query");
861                Ok(Response::new(NodeError::from(error).try_into()?))
862            }
863        }
864    }
865
866    #[instrument(
867        target = "grpc_server",
868        skip_all,
869        err,
870        fields(
871            nickname = self.state.nickname(),
872            chain_id = ?request.get_ref().chain_id()
873        )
874    )]
875    async fn download_pending_blob(
876        &self,
877        request: Request<PendingBlobRequest>,
878    ) -> Result<Response<PendingBlobResult>, Status> {
879        let start = Instant::now();
880        let (chain_id, blob_id) = request.into_inner().try_into()?;
881        trace!(?chain_id, ?blob_id, "Download pending blob");
882        match self
883            .state
884            .clone()
885            .download_pending_blob(chain_id, blob_id)
886            .await
887        {
888            Ok(blob) => {
889                Self::log_request_outcome_and_latency(start, true, "download_pending_blob");
890                Ok(Response::new(blob.into_content().try_into()?))
891            }
892            Err(error) => {
893                Self::log_request_outcome_and_latency(start, false, "download_pending_blob");
894                self.log_error(&error, "Failed to download pending blob");
895                Ok(Response::new(NodeError::from(error).try_into()?))
896            }
897        }
898    }
899
900    #[instrument(
901        target = "grpc_server",
902        skip_all,
903        err,
904        fields(
905            nickname = self.state.nickname(),
906            chain_id = ?request.get_ref().chain_id
907        )
908    )]
909    async fn handle_pending_blob(
910        &self,
911        request: Request<HandlePendingBlobRequest>,
912    ) -> Result<Response<ChainInfoResult>, Status> {
913        let start = Instant::now();
914        let (chain_id, blob_content) = request.into_inner().try_into()?;
915        let blob = Blob::new(blob_content);
916        let blob_id = blob.id();
917        trace!(?chain_id, ?blob_id, "Handle pending blob");
918        match self.state.clone().handle_pending_blob(chain_id, blob).await {
919            Ok(info) => {
920                Self::log_request_outcome_and_latency(start, true, "handle_pending_blob");
921                Ok(Response::new(info.try_into()?))
922            }
923            Err(error) => {
924                Self::log_request_outcome_and_latency(start, false, "handle_pending_blob");
925                self.log_error(&error, "Failed to handle pending blob");
926                Ok(Response::new(NodeError::from(error).try_into()?))
927            }
928        }
929    }
930
931    #[instrument(
932        target = "grpc_server",
933        skip_all,
934        err,
935        fields(
936            nickname = self.state.nickname(),
937            chain_id = ?request.get_ref().chain_id()
938        )
939    )]
940    async fn handle_cross_chain_request(
941        &self,
942        request: Request<CrossChainRequest>,
943    ) -> Result<Response<()>, Status> {
944        let start = Instant::now();
945        let request = request.into_inner().try_into()?;
946        trace!(?request, "Handling cross-chain request");
947        match self.state.clone().handle_cross_chain_request(request).await {
948            Ok(actions) => {
949                Self::log_request_outcome_and_latency(start, true, "handle_cross_chain_request");
950                self.handle_network_actions(actions)
951            }
952            Err(error) => {
953                Self::log_request_outcome_and_latency(start, false, "handle_cross_chain_request");
954                let nickname = self.state.nickname();
955                error!(nickname, %error, "Failed to handle cross-chain request");
956            }
957        }
958        Ok(Response::new(()))
959    }
960}
961
962/// Types which are proxyable and expose the appropriate methods to be handled
963/// by the `GrpcProxy`
964pub trait GrpcProxyable {
965    fn chain_id(&self) -> Option<ChainId>;
966}
967
968impl GrpcProxyable for BlockProposal {
969    fn chain_id(&self) -> Option<ChainId> {
970        self.chain_id.clone()?.try_into().ok()
971    }
972}
973
974impl GrpcProxyable for LiteCertificate {
975    fn chain_id(&self) -> Option<ChainId> {
976        self.chain_id.clone()?.try_into().ok()
977    }
978}
979
980impl GrpcProxyable for api::HandleConfirmedCertificateRequest {
981    fn chain_id(&self) -> Option<ChainId> {
982        self.chain_id.clone()?.try_into().ok()
983    }
984}
985
986impl GrpcProxyable for api::HandleTimeoutCertificateRequest {
987    fn chain_id(&self) -> Option<ChainId> {
988        self.chain_id.clone()?.try_into().ok()
989    }
990}
991
992impl GrpcProxyable for api::HandleValidatedCertificateRequest {
993    fn chain_id(&self) -> Option<ChainId> {
994        self.chain_id.clone()?.try_into().ok()
995    }
996}
997
998impl GrpcProxyable for ChainInfoQuery {
999    fn chain_id(&self) -> Option<ChainId> {
1000        self.chain_id.clone()?.try_into().ok()
1001    }
1002}
1003
1004impl GrpcProxyable for PendingBlobRequest {
1005    fn chain_id(&self) -> Option<ChainId> {
1006        self.chain_id.clone()?.try_into().ok()
1007    }
1008}
1009
1010impl GrpcProxyable for HandlePendingBlobRequest {
1011    fn chain_id(&self) -> Option<ChainId> {
1012        self.chain_id.clone()?.try_into().ok()
1013    }
1014}
1015
1016impl GrpcProxyable for CrossChainRequest {
1017    fn chain_id(&self) -> Option<ChainId> {
1018        use super::api::cross_chain_request::Inner;
1019
1020        match self.inner.as_ref()? {
1021            Inner::UpdateRecipient(api::UpdateRecipient { recipient, .. })
1022            | Inner::ConfirmUpdatedRecipient(api::ConfirmUpdatedRecipient { recipient, .. }) => {
1023                recipient.clone()?.try_into().ok()
1024            }
1025        }
1026    }
1027}