linera_rpc/grpc/
server.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    net::{IpAddr, SocketAddr},
6    str::FromStr,
7    task::{Context, Poll},
8};
9
10use futures::{channel::mpsc, future::BoxFuture, FutureExt as _};
11use linera_base::{
12    data_types::Blob,
13    identifiers::ChainId,
14    time::{Duration, Instant},
15};
16use linera_core::{
17    join_set_ext::JoinSet,
18    node::NodeError,
19    worker::{NetworkActions, Notification, Reason, WorkerState},
20    JoinSetExt as _, TaskHandle,
21};
22use linera_storage::Storage;
23use tokio::sync::{broadcast::error::RecvError, oneshot};
24use tokio_util::sync::CancellationToken;
25use tonic::{transport::Channel, Request, Response, Status};
26use tower::{builder::ServiceBuilder, Layer, Service};
27use tracing::{debug, error, info, instrument, trace, warn};
28
29use super::{
30    api::{
31        self,
32        notifier_service_client::NotifierServiceClient,
33        validator_worker_client::ValidatorWorkerClient,
34        validator_worker_server::{ValidatorWorker as ValidatorWorkerRpc, ValidatorWorkerServer},
35        BlockProposal, ChainInfoQuery, ChainInfoResult, CrossChainRequest,
36        HandlePendingBlobRequest, LiteCertificate, PendingBlobRequest, PendingBlobResult,
37    },
38    pool::GrpcConnectionPool,
39    GrpcError, GRPC_MAX_MESSAGE_SIZE,
40};
41use crate::{
42    config::{CrossChainConfig, NotificationConfig, ShardId, ValidatorInternalNetworkConfig},
43    cross_chain_message_queue, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
44    HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
45};
46
47type CrossChainSender = mpsc::Sender<(linera_core::data_types::CrossChainRequest, ShardId)>;
48type NotificationSender = tokio::sync::broadcast::Sender<Notification>;
49
50#[cfg(with_metrics)]
51mod metrics {
52    use std::sync::LazyLock;
53
54    use linera_base::prometheus_util::{
55        linear_bucket_interval, register_histogram_vec, register_int_counter_vec,
56    };
57    use prometheus::{HistogramVec, IntCounterVec};
58
59    pub static SERVER_REQUEST_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
60        register_histogram_vec(
61            "server_request_latency",
62            "Server request latency",
63            &[],
64            linear_bucket_interval(1.0, 25.0, 2000.0),
65        )
66    });
67
68    pub static SERVER_REQUEST_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
69        register_int_counter_vec("server_request_count", "Server request count", &[])
70    });
71
72    pub static SERVER_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
73        register_int_counter_vec(
74            "server_request_success",
75            "Server request success",
76            &["method_name"],
77        )
78    });
79
80    pub static SERVER_REQUEST_ERROR: LazyLock<IntCounterVec> = LazyLock::new(|| {
81        register_int_counter_vec(
82            "server_request_error",
83            "Server request error",
84            &["method_name"],
85        )
86    });
87
88    pub static SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE: LazyLock<HistogramVec> =
89        LazyLock::new(|| {
90            register_histogram_vec(
91                "server_request_latency_per_request_type",
92                "Server request latency per request type",
93                &["method_name"],
94                linear_bucket_interval(1.0, 25.0, 2000.0),
95            )
96        });
97
98    pub static CROSS_CHAIN_MESSAGE_CHANNEL_FULL: LazyLock<IntCounterVec> = LazyLock::new(|| {
99        register_int_counter_vec(
100            "cross_chain_message_channel_full",
101            "Cross-chain message channel full",
102            &[],
103        )
104    });
105}
106
107#[derive(Clone)]
108pub struct GrpcServer<S>
109where
110    S: Storage,
111{
112    state: WorkerState<S>,
113    shard_id: ShardId,
114    network: ValidatorInternalNetworkConfig,
115    cross_chain_sender: CrossChainSender,
116    notification_sender: NotificationSender,
117}
118
119pub struct GrpcServerHandle {
120    handle: TaskHandle<Result<(), GrpcError>>,
121}
122
123impl GrpcServerHandle {
124    pub async fn join(self) -> Result<(), GrpcError> {
125        self.handle.await?
126    }
127}
128
129#[derive(Clone)]
130pub struct GrpcPrometheusMetricsMiddlewareLayer;
131
132#[derive(Clone)]
133pub struct GrpcPrometheusMetricsMiddlewareService<T> {
134    service: T,
135}
136
137impl<S> Layer<S> for GrpcPrometheusMetricsMiddlewareLayer {
138    type Service = GrpcPrometheusMetricsMiddlewareService<S>;
139
140    fn layer(&self, service: S) -> Self::Service {
141        GrpcPrometheusMetricsMiddlewareService { service }
142    }
143}
144
145impl<S, Req> Service<Req> for GrpcPrometheusMetricsMiddlewareService<S>
146where
147    S::Future: Send + 'static,
148    S: Service<Req> + std::marker::Send,
149{
150    type Response = S::Response;
151    type Error = S::Error;
152    type Future = BoxFuture<'static, Result<S::Response, S::Error>>;
153
154    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
155        self.service.poll_ready(cx)
156    }
157
158    fn call(&mut self, request: Req) -> Self::Future {
159        #[cfg(with_metrics)]
160        let start = Instant::now();
161        let future = self.service.call(request);
162        async move {
163            let response = future.await?;
164            #[cfg(with_metrics)]
165            {
166                metrics::SERVER_REQUEST_LATENCY
167                    .with_label_values(&[])
168                    .observe(start.elapsed().as_secs_f64() * 1000.0);
169                metrics::SERVER_REQUEST_COUNT.with_label_values(&[]).inc();
170            }
171            Ok(response)
172        }
173        .boxed()
174    }
175}
176
177impl<S> GrpcServer<S>
178where
179    S: Storage + Clone + Send + Sync + 'static,
180{
181    #[expect(clippy::too_many_arguments)]
182    pub fn spawn(
183        host: String,
184        port: u16,
185        state: WorkerState<S>,
186        shard_id: ShardId,
187        internal_network: ValidatorInternalNetworkConfig,
188        cross_chain_config: CrossChainConfig,
189        notification_config: NotificationConfig,
190        shutdown_signal: CancellationToken,
191        join_set: &mut JoinSet,
192    ) -> GrpcServerHandle {
193        info!(
194            "spawning gRPC server on {}:{} for shard {}",
195            host, port, shard_id
196        );
197
198        let (cross_chain_sender, cross_chain_receiver) =
199            mpsc::channel(cross_chain_config.queue_size);
200
201        let (notification_sender, _) =
202            tokio::sync::broadcast::channel(notification_config.notification_queue_size);
203
204        join_set.spawn_task({
205            info!(
206                nickname = state.nickname(),
207                "spawning cross-chain queries thread on {} for shard {}", host, shard_id
208            );
209            Self::forward_cross_chain_queries(
210                state.nickname().to_string(),
211                internal_network.clone(),
212                cross_chain_config.max_retries,
213                Duration::from_millis(cross_chain_config.retry_delay_ms),
214                Duration::from_millis(cross_chain_config.sender_delay_ms),
215                cross_chain_config.sender_failure_rate,
216                shard_id,
217                cross_chain_receiver,
218            )
219        });
220
221        let mut exporter_forwarded = false;
222        for proxy in &internal_network.proxies {
223            let receiver = notification_sender.subscribe();
224            join_set.spawn_task({
225                info!(
226                    nickname = state.nickname(),
227                    "spawning notifications thread on {} for shard {}", host, shard_id
228                );
229                let exporter_addresses = if exporter_forwarded {
230                    vec![]
231                } else {
232                    exporter_forwarded = true;
233                    internal_network.exporter_addresses()
234                };
235                Self::forward_notifications(
236                    state.nickname().to_string(),
237                    proxy.internal_address(&internal_network.protocol),
238                    exporter_addresses,
239                    receiver,
240                )
241            });
242        }
243
244        let (health_reporter, health_service) = tonic_health::server::health_reporter();
245
246        let grpc_server = GrpcServer {
247            state,
248            shard_id,
249            network: internal_network,
250            cross_chain_sender,
251            notification_sender,
252        };
253
254        let worker_node = ValidatorWorkerServer::new(grpc_server)
255            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
256            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
257
258        let handle = join_set.spawn_task(async move {
259            let server_address = SocketAddr::from((IpAddr::from_str(&host)?, port));
260
261            let reflection_service = tonic_reflection::server::Builder::configure()
262                .register_encoded_file_descriptor_set(crate::FILE_DESCRIPTOR_SET)
263                .build_v1()?;
264
265            health_reporter
266                .set_serving::<ValidatorWorkerServer<Self>>()
267                .await;
268
269            tonic::transport::Server::builder()
270                .layer(
271                    ServiceBuilder::new()
272                        .layer(GrpcPrometheusMetricsMiddlewareLayer)
273                        .into_inner(),
274                )
275                .add_service(health_service)
276                .add_service(reflection_service)
277                .add_service(worker_node)
278                .serve_with_shutdown(server_address, shutdown_signal.cancelled_owned())
279                .await?;
280
281            Ok(())
282        });
283
284        GrpcServerHandle { handle }
285    }
286
287    /// Continuously waits for receiver to receive a notification which is then sent to
288    /// the proxy.
289    #[instrument(skip(receiver))]
290    async fn forward_notifications(
291        nickname: String,
292        proxy_address: String,
293        exporter_addresses: Vec<String>,
294        mut receiver: tokio::sync::broadcast::Receiver<Notification>,
295    ) {
296        let channel = tonic::transport::Channel::from_shared(proxy_address.clone())
297            .expect("Proxy URI should be valid")
298            .connect_lazy();
299        let mut client = NotifierServiceClient::new(channel)
300            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
301            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
302
303        let mut exporter_clients: Vec<NotifierServiceClient<Channel>> = exporter_addresses
304            .iter()
305            .map(|address| {
306                let channel = tonic::transport::Channel::from_shared(address.clone())
307                    .expect("Exporter URI should be valid")
308                    .connect_lazy();
309                NotifierServiceClient::new(channel)
310                    .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
311                    .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
312            })
313            .collect::<Vec<_>>();
314
315        loop {
316            let notification = match receiver.recv().await {
317                Ok(notification) => notification,
318                Err(RecvError::Lagged(skipped_count)) => {
319                    warn!(
320                        nickname,
321                        skipped_count, "notification receiver lagged, messages were skipped"
322                    );
323                    continue;
324                }
325                Err(RecvError::Closed) => {
326                    warn!(
327                        nickname,
328                        "notification channel closed, exiting forwarding loop"
329                    );
330                    break;
331                }
332            };
333
334            let reason = &notification.reason;
335            let chain_id = notification.chain_id;
336            let notification: api::Notification = match notification.clone().try_into() {
337                Ok(notification) => notification,
338                Err(error) => {
339                    warn!(%error, nickname, "could not deserialize notification");
340                    continue;
341                }
342            };
343            let request = tonic::Request::new(notification.clone());
344            if let Err(error) = client.notify(request).await {
345                error!(
346                    %error,
347                    nickname,
348                    ?chain_id,
349                    ?reason,
350                    "proxy: could not send notification",
351                )
352            }
353
354            if let Reason::NewBlock { height: _, hash: _ } = reason {
355                for exporter_client in &mut exporter_clients {
356                    let request = tonic::Request::new(notification.clone());
357                    if let Err(error) = exporter_client.notify(request).await {
358                        error!(
359                            %error,
360                            nickname,
361                            ?chain_id,
362                            ?reason,
363                            "block exporter: could not send notification",
364                        )
365                    }
366                }
367            }
368        }
369    }
370
371    fn handle_network_actions(&self, actions: NetworkActions) {
372        let mut cross_chain_sender = self.cross_chain_sender.clone();
373        let notification_sender = self.notification_sender.clone();
374
375        for request in actions.cross_chain_requests {
376            let shard_id = self.network.get_shard_id(request.target_chain_id());
377            trace!(
378                source_shard_id = self.shard_id,
379                target_shard_id = shard_id,
380                "Scheduling cross-chain query",
381            );
382
383            if let Err(error) = cross_chain_sender.try_send((request, shard_id)) {
384                error!(%error, "dropping cross-chain request");
385                #[cfg(with_metrics)]
386                if error.is_full() {
387                    metrics::CROSS_CHAIN_MESSAGE_CHANNEL_FULL
388                        .with_label_values(&[])
389                        .inc();
390                }
391            }
392        }
393
394        for notification in actions.notifications {
395            trace!("Scheduling notification query");
396            if let Err(error) = notification_sender.send(notification) {
397                error!(%error, "dropping notification");
398                break;
399            }
400        }
401    }
402
403    #[instrument(skip_all, fields(nickname, %this_shard))]
404    #[expect(clippy::too_many_arguments)]
405    async fn forward_cross_chain_queries(
406        nickname: String,
407        network: ValidatorInternalNetworkConfig,
408        cross_chain_max_retries: u32,
409        cross_chain_retry_delay: Duration,
410        cross_chain_sender_delay: Duration,
411        cross_chain_sender_failure_rate: f32,
412        this_shard: ShardId,
413        receiver: mpsc::Receiver<(linera_core::data_types::CrossChainRequest, ShardId)>,
414    ) {
415        let pool = GrpcConnectionPool::default();
416        let handle_request =
417            move |shard_id: ShardId, request: linera_core::data_types::CrossChainRequest| {
418                let channel_result = pool.channel(network.shard(shard_id).http_address());
419                async move {
420                    let mut client = ValidatorWorkerClient::new(channel_result?)
421                        .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
422                        .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
423                    client
424                        .handle_cross_chain_request(Request::new(request.try_into()?))
425                        .await?;
426                    anyhow::Result::<_, anyhow::Error>::Ok(())
427                }
428            };
429        cross_chain_message_queue::forward_cross_chain_queries(
430            nickname,
431            cross_chain_max_retries,
432            cross_chain_retry_delay,
433            cross_chain_sender_delay,
434            cross_chain_sender_failure_rate,
435            this_shard,
436            receiver,
437            handle_request,
438        )
439        .await;
440    }
441
442    fn log_request_outcome_and_latency(start: Instant, success: bool, method_name: &str) {
443        #![cfg_attr(not(with_metrics), allow(unused_variables))]
444        #[cfg(with_metrics)]
445        {
446            metrics::SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE
447                .with_label_values(&[method_name])
448                .observe(start.elapsed().as_secs_f64() * 1000.0);
449            if success {
450                metrics::SERVER_REQUEST_SUCCESS
451                    .with_label_values(&[method_name])
452                    .inc();
453            } else {
454                metrics::SERVER_REQUEST_ERROR
455                    .with_label_values(&[method_name])
456                    .inc();
457            }
458        }
459    }
460
461    fn log_error(&self, error: &linera_core::worker::WorkerError, context: &str) {
462        let nickname = self.state.nickname();
463        if error.is_local() {
464            error!(nickname, %error, "{}", context);
465        } else {
466            debug!(nickname, %error, "{}", context);
467        }
468    }
469}
470
471#[tonic::async_trait]
472impl<S> ValidatorWorkerRpc for GrpcServer<S>
473where
474    S: Storage + Clone + Send + Sync + 'static,
475{
476    #[instrument(
477        target = "grpc_server",
478        skip_all,
479        err,
480        fields(
481            nickname = self.state.nickname(),
482            chain_id = ?request.get_ref().chain_id()
483        )
484    )]
485    async fn handle_block_proposal(
486        &self,
487        request: Request<BlockProposal>,
488    ) -> Result<Response<ChainInfoResult>, Status> {
489        let start = Instant::now();
490        let proposal = request.into_inner().try_into()?;
491        trace!(?proposal, "Handling block proposal");
492        Ok(Response::new(
493            match self.state.clone().handle_block_proposal(proposal).await {
494                Ok((info, actions)) => {
495                    Self::log_request_outcome_and_latency(start, true, "handle_block_proposal");
496                    self.handle_network_actions(actions);
497                    info.try_into()?
498                }
499                Err(error) => {
500                    Self::log_request_outcome_and_latency(start, false, "handle_block_proposal");
501                    self.log_error(&error, "Failed to handle block proposal");
502                    NodeError::from(error).try_into()?
503                }
504            },
505        ))
506    }
507
508    #[instrument(
509        target = "grpc_server",
510        skip_all,
511        err,
512        fields(
513            nickname = self.state.nickname(),
514            chain_id = ?request.get_ref().chain_id()
515        )
516    )]
517    async fn handle_lite_certificate(
518        &self,
519        request: Request<LiteCertificate>,
520    ) -> Result<Response<ChainInfoResult>, Status> {
521        let start = Instant::now();
522        let HandleLiteCertRequest {
523            certificate,
524            wait_for_outgoing_messages,
525        } = request.into_inner().try_into()?;
526        trace!(?certificate, "Handling lite certificate");
527        let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
528        match Box::pin(
529            self.state
530                .clone()
531                .handle_lite_certificate(certificate, sender),
532        )
533        .await
534        {
535            Ok((info, actions)) => {
536                Self::log_request_outcome_and_latency(start, true, "handle_lite_certificate");
537                self.handle_network_actions(actions);
538                if let Some(receiver) = receiver {
539                    if let Err(e) = receiver.await {
540                        error!("Failed to wait for message delivery: {e}");
541                    }
542                }
543                Ok(Response::new(info.try_into()?))
544            }
545            Err(error) => {
546                Self::log_request_outcome_and_latency(start, false, "handle_lite_certificate");
547                self.log_error(&error, "Failed to handle lite certificate");
548                Ok(Response::new(NodeError::from(error).try_into()?))
549            }
550        }
551    }
552
553    #[instrument(
554        target = "grpc_server",
555        skip_all,
556        err,
557        fields(
558            nickname = self.state.nickname(),
559            chain_id = ?request.get_ref().chain_id()
560        )
561    )]
562    async fn handle_confirmed_certificate(
563        &self,
564        request: Request<api::HandleConfirmedCertificateRequest>,
565    ) -> Result<Response<ChainInfoResult>, Status> {
566        let start = Instant::now();
567        let HandleConfirmedCertificateRequest {
568            certificate,
569            wait_for_outgoing_messages,
570        } = request.into_inner().try_into()?;
571        trace!(?certificate, "Handling certificate");
572        let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
573        match self
574            .state
575            .clone()
576            .handle_confirmed_certificate(certificate, sender)
577            .await
578        {
579            Ok((info, actions)) => {
580                Self::log_request_outcome_and_latency(start, true, "handle_confirmed_certificate");
581                self.handle_network_actions(actions);
582                if let Some(receiver) = receiver {
583                    if let Err(e) = receiver.await {
584                        error!("Failed to wait for message delivery: {e}");
585                    }
586                }
587                Ok(Response::new(info.try_into()?))
588            }
589            Err(error) => {
590                Self::log_request_outcome_and_latency(start, false, "handle_confirmed_certificate");
591                self.log_error(&error, "Failed to handle confirmed certificate");
592                Ok(Response::new(NodeError::from(error).try_into()?))
593            }
594        }
595    }
596
597    #[instrument(
598        target = "grpc_server",
599        skip_all,
600        err,
601        fields(
602            nickname = self.state.nickname(),
603            chain_id = ?request.get_ref().chain_id()
604        )
605    )]
606    async fn handle_validated_certificate(
607        &self,
608        request: Request<api::HandleValidatedCertificateRequest>,
609    ) -> Result<Response<ChainInfoResult>, Status> {
610        let start = Instant::now();
611        let HandleValidatedCertificateRequest { certificate } = request.into_inner().try_into()?;
612        trace!(?certificate, "Handling certificate");
613        match self
614            .state
615            .clone()
616            .handle_validated_certificate(certificate)
617            .await
618        {
619            Ok((info, actions)) => {
620                Self::log_request_outcome_and_latency(start, true, "handle_validated_certificate");
621                self.handle_network_actions(actions);
622                Ok(Response::new(info.try_into()?))
623            }
624            Err(error) => {
625                Self::log_request_outcome_and_latency(start, false, "handle_validated_certificate");
626                self.log_error(&error, "Failed to handle validated certificate");
627                Ok(Response::new(NodeError::from(error).try_into()?))
628            }
629        }
630    }
631
632    #[instrument(
633        target = "grpc_server",
634        skip_all,
635        err,
636        fields(
637            nickname = self.state.nickname(),
638            chain_id = ?request.get_ref().chain_id()
639        )
640    )]
641    async fn handle_timeout_certificate(
642        &self,
643        request: Request<api::HandleTimeoutCertificateRequest>,
644    ) -> Result<Response<ChainInfoResult>, Status> {
645        let start = Instant::now();
646        let HandleTimeoutCertificateRequest { certificate } = request.into_inner().try_into()?;
647        trace!(?certificate, "Handling Timeout certificate");
648        match self
649            .state
650            .clone()
651            .handle_timeout_certificate(certificate)
652            .await
653        {
654            Ok((info, _actions)) => {
655                Self::log_request_outcome_and_latency(start, true, "handle_timeout_certificate");
656                Ok(Response::new(info.try_into()?))
657            }
658            Err(error) => {
659                Self::log_request_outcome_and_latency(start, false, "handle_timeout_certificate");
660                self.log_error(&error, "Failed to handle timeout certificate");
661                Ok(Response::new(NodeError::from(error).try_into()?))
662            }
663        }
664    }
665
666    #[instrument(
667        target = "grpc_server",
668        skip_all,
669        err,
670        fields(
671            nickname = self.state.nickname(),
672            chain_id = ?request.get_ref().chain_id()
673        )
674    )]
675    async fn handle_chain_info_query(
676        &self,
677        request: Request<ChainInfoQuery>,
678    ) -> Result<Response<ChainInfoResult>, Status> {
679        let start = Instant::now();
680        let query = request.into_inner().try_into()?;
681        trace!(?query, "Handling chain info query");
682        match self.state.clone().handle_chain_info_query(query).await {
683            Ok((info, actions)) => {
684                Self::log_request_outcome_and_latency(start, true, "handle_chain_info_query");
685                self.handle_network_actions(actions);
686                Ok(Response::new(info.try_into()?))
687            }
688            Err(error) => {
689                Self::log_request_outcome_and_latency(start, false, "handle_chain_info_query");
690                self.log_error(&error, "Failed to handle chain info query");
691                Ok(Response::new(NodeError::from(error).try_into()?))
692            }
693        }
694    }
695
696    #[instrument(
697        target = "grpc_server",
698        skip_all,
699        err,
700        fields(
701            nickname = self.state.nickname(),
702            chain_id = ?request.get_ref().chain_id()
703        )
704    )]
705    async fn download_pending_blob(
706        &self,
707        request: Request<PendingBlobRequest>,
708    ) -> Result<Response<PendingBlobResult>, Status> {
709        let start = Instant::now();
710        let (chain_id, blob_id) = request.into_inner().try_into()?;
711        trace!(?chain_id, ?blob_id, "Download pending blob");
712        match self
713            .state
714            .clone()
715            .download_pending_blob(chain_id, blob_id)
716            .await
717        {
718            Ok(blob) => {
719                Self::log_request_outcome_and_latency(start, true, "download_pending_blob");
720                Ok(Response::new(blob.into_content().try_into()?))
721            }
722            Err(error) => {
723                Self::log_request_outcome_and_latency(start, false, "download_pending_blob");
724                self.log_error(&error, "Failed to download pending blob");
725                Ok(Response::new(NodeError::from(error).try_into()?))
726            }
727        }
728    }
729
730    #[instrument(
731        target = "grpc_server",
732        skip_all,
733        err,
734        fields(
735            nickname = self.state.nickname(),
736            chain_id = ?request.get_ref().chain_id
737        )
738    )]
739    async fn handle_pending_blob(
740        &self,
741        request: Request<HandlePendingBlobRequest>,
742    ) -> Result<Response<ChainInfoResult>, Status> {
743        let start = Instant::now();
744        let (chain_id, blob_content) = request.into_inner().try_into()?;
745        let blob = Blob::new(blob_content);
746        let blob_id = blob.id();
747        trace!(?chain_id, ?blob_id, "Handle pending blob");
748        match self.state.clone().handle_pending_blob(chain_id, blob).await {
749            Ok(info) => {
750                Self::log_request_outcome_and_latency(start, true, "handle_pending_blob");
751                Ok(Response::new(info.try_into()?))
752            }
753            Err(error) => {
754                Self::log_request_outcome_and_latency(start, false, "handle_pending_blob");
755                self.log_error(&error, "Failed to handle pending blob");
756                Ok(Response::new(NodeError::from(error).try_into()?))
757            }
758        }
759    }
760
761    #[instrument(
762        target = "grpc_server",
763        skip_all,
764        err,
765        fields(
766            nickname = self.state.nickname(),
767            chain_id = ?request.get_ref().chain_id()
768        )
769    )]
770    async fn handle_cross_chain_request(
771        &self,
772        request: Request<CrossChainRequest>,
773    ) -> Result<Response<()>, Status> {
774        let start = Instant::now();
775        let request = request.into_inner().try_into()?;
776        trace!(?request, "Handling cross-chain request");
777        match self.state.clone().handle_cross_chain_request(request).await {
778            Ok(actions) => {
779                Self::log_request_outcome_and_latency(start, true, "handle_cross_chain_request");
780                self.handle_network_actions(actions)
781            }
782            Err(error) => {
783                Self::log_request_outcome_and_latency(start, false, "handle_cross_chain_request");
784                let nickname = self.state.nickname();
785                error!(nickname, %error, "Failed to handle cross-chain request");
786            }
787        }
788        Ok(Response::new(()))
789    }
790}
791
792/// Types which are proxyable and expose the appropriate methods to be handled
793/// by the `GrpcProxy`
794pub trait GrpcProxyable {
795    fn chain_id(&self) -> Option<ChainId>;
796}
797
798impl GrpcProxyable for BlockProposal {
799    fn chain_id(&self) -> Option<ChainId> {
800        self.chain_id.clone()?.try_into().ok()
801    }
802}
803
804impl GrpcProxyable for LiteCertificate {
805    fn chain_id(&self) -> Option<ChainId> {
806        self.chain_id.clone()?.try_into().ok()
807    }
808}
809
810impl GrpcProxyable for api::HandleConfirmedCertificateRequest {
811    fn chain_id(&self) -> Option<ChainId> {
812        self.chain_id.clone()?.try_into().ok()
813    }
814}
815
816impl GrpcProxyable for api::HandleTimeoutCertificateRequest {
817    fn chain_id(&self) -> Option<ChainId> {
818        self.chain_id.clone()?.try_into().ok()
819    }
820}
821
822impl GrpcProxyable for api::HandleValidatedCertificateRequest {
823    fn chain_id(&self) -> Option<ChainId> {
824        self.chain_id.clone()?.try_into().ok()
825    }
826}
827
828impl GrpcProxyable for ChainInfoQuery {
829    fn chain_id(&self) -> Option<ChainId> {
830        self.chain_id.clone()?.try_into().ok()
831    }
832}
833
834impl GrpcProxyable for PendingBlobRequest {
835    fn chain_id(&self) -> Option<ChainId> {
836        self.chain_id.clone()?.try_into().ok()
837    }
838}
839
840impl GrpcProxyable for HandlePendingBlobRequest {
841    fn chain_id(&self) -> Option<ChainId> {
842        self.chain_id.clone()?.try_into().ok()
843    }
844}
845
846impl GrpcProxyable for CrossChainRequest {
847    fn chain_id(&self) -> Option<ChainId> {
848        use super::api::cross_chain_request::Inner;
849
850        match self.inner.as_ref()? {
851            Inner::UpdateRecipient(api::UpdateRecipient { recipient, .. })
852            | Inner::ConfirmUpdatedRecipient(api::ConfirmUpdatedRecipient { recipient, .. }) => {
853                recipient.clone()?.try_into().ok()
854            }
855        }
856    }
857}