linera_rpc/grpc/
server.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    net::{IpAddr, SocketAddr},
6    str::FromStr,
7    task::{Context, Poll},
8};
9
10use futures::{channel::mpsc, future::BoxFuture, FutureExt as _};
11use linera_base::{
12    data_types::Blob,
13    identifiers::ChainId,
14    time::{Duration, Instant},
15};
16use linera_core::{
17    join_set_ext::JoinSet,
18    node::NodeError,
19    worker::{NetworkActions, Notification, Reason, WorkerState},
20    JoinSetExt as _, TaskHandle,
21};
22use linera_storage::Storage;
23use tokio::sync::{broadcast::error::RecvError, oneshot};
24use tokio_util::sync::CancellationToken;
25use tonic::{transport::Channel, Request, Response, Status};
26use tower::{builder::ServiceBuilder, Layer, Service};
27use tracing::{debug, error, info, instrument, trace, warn};
28
29use super::{
30    api::{
31        self,
32        notifier_service_client::NotifierServiceClient,
33        validator_worker_client::ValidatorWorkerClient,
34        validator_worker_server::{ValidatorWorker as ValidatorWorkerRpc, ValidatorWorkerServer},
35        BlockProposal, ChainInfoQuery, ChainInfoResult, CrossChainRequest,
36        HandlePendingBlobRequest, LiteCertificate, PendingBlobRequest, PendingBlobResult,
37    },
38    pool::GrpcConnectionPool,
39    GrpcError, GRPC_MAX_MESSAGE_SIZE,
40};
41use crate::{
42    config::{CrossChainConfig, NotificationConfig, ShardId, ValidatorInternalNetworkConfig},
43    cross_chain_message_queue, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
44    HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
45};
46
47type CrossChainSender = mpsc::Sender<(linera_core::data_types::CrossChainRequest, ShardId)>;
48type NotificationSender = tokio::sync::broadcast::Sender<Notification>;
49
50#[cfg(with_metrics)]
51mod metrics {
52    use std::sync::LazyLock;
53
54    use linera_base::prometheus_util::{
55        linear_bucket_interval, register_histogram_vec, register_int_counter_vec,
56    };
57    use prometheus::{HistogramVec, IntCounterVec};
58
59    pub static SERVER_REQUEST_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
60        register_histogram_vec(
61            "server_request_latency",
62            "Server request latency",
63            &[],
64            linear_bucket_interval(1.0, 25.0, 2000.0),
65        )
66    });
67
68    pub static SERVER_REQUEST_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
69        register_int_counter_vec("server_request_count", "Server request count", &[])
70    });
71
72    pub static SERVER_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
73        register_int_counter_vec(
74            "server_request_success",
75            "Server request success",
76            &["method_name"],
77        )
78    });
79
80    pub static SERVER_REQUEST_ERROR: LazyLock<IntCounterVec> = LazyLock::new(|| {
81        register_int_counter_vec(
82            "server_request_error",
83            "Server request error",
84            &["method_name"],
85        )
86    });
87
88    pub static SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE: LazyLock<HistogramVec> =
89        LazyLock::new(|| {
90            register_histogram_vec(
91                "server_request_latency_per_request_type",
92                "Server request latency per request type",
93                &["method_name"],
94                linear_bucket_interval(1.0, 25.0, 2000.0),
95            )
96        });
97
98    pub static CROSS_CHAIN_MESSAGE_CHANNEL_FULL: LazyLock<IntCounterVec> = LazyLock::new(|| {
99        register_int_counter_vec(
100            "cross_chain_message_channel_full",
101            "Cross-chain message channel full",
102            &[],
103        )
104    });
105}
106
107#[derive(Clone)]
108pub struct GrpcServer<S>
109where
110    S: Storage,
111{
112    state: WorkerState<S>,
113    shard_id: ShardId,
114    network: ValidatorInternalNetworkConfig,
115    cross_chain_sender: CrossChainSender,
116    notification_sender: NotificationSender,
117}
118
119pub struct GrpcServerHandle {
120    handle: TaskHandle<Result<(), GrpcError>>,
121}
122
123impl GrpcServerHandle {
124    pub async fn join(self) -> Result<(), GrpcError> {
125        self.handle.await?
126    }
127}
128
129#[derive(Clone)]
130pub struct GrpcPrometheusMetricsMiddlewareLayer;
131
132#[derive(Clone)]
133pub struct GrpcPrometheusMetricsMiddlewareService<T> {
134    service: T,
135}
136
137impl<S> Layer<S> for GrpcPrometheusMetricsMiddlewareLayer {
138    type Service = GrpcPrometheusMetricsMiddlewareService<S>;
139
140    fn layer(&self, service: S) -> Self::Service {
141        GrpcPrometheusMetricsMiddlewareService { service }
142    }
143}
144
145impl<S, Req> Service<Req> for GrpcPrometheusMetricsMiddlewareService<S>
146where
147    S::Future: Send + 'static,
148    S: Service<Req> + std::marker::Send,
149{
150    type Response = S::Response;
151    type Error = S::Error;
152    type Future = BoxFuture<'static, Result<S::Response, S::Error>>;
153
154    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
155        self.service.poll_ready(cx)
156    }
157
158    fn call(&mut self, request: Req) -> Self::Future {
159        #[cfg(with_metrics)]
160        let start = Instant::now();
161        let future = self.service.call(request);
162        async move {
163            let response = future.await?;
164            #[cfg(with_metrics)]
165            {
166                metrics::SERVER_REQUEST_LATENCY
167                    .with_label_values(&[])
168                    .observe(start.elapsed().as_secs_f64() * 1000.0);
169                metrics::SERVER_REQUEST_COUNT.with_label_values(&[]).inc();
170            }
171            Ok(response)
172        }
173        .boxed()
174    }
175}
176
177impl<S> GrpcServer<S>
178where
179    S: Storage + Clone + Send + Sync + 'static,
180{
181    #[expect(clippy::too_many_arguments)]
182    pub fn spawn(
183        host: String,
184        port: u16,
185        state: WorkerState<S>,
186        shard_id: ShardId,
187        internal_network: ValidatorInternalNetworkConfig,
188        cross_chain_config: CrossChainConfig,
189        notification_config: NotificationConfig,
190        shutdown_signal: CancellationToken,
191        join_set: &mut JoinSet,
192    ) -> GrpcServerHandle {
193        info!(
194            "spawning gRPC server on {}:{} for shard {}",
195            host, port, shard_id
196        );
197
198        let (cross_chain_sender, cross_chain_receiver) =
199            mpsc::channel(cross_chain_config.queue_size);
200
201        let (notification_sender, _) =
202            tokio::sync::broadcast::channel(notification_config.notification_queue_size);
203
204        join_set.spawn_task({
205            info!(
206                nickname = state.nickname(),
207                "spawning cross-chain queries thread on {} for shard {}", host, shard_id
208            );
209            Self::forward_cross_chain_queries(
210                state.nickname().to_string(),
211                internal_network.clone(),
212                cross_chain_config.max_retries,
213                Duration::from_millis(cross_chain_config.retry_delay_ms),
214                Duration::from_millis(cross_chain_config.sender_delay_ms),
215                cross_chain_config.sender_failure_rate,
216                shard_id,
217                cross_chain_receiver,
218            )
219        });
220
221        let mut exporter_forwarded = false;
222        for proxy in &internal_network.proxies {
223            let receiver = notification_sender.subscribe();
224            join_set.spawn_task({
225                info!(
226                    nickname = state.nickname(),
227                    "spawning notifications thread on {} for shard {}", host, shard_id
228                );
229                let exporter_addresses = if exporter_forwarded {
230                    vec![]
231                } else {
232                    exporter_forwarded = true;
233                    internal_network.exporter_addresses()
234                };
235                Self::forward_notifications(
236                    state.nickname().to_string(),
237                    proxy.internal_address(&internal_network.protocol),
238                    exporter_addresses,
239                    receiver,
240                )
241            });
242        }
243
244        let (health_reporter, health_service) = tonic_health::server::health_reporter();
245
246        let grpc_server = GrpcServer {
247            state,
248            shard_id,
249            network: internal_network,
250            cross_chain_sender,
251            notification_sender,
252        };
253
254        let worker_node = ValidatorWorkerServer::new(grpc_server)
255            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
256            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
257
258        let handle = join_set.spawn_task(async move {
259            let server_address = SocketAddr::from((IpAddr::from_str(&host)?, port));
260
261            let reflection_service = tonic_reflection::server::Builder::configure()
262                .register_encoded_file_descriptor_set(crate::FILE_DESCRIPTOR_SET)
263                .build_v1()?;
264
265            health_reporter
266                .set_serving::<ValidatorWorkerServer<Self>>()
267                .await;
268
269            tonic::transport::Server::builder()
270                .layer(
271                    ServiceBuilder::new()
272                        .layer(GrpcPrometheusMetricsMiddlewareLayer)
273                        .into_inner(),
274                )
275                .add_service(health_service)
276                .add_service(reflection_service)
277                .add_service(worker_node)
278                .serve_with_shutdown(server_address, shutdown_signal.cancelled_owned())
279                .await?;
280
281            Ok(())
282        });
283
284        GrpcServerHandle { handle }
285    }
286
287    /// Continuously waits for receiver to receive a notification which is then sent to
288    /// the proxy.
289    #[instrument(skip(receiver))]
290    async fn forward_notifications(
291        nickname: String,
292        proxy_address: String,
293        exporter_addresses: Vec<String>,
294        mut receiver: tokio::sync::broadcast::Receiver<Notification>,
295    ) {
296        let channel = tonic::transport::Channel::from_shared(proxy_address.clone())
297            .expect("Proxy URI should be valid")
298            .connect_lazy();
299        let mut client = NotifierServiceClient::new(channel)
300            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
301            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
302
303        let mut exporter_clients: Vec<NotifierServiceClient<Channel>> = exporter_addresses
304            .iter()
305            .map(|address| {
306                let channel = tonic::transport::Channel::from_shared(address.clone())
307                    .expect("Exporter URI should be valid")
308                    .connect_lazy();
309                NotifierServiceClient::new(channel)
310                    .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
311                    .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
312            })
313            .collect::<Vec<_>>();
314
315        loop {
316            let notification = match receiver.recv().await {
317                Ok(notification) => notification,
318                Err(RecvError::Lagged(skipped_count)) => {
319                    warn!(
320                        nickname,
321                        skipped_count, "notification receiver lagged, messages were skipped"
322                    );
323                    continue;
324                }
325                Err(RecvError::Closed) => {
326                    warn!(
327                        nickname,
328                        "notification channel closed, exiting forwarding loop"
329                    );
330                    break;
331                }
332            };
333
334            let reason = &notification.reason;
335            let notification: api::Notification = match notification.clone().try_into() {
336                Ok(notification) => notification,
337                Err(error) => {
338                    warn!(%error, nickname, "could not deserialize notification");
339                    continue;
340                }
341            };
342            let request = tonic::Request::new(notification.clone());
343            if let Err(error) = client.notify(request).await {
344                error!(
345                    %error,
346                    nickname,
347                    ?notification,
348                    "proxy: could not send notification",
349                )
350            }
351
352            if let Reason::NewBlock { height: _, hash: _ } = reason {
353                for exporter_client in &mut exporter_clients {
354                    let request = tonic::Request::new(notification.clone());
355                    if let Err(error) = exporter_client.notify(request).await {
356                        error!(
357                            %error,
358                            nickname,
359                            ?notification,
360                            "block exporter: could not send notification",
361                        )
362                    }
363                }
364            }
365        }
366    }
367
368    fn handle_network_actions(&self, actions: NetworkActions) {
369        let mut cross_chain_sender = self.cross_chain_sender.clone();
370        let notification_sender = self.notification_sender.clone();
371
372        for request in actions.cross_chain_requests {
373            let shard_id = self.network.get_shard_id(request.target_chain_id());
374            trace!(
375                source_shard_id = self.shard_id,
376                target_shard_id = shard_id,
377                "Scheduling cross-chain query",
378            );
379
380            if let Err(error) = cross_chain_sender.try_send((request, shard_id)) {
381                error!(%error, "dropping cross-chain request");
382                #[cfg(with_metrics)]
383                if error.is_full() {
384                    metrics::CROSS_CHAIN_MESSAGE_CHANNEL_FULL
385                        .with_label_values(&[])
386                        .inc();
387                }
388            }
389        }
390
391        for notification in actions.notifications {
392            trace!("Scheduling notification query");
393            if let Err(error) = notification_sender.send(notification) {
394                error!(%error, "dropping notification");
395                break;
396            }
397        }
398    }
399
400    #[instrument(skip_all, fields(nickname, %this_shard))]
401    #[expect(clippy::too_many_arguments)]
402    async fn forward_cross_chain_queries(
403        nickname: String,
404        network: ValidatorInternalNetworkConfig,
405        cross_chain_max_retries: u32,
406        cross_chain_retry_delay: Duration,
407        cross_chain_sender_delay: Duration,
408        cross_chain_sender_failure_rate: f32,
409        this_shard: ShardId,
410        receiver: mpsc::Receiver<(linera_core::data_types::CrossChainRequest, ShardId)>,
411    ) {
412        let pool = GrpcConnectionPool::default();
413        let handle_request =
414            move |shard_id: ShardId, request: linera_core::data_types::CrossChainRequest| {
415                let channel_result = pool.channel(network.shard(shard_id).http_address());
416                async move {
417                    let mut client = ValidatorWorkerClient::new(channel_result?)
418                        .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
419                        .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
420                    client
421                        .handle_cross_chain_request(Request::new(request.try_into()?))
422                        .await?;
423                    anyhow::Result::<_, anyhow::Error>::Ok(())
424                }
425            };
426        cross_chain_message_queue::forward_cross_chain_queries(
427            nickname,
428            cross_chain_max_retries,
429            cross_chain_retry_delay,
430            cross_chain_sender_delay,
431            cross_chain_sender_failure_rate,
432            this_shard,
433            receiver,
434            handle_request,
435        )
436        .await;
437    }
438
439    fn log_request_outcome_and_latency(start: Instant, success: bool, method_name: &str) {
440        #![cfg_attr(not(with_metrics), allow(unused_variables))]
441        #[cfg(with_metrics)]
442        {
443            metrics::SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE
444                .with_label_values(&[method_name])
445                .observe(start.elapsed().as_secs_f64() * 1000.0);
446            if success {
447                metrics::SERVER_REQUEST_SUCCESS
448                    .with_label_values(&[method_name])
449                    .inc();
450            } else {
451                metrics::SERVER_REQUEST_ERROR
452                    .with_label_values(&[method_name])
453                    .inc();
454            }
455        }
456    }
457
458    fn log_error(&self, error: &linera_core::worker::WorkerError, context: &str) {
459        let nickname = self.state.nickname();
460        if error.is_local() {
461            error!(nickname, %error, "{}", context);
462        } else {
463            debug!(nickname, %error, "{}", context);
464        }
465    }
466}
467
468#[tonic::async_trait]
469impl<S> ValidatorWorkerRpc for GrpcServer<S>
470where
471    S: Storage + Clone + Send + Sync + 'static,
472{
473    #[instrument(
474        target = "grpc_server",
475        skip_all,
476        err,
477        fields(
478            nickname = self.state.nickname(),
479            chain_id = ?request.get_ref().chain_id()
480        )
481    )]
482    async fn handle_block_proposal(
483        &self,
484        request: Request<BlockProposal>,
485    ) -> Result<Response<ChainInfoResult>, Status> {
486        let start = Instant::now();
487        let proposal = request.into_inner().try_into()?;
488        trace!(?proposal, "Handling block proposal");
489        Ok(Response::new(
490            match self.state.clone().handle_block_proposal(proposal).await {
491                Ok((info, actions)) => {
492                    Self::log_request_outcome_and_latency(start, true, "handle_block_proposal");
493                    self.handle_network_actions(actions);
494                    info.try_into()?
495                }
496                Err(error) => {
497                    Self::log_request_outcome_and_latency(start, false, "handle_block_proposal");
498                    let nickname = self.state.nickname();
499                    warn!(nickname, %error, "Failed to handle block proposal");
500                    NodeError::from(error).try_into()?
501                }
502            },
503        ))
504    }
505
506    #[instrument(
507        target = "grpc_server",
508        skip_all,
509        err,
510        fields(
511            nickname = self.state.nickname(),
512            chain_id = ?request.get_ref().chain_id()
513        )
514    )]
515    async fn handle_lite_certificate(
516        &self,
517        request: Request<LiteCertificate>,
518    ) -> Result<Response<ChainInfoResult>, Status> {
519        let start = Instant::now();
520        let HandleLiteCertRequest {
521            certificate,
522            wait_for_outgoing_messages,
523        } = request.into_inner().try_into()?;
524        trace!(?certificate, "Handling lite certificate");
525        let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
526        match Box::pin(
527            self.state
528                .clone()
529                .handle_lite_certificate(certificate, sender),
530        )
531        .await
532        {
533            Ok((info, actions)) => {
534                Self::log_request_outcome_and_latency(start, true, "handle_lite_certificate");
535                self.handle_network_actions(actions);
536                if let Some(receiver) = receiver {
537                    if let Err(e) = receiver.await {
538                        error!("Failed to wait for message delivery: {e}");
539                    }
540                }
541                Ok(Response::new(info.try_into()?))
542            }
543            Err(error) => {
544                Self::log_request_outcome_and_latency(start, false, "handle_lite_certificate");
545                self.log_error(&error, "Failed to handle lite certificate");
546                Ok(Response::new(NodeError::from(error).try_into()?))
547            }
548        }
549    }
550
551    #[instrument(
552        target = "grpc_server",
553        skip_all,
554        err,
555        fields(
556            nickname = self.state.nickname(),
557            chain_id = ?request.get_ref().chain_id()
558        )
559    )]
560    async fn handle_confirmed_certificate(
561        &self,
562        request: Request<api::HandleConfirmedCertificateRequest>,
563    ) -> Result<Response<ChainInfoResult>, Status> {
564        let start = Instant::now();
565        let HandleConfirmedCertificateRequest {
566            certificate,
567            wait_for_outgoing_messages,
568        } = request.into_inner().try_into()?;
569        trace!(?certificate, "Handling certificate");
570        let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
571        match self
572            .state
573            .clone()
574            .handle_confirmed_certificate(certificate, sender)
575            .await
576        {
577            Ok((info, actions)) => {
578                Self::log_request_outcome_and_latency(start, true, "handle_confirmed_certificate");
579                self.handle_network_actions(actions);
580                if let Some(receiver) = receiver {
581                    if let Err(e) = receiver.await {
582                        error!("Failed to wait for message delivery: {e}");
583                    }
584                }
585                Ok(Response::new(info.try_into()?))
586            }
587            Err(error) => {
588                Self::log_request_outcome_and_latency(start, false, "handle_confirmed_certificate");
589                self.log_error(&error, "Failed to handle confirmed certificate");
590                Ok(Response::new(NodeError::from(error).try_into()?))
591            }
592        }
593    }
594
595    #[instrument(
596        target = "grpc_server",
597        skip_all,
598        err,
599        fields(
600            nickname = self.state.nickname(),
601            chain_id = ?request.get_ref().chain_id()
602        )
603    )]
604    async fn handle_validated_certificate(
605        &self,
606        request: Request<api::HandleValidatedCertificateRequest>,
607    ) -> Result<Response<ChainInfoResult>, Status> {
608        let start = Instant::now();
609        let HandleValidatedCertificateRequest { certificate } = request.into_inner().try_into()?;
610        trace!(?certificate, "Handling certificate");
611        match self
612            .state
613            .clone()
614            .handle_validated_certificate(certificate)
615            .await
616        {
617            Ok((info, actions)) => {
618                Self::log_request_outcome_and_latency(start, true, "handle_validated_certificate");
619                self.handle_network_actions(actions);
620                Ok(Response::new(info.try_into()?))
621            }
622            Err(error) => {
623                Self::log_request_outcome_and_latency(start, false, "handle_validated_certificate");
624                self.log_error(&error, "Failed to handle validated certificate");
625                Ok(Response::new(NodeError::from(error).try_into()?))
626            }
627        }
628    }
629
630    #[instrument(
631        target = "grpc_server",
632        skip_all,
633        err,
634        fields(
635            nickname = self.state.nickname(),
636            chain_id = ?request.get_ref().chain_id()
637        )
638    )]
639    async fn handle_timeout_certificate(
640        &self,
641        request: Request<api::HandleTimeoutCertificateRequest>,
642    ) -> Result<Response<ChainInfoResult>, Status> {
643        let start = Instant::now();
644        let HandleTimeoutCertificateRequest { certificate } = request.into_inner().try_into()?;
645        trace!(?certificate, "Handling Timeout certificate");
646        match self
647            .state
648            .clone()
649            .handle_timeout_certificate(certificate)
650            .await
651        {
652            Ok((info, _actions)) => {
653                Self::log_request_outcome_and_latency(start, true, "handle_timeout_certificate");
654                Ok(Response::new(info.try_into()?))
655            }
656            Err(error) => {
657                Self::log_request_outcome_and_latency(start, false, "handle_timeout_certificate");
658                self.log_error(&error, "Failed to handle timeout certificate");
659                Ok(Response::new(NodeError::from(error).try_into()?))
660            }
661        }
662    }
663
664    #[instrument(
665        target = "grpc_server",
666        skip_all,
667        err,
668        fields(
669            nickname = self.state.nickname(),
670            chain_id = ?request.get_ref().chain_id()
671        )
672    )]
673    async fn handle_chain_info_query(
674        &self,
675        request: Request<ChainInfoQuery>,
676    ) -> Result<Response<ChainInfoResult>, Status> {
677        let start = Instant::now();
678        let query = request.into_inner().try_into()?;
679        trace!(?query, "Handling chain info query");
680        match self.state.clone().handle_chain_info_query(query).await {
681            Ok((info, actions)) => {
682                Self::log_request_outcome_and_latency(start, true, "handle_chain_info_query");
683                self.handle_network_actions(actions);
684                Ok(Response::new(info.try_into()?))
685            }
686            Err(error) => {
687                Self::log_request_outcome_and_latency(start, false, "handle_chain_info_query");
688                self.log_error(&error, "Failed to handle chain info query");
689                Ok(Response::new(NodeError::from(error).try_into()?))
690            }
691        }
692    }
693
694    #[instrument(
695        target = "grpc_server",
696        skip_all,
697        err,
698        fields(
699            nickname = self.state.nickname(),
700            chain_id = ?request.get_ref().chain_id()
701        )
702    )]
703    async fn download_pending_blob(
704        &self,
705        request: Request<PendingBlobRequest>,
706    ) -> Result<Response<PendingBlobResult>, Status> {
707        let start = Instant::now();
708        let (chain_id, blob_id) = request.into_inner().try_into()?;
709        trace!(?chain_id, ?blob_id, "Download pending blob");
710        match self
711            .state
712            .clone()
713            .download_pending_blob(chain_id, blob_id)
714            .await
715        {
716            Ok(blob) => {
717                Self::log_request_outcome_and_latency(start, true, "download_pending_blob");
718                Ok(Response::new(blob.into_content().try_into()?))
719            }
720            Err(error) => {
721                Self::log_request_outcome_and_latency(start, false, "download_pending_blob");
722                self.log_error(&error, "Failed to download pending blob");
723                Ok(Response::new(NodeError::from(error).try_into()?))
724            }
725        }
726    }
727
728    #[instrument(
729        target = "grpc_server",
730        skip_all,
731        err,
732        fields(
733            nickname = self.state.nickname(),
734            chain_id = ?request.get_ref().chain_id
735        )
736    )]
737    async fn handle_pending_blob(
738        &self,
739        request: Request<HandlePendingBlobRequest>,
740    ) -> Result<Response<ChainInfoResult>, Status> {
741        let start = Instant::now();
742        let (chain_id, blob_content) = request.into_inner().try_into()?;
743        let blob = Blob::new(blob_content);
744        let blob_id = blob.id();
745        trace!(?chain_id, ?blob_id, "Handle pending blob");
746        match self.state.clone().handle_pending_blob(chain_id, blob).await {
747            Ok(info) => {
748                Self::log_request_outcome_and_latency(start, true, "handle_pending_blob");
749                Ok(Response::new(info.try_into()?))
750            }
751            Err(error) => {
752                Self::log_request_outcome_and_latency(start, false, "handle_pending_blob");
753                self.log_error(&error, "Failed to handle pending blob");
754                Ok(Response::new(NodeError::from(error).try_into()?))
755            }
756        }
757    }
758
759    #[instrument(
760        target = "grpc_server",
761        skip_all,
762        err,
763        fields(
764            nickname = self.state.nickname(),
765            chain_id = ?request.get_ref().chain_id()
766        )
767    )]
768    async fn handle_cross_chain_request(
769        &self,
770        request: Request<CrossChainRequest>,
771    ) -> Result<Response<()>, Status> {
772        let start = Instant::now();
773        let request = request.into_inner().try_into()?;
774        trace!(?request, "Handling cross-chain request");
775        match self.state.clone().handle_cross_chain_request(request).await {
776            Ok(actions) => {
777                Self::log_request_outcome_and_latency(start, true, "handle_cross_chain_request");
778                self.handle_network_actions(actions)
779            }
780            Err(error) => {
781                Self::log_request_outcome_and_latency(start, false, "handle_cross_chain_request");
782                let nickname = self.state.nickname();
783                error!(nickname, %error, "Failed to handle cross-chain request");
784            }
785        }
786        Ok(Response::new(()))
787    }
788}
789
790/// Types which are proxyable and expose the appropriate methods to be handled
791/// by the `GrpcProxy`
792pub trait GrpcProxyable {
793    fn chain_id(&self) -> Option<ChainId>;
794}
795
796impl GrpcProxyable for BlockProposal {
797    fn chain_id(&self) -> Option<ChainId> {
798        self.chain_id.clone()?.try_into().ok()
799    }
800}
801
802impl GrpcProxyable for LiteCertificate {
803    fn chain_id(&self) -> Option<ChainId> {
804        self.chain_id.clone()?.try_into().ok()
805    }
806}
807
808impl GrpcProxyable for api::HandleConfirmedCertificateRequest {
809    fn chain_id(&self) -> Option<ChainId> {
810        self.chain_id.clone()?.try_into().ok()
811    }
812}
813
814impl GrpcProxyable for api::HandleTimeoutCertificateRequest {
815    fn chain_id(&self) -> Option<ChainId> {
816        self.chain_id.clone()?.try_into().ok()
817    }
818}
819
820impl GrpcProxyable for api::HandleValidatedCertificateRequest {
821    fn chain_id(&self) -> Option<ChainId> {
822        self.chain_id.clone()?.try_into().ok()
823    }
824}
825
826impl GrpcProxyable for ChainInfoQuery {
827    fn chain_id(&self) -> Option<ChainId> {
828        self.chain_id.clone()?.try_into().ok()
829    }
830}
831
832impl GrpcProxyable for PendingBlobRequest {
833    fn chain_id(&self) -> Option<ChainId> {
834        self.chain_id.clone()?.try_into().ok()
835    }
836}
837
838impl GrpcProxyable for HandlePendingBlobRequest {
839    fn chain_id(&self) -> Option<ChainId> {
840        self.chain_id.clone()?.try_into().ok()
841    }
842}
843
844impl GrpcProxyable for CrossChainRequest {
845    fn chain_id(&self) -> Option<ChainId> {
846        use super::api::cross_chain_request::Inner;
847
848        match self.inner.as_ref()? {
849            Inner::UpdateRecipient(api::UpdateRecipient { recipient, .. })
850            | Inner::ConfirmUpdatedRecipient(api::ConfirmUpdatedRecipient { recipient, .. }) => {
851                recipient.clone()?.try_into().ok()
852            }
853        }
854    }
855}