linera_rpc/grpc/
server.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    net::{IpAddr, SocketAddr},
6    str::FromStr,
7    task::{Context, Poll},
8    time::{Duration, Instant},
9};
10
11use futures::{channel::mpsc, future::BoxFuture, FutureExt as _};
12use linera_base::{data_types::Blob, identifiers::ChainId};
13use linera_core::{
14    join_set_ext::JoinSet,
15    node::NodeError,
16    worker::{NetworkActions, Notification, Reason, WorkerError, WorkerState},
17    JoinSetExt as _, TaskHandle,
18};
19use linera_storage::Storage;
20use tokio::sync::oneshot;
21use tokio_util::sync::CancellationToken;
22use tonic::{transport::Channel, Request, Response, Status};
23use tower::{builder::ServiceBuilder, Layer, Service};
24use tracing::{debug, error, info, instrument, trace, warn};
25
26use super::{
27    api::{
28        self,
29        notifier_service_client::NotifierServiceClient,
30        validator_worker_client::ValidatorWorkerClient,
31        validator_worker_server::{ValidatorWorker as ValidatorWorkerRpc, ValidatorWorkerServer},
32        BlockProposal, ChainInfoQuery, ChainInfoResult, CrossChainRequest,
33        HandlePendingBlobRequest, LiteCertificate, PendingBlobRequest, PendingBlobResult,
34    },
35    pool::GrpcConnectionPool,
36    GrpcError, GRPC_MAX_MESSAGE_SIZE,
37};
38use crate::{
39    config::{CrossChainConfig, NotificationConfig, ShardId, ValidatorInternalNetworkConfig},
40    cross_chain_message_queue, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
41    HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
42};
43
44type CrossChainSender = mpsc::Sender<(linera_core::data_types::CrossChainRequest, ShardId)>;
45type NotificationSender = tokio::sync::broadcast::Sender<Notification>;
46
47#[cfg(with_metrics)]
48mod metrics {
49    use std::sync::LazyLock;
50
51    use linera_base::prometheus_util::{
52        linear_bucket_interval, register_histogram_vec, register_int_counter_vec,
53    };
54    use prometheus::{HistogramVec, IntCounterVec};
55
56    pub static SERVER_REQUEST_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
57        register_histogram_vec(
58            "server_request_latency",
59            "Server request latency",
60            &[],
61            linear_bucket_interval(1.0, 25.0, 2000.0),
62        )
63    });
64
65    pub static SERVER_REQUEST_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
66        register_int_counter_vec("server_request_count", "Server request count", &[])
67    });
68
69    pub static SERVER_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
70        register_int_counter_vec(
71            "server_request_success",
72            "Server request success",
73            &["method_name"],
74        )
75    });
76
77    pub static SERVER_REQUEST_ERROR: LazyLock<IntCounterVec> = LazyLock::new(|| {
78        register_int_counter_vec(
79            "server_request_error",
80            "Server request error",
81            &["method_name"],
82        )
83    });
84
85    pub static SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE: LazyLock<HistogramVec> =
86        LazyLock::new(|| {
87            register_histogram_vec(
88                "server_request_latency_per_request_type",
89                "Server request latency per request type",
90                &["method_name"],
91                linear_bucket_interval(1.0, 25.0, 2000.0),
92            )
93        });
94
95    pub static CROSS_CHAIN_MESSAGE_CHANNEL_FULL: LazyLock<IntCounterVec> = LazyLock::new(|| {
96        register_int_counter_vec(
97            "cross_chain_message_channel_full",
98            "Cross-chain message channel full",
99            &[],
100        )
101    });
102}
103
104#[derive(Clone)]
105pub struct GrpcServer<S>
106where
107    S: Storage,
108{
109    state: WorkerState<S>,
110    shard_id: ShardId,
111    network: ValidatorInternalNetworkConfig,
112    cross_chain_sender: CrossChainSender,
113    notification_sender: NotificationSender,
114}
115
116pub struct GrpcServerHandle {
117    handle: TaskHandle<Result<(), GrpcError>>,
118}
119
120impl GrpcServerHandle {
121    pub async fn join(self) -> Result<(), GrpcError> {
122        self.handle.await?
123    }
124}
125
126#[derive(Clone)]
127pub struct GrpcPrometheusMetricsMiddlewareLayer;
128
129#[derive(Clone)]
130pub struct GrpcPrometheusMetricsMiddlewareService<T> {
131    service: T,
132}
133
134impl<S> Layer<S> for GrpcPrometheusMetricsMiddlewareLayer {
135    type Service = GrpcPrometheusMetricsMiddlewareService<S>;
136
137    fn layer(&self, service: S) -> Self::Service {
138        GrpcPrometheusMetricsMiddlewareService { service }
139    }
140}
141
142impl<S, Req> Service<Req> for GrpcPrometheusMetricsMiddlewareService<S>
143where
144    S::Future: Send + 'static,
145    S: Service<Req> + std::marker::Send,
146{
147    type Response = S::Response;
148    type Error = S::Error;
149    type Future = BoxFuture<'static, Result<S::Response, S::Error>>;
150
151    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
152        self.service.poll_ready(cx)
153    }
154
155    fn call(&mut self, request: Req) -> Self::Future {
156        #[cfg(with_metrics)]
157        let start = Instant::now();
158        let future = self.service.call(request);
159        async move {
160            let response = future.await?;
161            #[cfg(with_metrics)]
162            {
163                metrics::SERVER_REQUEST_LATENCY
164                    .with_label_values(&[])
165                    .observe(start.elapsed().as_secs_f64() * 1000.0);
166                metrics::SERVER_REQUEST_COUNT.with_label_values(&[]).inc();
167            }
168            Ok(response)
169        }
170        .boxed()
171    }
172}
173
174impl<S> GrpcServer<S>
175where
176    S: Storage + Clone + Send + Sync + 'static,
177{
178    #[expect(clippy::too_many_arguments)]
179    pub fn spawn(
180        host: String,
181        port: u16,
182        state: WorkerState<S>,
183        shard_id: ShardId,
184        internal_network: ValidatorInternalNetworkConfig,
185        cross_chain_config: CrossChainConfig,
186        notification_config: NotificationConfig,
187        shutdown_signal: CancellationToken,
188        join_set: &mut JoinSet,
189    ) -> GrpcServerHandle {
190        info!(
191            "spawning gRPC server on {}:{} for shard {}",
192            host, port, shard_id
193        );
194
195        let (cross_chain_sender, cross_chain_receiver) =
196            mpsc::channel(cross_chain_config.queue_size);
197
198        let (notification_sender, _) =
199            tokio::sync::broadcast::channel(notification_config.notification_queue_size);
200
201        join_set.spawn_task({
202            info!(
203                nickname = state.nickname(),
204                "spawning cross-chain queries thread on {} for shard {}", host, shard_id
205            );
206            Self::forward_cross_chain_queries(
207                state.nickname().to_string(),
208                internal_network.clone(),
209                cross_chain_config.max_retries,
210                Duration::from_millis(cross_chain_config.retry_delay_ms),
211                Duration::from_millis(cross_chain_config.sender_delay_ms),
212                cross_chain_config.sender_failure_rate,
213                shard_id,
214                cross_chain_receiver,
215            )
216        });
217
218        for proxy in &internal_network.proxies {
219            let receiver = notification_sender.subscribe();
220            join_set.spawn_task({
221                info!(
222                    nickname = state.nickname(),
223                    "spawning notifications thread on {} for shard {}", host, shard_id
224                );
225                Self::forward_notifications(
226                    state.nickname().to_string(),
227                    proxy.internal_address(&internal_network.protocol),
228                    internal_network.exporter_addresses(),
229                    receiver,
230                )
231            });
232        }
233
234        let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
235
236        let grpc_server = GrpcServer {
237            state,
238            shard_id,
239            network: internal_network,
240            cross_chain_sender,
241            notification_sender,
242        };
243
244        let worker_node = ValidatorWorkerServer::new(grpc_server)
245            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
246            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
247
248        let handle = join_set.spawn_task(async move {
249            let server_address = SocketAddr::from((IpAddr::from_str(&host)?, port));
250
251            let reflection_service = tonic_reflection::server::Builder::configure()
252                .register_encoded_file_descriptor_set(crate::FILE_DESCRIPTOR_SET)
253                .build_v1()?;
254
255            health_reporter
256                .set_serving::<ValidatorWorkerServer<Self>>()
257                .await;
258
259            tonic::transport::Server::builder()
260                .layer(
261                    ServiceBuilder::new()
262                        .layer(GrpcPrometheusMetricsMiddlewareLayer)
263                        .into_inner(),
264                )
265                .add_service(health_service)
266                .add_service(reflection_service)
267                .add_service(worker_node)
268                .serve_with_shutdown(server_address, shutdown_signal.cancelled_owned())
269                .await?;
270
271            Ok(())
272        });
273
274        GrpcServerHandle { handle }
275    }
276
277    /// Continuously waits for receiver to receive a notification which is then sent to
278    /// the proxy.
279    #[instrument(skip(receiver))]
280    async fn forward_notifications(
281        nickname: String,
282        proxy_address: String,
283        exporter_addresses: Vec<String>,
284        mut receiver: tokio::sync::broadcast::Receiver<Notification>,
285    ) {
286        let channel = tonic::transport::Channel::from_shared(proxy_address.clone())
287            .expect("Proxy URI should be valid")
288            .connect_lazy();
289        let mut client = NotifierServiceClient::new(channel)
290            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
291            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
292
293        let mut exporter_clients: Vec<NotifierServiceClient<Channel>> = exporter_addresses
294            .iter()
295            .map(|address| {
296                let channel = tonic::transport::Channel::from_shared(address.clone())
297                    .expect("Exporter URI should be valid")
298                    .connect_lazy();
299                NotifierServiceClient::new(channel)
300                    .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
301                    .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
302            })
303            .collect::<Vec<_>>();
304
305        while let Ok(notification) = receiver.recv().await {
306            let reason = &notification.reason;
307            let notification: api::Notification = match notification.clone().try_into() {
308                Ok(notification) => notification,
309                Err(error) => {
310                    warn!(%error, nickname, "could not deserialize notification");
311                    continue;
312                }
313            };
314            let request = tonic::Request::new(notification.clone());
315            if let Err(error) = client.notify(request).await {
316                error!(
317                    %error,
318                    nickname,
319                    ?notification,
320                    "could not send notification",
321                )
322            }
323
324            if let Reason::NewBlock { height: _, hash: _ } = reason {
325                for exporter_client in &mut exporter_clients {
326                    let request = tonic::Request::new(notification.clone());
327                    if let Err(error) = exporter_client.notify(request).await {
328                        error!(
329                            %error,
330                            nickname,
331                            ?notification,
332                            "could not send notification",
333                        )
334                    }
335                }
336            }
337        }
338    }
339
340    fn handle_network_actions(&self, actions: NetworkActions) {
341        let mut cross_chain_sender = self.cross_chain_sender.clone();
342        let notification_sender = self.notification_sender.clone();
343
344        for request in actions.cross_chain_requests {
345            let shard_id = self.network.get_shard_id(request.target_chain_id());
346            trace!(
347                source_shard_id = self.shard_id,
348                target_shard_id = shard_id,
349                "Scheduling cross-chain query",
350            );
351
352            if let Err(error) = cross_chain_sender.try_send((request, shard_id)) {
353                error!(%error, "dropping cross-chain request");
354                #[cfg(with_metrics)]
355                if error.is_full() {
356                    metrics::CROSS_CHAIN_MESSAGE_CHANNEL_FULL
357                        .with_label_values(&[])
358                        .inc();
359                }
360            }
361        }
362
363        for notification in actions.notifications {
364            trace!("Scheduling notification query");
365            if let Err(error) = notification_sender.send(notification) {
366                error!(%error, "dropping notification");
367                break;
368            }
369        }
370    }
371
372    #[instrument(skip_all, fields(nickname, %this_shard))]
373    #[expect(clippy::too_many_arguments)]
374    async fn forward_cross_chain_queries(
375        nickname: String,
376        network: ValidatorInternalNetworkConfig,
377        cross_chain_max_retries: u32,
378        cross_chain_retry_delay: Duration,
379        cross_chain_sender_delay: Duration,
380        cross_chain_sender_failure_rate: f32,
381        this_shard: ShardId,
382        receiver: mpsc::Receiver<(linera_core::data_types::CrossChainRequest, ShardId)>,
383    ) {
384        let pool = GrpcConnectionPool::default();
385        let handle_request =
386            move |shard_id: ShardId, request: linera_core::data_types::CrossChainRequest| {
387                let channel_result = pool.channel(network.shard(shard_id).http_address());
388                async move {
389                    let mut client = ValidatorWorkerClient::new(channel_result?)
390                        .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
391                        .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
392                    client
393                        .handle_cross_chain_request(Request::new(request.try_into()?))
394                        .await?;
395                    anyhow::Result::<_, anyhow::Error>::Ok(())
396                }
397            };
398        cross_chain_message_queue::forward_cross_chain_queries(
399            nickname,
400            cross_chain_max_retries,
401            cross_chain_retry_delay,
402            cross_chain_sender_delay,
403            cross_chain_sender_failure_rate,
404            this_shard,
405            receiver,
406            handle_request,
407        )
408        .await;
409    }
410
411    fn log_request_outcome_and_latency(start: Instant, success: bool, method_name: &str) {
412        #![allow(unused_variables)]
413        #[cfg(with_metrics)]
414        {
415            metrics::SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE
416                .with_label_values(&[method_name])
417                .observe(start.elapsed().as_secs_f64() * 1000.0);
418            if success {
419                metrics::SERVER_REQUEST_SUCCESS
420                    .with_label_values(&[method_name])
421                    .inc();
422            } else {
423                metrics::SERVER_REQUEST_ERROR
424                    .with_label_values(&[method_name])
425                    .inc();
426            }
427        }
428    }
429}
430
431#[tonic::async_trait]
432impl<S> ValidatorWorkerRpc for GrpcServer<S>
433where
434    S: Storage + Clone + Send + Sync + 'static,
435{
436    #[instrument(
437        target = "grpc_server",
438        skip_all,
439        err,
440        fields(
441            nickname = self.state.nickname(),
442            chain_id = ?request.get_ref().chain_id()
443        )
444    )]
445    async fn handle_block_proposal(
446        &self,
447        request: Request<BlockProposal>,
448    ) -> Result<Response<ChainInfoResult>, Status> {
449        let start = Instant::now();
450        let proposal = request.into_inner().try_into()?;
451        trace!(?proposal, "Handling block proposal");
452        Ok(Response::new(
453            match self.state.clone().handle_block_proposal(proposal).await {
454                Ok((info, actions)) => {
455                    Self::log_request_outcome_and_latency(start, true, "handle_block_proposal");
456                    self.handle_network_actions(actions);
457                    info.try_into()?
458                }
459                Err(error) => {
460                    Self::log_request_outcome_and_latency(start, false, "handle_block_proposal");
461                    let nickname = self.state.nickname();
462                    warn!(nickname, %error, "Failed to handle block proposal");
463                    NodeError::from(error).try_into()?
464                }
465            },
466        ))
467    }
468
469    #[instrument(
470        target = "grpc_server",
471        skip_all,
472        err,
473        fields(
474            nickname = self.state.nickname(),
475            chain_id = ?request.get_ref().chain_id()
476        )
477    )]
478    async fn handle_lite_certificate(
479        &self,
480        request: Request<LiteCertificate>,
481    ) -> Result<Response<ChainInfoResult>, Status> {
482        let start = Instant::now();
483        let HandleLiteCertRequest {
484            certificate,
485            wait_for_outgoing_messages,
486        } = request.into_inner().try_into()?;
487        trace!(?certificate, "Handling lite certificate");
488        let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
489        match Box::pin(
490            self.state
491                .clone()
492                .handle_lite_certificate(certificate, sender),
493        )
494        .await
495        {
496            Ok((info, actions)) => {
497                Self::log_request_outcome_and_latency(start, true, "handle_lite_certificate");
498                self.handle_network_actions(actions);
499                if let Some(receiver) = receiver {
500                    if let Err(e) = receiver.await {
501                        error!("Failed to wait for message delivery: {e}");
502                    }
503                }
504                Ok(Response::new(info.try_into()?))
505            }
506            Err(error) => {
507                Self::log_request_outcome_and_latency(start, false, "handle_lite_certificate");
508                let nickname = self.state.nickname();
509                if let WorkerError::MissingCertificateValue = &error {
510                    debug!(nickname, %error, "Failed to handle lite certificate");
511                } else {
512                    error!(nickname, %error, "Failed to handle lite certificate");
513                }
514                Ok(Response::new(NodeError::from(error).try_into()?))
515            }
516        }
517    }
518
519    #[instrument(
520        target = "grpc_server",
521        skip_all,
522        err,
523        fields(
524            nickname = self.state.nickname(),
525            chain_id = ?request.get_ref().chain_id()
526        )
527    )]
528    async fn handle_confirmed_certificate(
529        &self,
530        request: Request<api::HandleConfirmedCertificateRequest>,
531    ) -> Result<Response<ChainInfoResult>, Status> {
532        let start = Instant::now();
533        let HandleConfirmedCertificateRequest {
534            certificate,
535            wait_for_outgoing_messages,
536        } = request.into_inner().try_into()?;
537        trace!(?certificate, "Handling certificate");
538        let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
539        match self
540            .state
541            .clone()
542            .handle_confirmed_certificate(certificate, sender)
543            .await
544        {
545            Ok((info, actions)) => {
546                Self::log_request_outcome_and_latency(start, true, "handle_confirmed_certificate");
547                self.handle_network_actions(actions);
548                if let Some(receiver) = receiver {
549                    if let Err(e) = receiver.await {
550                        error!("Failed to wait for message delivery: {e}");
551                    }
552                }
553                Ok(Response::new(info.try_into()?))
554            }
555            Err(error) => {
556                Self::log_request_outcome_and_latency(start, false, "handle_confirmed_certificate");
557                let nickname = self.state.nickname();
558                error!(nickname, %error, "Failed to handle confirmed certificate");
559                Ok(Response::new(NodeError::from(error).try_into()?))
560            }
561        }
562    }
563
564    #[instrument(
565        target = "grpc_server",
566        skip_all,
567        err,
568        fields(
569            nickname = self.state.nickname(),
570            chain_id = ?request.get_ref().chain_id()
571        )
572    )]
573    async fn handle_validated_certificate(
574        &self,
575        request: Request<api::HandleValidatedCertificateRequest>,
576    ) -> Result<Response<ChainInfoResult>, Status> {
577        let start = Instant::now();
578        let HandleValidatedCertificateRequest { certificate } = request.into_inner().try_into()?;
579        trace!(?certificate, "Handling certificate");
580        match self
581            .state
582            .clone()
583            .handle_validated_certificate(certificate)
584            .await
585        {
586            Ok((info, actions)) => {
587                Self::log_request_outcome_and_latency(start, true, "handle_validated_certificate");
588                self.handle_network_actions(actions);
589                Ok(Response::new(info.try_into()?))
590            }
591            Err(error) => {
592                Self::log_request_outcome_and_latency(start, false, "handle_validated_certificate");
593                let nickname = self.state.nickname();
594                error!(nickname, %error, "Failed to handle validated certificate");
595                Ok(Response::new(NodeError::from(error).try_into()?))
596            }
597        }
598    }
599
600    #[instrument(
601        target = "grpc_server",
602        skip_all,
603        err,
604        fields(
605            nickname = self.state.nickname(),
606            chain_id = ?request.get_ref().chain_id()
607        )
608    )]
609    async fn handle_timeout_certificate(
610        &self,
611        request: Request<api::HandleTimeoutCertificateRequest>,
612    ) -> Result<Response<ChainInfoResult>, Status> {
613        let start = Instant::now();
614        let HandleTimeoutCertificateRequest { certificate } = request.into_inner().try_into()?;
615        trace!(?certificate, "Handling Timeout certificate");
616        match self
617            .state
618            .clone()
619            .handle_timeout_certificate(certificate)
620            .await
621        {
622            Ok((info, _actions)) => {
623                Self::log_request_outcome_and_latency(start, true, "handle_timeout_certificate");
624                Ok(Response::new(info.try_into()?))
625            }
626            Err(error) => {
627                Self::log_request_outcome_and_latency(start, false, "handle_timeout_certificate");
628                let nickname = self.state.nickname();
629                error!(nickname, %error, "Failed to handle timeout certificate");
630                Ok(Response::new(NodeError::from(error).try_into()?))
631            }
632        }
633    }
634
635    #[instrument(
636        target = "grpc_server",
637        skip_all,
638        err,
639        fields(
640            nickname = self.state.nickname(),
641            chain_id = ?request.get_ref().chain_id()
642        )
643    )]
644    async fn handle_chain_info_query(
645        &self,
646        request: Request<ChainInfoQuery>,
647    ) -> Result<Response<ChainInfoResult>, Status> {
648        let start = Instant::now();
649        let query = request.into_inner().try_into()?;
650        trace!(?query, "Handling chain info query");
651        match self.state.clone().handle_chain_info_query(query).await {
652            Ok((info, actions)) => {
653                Self::log_request_outcome_and_latency(start, true, "handle_chain_info_query");
654                self.handle_network_actions(actions);
655                Ok(Response::new(info.try_into()?))
656            }
657            Err(error) => {
658                Self::log_request_outcome_and_latency(start, false, "handle_chain_info_query");
659                let nickname = self.state.nickname();
660                error!(nickname, %error, "Failed to handle chain info query");
661                Ok(Response::new(NodeError::from(error).try_into()?))
662            }
663        }
664    }
665
666    #[instrument(
667        target = "grpc_server",
668        skip_all,
669        err,
670        fields(
671            nickname = self.state.nickname(),
672            chain_id = ?request.get_ref().chain_id()
673        )
674    )]
675    async fn download_pending_blob(
676        &self,
677        request: Request<PendingBlobRequest>,
678    ) -> Result<Response<PendingBlobResult>, Status> {
679        let start = Instant::now();
680        let (chain_id, blob_id) = request.into_inner().try_into()?;
681        trace!(?chain_id, ?blob_id, "Download pending blob");
682        match self
683            .state
684            .clone()
685            .download_pending_blob(chain_id, blob_id)
686            .await
687        {
688            Ok(blob) => {
689                Self::log_request_outcome_and_latency(start, true, "download_pending_blob");
690                Ok(Response::new(blob.into_content().try_into()?))
691            }
692            Err(error) => {
693                Self::log_request_outcome_and_latency(start, false, "download_pending_blob");
694                let nickname = self.state.nickname();
695                error!(nickname, %error, "Failed to download pending blob");
696                Ok(Response::new(NodeError::from(error).try_into()?))
697            }
698        }
699    }
700
701    #[instrument(
702        target = "grpc_server",
703        skip_all,
704        err,
705        fields(
706            nickname = self.state.nickname(),
707            chain_id = ?request.get_ref().chain_id
708        )
709    )]
710    async fn handle_pending_blob(
711        &self,
712        request: Request<HandlePendingBlobRequest>,
713    ) -> Result<Response<ChainInfoResult>, Status> {
714        let start = Instant::now();
715        let (chain_id, blob_content) = request.into_inner().try_into()?;
716        let blob = Blob::new(blob_content);
717        let blob_id = blob.id();
718        trace!(?chain_id, ?blob_id, "Handle pending blob");
719        match self.state.clone().handle_pending_blob(chain_id, blob).await {
720            Ok(info) => {
721                Self::log_request_outcome_and_latency(start, true, "handle_pending_blob");
722                Ok(Response::new(info.try_into()?))
723            }
724            Err(error) => {
725                Self::log_request_outcome_and_latency(start, false, "handle_pending_blob");
726                let nickname = self.state.nickname();
727                error!(nickname, %error, "Failed to handle pending blob");
728                Ok(Response::new(NodeError::from(error).try_into()?))
729            }
730        }
731    }
732
733    #[instrument(
734        target = "grpc_server",
735        skip_all,
736        err,
737        fields(
738            nickname = self.state.nickname(),
739            chain_id = ?request.get_ref().chain_id()
740        )
741    )]
742    async fn handle_cross_chain_request(
743        &self,
744        request: Request<CrossChainRequest>,
745    ) -> Result<Response<()>, Status> {
746        let start = Instant::now();
747        let request = request.into_inner().try_into()?;
748        trace!(?request, "Handling cross-chain request");
749        match self.state.clone().handle_cross_chain_request(request).await {
750            Ok(actions) => {
751                Self::log_request_outcome_and_latency(start, true, "handle_cross_chain_request");
752                self.handle_network_actions(actions)
753            }
754            Err(error) => {
755                Self::log_request_outcome_and_latency(start, false, "handle_cross_chain_request");
756                let nickname = self.state.nickname();
757                error!(nickname, %error, "Failed to handle cross-chain request");
758            }
759        }
760        Ok(Response::new(()))
761    }
762}
763
764/// Types which are proxyable and expose the appropriate methods to be handled
765/// by the `GrpcProxy`
766pub trait GrpcProxyable {
767    fn chain_id(&self) -> Option<ChainId>;
768}
769
770impl GrpcProxyable for BlockProposal {
771    fn chain_id(&self) -> Option<ChainId> {
772        self.chain_id.clone()?.try_into().ok()
773    }
774}
775
776impl GrpcProxyable for LiteCertificate {
777    fn chain_id(&self) -> Option<ChainId> {
778        self.chain_id.clone()?.try_into().ok()
779    }
780}
781
782impl GrpcProxyable for api::HandleConfirmedCertificateRequest {
783    fn chain_id(&self) -> Option<ChainId> {
784        self.chain_id.clone()?.try_into().ok()
785    }
786}
787
788impl GrpcProxyable for api::HandleTimeoutCertificateRequest {
789    fn chain_id(&self) -> Option<ChainId> {
790        self.chain_id.clone()?.try_into().ok()
791    }
792}
793
794impl GrpcProxyable for api::HandleValidatedCertificateRequest {
795    fn chain_id(&self) -> Option<ChainId> {
796        self.chain_id.clone()?.try_into().ok()
797    }
798}
799
800impl GrpcProxyable for ChainInfoQuery {
801    fn chain_id(&self) -> Option<ChainId> {
802        self.chain_id.clone()?.try_into().ok()
803    }
804}
805
806impl GrpcProxyable for PendingBlobRequest {
807    fn chain_id(&self) -> Option<ChainId> {
808        self.chain_id.clone()?.try_into().ok()
809    }
810}
811
812impl GrpcProxyable for HandlePendingBlobRequest {
813    fn chain_id(&self) -> Option<ChainId> {
814        self.chain_id.clone()?.try_into().ok()
815    }
816}
817
818impl GrpcProxyable for CrossChainRequest {
819    fn chain_id(&self) -> Option<ChainId> {
820        use super::api::cross_chain_request::Inner;
821
822        match self.inner.as_ref()? {
823            Inner::UpdateRecipient(api::UpdateRecipient { recipient, .. })
824            | Inner::ConfirmUpdatedRecipient(api::ConfirmUpdatedRecipient { recipient, .. }) => {
825                recipient.clone()?.try_into().ok()
826            }
827        }
828    }
829}