linera_rpc/grpc/
server.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    net::{IpAddr, SocketAddr},
6    str::FromStr,
7    task::{Context, Poll},
8};
9
10use futures::{channel::mpsc, future::BoxFuture, FutureExt as _};
11use linera_base::{
12    data_types::Blob,
13    identifiers::ChainId,
14    time::{Duration, Instant},
15};
16use linera_core::{
17    join_set_ext::JoinSet,
18    node::NodeError,
19    worker::{NetworkActions, Notification, Reason, WorkerError, WorkerState},
20    JoinSetExt as _, TaskHandle,
21};
22use linera_storage::Storage;
23use tokio::sync::oneshot;
24use tokio_util::sync::CancellationToken;
25use tonic::{transport::Channel, Request, Response, Status};
26use tower::{builder::ServiceBuilder, Layer, Service};
27use tracing::{debug, error, info, instrument, trace, warn};
28
29use super::{
30    api::{
31        self,
32        notifier_service_client::NotifierServiceClient,
33        validator_worker_client::ValidatorWorkerClient,
34        validator_worker_server::{ValidatorWorker as ValidatorWorkerRpc, ValidatorWorkerServer},
35        BlockProposal, ChainInfoQuery, ChainInfoResult, CrossChainRequest,
36        HandlePendingBlobRequest, LiteCertificate, PendingBlobRequest, PendingBlobResult,
37    },
38    pool::GrpcConnectionPool,
39    GrpcError, GRPC_MAX_MESSAGE_SIZE,
40};
41use crate::{
42    config::{CrossChainConfig, NotificationConfig, ShardId, ValidatorInternalNetworkConfig},
43    cross_chain_message_queue, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
44    HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
45};
46
47type CrossChainSender = mpsc::Sender<(linera_core::data_types::CrossChainRequest, ShardId)>;
48type NotificationSender = tokio::sync::broadcast::Sender<Notification>;
49
50#[cfg(with_metrics)]
51mod metrics {
52    use std::sync::LazyLock;
53
54    use linera_base::prometheus_util::{
55        linear_bucket_interval, register_histogram_vec, register_int_counter_vec,
56    };
57    use prometheus::{HistogramVec, IntCounterVec};
58
59    pub static SERVER_REQUEST_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
60        register_histogram_vec(
61            "server_request_latency",
62            "Server request latency",
63            &[],
64            linear_bucket_interval(1.0, 25.0, 2000.0),
65        )
66    });
67
68    pub static SERVER_REQUEST_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
69        register_int_counter_vec("server_request_count", "Server request count", &[])
70    });
71
72    pub static SERVER_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
73        register_int_counter_vec(
74            "server_request_success",
75            "Server request success",
76            &["method_name"],
77        )
78    });
79
80    pub static SERVER_REQUEST_ERROR: LazyLock<IntCounterVec> = LazyLock::new(|| {
81        register_int_counter_vec(
82            "server_request_error",
83            "Server request error",
84            &["method_name"],
85        )
86    });
87
88    pub static SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE: LazyLock<HistogramVec> =
89        LazyLock::new(|| {
90            register_histogram_vec(
91                "server_request_latency_per_request_type",
92                "Server request latency per request type",
93                &["method_name"],
94                linear_bucket_interval(1.0, 25.0, 2000.0),
95            )
96        });
97
98    pub static CROSS_CHAIN_MESSAGE_CHANNEL_FULL: LazyLock<IntCounterVec> = LazyLock::new(|| {
99        register_int_counter_vec(
100            "cross_chain_message_channel_full",
101            "Cross-chain message channel full",
102            &[],
103        )
104    });
105}
106
107#[derive(Clone)]
108pub struct GrpcServer<S>
109where
110    S: Storage,
111{
112    state: WorkerState<S>,
113    shard_id: ShardId,
114    network: ValidatorInternalNetworkConfig,
115    cross_chain_sender: CrossChainSender,
116    notification_sender: NotificationSender,
117}
118
119pub struct GrpcServerHandle {
120    handle: TaskHandle<Result<(), GrpcError>>,
121}
122
123impl GrpcServerHandle {
124    pub async fn join(self) -> Result<(), GrpcError> {
125        self.handle.await?
126    }
127}
128
129#[derive(Clone)]
130pub struct GrpcPrometheusMetricsMiddlewareLayer;
131
132#[derive(Clone)]
133pub struct GrpcPrometheusMetricsMiddlewareService<T> {
134    service: T,
135}
136
137impl<S> Layer<S> for GrpcPrometheusMetricsMiddlewareLayer {
138    type Service = GrpcPrometheusMetricsMiddlewareService<S>;
139
140    fn layer(&self, service: S) -> Self::Service {
141        GrpcPrometheusMetricsMiddlewareService { service }
142    }
143}
144
145impl<S, Req> Service<Req> for GrpcPrometheusMetricsMiddlewareService<S>
146where
147    S::Future: Send + 'static,
148    S: Service<Req> + std::marker::Send,
149{
150    type Response = S::Response;
151    type Error = S::Error;
152    type Future = BoxFuture<'static, Result<S::Response, S::Error>>;
153
154    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
155        self.service.poll_ready(cx)
156    }
157
158    fn call(&mut self, request: Req) -> Self::Future {
159        #[cfg(with_metrics)]
160        let start = Instant::now();
161        let future = self.service.call(request);
162        async move {
163            let response = future.await?;
164            #[cfg(with_metrics)]
165            {
166                metrics::SERVER_REQUEST_LATENCY
167                    .with_label_values(&[])
168                    .observe(start.elapsed().as_secs_f64() * 1000.0);
169                metrics::SERVER_REQUEST_COUNT.with_label_values(&[]).inc();
170            }
171            Ok(response)
172        }
173        .boxed()
174    }
175}
176
177impl<S> GrpcServer<S>
178where
179    S: Storage + Clone + Send + Sync + 'static,
180{
181    #[expect(clippy::too_many_arguments)]
182    pub fn spawn(
183        host: String,
184        port: u16,
185        state: WorkerState<S>,
186        shard_id: ShardId,
187        internal_network: ValidatorInternalNetworkConfig,
188        cross_chain_config: CrossChainConfig,
189        notification_config: NotificationConfig,
190        shutdown_signal: CancellationToken,
191        join_set: &mut JoinSet,
192    ) -> GrpcServerHandle {
193        info!(
194            "spawning gRPC server on {}:{} for shard {}",
195            host, port, shard_id
196        );
197
198        let (cross_chain_sender, cross_chain_receiver) =
199            mpsc::channel(cross_chain_config.queue_size);
200
201        let (notification_sender, _) =
202            tokio::sync::broadcast::channel(notification_config.notification_queue_size);
203
204        join_set.spawn_task({
205            info!(
206                nickname = state.nickname(),
207                "spawning cross-chain queries thread on {} for shard {}", host, shard_id
208            );
209            Self::forward_cross_chain_queries(
210                state.nickname().to_string(),
211                internal_network.clone(),
212                cross_chain_config.max_retries,
213                Duration::from_millis(cross_chain_config.retry_delay_ms),
214                Duration::from_millis(cross_chain_config.sender_delay_ms),
215                cross_chain_config.sender_failure_rate,
216                shard_id,
217                cross_chain_receiver,
218            )
219        });
220
221        for proxy in &internal_network.proxies {
222            let receiver = notification_sender.subscribe();
223            join_set.spawn_task({
224                info!(
225                    nickname = state.nickname(),
226                    "spawning notifications thread on {} for shard {}", host, shard_id
227                );
228                Self::forward_notifications(
229                    state.nickname().to_string(),
230                    proxy.internal_address(&internal_network.protocol),
231                    internal_network.exporter_addresses(),
232                    receiver,
233                )
234            });
235        }
236
237        let (health_reporter, health_service) = tonic_health::server::health_reporter();
238
239        let grpc_server = GrpcServer {
240            state,
241            shard_id,
242            network: internal_network,
243            cross_chain_sender,
244            notification_sender,
245        };
246
247        let worker_node = ValidatorWorkerServer::new(grpc_server)
248            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
249            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
250
251        let handle = join_set.spawn_task(async move {
252            let server_address = SocketAddr::from((IpAddr::from_str(&host)?, port));
253
254            let reflection_service = tonic_reflection::server::Builder::configure()
255                .register_encoded_file_descriptor_set(crate::FILE_DESCRIPTOR_SET)
256                .build_v1()?;
257
258            health_reporter
259                .set_serving::<ValidatorWorkerServer<Self>>()
260                .await;
261
262            tonic::transport::Server::builder()
263                .layer(
264                    ServiceBuilder::new()
265                        .layer(GrpcPrometheusMetricsMiddlewareLayer)
266                        .into_inner(),
267                )
268                .add_service(health_service)
269                .add_service(reflection_service)
270                .add_service(worker_node)
271                .serve_with_shutdown(server_address, shutdown_signal.cancelled_owned())
272                .await?;
273
274            Ok(())
275        });
276
277        GrpcServerHandle { handle }
278    }
279
280    /// Continuously waits for receiver to receive a notification which is then sent to
281    /// the proxy.
282    #[instrument(skip(receiver))]
283    async fn forward_notifications(
284        nickname: String,
285        proxy_address: String,
286        exporter_addresses: Vec<String>,
287        mut receiver: tokio::sync::broadcast::Receiver<Notification>,
288    ) {
289        let channel = tonic::transport::Channel::from_shared(proxy_address.clone())
290            .expect("Proxy URI should be valid")
291            .connect_lazy();
292        let mut client = NotifierServiceClient::new(channel)
293            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
294            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
295
296        let mut exporter_clients: Vec<NotifierServiceClient<Channel>> = exporter_addresses
297            .iter()
298            .map(|address| {
299                let channel = tonic::transport::Channel::from_shared(address.clone())
300                    .expect("Exporter URI should be valid")
301                    .connect_lazy();
302                NotifierServiceClient::new(channel)
303                    .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
304                    .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
305            })
306            .collect::<Vec<_>>();
307
308        while let Ok(notification) = receiver.recv().await {
309            let reason = &notification.reason;
310            let notification: api::Notification = match notification.clone().try_into() {
311                Ok(notification) => notification,
312                Err(error) => {
313                    warn!(%error, nickname, "could not deserialize notification");
314                    continue;
315                }
316            };
317            let request = tonic::Request::new(notification.clone());
318            if let Err(error) = client.notify(request).await {
319                error!(
320                    %error,
321                    nickname,
322                    ?notification,
323                    "proxy: could not send notification",
324                )
325            }
326
327            if let Reason::NewBlock { height: _, hash: _ } = reason {
328                for exporter_client in &mut exporter_clients {
329                    let request = tonic::Request::new(notification.clone());
330                    if let Err(error) = exporter_client.notify(request).await {
331                        error!(
332                            %error,
333                            nickname,
334                            ?notification,
335                            "block exporter: could not send notification",
336                        )
337                    }
338                }
339            }
340        }
341    }
342
343    fn handle_network_actions(&self, actions: NetworkActions) {
344        let mut cross_chain_sender = self.cross_chain_sender.clone();
345        let notification_sender = self.notification_sender.clone();
346
347        for request in actions.cross_chain_requests {
348            let shard_id = self.network.get_shard_id(request.target_chain_id());
349            trace!(
350                source_shard_id = self.shard_id,
351                target_shard_id = shard_id,
352                "Scheduling cross-chain query",
353            );
354
355            if let Err(error) = cross_chain_sender.try_send((request, shard_id)) {
356                error!(%error, "dropping cross-chain request");
357                #[cfg(with_metrics)]
358                if error.is_full() {
359                    metrics::CROSS_CHAIN_MESSAGE_CHANNEL_FULL
360                        .with_label_values(&[])
361                        .inc();
362                }
363            }
364        }
365
366        for notification in actions.notifications {
367            trace!("Scheduling notification query");
368            if let Err(error) = notification_sender.send(notification) {
369                error!(%error, "dropping notification");
370                break;
371            }
372        }
373    }
374
375    #[instrument(skip_all, fields(nickname, %this_shard))]
376    #[expect(clippy::too_many_arguments)]
377    async fn forward_cross_chain_queries(
378        nickname: String,
379        network: ValidatorInternalNetworkConfig,
380        cross_chain_max_retries: u32,
381        cross_chain_retry_delay: Duration,
382        cross_chain_sender_delay: Duration,
383        cross_chain_sender_failure_rate: f32,
384        this_shard: ShardId,
385        receiver: mpsc::Receiver<(linera_core::data_types::CrossChainRequest, ShardId)>,
386    ) {
387        let pool = GrpcConnectionPool::default();
388        let handle_request =
389            move |shard_id: ShardId, request: linera_core::data_types::CrossChainRequest| {
390                let channel_result = pool.channel(network.shard(shard_id).http_address());
391                async move {
392                    let mut client = ValidatorWorkerClient::new(channel_result?)
393                        .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
394                        .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
395                    client
396                        .handle_cross_chain_request(Request::new(request.try_into()?))
397                        .await?;
398                    anyhow::Result::<_, anyhow::Error>::Ok(())
399                }
400            };
401        cross_chain_message_queue::forward_cross_chain_queries(
402            nickname,
403            cross_chain_max_retries,
404            cross_chain_retry_delay,
405            cross_chain_sender_delay,
406            cross_chain_sender_failure_rate,
407            this_shard,
408            receiver,
409            handle_request,
410        )
411        .await;
412    }
413
414    fn log_request_outcome_and_latency(start: Instant, success: bool, method_name: &str) {
415        #![cfg_attr(not(with_metrics), allow(unused_variables))]
416        #[cfg(with_metrics)]
417        {
418            metrics::SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE
419                .with_label_values(&[method_name])
420                .observe(start.elapsed().as_secs_f64() * 1000.0);
421            if success {
422                metrics::SERVER_REQUEST_SUCCESS
423                    .with_label_values(&[method_name])
424                    .inc();
425            } else {
426                metrics::SERVER_REQUEST_ERROR
427                    .with_label_values(&[method_name])
428                    .inc();
429            }
430        }
431    }
432}
433
434#[tonic::async_trait]
435impl<S> ValidatorWorkerRpc for GrpcServer<S>
436where
437    S: Storage + Clone + Send + Sync + 'static,
438{
439    #[instrument(
440        target = "grpc_server",
441        skip_all,
442        err,
443        fields(
444            nickname = self.state.nickname(),
445            chain_id = ?request.get_ref().chain_id()
446        )
447    )]
448    async fn handle_block_proposal(
449        &self,
450        request: Request<BlockProposal>,
451    ) -> Result<Response<ChainInfoResult>, Status> {
452        let start = Instant::now();
453        let proposal = request.into_inner().try_into()?;
454        trace!(?proposal, "Handling block proposal");
455        Ok(Response::new(
456            match self.state.clone().handle_block_proposal(proposal).await {
457                Ok((info, actions)) => {
458                    Self::log_request_outcome_and_latency(start, true, "handle_block_proposal");
459                    self.handle_network_actions(actions);
460                    info.try_into()?
461                }
462                Err(error) => {
463                    Self::log_request_outcome_and_latency(start, false, "handle_block_proposal");
464                    let nickname = self.state.nickname();
465                    warn!(nickname, %error, "Failed to handle block proposal");
466                    NodeError::from(error).try_into()?
467                }
468            },
469        ))
470    }
471
472    #[instrument(
473        target = "grpc_server",
474        skip_all,
475        err,
476        fields(
477            nickname = self.state.nickname(),
478            chain_id = ?request.get_ref().chain_id()
479        )
480    )]
481    async fn handle_lite_certificate(
482        &self,
483        request: Request<LiteCertificate>,
484    ) -> Result<Response<ChainInfoResult>, Status> {
485        let start = Instant::now();
486        let HandleLiteCertRequest {
487            certificate,
488            wait_for_outgoing_messages,
489        } = request.into_inner().try_into()?;
490        trace!(?certificate, "Handling lite certificate");
491        let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
492        match Box::pin(
493            self.state
494                .clone()
495                .handle_lite_certificate(certificate, sender),
496        )
497        .await
498        {
499            Ok((info, actions)) => {
500                Self::log_request_outcome_and_latency(start, true, "handle_lite_certificate");
501                self.handle_network_actions(actions);
502                if let Some(receiver) = receiver {
503                    if let Err(e) = receiver.await {
504                        error!("Failed to wait for message delivery: {e}");
505                    }
506                }
507                Ok(Response::new(info.try_into()?))
508            }
509            Err(error) => {
510                Self::log_request_outcome_and_latency(start, false, "handle_lite_certificate");
511                let nickname = self.state.nickname();
512                if let WorkerError::MissingCertificateValue = &error {
513                    debug!(nickname, %error, "Failed to handle lite certificate");
514                } else {
515                    error!(nickname, %error, "Failed to handle lite certificate");
516                }
517                Ok(Response::new(NodeError::from(error).try_into()?))
518            }
519        }
520    }
521
522    #[instrument(
523        target = "grpc_server",
524        skip_all,
525        err,
526        fields(
527            nickname = self.state.nickname(),
528            chain_id = ?request.get_ref().chain_id()
529        )
530    )]
531    async fn handle_confirmed_certificate(
532        &self,
533        request: Request<api::HandleConfirmedCertificateRequest>,
534    ) -> Result<Response<ChainInfoResult>, Status> {
535        let start = Instant::now();
536        let HandleConfirmedCertificateRequest {
537            certificate,
538            wait_for_outgoing_messages,
539        } = request.into_inner().try_into()?;
540        trace!(?certificate, "Handling certificate");
541        let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
542        match self
543            .state
544            .clone()
545            .handle_confirmed_certificate(certificate, sender)
546            .await
547        {
548            Ok((info, actions)) => {
549                Self::log_request_outcome_and_latency(start, true, "handle_confirmed_certificate");
550                self.handle_network_actions(actions);
551                if let Some(receiver) = receiver {
552                    if let Err(e) = receiver.await {
553                        error!("Failed to wait for message delivery: {e}");
554                    }
555                }
556                Ok(Response::new(info.try_into()?))
557            }
558            Err(error) => {
559                Self::log_request_outcome_and_latency(start, false, "handle_confirmed_certificate");
560                let nickname = self.state.nickname();
561                error!(nickname, %error, "Failed to handle confirmed certificate");
562                Ok(Response::new(NodeError::from(error).try_into()?))
563            }
564        }
565    }
566
567    #[instrument(
568        target = "grpc_server",
569        skip_all,
570        err,
571        fields(
572            nickname = self.state.nickname(),
573            chain_id = ?request.get_ref().chain_id()
574        )
575    )]
576    async fn handle_validated_certificate(
577        &self,
578        request: Request<api::HandleValidatedCertificateRequest>,
579    ) -> Result<Response<ChainInfoResult>, Status> {
580        let start = Instant::now();
581        let HandleValidatedCertificateRequest { certificate } = request.into_inner().try_into()?;
582        trace!(?certificate, "Handling certificate");
583        match self
584            .state
585            .clone()
586            .handle_validated_certificate(certificate)
587            .await
588        {
589            Ok((info, actions)) => {
590                Self::log_request_outcome_and_latency(start, true, "handle_validated_certificate");
591                self.handle_network_actions(actions);
592                Ok(Response::new(info.try_into()?))
593            }
594            Err(error) => {
595                Self::log_request_outcome_and_latency(start, false, "handle_validated_certificate");
596                let nickname = self.state.nickname();
597                error!(nickname, %error, "Failed to handle validated certificate");
598                Ok(Response::new(NodeError::from(error).try_into()?))
599            }
600        }
601    }
602
603    #[instrument(
604        target = "grpc_server",
605        skip_all,
606        err,
607        fields(
608            nickname = self.state.nickname(),
609            chain_id = ?request.get_ref().chain_id()
610        )
611    )]
612    async fn handle_timeout_certificate(
613        &self,
614        request: Request<api::HandleTimeoutCertificateRequest>,
615    ) -> Result<Response<ChainInfoResult>, Status> {
616        let start = Instant::now();
617        let HandleTimeoutCertificateRequest { certificate } = request.into_inner().try_into()?;
618        trace!(?certificate, "Handling Timeout certificate");
619        match self
620            .state
621            .clone()
622            .handle_timeout_certificate(certificate)
623            .await
624        {
625            Ok((info, _actions)) => {
626                Self::log_request_outcome_and_latency(start, true, "handle_timeout_certificate");
627                Ok(Response::new(info.try_into()?))
628            }
629            Err(error) => {
630                Self::log_request_outcome_and_latency(start, false, "handle_timeout_certificate");
631                let nickname = self.state.nickname();
632                error!(nickname, %error, "Failed to handle timeout certificate");
633                Ok(Response::new(NodeError::from(error).try_into()?))
634            }
635        }
636    }
637
638    #[instrument(
639        target = "grpc_server",
640        skip_all,
641        err,
642        fields(
643            nickname = self.state.nickname(),
644            chain_id = ?request.get_ref().chain_id()
645        )
646    )]
647    async fn handle_chain_info_query(
648        &self,
649        request: Request<ChainInfoQuery>,
650    ) -> Result<Response<ChainInfoResult>, Status> {
651        let start = Instant::now();
652        let query = request.into_inner().try_into()?;
653        trace!(?query, "Handling chain info query");
654        match self.state.clone().handle_chain_info_query(query).await {
655            Ok((info, actions)) => {
656                Self::log_request_outcome_and_latency(start, true, "handle_chain_info_query");
657                self.handle_network_actions(actions);
658                Ok(Response::new(info.try_into()?))
659            }
660            Err(error) => {
661                Self::log_request_outcome_and_latency(start, false, "handle_chain_info_query");
662                let nickname = self.state.nickname();
663                error!(nickname, %error, "Failed to handle chain info query");
664                Ok(Response::new(NodeError::from(error).try_into()?))
665            }
666        }
667    }
668
669    #[instrument(
670        target = "grpc_server",
671        skip_all,
672        err,
673        fields(
674            nickname = self.state.nickname(),
675            chain_id = ?request.get_ref().chain_id()
676        )
677    )]
678    async fn download_pending_blob(
679        &self,
680        request: Request<PendingBlobRequest>,
681    ) -> Result<Response<PendingBlobResult>, Status> {
682        let start = Instant::now();
683        let (chain_id, blob_id) = request.into_inner().try_into()?;
684        trace!(?chain_id, ?blob_id, "Download pending blob");
685        match self
686            .state
687            .clone()
688            .download_pending_blob(chain_id, blob_id)
689            .await
690        {
691            Ok(blob) => {
692                Self::log_request_outcome_and_latency(start, true, "download_pending_blob");
693                Ok(Response::new(blob.into_content().try_into()?))
694            }
695            Err(error) => {
696                Self::log_request_outcome_and_latency(start, false, "download_pending_blob");
697                let nickname = self.state.nickname();
698                error!(nickname, %error, "Failed to download pending blob");
699                Ok(Response::new(NodeError::from(error).try_into()?))
700            }
701        }
702    }
703
704    #[instrument(
705        target = "grpc_server",
706        skip_all,
707        err,
708        fields(
709            nickname = self.state.nickname(),
710            chain_id = ?request.get_ref().chain_id
711        )
712    )]
713    async fn handle_pending_blob(
714        &self,
715        request: Request<HandlePendingBlobRequest>,
716    ) -> Result<Response<ChainInfoResult>, Status> {
717        let start = Instant::now();
718        let (chain_id, blob_content) = request.into_inner().try_into()?;
719        let blob = Blob::new(blob_content);
720        let blob_id = blob.id();
721        trace!(?chain_id, ?blob_id, "Handle pending blob");
722        match self.state.clone().handle_pending_blob(chain_id, blob).await {
723            Ok(info) => {
724                Self::log_request_outcome_and_latency(start, true, "handle_pending_blob");
725                Ok(Response::new(info.try_into()?))
726            }
727            Err(error) => {
728                Self::log_request_outcome_and_latency(start, false, "handle_pending_blob");
729                let nickname = self.state.nickname();
730                error!(nickname, %error, "Failed to handle pending blob");
731                Ok(Response::new(NodeError::from(error).try_into()?))
732            }
733        }
734    }
735
736    #[instrument(
737        target = "grpc_server",
738        skip_all,
739        err,
740        fields(
741            nickname = self.state.nickname(),
742            chain_id = ?request.get_ref().chain_id()
743        )
744    )]
745    async fn handle_cross_chain_request(
746        &self,
747        request: Request<CrossChainRequest>,
748    ) -> Result<Response<()>, Status> {
749        let start = Instant::now();
750        let request = request.into_inner().try_into()?;
751        trace!(?request, "Handling cross-chain request");
752        match self.state.clone().handle_cross_chain_request(request).await {
753            Ok(actions) => {
754                Self::log_request_outcome_and_latency(start, true, "handle_cross_chain_request");
755                self.handle_network_actions(actions)
756            }
757            Err(error) => {
758                Self::log_request_outcome_and_latency(start, false, "handle_cross_chain_request");
759                let nickname = self.state.nickname();
760                error!(nickname, %error, "Failed to handle cross-chain request");
761            }
762        }
763        Ok(Response::new(()))
764    }
765}
766
767/// Types which are proxyable and expose the appropriate methods to be handled
768/// by the `GrpcProxy`
769pub trait GrpcProxyable {
770    fn chain_id(&self) -> Option<ChainId>;
771}
772
773impl GrpcProxyable for BlockProposal {
774    fn chain_id(&self) -> Option<ChainId> {
775        self.chain_id.clone()?.try_into().ok()
776    }
777}
778
779impl GrpcProxyable for LiteCertificate {
780    fn chain_id(&self) -> Option<ChainId> {
781        self.chain_id.clone()?.try_into().ok()
782    }
783}
784
785impl GrpcProxyable for api::HandleConfirmedCertificateRequest {
786    fn chain_id(&self) -> Option<ChainId> {
787        self.chain_id.clone()?.try_into().ok()
788    }
789}
790
791impl GrpcProxyable for api::HandleTimeoutCertificateRequest {
792    fn chain_id(&self) -> Option<ChainId> {
793        self.chain_id.clone()?.try_into().ok()
794    }
795}
796
797impl GrpcProxyable for api::HandleValidatedCertificateRequest {
798    fn chain_id(&self) -> Option<ChainId> {
799        self.chain_id.clone()?.try_into().ok()
800    }
801}
802
803impl GrpcProxyable for ChainInfoQuery {
804    fn chain_id(&self) -> Option<ChainId> {
805        self.chain_id.clone()?.try_into().ok()
806    }
807}
808
809impl GrpcProxyable for PendingBlobRequest {
810    fn chain_id(&self) -> Option<ChainId> {
811        self.chain_id.clone()?.try_into().ok()
812    }
813}
814
815impl GrpcProxyable for HandlePendingBlobRequest {
816    fn chain_id(&self) -> Option<ChainId> {
817        self.chain_id.clone()?.try_into().ok()
818    }
819}
820
821impl GrpcProxyable for CrossChainRequest {
822    fn chain_id(&self) -> Option<ChainId> {
823        use super::api::cross_chain_request::Inner;
824
825        match self.inner.as_ref()? {
826            Inner::UpdateRecipient(api::UpdateRecipient { recipient, .. })
827            | Inner::ConfirmUpdatedRecipient(api::ConfirmUpdatedRecipient { recipient, .. }) => {
828                recipient.clone()?.try_into().ok()
829            }
830        }
831    }
832}