linera_rpc/grpc/
server.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    net::{IpAddr, SocketAddr},
6    str::FromStr,
7    task::{Context, Poll},
8};
9
10use futures::{
11    channel::mpsc, future::BoxFuture, stream::FuturesUnordered, FutureExt as _, StreamExt as _,
12};
13#[cfg(with_metrics)]
14use linera_base::time::Instant;
15use linera_base::{data_types::Blob, identifiers::ChainId, time::Duration};
16use linera_core::{
17    join_set_ext::JoinSet,
18    node::NodeError,
19    worker::{NetworkActions, Notification, Reason, WorkerState},
20    JoinSetExt as _, TaskHandle,
21};
22use linera_storage::Storage;
23use tokio::sync::{broadcast::error::RecvError, oneshot};
24use tokio_util::sync::CancellationToken;
25use tonic::{transport::Channel, Request, Response, Status};
26use tower::{builder::ServiceBuilder, Layer, Service};
27use tracing::{debug, error, info, instrument, trace, warn};
28
29use super::{
30    api::{
31        self,
32        notifier_service_client::NotifierServiceClient,
33        validator_worker_client::ValidatorWorkerClient,
34        validator_worker_server::{ValidatorWorker as ValidatorWorkerRpc, ValidatorWorkerServer},
35        BlockProposal, ChainInfoQuery, ChainInfoResult, CrossChainRequest,
36        HandlePendingBlobRequest, LiteCertificate, PendingBlobRequest, PendingBlobResult,
37    },
38    pool::GrpcConnectionPool,
39    GrpcError, GRPC_MAX_MESSAGE_SIZE,
40};
41#[cfg(feature = "opentelemetry")]
42use crate::propagation::get_traffic_type_from_request;
43use crate::{
44    config::{CrossChainConfig, NotificationConfig, ShardId, ValidatorInternalNetworkConfig},
45    cross_chain_message_queue, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
46    HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
47};
48
49type CrossChainSender = mpsc::Sender<(linera_core::data_types::CrossChainRequest, ShardId)>;
50type NotificationSender = tokio::sync::broadcast::Sender<Notification>;
51
52#[cfg(with_metrics)]
53mod metrics {
54    use std::sync::LazyLock;
55
56    use linera_base::prometheus_util::{
57        exponential_bucket_interval, linear_bucket_interval, register_histogram_vec,
58        register_int_counter_vec,
59    };
60    use prometheus::{HistogramVec, IntCounterVec};
61
62    use super::super::{ERROR_TYPE_LABEL, METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL};
63
64    pub static SERVER_REQUEST_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
65        register_histogram_vec(
66            "server_request_latency",
67            "Server request latency",
68            &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
69            linear_bucket_interval(1.0, 50.0, 5000.0),
70        )
71    });
72
73    pub static SERVER_REQUEST_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
74        register_int_counter_vec(
75            "server_request_count",
76            "Server request count",
77            &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
78        )
79    });
80
81    pub static SERVER_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
82        register_int_counter_vec(
83            "server_request_success",
84            "Server request success",
85            &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
86        )
87    });
88
89    pub static SERVER_REQUEST_ERROR: LazyLock<IntCounterVec> = LazyLock::new(|| {
90        register_int_counter_vec(
91            "server_request_error",
92            "Server request error",
93            &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL, ERROR_TYPE_LABEL],
94        )
95    });
96
97    pub static SERVER_REQUEST_CANCELLED: LazyLock<IntCounterVec> = LazyLock::new(|| {
98        register_int_counter_vec(
99            "server_request_cancelled",
100            "Server requests whose handler future was dropped before completion (e.g. client-side timeout / disconnect)",
101            &[METHOD_NAME_LABEL, TRAFFIC_TYPE_LABEL],
102        )
103    });
104
105    pub static CROSS_CHAIN_MESSAGE_CHANNEL_FULL: LazyLock<IntCounterVec> = LazyLock::new(|| {
106        register_int_counter_vec(
107            "cross_chain_message_channel_full",
108            "Cross-chain message channel full",
109            &[],
110        )
111    });
112
113    pub static NOTIFICATIONS_SKIPPED_RECEIVER_LAG: LazyLock<IntCounterVec> = LazyLock::new(|| {
114        register_int_counter_vec(
115            "notifications_skipped_receiver_lag",
116            "Number of notifications skipped because receiver lagged behind sender",
117            &[],
118        )
119    });
120
121    pub static NOTIFICATIONS_DROPPED_NO_RECEIVER: LazyLock<IntCounterVec> = LazyLock::new(|| {
122        register_int_counter_vec(
123            "notifications_dropped_no_receiver",
124            "Number of notifications dropped because no receiver was available",
125            &[],
126        )
127    });
128
129    pub static NOTIFICATION_BATCH_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
130        register_histogram_vec(
131            "notification_batch_size",
132            "Number of notifications per batch sent to proxy",
133            &[],
134            exponential_bucket_interval(1.0, 250.0),
135        )
136    });
137
138    pub static NOTIFICATION_BATCHES_SENT: LazyLock<IntCounterVec> = LazyLock::new(|| {
139        register_int_counter_vec(
140            "notification_batches_sent",
141            "Total notification batches sent",
142            &["status"],
143        )
144    });
145}
146
147/// Handles batched forwarding of notifications to proxy and exporters.
148struct BatchForwarder {
149    nickname: String,
150    client: NotifierServiceClient<Channel>,
151    exporter_clients: Vec<NotifierServiceClient<Channel>>,
152    pending_notifications: Vec<Notification>,
153    futures: FuturesUnordered<BoxFuture<'static, ()>>,
154    batch_limit: usize,
155    max_tasks: usize,
156}
157
158impl BatchForwarder {
159    /// Spawns batch send tasks up to max_tasks limit.
160    fn spawn_batches(&mut self) {
161        while !self.pending_notifications.is_empty() && self.futures.len() < self.max_tasks {
162            let chunk_size = std::cmp::min(self.batch_limit, self.pending_notifications.len());
163            let batch: Vec<Notification> = self.pending_notifications.drain(..chunk_size).collect();
164
165            #[cfg(with_metrics)]
166            metrics::NOTIFICATION_BATCH_SIZE
167                .with_label_values(&[])
168                .observe(batch.len() as f64);
169
170            let client = self.client.clone();
171            let exporter_clients = self.exporter_clients.clone();
172            let nickname = self.nickname.clone();
173
174            self.futures.push(
175                async move {
176                    Self::send_batch(nickname, client, exporter_clients, batch).await;
177                }
178                .boxed(),
179            );
180        }
181    }
182
183    /// Returns true if there are no pending notifications and no in-flight tasks.
184    fn is_fully_drained(&self) -> bool {
185        self.pending_notifications.is_empty() && self.futures.is_empty()
186    }
187
188    /// Sends a batch of notifications to the proxy and exporters.
189    async fn send_batch(
190        nickname: String,
191        mut client: NotifierServiceClient<Channel>,
192        mut exporter_clients: Vec<NotifierServiceClient<Channel>>,
193        batch: Vec<Notification>,
194    ) {
195        // Convert to proto notifications, logging any deserialization errors
196        let mut proto_notifications = Vec::with_capacity(batch.len());
197        for notification in &batch {
198            match notification.clone().try_into() {
199                Ok(proto) => proto_notifications.push(proto),
200                Err(error) => {
201                    warn!(
202                        %error,
203                        nickname,
204                        ?notification.chain_id,
205                        ?notification.reason,
206                        "could not deserialize notification"
207                    );
208                }
209            }
210        }
211
212        // Collect chain_ids for error logging
213        let chain_ids: Vec<_> = batch.iter().map(|n| n.chain_id).collect();
214
215        // Send batch to proxy
216        let request = Request::new(api::NotificationBatch {
217            notifications: proto_notifications.clone(),
218        });
219        let result = client.notify_batch(request).await;
220
221        #[cfg(with_metrics)]
222        {
223            let status = if result.is_ok() { "success" } else { "error" };
224            metrics::NOTIFICATION_BATCHES_SENT
225                .with_label_values(&[status])
226                .inc();
227        }
228
229        if let Err(error) = result {
230            error!(
231                %error,
232                nickname,
233                batch_size = proto_notifications.len(),
234                ?chain_ids,
235                "proxy: could not send notification batch",
236            );
237        }
238
239        // Send NewBlock notifications to exporters
240        let new_block_notifications: Vec<_> = batch
241            .iter()
242            .filter(|n| matches!(n.reason, Reason::NewBlock { .. }))
243            .collect();
244
245        let exporter_notifications: Vec<api::Notification> = new_block_notifications
246            .iter()
247            .filter_map(|n| (*n).clone().try_into().ok())
248            .collect();
249
250        if !exporter_notifications.is_empty() {
251            let exporter_chain_ids: Vec<_> =
252                new_block_notifications.iter().map(|n| n.chain_id).collect();
253
254            for exporter_client in &mut exporter_clients {
255                let request = Request::new(api::NotificationBatch {
256                    notifications: exporter_notifications.clone(),
257                });
258                if let Err(error) = exporter_client.notify_batch(request).await {
259                    error!(
260                        %error,
261                        nickname,
262                        batch_size = exporter_notifications.len(),
263                        ?exporter_chain_ids,
264                        "block exporter: could not send notification batch",
265                    );
266                }
267            }
268        }
269    }
270}
271
272#[derive(Clone)]
273pub struct GrpcServer<S>
274where
275    S: Storage,
276{
277    state: WorkerState<S>,
278    shard_id: ShardId,
279    network: ValidatorInternalNetworkConfig,
280    cross_chain_sender: CrossChainSender,
281    notification_sender: NotificationSender,
282}
283
284pub struct GrpcServerHandle {
285    handle: TaskHandle<Result<(), GrpcError>>,
286}
287
288impl GrpcServerHandle {
289    pub async fn join(self) -> Result<(), GrpcError> {
290        self.handle.await?
291    }
292}
293
294#[cfg(with_metrics)]
295struct ServerRequestCancellationGuard {
296    method_name: String,
297    traffic_type: &'static str,
298    completed: bool,
299}
300
301#[cfg(with_metrics)]
302impl Drop for ServerRequestCancellationGuard {
303    fn drop(&mut self) {
304        if !self.completed {
305            metrics::SERVER_REQUEST_CANCELLED
306                .with_label_values(&[&self.method_name, self.traffic_type])
307                .inc();
308        }
309    }
310}
311
312#[derive(Clone)]
313pub struct GrpcPrometheusMetricsMiddlewareLayer;
314
315#[derive(Clone)]
316pub struct GrpcPrometheusMetricsMiddlewareService<T> {
317    service: T,
318}
319
320impl<S> Layer<S> for GrpcPrometheusMetricsMiddlewareLayer {
321    type Service = GrpcPrometheusMetricsMiddlewareService<S>;
322
323    fn layer(&self, service: S) -> Self::Service {
324        GrpcPrometheusMetricsMiddlewareService { service }
325    }
326}
327
328impl<S, B> Service<http::Request<B>> for GrpcPrometheusMetricsMiddlewareService<S>
329where
330    S::Future: Send + 'static,
331    S: Service<http::Request<B>> + std::marker::Send,
332    B: Send + 'static,
333{
334    type Response = S::Response;
335    type Error = S::Error;
336    type Future = BoxFuture<'static, Result<S::Response, S::Error>>;
337
338    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
339        self.service.poll_ready(cx)
340    }
341
342    fn call(&mut self, request: http::Request<B>) -> Self::Future {
343        #[cfg(with_metrics)]
344        let start = Instant::now();
345
346        #[cfg(with_metrics)]
347        let method_name = super::extract_grpc_method_name(request.uri().path()).to_owned();
348
349        // Extract traffic type from request extensions (set by OtelContextLayer).
350        // When opentelemetry is enabled but no baggage is set, defaults to "organic".
351        // When opentelemetry is disabled, defaults to "unknown".
352        #[cfg(all(with_metrics, feature = "opentelemetry"))]
353        let traffic_type: &'static str = get_traffic_type_from_request(&request);
354        #[cfg(all(with_metrics, not(feature = "opentelemetry")))]
355        let traffic_type: &'static str = "unknown";
356
357        let future = self.service.call(request);
358        async move {
359            #[cfg(with_metrics)]
360            let mut cancellation_guard = ServerRequestCancellationGuard {
361                method_name,
362                traffic_type,
363                completed: false,
364            };
365            let response = future.await?;
366            #[cfg(with_metrics)]
367            {
368                cancellation_guard.completed = true;
369                metrics::SERVER_REQUEST_LATENCY
370                    .with_label_values(&[&cancellation_guard.method_name, traffic_type])
371                    .observe(start.elapsed().as_secs_f64() * 1000.0);
372                metrics::SERVER_REQUEST_COUNT
373                    .with_label_values(&[&cancellation_guard.method_name, traffic_type])
374                    .inc();
375            }
376            Ok(response)
377        }
378        .boxed()
379    }
380}
381
382impl<S> GrpcServer<S>
383where
384    S: Storage + Clone + Send + Sync + 'static,
385{
386    #[expect(clippy::too_many_arguments)]
387    pub fn spawn(
388        host: String,
389        port: u16,
390        state: WorkerState<S>,
391        shard_id: ShardId,
392        internal_network: ValidatorInternalNetworkConfig,
393        cross_chain_config: &CrossChainConfig,
394        notification_config: &NotificationConfig,
395        shutdown_signal: CancellationToken,
396        join_set: &mut JoinSet,
397    ) -> GrpcServerHandle {
398        info!(
399            "spawning gRPC server on {}:{} for shard {}",
400            host, port, shard_id
401        );
402
403        let (cross_chain_sender, cross_chain_receiver) =
404            mpsc::channel(cross_chain_config.queue_size);
405
406        // Give the worker a shard-routing sender for cross-chain requests generated
407        // outside the normal `NetworkActions` return path (specifically, the
408        // `RevertConfirm`s emitted after resetting a corrupted chain).
409        let state = {
410            let routing_network = internal_network.clone();
411            let routing_sender = cross_chain_sender.clone();
412            state.with_outbound_cross_chain_sender(std::sync::Arc::new(move |request| {
413                let shard_id = routing_network.get_shard_id(request.target_chain_id());
414                if let Err(error) = routing_sender.clone().try_send((request, shard_id)) {
415                    error!(%error, "dropping cross-chain request");
416                }
417            }))
418        };
419
420        let (notification_sender, _) =
421            tokio::sync::broadcast::channel(notification_config.notification_queue_size);
422
423        join_set.spawn_task({
424            info!(
425                nickname = state.nickname(),
426                "spawning cross-chain queries thread on {} for shard {}", host, shard_id
427            );
428            Self::forward_cross_chain_queries(
429                state.nickname().to_string(),
430                internal_network.clone(),
431                cross_chain_config.max_retries,
432                Duration::from_millis(cross_chain_config.retry_delay_ms),
433                Duration::from_millis(cross_chain_config.max_backoff_ms),
434                Duration::from_millis(cross_chain_config.sender_delay_ms),
435                cross_chain_config.sender_failure_rate,
436                shard_id,
437                cross_chain_receiver,
438            )
439        });
440
441        let mut exporter_forwarded = false;
442        for proxy in &internal_network.proxies {
443            let receiver = notification_sender.subscribe();
444            join_set.spawn_task({
445                info!(
446                    nickname = state.nickname(),
447                    "spawning notifications thread on {} for shard {}", host, shard_id
448                );
449                let exporter_addresses = if exporter_forwarded {
450                    vec![]
451                } else {
452                    exporter_forwarded = true;
453                    internal_network.exporter_addresses()
454                };
455                Self::forward_notifications(
456                    state.nickname().to_string(),
457                    proxy.internal_address(&internal_network.protocol),
458                    exporter_addresses,
459                    receiver,
460                    notification_config.clone(),
461                )
462            });
463        }
464
465        let (health_reporter, health_service) = tonic_health::server::health_reporter();
466
467        let grpc_server = GrpcServer {
468            state,
469            shard_id,
470            network: internal_network,
471            cross_chain_sender,
472            notification_sender,
473        };
474
475        let worker_node = ValidatorWorkerServer::new(grpc_server)
476            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
477            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
478
479        let handle = join_set.spawn_task(async move {
480            let server_address = SocketAddr::from((IpAddr::from_str(&host)?, port));
481
482            let reflection_service = tonic_reflection::server::Builder::configure()
483                .register_encoded_file_descriptor_set(crate::FILE_DESCRIPTOR_SET)
484                .build_v1()?;
485
486            health_reporter
487                .set_serving::<ValidatorWorkerServer<Self>>()
488                .await;
489
490            #[cfg(feature = "opentelemetry")]
491            let mut server = tonic::transport::Server::builder().layer(
492                ServiceBuilder::new()
493                    .layer(crate::propagation::OtelContextLayer)
494                    .layer(GrpcPrometheusMetricsMiddlewareLayer)
495                    .into_inner(),
496            );
497            #[cfg(not(feature = "opentelemetry"))]
498            let mut server = tonic::transport::Server::builder().layer(
499                ServiceBuilder::new()
500                    .layer(GrpcPrometheusMetricsMiddlewareLayer)
501                    .into_inner(),
502            );
503            server
504                .add_service(health_service)
505                .add_service(reflection_service)
506                .add_service(worker_node)
507                .serve_with_shutdown(server_address, shutdown_signal.cancelled_owned())
508                .await?;
509
510            Ok(())
511        });
512
513        GrpcServerHandle { handle }
514    }
515
516    /// Continuously waits for receiver to receive notifications and sends them to
517    /// the proxy in batches for improved throughput.
518    #[instrument(skip(receiver, config))]
519    async fn forward_notifications(
520        nickname: String,
521        proxy_address: String,
522        exporter_addresses: Vec<String>,
523        mut receiver: tokio::sync::broadcast::Receiver<Notification>,
524        config: NotificationConfig,
525    ) {
526        let channel = tonic::transport::Channel::from_shared(proxy_address.clone())
527            .expect("Proxy URI should be valid")
528            .connect_lazy();
529        let client = NotifierServiceClient::new(channel)
530            .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
531            .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
532
533        let exporter_clients: Vec<NotifierServiceClient<Channel>> = exporter_addresses
534            .iter()
535            .map(|address| {
536                let channel = tonic::transport::Channel::from_shared(address.clone())
537                    .expect("Exporter URI should be valid")
538                    .connect_lazy();
539                NotifierServiceClient::new(channel)
540                    .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
541                    .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
542            })
543            .collect::<Vec<_>>();
544
545        let mut forwarder = BatchForwarder {
546            nickname: nickname.clone(),
547            client,
548            exporter_clients,
549            pending_notifications: Vec::new(),
550            futures: FuturesUnordered::new(),
551            batch_limit: config.notification_batch_size,
552            max_tasks: config.notification_max_in_flight,
553        };
554
555        loop {
556            tokio::select! {
557                biased;
558
559                result = receiver.recv() => {
560                    match result {
561                        Ok(notification) => {
562                            forwarder.pending_notifications.push(notification);
563
564                            if forwarder.futures.is_empty()
565                               || (forwarder.pending_notifications.len() >= forwarder.batch_limit
566                                   && forwarder.futures.len() < forwarder.max_tasks) {
567                                forwarder.spawn_batches();
568                            }
569                        }
570                        Err(RecvError::Lagged(skipped_count)) => {
571                            warn!(
572                                nickname,
573                                skipped_count, "notification receiver lagged, messages were skipped"
574                            );
575                            #[cfg(with_metrics)]
576                            metrics::NOTIFICATIONS_SKIPPED_RECEIVER_LAG
577                                .with_label_values(&[])
578                                .inc_by(skipped_count);
579                        }
580                        Err(RecvError::Closed) => {
581                            warn!(
582                                nickname,
583                                "notification channel closed, draining pending notifications"
584                            );
585                            // Drain all pending notifications before exiting
586                            loop {
587                                forwarder.spawn_batches();
588                                if forwarder.is_fully_drained() {
589                                    break;
590                                }
591                                forwarder.futures.next().await;
592                            }
593                            break;
594                        }
595                    }
596                }
597
598                Some(()) = forwarder.futures.next() => {
599                    forwarder.spawn_batches();
600                }
601            }
602        }
603    }
604
605    fn handle_network_actions(&self, actions: NetworkActions) {
606        let mut cross_chain_sender = self.cross_chain_sender.clone();
607        let notification_sender = self.notification_sender.clone();
608
609        for request in actions.cross_chain_requests {
610            let shard_id = self.network.get_shard_id(request.target_chain_id());
611            trace!(
612                source_shard_id = self.shard_id,
613                target_shard_id = shard_id,
614                "Scheduling cross-chain query",
615            );
616
617            if let Err(error) = cross_chain_sender.try_send((request, shard_id)) {
618                error!(%error, "dropping cross-chain request");
619                #[cfg(with_metrics)]
620                if error.is_full() {
621                    metrics::CROSS_CHAIN_MESSAGE_CHANNEL_FULL
622                        .with_label_values(&[])
623                        .inc();
624                }
625            }
626        }
627
628        for notification in actions.notifications {
629            trace!("Scheduling notification query");
630            if let Err(error) = notification_sender.send(notification) {
631                error!(%error, "dropping notification");
632                #[cfg(with_metrics)]
633                metrics::NOTIFICATIONS_DROPPED_NO_RECEIVER
634                    .with_label_values(&[])
635                    .inc();
636            }
637        }
638    }
639
640    #[instrument(skip_all, fields(nickname, %this_shard))]
641    #[expect(clippy::too_many_arguments)]
642    async fn forward_cross_chain_queries(
643        nickname: String,
644        network: ValidatorInternalNetworkConfig,
645        cross_chain_max_retries: u32,
646        cross_chain_retry_delay: Duration,
647        cross_chain_max_backoff: Duration,
648        cross_chain_sender_delay: Duration,
649        cross_chain_sender_failure_rate: f32,
650        this_shard: ShardId,
651        receiver: mpsc::Receiver<(linera_core::data_types::CrossChainRequest, ShardId)>,
652    ) {
653        let pool = GrpcConnectionPool::default();
654        let handle_request =
655            move |shard_id: ShardId, request: linera_core::data_types::CrossChainRequest| {
656                let channel_result = pool.channel(network.shard(shard_id).http_address());
657                async move {
658                    let mut client = ValidatorWorkerClient::new(channel_result?)
659                        .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
660                        .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
661                    client
662                        .handle_cross_chain_request(Request::new(request.try_into()?))
663                        .await?;
664                    anyhow::Result::<_, anyhow::Error>::Ok(())
665                }
666            };
667        cross_chain_message_queue::forward_cross_chain_queries(
668            nickname,
669            cross_chain_max_retries,
670            cross_chain_retry_delay,
671            cross_chain_max_backoff,
672            cross_chain_sender_delay,
673            cross_chain_sender_failure_rate,
674            this_shard,
675            receiver,
676            handle_request,
677        )
678        .await;
679    }
680
681    fn log_request_success(method_name: &str, traffic_type: &str) {
682        #![cfg_attr(not(with_metrics), allow(unused_variables))]
683        #[cfg(with_metrics)]
684        metrics::SERVER_REQUEST_SUCCESS
685            .with_label_values(&[method_name, traffic_type])
686            .inc();
687    }
688
689    fn log_request_error(method_name: &str, traffic_type: &str, error_type: &str) {
690        #![cfg_attr(not(with_metrics), allow(unused_variables))]
691        #[cfg(with_metrics)]
692        metrics::SERVER_REQUEST_ERROR
693            .with_label_values(&[method_name, traffic_type, error_type])
694            .inc();
695    }
696
697    /// Extracts traffic type from a tonic request's extensions.
698    #[cfg(feature = "opentelemetry")]
699    fn get_traffic_type<R>(request: &Request<R>) -> &'static str {
700        get_traffic_type_from_request(request)
701    }
702
703    /// Returns "unknown" when opentelemetry feature is disabled.
704    #[cfg(not(feature = "opentelemetry"))]
705    fn get_traffic_type<R>(_request: &Request<R>) -> &'static str {
706        "unknown"
707    }
708
709    fn log_error(&self, error: &linera_core::worker::WorkerError, context: &str) {
710        let nickname = self.state.nickname();
711        if error.is_local() {
712            error!(nickname, %error, "{}", context);
713        } else {
714            debug!(nickname, %error, "{}", context);
715        }
716    }
717}
718
719#[tonic::async_trait]
720impl<S> ValidatorWorkerRpc for GrpcServer<S>
721where
722    S: Storage + Clone + Send + Sync + 'static,
723{
724    #[instrument(
725        target = "grpc_server",
726        skip_all,
727        err,
728        fields(
729            nickname = self.state.nickname(),
730            chain_id = ?request.get_ref().chain_id()
731        )
732    )]
733    async fn handle_block_proposal(
734        &self,
735        request: Request<BlockProposal>,
736    ) -> Result<Response<ChainInfoResult>, Status> {
737        let traffic_type = Self::get_traffic_type(&request);
738        let proposal = request.into_inner().try_into()?;
739        trace!(?proposal, "Handling block proposal");
740        Ok(Response::new(
741            match self.state.clone().handle_block_proposal(proposal).await {
742                Ok((info, actions)) => {
743                    Self::log_request_success("handle_block_proposal", traffic_type);
744                    self.handle_network_actions(actions);
745                    info.try_into()?
746                }
747                Err(error) => {
748                    Self::log_request_error(
749                        "handle_block_proposal",
750                        traffic_type,
751                        &error.error_type(),
752                    );
753                    self.log_error(&error, "Failed to handle block proposal");
754                    NodeError::from(error).try_into()?
755                }
756            },
757        ))
758    }
759
760    #[instrument(
761        target = "grpc_server",
762        skip_all,
763        err,
764        fields(
765            nickname = self.state.nickname(),
766            chain_id = ?request.get_ref().chain_id()
767        )
768    )]
769    async fn handle_lite_certificate(
770        &self,
771        request: Request<LiteCertificate>,
772    ) -> Result<Response<ChainInfoResult>, Status> {
773        let traffic_type = Self::get_traffic_type(&request);
774        let HandleLiteCertRequest {
775            certificate,
776            wait_for_outgoing_messages,
777        } = request.into_inner().try_into()?;
778        trace!(?certificate, "Handling lite certificate");
779        let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
780        match Box::pin(
781            self.state
782                .clone()
783                .handle_lite_certificate(certificate, sender),
784        )
785        .await
786        {
787            Ok((info, actions)) => {
788                Self::log_request_success("handle_lite_certificate", traffic_type);
789                self.handle_network_actions(actions);
790                if let Some(receiver) = receiver {
791                    if let Err(e) = receiver.await {
792                        error!("Failed to wait for message delivery: {e}");
793                    }
794                }
795                Ok(Response::new(info.try_into()?))
796            }
797            Err(error) => {
798                Self::log_request_error(
799                    "handle_lite_certificate",
800                    traffic_type,
801                    &error.error_type(),
802                );
803                self.log_error(&error, "Failed to handle lite certificate");
804                Ok(Response::new(NodeError::from(error).try_into()?))
805            }
806        }
807    }
808
809    #[instrument(
810        target = "grpc_server",
811        skip_all,
812        err,
813        fields(
814            nickname = self.state.nickname(),
815            chain_id = ?request.get_ref().chain_id()
816        )
817    )]
818    async fn handle_confirmed_certificate(
819        &self,
820        request: Request<api::HandleConfirmedCertificateRequest>,
821    ) -> Result<Response<ChainInfoResult>, Status> {
822        let traffic_type = Self::get_traffic_type(&request);
823        let HandleConfirmedCertificateRequest {
824            certificate,
825            wait_for_outgoing_messages,
826        } = request.into_inner().try_into()?;
827        trace!(?certificate, "Handling certificate");
828        let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
829        match self
830            .state
831            .clone()
832            .handle_confirmed_certificate(certificate, sender)
833            .await
834        {
835            Ok((info, actions)) => {
836                Self::log_request_success("handle_confirmed_certificate", traffic_type);
837                self.handle_network_actions(actions);
838                if let Some(receiver) = receiver {
839                    if let Err(e) = receiver.await {
840                        error!("Failed to wait for message delivery: {e}");
841                    }
842                }
843                Ok(Response::new(info.try_into()?))
844            }
845            Err(error) => {
846                Self::log_request_error(
847                    "handle_confirmed_certificate",
848                    traffic_type,
849                    &error.error_type(),
850                );
851                self.log_error(&error, "Failed to handle confirmed certificate");
852                Ok(Response::new(NodeError::from(error).try_into()?))
853            }
854        }
855    }
856
857    #[instrument(
858        target = "grpc_server",
859        skip_all,
860        err,
861        fields(
862            nickname = self.state.nickname(),
863            chain_id = ?request.get_ref().chain_id()
864        )
865    )]
866    async fn handle_validated_certificate(
867        &self,
868        request: Request<api::HandleValidatedCertificateRequest>,
869    ) -> Result<Response<ChainInfoResult>, Status> {
870        let traffic_type = Self::get_traffic_type(&request);
871        let HandleValidatedCertificateRequest { certificate } = request.into_inner().try_into()?;
872        trace!(?certificate, "Handling certificate");
873        match self
874            .state
875            .clone()
876            .handle_validated_certificate(certificate)
877            .await
878        {
879            Ok((info, actions)) => {
880                Self::log_request_success("handle_validated_certificate", traffic_type);
881                self.handle_network_actions(actions);
882                Ok(Response::new(info.try_into()?))
883            }
884            Err(error) => {
885                Self::log_request_error(
886                    "handle_validated_certificate",
887                    traffic_type,
888                    &error.error_type(),
889                );
890                self.log_error(&error, "Failed to handle validated certificate");
891                Ok(Response::new(NodeError::from(error).try_into()?))
892            }
893        }
894    }
895
896    #[instrument(
897        target = "grpc_server",
898        skip_all,
899        err,
900        fields(
901            nickname = self.state.nickname(),
902            chain_id = ?request.get_ref().chain_id()
903        )
904    )]
905    async fn handle_timeout_certificate(
906        &self,
907        request: Request<api::HandleTimeoutCertificateRequest>,
908    ) -> Result<Response<ChainInfoResult>, Status> {
909        let traffic_type = Self::get_traffic_type(&request);
910        let HandleTimeoutCertificateRequest { certificate } = request.into_inner().try_into()?;
911        trace!(?certificate, "Handling Timeout certificate");
912        match self
913            .state
914            .clone()
915            .handle_timeout_certificate(certificate)
916            .await
917        {
918            Ok((info, _actions)) => {
919                Self::log_request_success("handle_timeout_certificate", traffic_type);
920                Ok(Response::new(info.try_into()?))
921            }
922            Err(error) => {
923                Self::log_request_error(
924                    "handle_timeout_certificate",
925                    traffic_type,
926                    &error.error_type(),
927                );
928                self.log_error(&error, "Failed to handle timeout certificate");
929                Ok(Response::new(NodeError::from(error).try_into()?))
930            }
931        }
932    }
933
934    #[instrument(
935        target = "grpc_server",
936        skip_all,
937        err,
938        fields(
939            nickname = self.state.nickname(),
940            chain_id = ?request.get_ref().chain_id()
941        )
942    )]
943    async fn handle_chain_info_query(
944        &self,
945        request: Request<ChainInfoQuery>,
946    ) -> Result<Response<ChainInfoResult>, Status> {
947        let traffic_type = Self::get_traffic_type(&request);
948        let query = request.into_inner().try_into()?;
949        trace!(?query, "Handling chain info query");
950        match self.state.clone().handle_chain_info_query(query).await {
951            Ok(info) => {
952                Self::log_request_success("handle_chain_info_query", traffic_type);
953                Ok(Response::new(info.try_into()?))
954            }
955            Err(error) => {
956                Self::log_request_error(
957                    "handle_chain_info_query",
958                    traffic_type,
959                    &error.error_type(),
960                );
961                self.log_error(&error, "Failed to handle chain info query");
962                Ok(Response::new(NodeError::from(error).try_into()?))
963            }
964        }
965    }
966
967    #[instrument(
968        target = "grpc_server",
969        skip_all,
970        err,
971        fields(
972            nickname = self.state.nickname(),
973            chain_id = ?request.get_ref().chain_id()
974        )
975    )]
976    async fn download_pending_blob(
977        &self,
978        request: Request<PendingBlobRequest>,
979    ) -> Result<Response<PendingBlobResult>, Status> {
980        let traffic_type = Self::get_traffic_type(&request);
981        let (chain_id, blob_id) = request.into_inner().try_into()?;
982        trace!(?blob_id, "Download pending blob");
983        match self
984            .state
985            .clone()
986            .download_pending_blob(chain_id, blob_id)
987            .await
988        {
989            Ok(blob) => {
990                Self::log_request_success("download_pending_blob", traffic_type);
991                Ok(Response::new(blob.content().clone().try_into()?))
992            }
993            Err(error) => {
994                Self::log_request_error("download_pending_blob", traffic_type, &error.error_type());
995                self.log_error(&error, "Failed to download pending blob");
996                Ok(Response::new(NodeError::from(error).try_into()?))
997            }
998        }
999    }
1000
1001    #[instrument(
1002        target = "grpc_server",
1003        skip_all,
1004        err,
1005        fields(
1006            nickname = self.state.nickname(),
1007            chain_id = ?request.get_ref().chain_id
1008        )
1009    )]
1010    async fn handle_pending_blob(
1011        &self,
1012        request: Request<HandlePendingBlobRequest>,
1013    ) -> Result<Response<ChainInfoResult>, Status> {
1014        let traffic_type = Self::get_traffic_type(&request);
1015        let (chain_id, blob_content) = request.into_inner().try_into()?;
1016        let blob = Blob::new(blob_content);
1017        let blob_id = blob.id();
1018        trace!(?blob_id, "Handle pending blob");
1019        match self.state.clone().handle_pending_blob(chain_id, blob).await {
1020            Ok(info) => {
1021                Self::log_request_success("handle_pending_blob", traffic_type);
1022                Ok(Response::new(info.try_into()?))
1023            }
1024            Err(error) => {
1025                Self::log_request_error("handle_pending_blob", traffic_type, &error.error_type());
1026                self.log_error(&error, "Failed to handle pending blob");
1027                Ok(Response::new(NodeError::from(error).try_into()?))
1028            }
1029        }
1030    }
1031
1032    #[instrument(
1033        target = "grpc_server",
1034        skip_all,
1035        err,
1036        fields(
1037            nickname = self.state.nickname(),
1038            chain_id = ?request.get_ref().chain_id()
1039        )
1040    )]
1041    async fn handle_cross_chain_request(
1042        &self,
1043        request: Request<CrossChainRequest>,
1044    ) -> Result<Response<()>, Status> {
1045        let traffic_type = Self::get_traffic_type(&request);
1046        let cross_chain_request = request.into_inner().try_into()?;
1047        trace!(?cross_chain_request, "Handling cross-chain request");
1048        match self
1049            .state
1050            .clone()
1051            .handle_cross_chain_request(cross_chain_request)
1052            .await
1053        {
1054            Ok(actions) => {
1055                Self::log_request_success("handle_cross_chain_request", traffic_type);
1056                self.handle_network_actions(actions)
1057            }
1058            Err(error) => {
1059                Self::log_request_error(
1060                    "handle_cross_chain_request",
1061                    traffic_type,
1062                    &error.error_type(),
1063                );
1064                self.log_error(&error, "Failed to handle cross-chain request");
1065            }
1066        }
1067        Ok(Response::new(()))
1068    }
1069}
1070
1071/// Types which are proxyable and expose the appropriate methods to be handled
1072/// by the `GrpcProxy`
1073pub trait GrpcProxyable {
1074    fn chain_id(&self) -> Option<ChainId>;
1075}
1076
1077impl GrpcProxyable for BlockProposal {
1078    fn chain_id(&self) -> Option<ChainId> {
1079        self.chain_id.clone()?.try_into().ok()
1080    }
1081}
1082
1083impl GrpcProxyable for LiteCertificate {
1084    fn chain_id(&self) -> Option<ChainId> {
1085        self.chain_id.clone()?.try_into().ok()
1086    }
1087}
1088
1089impl GrpcProxyable for api::HandleConfirmedCertificateRequest {
1090    fn chain_id(&self) -> Option<ChainId> {
1091        self.chain_id.clone()?.try_into().ok()
1092    }
1093}
1094
1095impl GrpcProxyable for api::HandleTimeoutCertificateRequest {
1096    fn chain_id(&self) -> Option<ChainId> {
1097        self.chain_id.clone()?.try_into().ok()
1098    }
1099}
1100
1101impl GrpcProxyable for api::HandleValidatedCertificateRequest {
1102    fn chain_id(&self) -> Option<ChainId> {
1103        self.chain_id.clone()?.try_into().ok()
1104    }
1105}
1106
1107impl GrpcProxyable for ChainInfoQuery {
1108    fn chain_id(&self) -> Option<ChainId> {
1109        self.chain_id.clone()?.try_into().ok()
1110    }
1111}
1112
1113impl GrpcProxyable for PendingBlobRequest {
1114    fn chain_id(&self) -> Option<ChainId> {
1115        self.chain_id.clone()?.try_into().ok()
1116    }
1117}
1118
1119impl GrpcProxyable for HandlePendingBlobRequest {
1120    fn chain_id(&self) -> Option<ChainId> {
1121        self.chain_id.clone()?.try_into().ok()
1122    }
1123}
1124
1125impl GrpcProxyable for CrossChainRequest {
1126    fn chain_id(&self) -> Option<ChainId> {
1127        use super::api::cross_chain_request::Inner;
1128
1129        match self.inner.as_ref()? {
1130            Inner::UpdateRecipient(api::UpdateRecipient { recipient, .. })
1131            | Inner::ConfirmUpdatedRecipient(api::ConfirmUpdatedRecipient { recipient, .. })
1132            | Inner::RevertConfirm(api::RevertConfirm { recipient, .. }) => {
1133                recipient.clone()?.try_into().ok()
1134            }
1135        }
1136    }
1137}