1use std::{
5 net::{IpAddr, SocketAddr},
6 str::FromStr,
7 task::{Context, Poll},
8};
9
10use futures::{channel::mpsc, future::BoxFuture, FutureExt as _};
11use linera_base::{
12 data_types::Blob,
13 identifiers::ChainId,
14 time::{Duration, Instant},
15};
16use linera_core::{
17 join_set_ext::JoinSet,
18 node::NodeError,
19 worker::{NetworkActions, Notification, Reason, WorkerError, WorkerState},
20 JoinSetExt as _, TaskHandle,
21};
22use linera_storage::Storage;
23use tokio::sync::oneshot;
24use tokio_util::sync::CancellationToken;
25use tonic::{transport::Channel, Request, Response, Status};
26use tower::{builder::ServiceBuilder, Layer, Service};
27use tracing::{debug, error, info, instrument, trace, warn};
28
29use super::{
30 api::{
31 self,
32 notifier_service_client::NotifierServiceClient,
33 validator_worker_client::ValidatorWorkerClient,
34 validator_worker_server::{ValidatorWorker as ValidatorWorkerRpc, ValidatorWorkerServer},
35 BlockProposal, ChainInfoQuery, ChainInfoResult, CrossChainRequest,
36 HandlePendingBlobRequest, LiteCertificate, PendingBlobRequest, PendingBlobResult,
37 },
38 pool::GrpcConnectionPool,
39 GrpcError, GRPC_MAX_MESSAGE_SIZE,
40};
41use crate::{
42 config::{CrossChainConfig, NotificationConfig, ShardId, ValidatorInternalNetworkConfig},
43 cross_chain_message_queue, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
44 HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
45};
46
47type CrossChainSender = mpsc::Sender<(linera_core::data_types::CrossChainRequest, ShardId)>;
48type NotificationSender = tokio::sync::broadcast::Sender<Notification>;
49
50#[cfg(with_metrics)]
51mod metrics {
52 use std::sync::LazyLock;
53
54 use linera_base::prometheus_util::{
55 linear_bucket_interval, register_histogram_vec, register_int_counter_vec,
56 };
57 use prometheus::{HistogramVec, IntCounterVec};
58
59 pub static SERVER_REQUEST_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
60 register_histogram_vec(
61 "server_request_latency",
62 "Server request latency",
63 &[],
64 linear_bucket_interval(1.0, 25.0, 2000.0),
65 )
66 });
67
68 pub static SERVER_REQUEST_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
69 register_int_counter_vec("server_request_count", "Server request count", &[])
70 });
71
72 pub static SERVER_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
73 register_int_counter_vec(
74 "server_request_success",
75 "Server request success",
76 &["method_name"],
77 )
78 });
79
80 pub static SERVER_REQUEST_ERROR: LazyLock<IntCounterVec> = LazyLock::new(|| {
81 register_int_counter_vec(
82 "server_request_error",
83 "Server request error",
84 &["method_name"],
85 )
86 });
87
88 pub static SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE: LazyLock<HistogramVec> =
89 LazyLock::new(|| {
90 register_histogram_vec(
91 "server_request_latency_per_request_type",
92 "Server request latency per request type",
93 &["method_name"],
94 linear_bucket_interval(1.0, 25.0, 2000.0),
95 )
96 });
97
98 pub static CROSS_CHAIN_MESSAGE_CHANNEL_FULL: LazyLock<IntCounterVec> = LazyLock::new(|| {
99 register_int_counter_vec(
100 "cross_chain_message_channel_full",
101 "Cross-chain message channel full",
102 &[],
103 )
104 });
105}
106
107#[derive(Clone)]
108pub struct GrpcServer<S>
109where
110 S: Storage,
111{
112 state: WorkerState<S>,
113 shard_id: ShardId,
114 network: ValidatorInternalNetworkConfig,
115 cross_chain_sender: CrossChainSender,
116 notification_sender: NotificationSender,
117}
118
119pub struct GrpcServerHandle {
120 handle: TaskHandle<Result<(), GrpcError>>,
121}
122
123impl GrpcServerHandle {
124 pub async fn join(self) -> Result<(), GrpcError> {
125 self.handle.await?
126 }
127}
128
129#[derive(Clone)]
130pub struct GrpcPrometheusMetricsMiddlewareLayer;
131
132#[derive(Clone)]
133pub struct GrpcPrometheusMetricsMiddlewareService<T> {
134 service: T,
135}
136
137impl<S> Layer<S> for GrpcPrometheusMetricsMiddlewareLayer {
138 type Service = GrpcPrometheusMetricsMiddlewareService<S>;
139
140 fn layer(&self, service: S) -> Self::Service {
141 GrpcPrometheusMetricsMiddlewareService { service }
142 }
143}
144
145impl<S, Req> Service<Req> for GrpcPrometheusMetricsMiddlewareService<S>
146where
147 S::Future: Send + 'static,
148 S: Service<Req> + std::marker::Send,
149{
150 type Response = S::Response;
151 type Error = S::Error;
152 type Future = BoxFuture<'static, Result<S::Response, S::Error>>;
153
154 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
155 self.service.poll_ready(cx)
156 }
157
158 fn call(&mut self, request: Req) -> Self::Future {
159 #[cfg(with_metrics)]
160 let start = Instant::now();
161 let future = self.service.call(request);
162 async move {
163 let response = future.await?;
164 #[cfg(with_metrics)]
165 {
166 metrics::SERVER_REQUEST_LATENCY
167 .with_label_values(&[])
168 .observe(start.elapsed().as_secs_f64() * 1000.0);
169 metrics::SERVER_REQUEST_COUNT.with_label_values(&[]).inc();
170 }
171 Ok(response)
172 }
173 .boxed()
174 }
175}
176
177impl<S> GrpcServer<S>
178where
179 S: Storage + Clone + Send + Sync + 'static,
180{
181 #[expect(clippy::too_many_arguments)]
182 pub fn spawn(
183 host: String,
184 port: u16,
185 state: WorkerState<S>,
186 shard_id: ShardId,
187 internal_network: ValidatorInternalNetworkConfig,
188 cross_chain_config: CrossChainConfig,
189 notification_config: NotificationConfig,
190 shutdown_signal: CancellationToken,
191 join_set: &mut JoinSet,
192 ) -> GrpcServerHandle {
193 info!(
194 "spawning gRPC server on {}:{} for shard {}",
195 host, port, shard_id
196 );
197
198 let (cross_chain_sender, cross_chain_receiver) =
199 mpsc::channel(cross_chain_config.queue_size);
200
201 let (notification_sender, _) =
202 tokio::sync::broadcast::channel(notification_config.notification_queue_size);
203
204 join_set.spawn_task({
205 info!(
206 nickname = state.nickname(),
207 "spawning cross-chain queries thread on {} for shard {}", host, shard_id
208 );
209 Self::forward_cross_chain_queries(
210 state.nickname().to_string(),
211 internal_network.clone(),
212 cross_chain_config.max_retries,
213 Duration::from_millis(cross_chain_config.retry_delay_ms),
214 Duration::from_millis(cross_chain_config.sender_delay_ms),
215 cross_chain_config.sender_failure_rate,
216 shard_id,
217 cross_chain_receiver,
218 )
219 });
220
221 for proxy in &internal_network.proxies {
222 let receiver = notification_sender.subscribe();
223 join_set.spawn_task({
224 info!(
225 nickname = state.nickname(),
226 "spawning notifications thread on {} for shard {}", host, shard_id
227 );
228 Self::forward_notifications(
229 state.nickname().to_string(),
230 proxy.internal_address(&internal_network.protocol),
231 internal_network.exporter_addresses(),
232 receiver,
233 )
234 });
235 }
236
237 let (health_reporter, health_service) = tonic_health::server::health_reporter();
238
239 let grpc_server = GrpcServer {
240 state,
241 shard_id,
242 network: internal_network,
243 cross_chain_sender,
244 notification_sender,
245 };
246
247 let worker_node = ValidatorWorkerServer::new(grpc_server)
248 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
249 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
250
251 let handle = join_set.spawn_task(async move {
252 let server_address = SocketAddr::from((IpAddr::from_str(&host)?, port));
253
254 let reflection_service = tonic_reflection::server::Builder::configure()
255 .register_encoded_file_descriptor_set(crate::FILE_DESCRIPTOR_SET)
256 .build_v1()?;
257
258 health_reporter
259 .set_serving::<ValidatorWorkerServer<Self>>()
260 .await;
261
262 tonic::transport::Server::builder()
263 .layer(
264 ServiceBuilder::new()
265 .layer(GrpcPrometheusMetricsMiddlewareLayer)
266 .into_inner(),
267 )
268 .add_service(health_service)
269 .add_service(reflection_service)
270 .add_service(worker_node)
271 .serve_with_shutdown(server_address, shutdown_signal.cancelled_owned())
272 .await?;
273
274 Ok(())
275 });
276
277 GrpcServerHandle { handle }
278 }
279
280 #[instrument(skip(receiver))]
283 async fn forward_notifications(
284 nickname: String,
285 proxy_address: String,
286 exporter_addresses: Vec<String>,
287 mut receiver: tokio::sync::broadcast::Receiver<Notification>,
288 ) {
289 let channel = tonic::transport::Channel::from_shared(proxy_address.clone())
290 .expect("Proxy URI should be valid")
291 .connect_lazy();
292 let mut client = NotifierServiceClient::new(channel)
293 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
294 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
295
296 let mut exporter_clients: Vec<NotifierServiceClient<Channel>> = exporter_addresses
297 .iter()
298 .map(|address| {
299 let channel = tonic::transport::Channel::from_shared(address.clone())
300 .expect("Exporter URI should be valid")
301 .connect_lazy();
302 NotifierServiceClient::new(channel)
303 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
304 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
305 })
306 .collect::<Vec<_>>();
307
308 while let Ok(notification) = receiver.recv().await {
309 let reason = ¬ification.reason;
310 let notification: api::Notification = match notification.clone().try_into() {
311 Ok(notification) => notification,
312 Err(error) => {
313 warn!(%error, nickname, "could not deserialize notification");
314 continue;
315 }
316 };
317 let request = tonic::Request::new(notification.clone());
318 if let Err(error) = client.notify(request).await {
319 error!(
320 %error,
321 nickname,
322 ?notification,
323 "proxy: could not send notification",
324 )
325 }
326
327 if let Reason::NewBlock { height: _, hash: _ } = reason {
328 for exporter_client in &mut exporter_clients {
329 let request = tonic::Request::new(notification.clone());
330 if let Err(error) = exporter_client.notify(request).await {
331 error!(
332 %error,
333 nickname,
334 ?notification,
335 "block exporter: could not send notification",
336 )
337 }
338 }
339 }
340 }
341 }
342
343 fn handle_network_actions(&self, actions: NetworkActions) {
344 let mut cross_chain_sender = self.cross_chain_sender.clone();
345 let notification_sender = self.notification_sender.clone();
346
347 for request in actions.cross_chain_requests {
348 let shard_id = self.network.get_shard_id(request.target_chain_id());
349 trace!(
350 source_shard_id = self.shard_id,
351 target_shard_id = shard_id,
352 "Scheduling cross-chain query",
353 );
354
355 if let Err(error) = cross_chain_sender.try_send((request, shard_id)) {
356 error!(%error, "dropping cross-chain request");
357 #[cfg(with_metrics)]
358 if error.is_full() {
359 metrics::CROSS_CHAIN_MESSAGE_CHANNEL_FULL
360 .with_label_values(&[])
361 .inc();
362 }
363 }
364 }
365
366 for notification in actions.notifications {
367 trace!("Scheduling notification query");
368 if let Err(error) = notification_sender.send(notification) {
369 error!(%error, "dropping notification");
370 break;
371 }
372 }
373 }
374
375 #[instrument(skip_all, fields(nickname, %this_shard))]
376 #[expect(clippy::too_many_arguments)]
377 async fn forward_cross_chain_queries(
378 nickname: String,
379 network: ValidatorInternalNetworkConfig,
380 cross_chain_max_retries: u32,
381 cross_chain_retry_delay: Duration,
382 cross_chain_sender_delay: Duration,
383 cross_chain_sender_failure_rate: f32,
384 this_shard: ShardId,
385 receiver: mpsc::Receiver<(linera_core::data_types::CrossChainRequest, ShardId)>,
386 ) {
387 let pool = GrpcConnectionPool::default();
388 let handle_request =
389 move |shard_id: ShardId, request: linera_core::data_types::CrossChainRequest| {
390 let channel_result = pool.channel(network.shard(shard_id).http_address());
391 async move {
392 let mut client = ValidatorWorkerClient::new(channel_result?)
393 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
394 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
395 client
396 .handle_cross_chain_request(Request::new(request.try_into()?))
397 .await?;
398 anyhow::Result::<_, anyhow::Error>::Ok(())
399 }
400 };
401 cross_chain_message_queue::forward_cross_chain_queries(
402 nickname,
403 cross_chain_max_retries,
404 cross_chain_retry_delay,
405 cross_chain_sender_delay,
406 cross_chain_sender_failure_rate,
407 this_shard,
408 receiver,
409 handle_request,
410 )
411 .await;
412 }
413
414 fn log_request_outcome_and_latency(start: Instant, success: bool, method_name: &str) {
415 #![cfg_attr(not(with_metrics), allow(unused_variables))]
416 #[cfg(with_metrics)]
417 {
418 metrics::SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE
419 .with_label_values(&[method_name])
420 .observe(start.elapsed().as_secs_f64() * 1000.0);
421 if success {
422 metrics::SERVER_REQUEST_SUCCESS
423 .with_label_values(&[method_name])
424 .inc();
425 } else {
426 metrics::SERVER_REQUEST_ERROR
427 .with_label_values(&[method_name])
428 .inc();
429 }
430 }
431 }
432}
433
434#[tonic::async_trait]
435impl<S> ValidatorWorkerRpc for GrpcServer<S>
436where
437 S: Storage + Clone + Send + Sync + 'static,
438{
439 #[instrument(
440 target = "grpc_server",
441 skip_all,
442 err,
443 fields(
444 nickname = self.state.nickname(),
445 chain_id = ?request.get_ref().chain_id()
446 )
447 )]
448 async fn handle_block_proposal(
449 &self,
450 request: Request<BlockProposal>,
451 ) -> Result<Response<ChainInfoResult>, Status> {
452 let start = Instant::now();
453 let proposal = request.into_inner().try_into()?;
454 trace!(?proposal, "Handling block proposal");
455 Ok(Response::new(
456 match self.state.clone().handle_block_proposal(proposal).await {
457 Ok((info, actions)) => {
458 Self::log_request_outcome_and_latency(start, true, "handle_block_proposal");
459 self.handle_network_actions(actions);
460 info.try_into()?
461 }
462 Err(error) => {
463 Self::log_request_outcome_and_latency(start, false, "handle_block_proposal");
464 let nickname = self.state.nickname();
465 warn!(nickname, %error, "Failed to handle block proposal");
466 NodeError::from(error).try_into()?
467 }
468 },
469 ))
470 }
471
472 #[instrument(
473 target = "grpc_server",
474 skip_all,
475 err,
476 fields(
477 nickname = self.state.nickname(),
478 chain_id = ?request.get_ref().chain_id()
479 )
480 )]
481 async fn handle_lite_certificate(
482 &self,
483 request: Request<LiteCertificate>,
484 ) -> Result<Response<ChainInfoResult>, Status> {
485 let start = Instant::now();
486 let HandleLiteCertRequest {
487 certificate,
488 wait_for_outgoing_messages,
489 } = request.into_inner().try_into()?;
490 trace!(?certificate, "Handling lite certificate");
491 let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
492 match Box::pin(
493 self.state
494 .clone()
495 .handle_lite_certificate(certificate, sender),
496 )
497 .await
498 {
499 Ok((info, actions)) => {
500 Self::log_request_outcome_and_latency(start, true, "handle_lite_certificate");
501 self.handle_network_actions(actions);
502 if let Some(receiver) = receiver {
503 if let Err(e) = receiver.await {
504 error!("Failed to wait for message delivery: {e}");
505 }
506 }
507 Ok(Response::new(info.try_into()?))
508 }
509 Err(error) => {
510 Self::log_request_outcome_and_latency(start, false, "handle_lite_certificate");
511 let nickname = self.state.nickname();
512 if let WorkerError::MissingCertificateValue = &error {
513 debug!(nickname, %error, "Failed to handle lite certificate");
514 } else {
515 error!(nickname, %error, "Failed to handle lite certificate");
516 }
517 Ok(Response::new(NodeError::from(error).try_into()?))
518 }
519 }
520 }
521
522 #[instrument(
523 target = "grpc_server",
524 skip_all,
525 err,
526 fields(
527 nickname = self.state.nickname(),
528 chain_id = ?request.get_ref().chain_id()
529 )
530 )]
531 async fn handle_confirmed_certificate(
532 &self,
533 request: Request<api::HandleConfirmedCertificateRequest>,
534 ) -> Result<Response<ChainInfoResult>, Status> {
535 let start = Instant::now();
536 let HandleConfirmedCertificateRequest {
537 certificate,
538 wait_for_outgoing_messages,
539 } = request.into_inner().try_into()?;
540 trace!(?certificate, "Handling certificate");
541 let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
542 match self
543 .state
544 .clone()
545 .handle_confirmed_certificate(certificate, sender)
546 .await
547 {
548 Ok((info, actions)) => {
549 Self::log_request_outcome_and_latency(start, true, "handle_confirmed_certificate");
550 self.handle_network_actions(actions);
551 if let Some(receiver) = receiver {
552 if let Err(e) = receiver.await {
553 error!("Failed to wait for message delivery: {e}");
554 }
555 }
556 Ok(Response::new(info.try_into()?))
557 }
558 Err(error) => {
559 Self::log_request_outcome_and_latency(start, false, "handle_confirmed_certificate");
560 let nickname = self.state.nickname();
561 error!(nickname, %error, "Failed to handle confirmed certificate");
562 Ok(Response::new(NodeError::from(error).try_into()?))
563 }
564 }
565 }
566
567 #[instrument(
568 target = "grpc_server",
569 skip_all,
570 err,
571 fields(
572 nickname = self.state.nickname(),
573 chain_id = ?request.get_ref().chain_id()
574 )
575 )]
576 async fn handle_validated_certificate(
577 &self,
578 request: Request<api::HandleValidatedCertificateRequest>,
579 ) -> Result<Response<ChainInfoResult>, Status> {
580 let start = Instant::now();
581 let HandleValidatedCertificateRequest { certificate } = request.into_inner().try_into()?;
582 trace!(?certificate, "Handling certificate");
583 match self
584 .state
585 .clone()
586 .handle_validated_certificate(certificate)
587 .await
588 {
589 Ok((info, actions)) => {
590 Self::log_request_outcome_and_latency(start, true, "handle_validated_certificate");
591 self.handle_network_actions(actions);
592 Ok(Response::new(info.try_into()?))
593 }
594 Err(error) => {
595 Self::log_request_outcome_and_latency(start, false, "handle_validated_certificate");
596 let nickname = self.state.nickname();
597 error!(nickname, %error, "Failed to handle validated certificate");
598 Ok(Response::new(NodeError::from(error).try_into()?))
599 }
600 }
601 }
602
603 #[instrument(
604 target = "grpc_server",
605 skip_all,
606 err,
607 fields(
608 nickname = self.state.nickname(),
609 chain_id = ?request.get_ref().chain_id()
610 )
611 )]
612 async fn handle_timeout_certificate(
613 &self,
614 request: Request<api::HandleTimeoutCertificateRequest>,
615 ) -> Result<Response<ChainInfoResult>, Status> {
616 let start = Instant::now();
617 let HandleTimeoutCertificateRequest { certificate } = request.into_inner().try_into()?;
618 trace!(?certificate, "Handling Timeout certificate");
619 match self
620 .state
621 .clone()
622 .handle_timeout_certificate(certificate)
623 .await
624 {
625 Ok((info, _actions)) => {
626 Self::log_request_outcome_and_latency(start, true, "handle_timeout_certificate");
627 Ok(Response::new(info.try_into()?))
628 }
629 Err(error) => {
630 Self::log_request_outcome_and_latency(start, false, "handle_timeout_certificate");
631 let nickname = self.state.nickname();
632 error!(nickname, %error, "Failed to handle timeout certificate");
633 Ok(Response::new(NodeError::from(error).try_into()?))
634 }
635 }
636 }
637
638 #[instrument(
639 target = "grpc_server",
640 skip_all,
641 err,
642 fields(
643 nickname = self.state.nickname(),
644 chain_id = ?request.get_ref().chain_id()
645 )
646 )]
647 async fn handle_chain_info_query(
648 &self,
649 request: Request<ChainInfoQuery>,
650 ) -> Result<Response<ChainInfoResult>, Status> {
651 let start = Instant::now();
652 let query = request.into_inner().try_into()?;
653 trace!(?query, "Handling chain info query");
654 match self.state.clone().handle_chain_info_query(query).await {
655 Ok((info, actions)) => {
656 Self::log_request_outcome_and_latency(start, true, "handle_chain_info_query");
657 self.handle_network_actions(actions);
658 Ok(Response::new(info.try_into()?))
659 }
660 Err(error) => {
661 Self::log_request_outcome_and_latency(start, false, "handle_chain_info_query");
662 let nickname = self.state.nickname();
663 error!(nickname, %error, "Failed to handle chain info query");
664 Ok(Response::new(NodeError::from(error).try_into()?))
665 }
666 }
667 }
668
669 #[instrument(
670 target = "grpc_server",
671 skip_all,
672 err,
673 fields(
674 nickname = self.state.nickname(),
675 chain_id = ?request.get_ref().chain_id()
676 )
677 )]
678 async fn download_pending_blob(
679 &self,
680 request: Request<PendingBlobRequest>,
681 ) -> Result<Response<PendingBlobResult>, Status> {
682 let start = Instant::now();
683 let (chain_id, blob_id) = request.into_inner().try_into()?;
684 trace!(?chain_id, ?blob_id, "Download pending blob");
685 match self
686 .state
687 .clone()
688 .download_pending_blob(chain_id, blob_id)
689 .await
690 {
691 Ok(blob) => {
692 Self::log_request_outcome_and_latency(start, true, "download_pending_blob");
693 Ok(Response::new(blob.into_content().try_into()?))
694 }
695 Err(error) => {
696 Self::log_request_outcome_and_latency(start, false, "download_pending_blob");
697 let nickname = self.state.nickname();
698 error!(nickname, %error, "Failed to download pending blob");
699 Ok(Response::new(NodeError::from(error).try_into()?))
700 }
701 }
702 }
703
704 #[instrument(
705 target = "grpc_server",
706 skip_all,
707 err,
708 fields(
709 nickname = self.state.nickname(),
710 chain_id = ?request.get_ref().chain_id
711 )
712 )]
713 async fn handle_pending_blob(
714 &self,
715 request: Request<HandlePendingBlobRequest>,
716 ) -> Result<Response<ChainInfoResult>, Status> {
717 let start = Instant::now();
718 let (chain_id, blob_content) = request.into_inner().try_into()?;
719 let blob = Blob::new(blob_content);
720 let blob_id = blob.id();
721 trace!(?chain_id, ?blob_id, "Handle pending blob");
722 match self.state.clone().handle_pending_blob(chain_id, blob).await {
723 Ok(info) => {
724 Self::log_request_outcome_and_latency(start, true, "handle_pending_blob");
725 Ok(Response::new(info.try_into()?))
726 }
727 Err(error) => {
728 Self::log_request_outcome_and_latency(start, false, "handle_pending_blob");
729 let nickname = self.state.nickname();
730 error!(nickname, %error, "Failed to handle pending blob");
731 Ok(Response::new(NodeError::from(error).try_into()?))
732 }
733 }
734 }
735
736 #[instrument(
737 target = "grpc_server",
738 skip_all,
739 err,
740 fields(
741 nickname = self.state.nickname(),
742 chain_id = ?request.get_ref().chain_id()
743 )
744 )]
745 async fn handle_cross_chain_request(
746 &self,
747 request: Request<CrossChainRequest>,
748 ) -> Result<Response<()>, Status> {
749 let start = Instant::now();
750 let request = request.into_inner().try_into()?;
751 trace!(?request, "Handling cross-chain request");
752 match self.state.clone().handle_cross_chain_request(request).await {
753 Ok(actions) => {
754 Self::log_request_outcome_and_latency(start, true, "handle_cross_chain_request");
755 self.handle_network_actions(actions)
756 }
757 Err(error) => {
758 Self::log_request_outcome_and_latency(start, false, "handle_cross_chain_request");
759 let nickname = self.state.nickname();
760 error!(nickname, %error, "Failed to handle cross-chain request");
761 }
762 }
763 Ok(Response::new(()))
764 }
765}
766
767pub trait GrpcProxyable {
770 fn chain_id(&self) -> Option<ChainId>;
771}
772
773impl GrpcProxyable for BlockProposal {
774 fn chain_id(&self) -> Option<ChainId> {
775 self.chain_id.clone()?.try_into().ok()
776 }
777}
778
779impl GrpcProxyable for LiteCertificate {
780 fn chain_id(&self) -> Option<ChainId> {
781 self.chain_id.clone()?.try_into().ok()
782 }
783}
784
785impl GrpcProxyable for api::HandleConfirmedCertificateRequest {
786 fn chain_id(&self) -> Option<ChainId> {
787 self.chain_id.clone()?.try_into().ok()
788 }
789}
790
791impl GrpcProxyable for api::HandleTimeoutCertificateRequest {
792 fn chain_id(&self) -> Option<ChainId> {
793 self.chain_id.clone()?.try_into().ok()
794 }
795}
796
797impl GrpcProxyable for api::HandleValidatedCertificateRequest {
798 fn chain_id(&self) -> Option<ChainId> {
799 self.chain_id.clone()?.try_into().ok()
800 }
801}
802
803impl GrpcProxyable for ChainInfoQuery {
804 fn chain_id(&self) -> Option<ChainId> {
805 self.chain_id.clone()?.try_into().ok()
806 }
807}
808
809impl GrpcProxyable for PendingBlobRequest {
810 fn chain_id(&self) -> Option<ChainId> {
811 self.chain_id.clone()?.try_into().ok()
812 }
813}
814
815impl GrpcProxyable for HandlePendingBlobRequest {
816 fn chain_id(&self) -> Option<ChainId> {
817 self.chain_id.clone()?.try_into().ok()
818 }
819}
820
821impl GrpcProxyable for CrossChainRequest {
822 fn chain_id(&self) -> Option<ChainId> {
823 use super::api::cross_chain_request::Inner;
824
825 match self.inner.as_ref()? {
826 Inner::UpdateRecipient(api::UpdateRecipient { recipient, .. })
827 | Inner::ConfirmUpdatedRecipient(api::ConfirmUpdatedRecipient { recipient, .. }) => {
828 recipient.clone()?.try_into().ok()
829 }
830 }
831 }
832}