1use std::{
5 net::{IpAddr, SocketAddr},
6 str::FromStr,
7 task::{Context, Poll},
8};
9
10use futures::{channel::mpsc, future::BoxFuture, FutureExt as _};
11use linera_base::{
12 data_types::Blob,
13 identifiers::ChainId,
14 time::{Duration, Instant},
15};
16use linera_core::{
17 join_set_ext::JoinSet,
18 node::NodeError,
19 worker::{NetworkActions, Notification, Reason, WorkerState},
20 JoinSetExt as _, TaskHandle,
21};
22use linera_storage::Storage;
23use tokio::sync::{broadcast::error::RecvError, oneshot};
24use tokio_util::sync::CancellationToken;
25use tonic::{transport::Channel, Request, Response, Status};
26use tower::{builder::ServiceBuilder, Layer, Service};
27use tracing::{debug, error, info, instrument, trace, warn};
28
29use super::{
30 api::{
31 self,
32 notifier_service_client::NotifierServiceClient,
33 validator_worker_client::ValidatorWorkerClient,
34 validator_worker_server::{ValidatorWorker as ValidatorWorkerRpc, ValidatorWorkerServer},
35 BlockProposal, ChainInfoQuery, ChainInfoResult, CrossChainRequest,
36 HandlePendingBlobRequest, LiteCertificate, PendingBlobRequest, PendingBlobResult,
37 },
38 pool::GrpcConnectionPool,
39 GrpcError, GRPC_MAX_MESSAGE_SIZE,
40};
41use crate::{
42 config::{CrossChainConfig, NotificationConfig, ShardId, ValidatorInternalNetworkConfig},
43 cross_chain_message_queue, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
44 HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
45};
46
47type CrossChainSender = mpsc::Sender<(linera_core::data_types::CrossChainRequest, ShardId)>;
48type NotificationSender = tokio::sync::broadcast::Sender<Notification>;
49
50#[cfg(with_metrics)]
51mod metrics {
52 use std::sync::LazyLock;
53
54 use linera_base::prometheus_util::{
55 linear_bucket_interval, register_histogram_vec, register_int_counter_vec,
56 };
57 use prometheus::{HistogramVec, IntCounterVec};
58
59 pub static SERVER_REQUEST_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
60 register_histogram_vec(
61 "server_request_latency",
62 "Server request latency",
63 &[],
64 linear_bucket_interval(1.0, 25.0, 2000.0),
65 )
66 });
67
68 pub static SERVER_REQUEST_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
69 register_int_counter_vec("server_request_count", "Server request count", &[])
70 });
71
72 pub static SERVER_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
73 register_int_counter_vec(
74 "server_request_success",
75 "Server request success",
76 &["method_name"],
77 )
78 });
79
80 pub static SERVER_REQUEST_ERROR: LazyLock<IntCounterVec> = LazyLock::new(|| {
81 register_int_counter_vec(
82 "server_request_error",
83 "Server request error",
84 &["method_name"],
85 )
86 });
87
88 pub static SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE: LazyLock<HistogramVec> =
89 LazyLock::new(|| {
90 register_histogram_vec(
91 "server_request_latency_per_request_type",
92 "Server request latency per request type",
93 &["method_name"],
94 linear_bucket_interval(1.0, 25.0, 2000.0),
95 )
96 });
97
98 pub static CROSS_CHAIN_MESSAGE_CHANNEL_FULL: LazyLock<IntCounterVec> = LazyLock::new(|| {
99 register_int_counter_vec(
100 "cross_chain_message_channel_full",
101 "Cross-chain message channel full",
102 &[],
103 )
104 });
105}
106
107#[derive(Clone)]
108pub struct GrpcServer<S>
109where
110 S: Storage,
111{
112 state: WorkerState<S>,
113 shard_id: ShardId,
114 network: ValidatorInternalNetworkConfig,
115 cross_chain_sender: CrossChainSender,
116 notification_sender: NotificationSender,
117}
118
119pub struct GrpcServerHandle {
120 handle: TaskHandle<Result<(), GrpcError>>,
121}
122
123impl GrpcServerHandle {
124 pub async fn join(self) -> Result<(), GrpcError> {
125 self.handle.await?
126 }
127}
128
129#[derive(Clone)]
130pub struct GrpcPrometheusMetricsMiddlewareLayer;
131
132#[derive(Clone)]
133pub struct GrpcPrometheusMetricsMiddlewareService<T> {
134 service: T,
135}
136
137impl<S> Layer<S> for GrpcPrometheusMetricsMiddlewareLayer {
138 type Service = GrpcPrometheusMetricsMiddlewareService<S>;
139
140 fn layer(&self, service: S) -> Self::Service {
141 GrpcPrometheusMetricsMiddlewareService { service }
142 }
143}
144
145impl<S, Req> Service<Req> for GrpcPrometheusMetricsMiddlewareService<S>
146where
147 S::Future: Send + 'static,
148 S: Service<Req> + std::marker::Send,
149{
150 type Response = S::Response;
151 type Error = S::Error;
152 type Future = BoxFuture<'static, Result<S::Response, S::Error>>;
153
154 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
155 self.service.poll_ready(cx)
156 }
157
158 fn call(&mut self, request: Req) -> Self::Future {
159 #[cfg(with_metrics)]
160 let start = Instant::now();
161 let future = self.service.call(request);
162 async move {
163 let response = future.await?;
164 #[cfg(with_metrics)]
165 {
166 metrics::SERVER_REQUEST_LATENCY
167 .with_label_values(&[])
168 .observe(start.elapsed().as_secs_f64() * 1000.0);
169 metrics::SERVER_REQUEST_COUNT.with_label_values(&[]).inc();
170 }
171 Ok(response)
172 }
173 .boxed()
174 }
175}
176
177impl<S> GrpcServer<S>
178where
179 S: Storage + Clone + Send + Sync + 'static,
180{
181 #[expect(clippy::too_many_arguments)]
182 pub fn spawn(
183 host: String,
184 port: u16,
185 state: WorkerState<S>,
186 shard_id: ShardId,
187 internal_network: ValidatorInternalNetworkConfig,
188 cross_chain_config: CrossChainConfig,
189 notification_config: NotificationConfig,
190 shutdown_signal: CancellationToken,
191 join_set: &mut JoinSet,
192 ) -> GrpcServerHandle {
193 info!(
194 "spawning gRPC server on {}:{} for shard {}",
195 host, port, shard_id
196 );
197
198 let (cross_chain_sender, cross_chain_receiver) =
199 mpsc::channel(cross_chain_config.queue_size);
200
201 let (notification_sender, _) =
202 tokio::sync::broadcast::channel(notification_config.notification_queue_size);
203
204 join_set.spawn_task({
205 info!(
206 nickname = state.nickname(),
207 "spawning cross-chain queries thread on {} for shard {}", host, shard_id
208 );
209 Self::forward_cross_chain_queries(
210 state.nickname().to_string(),
211 internal_network.clone(),
212 cross_chain_config.max_retries,
213 Duration::from_millis(cross_chain_config.retry_delay_ms),
214 Duration::from_millis(cross_chain_config.sender_delay_ms),
215 cross_chain_config.sender_failure_rate,
216 shard_id,
217 cross_chain_receiver,
218 )
219 });
220
221 let mut exporter_forwarded = false;
222 for proxy in &internal_network.proxies {
223 let receiver = notification_sender.subscribe();
224 join_set.spawn_task({
225 info!(
226 nickname = state.nickname(),
227 "spawning notifications thread on {} for shard {}", host, shard_id
228 );
229 let exporter_addresses = if exporter_forwarded {
230 vec![]
231 } else {
232 exporter_forwarded = true;
233 internal_network.exporter_addresses()
234 };
235 Self::forward_notifications(
236 state.nickname().to_string(),
237 proxy.internal_address(&internal_network.protocol),
238 exporter_addresses,
239 receiver,
240 )
241 });
242 }
243
244 let (health_reporter, health_service) = tonic_health::server::health_reporter();
245
246 let grpc_server = GrpcServer {
247 state,
248 shard_id,
249 network: internal_network,
250 cross_chain_sender,
251 notification_sender,
252 };
253
254 let worker_node = ValidatorWorkerServer::new(grpc_server)
255 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
256 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
257
258 let handle = join_set.spawn_task(async move {
259 let server_address = SocketAddr::from((IpAddr::from_str(&host)?, port));
260
261 let reflection_service = tonic_reflection::server::Builder::configure()
262 .register_encoded_file_descriptor_set(crate::FILE_DESCRIPTOR_SET)
263 .build_v1()?;
264
265 health_reporter
266 .set_serving::<ValidatorWorkerServer<Self>>()
267 .await;
268
269 tonic::transport::Server::builder()
270 .layer(
271 ServiceBuilder::new()
272 .layer(GrpcPrometheusMetricsMiddlewareLayer)
273 .into_inner(),
274 )
275 .add_service(health_service)
276 .add_service(reflection_service)
277 .add_service(worker_node)
278 .serve_with_shutdown(server_address, shutdown_signal.cancelled_owned())
279 .await?;
280
281 Ok(())
282 });
283
284 GrpcServerHandle { handle }
285 }
286
287 #[instrument(skip(receiver))]
290 async fn forward_notifications(
291 nickname: String,
292 proxy_address: String,
293 exporter_addresses: Vec<String>,
294 mut receiver: tokio::sync::broadcast::Receiver<Notification>,
295 ) {
296 let channel = tonic::transport::Channel::from_shared(proxy_address.clone())
297 .expect("Proxy URI should be valid")
298 .connect_lazy();
299 let mut client = NotifierServiceClient::new(channel)
300 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
301 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
302
303 let mut exporter_clients: Vec<NotifierServiceClient<Channel>> = exporter_addresses
304 .iter()
305 .map(|address| {
306 let channel = tonic::transport::Channel::from_shared(address.clone())
307 .expect("Exporter URI should be valid")
308 .connect_lazy();
309 NotifierServiceClient::new(channel)
310 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
311 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
312 })
313 .collect::<Vec<_>>();
314
315 loop {
316 let notification = match receiver.recv().await {
317 Ok(notification) => notification,
318 Err(RecvError::Lagged(skipped_count)) => {
319 warn!(
320 nickname,
321 skipped_count, "notification receiver lagged, messages were skipped"
322 );
323 continue;
324 }
325 Err(RecvError::Closed) => {
326 warn!(
327 nickname,
328 "notification channel closed, exiting forwarding loop"
329 );
330 break;
331 }
332 };
333
334 let reason = ¬ification.reason;
335 let notification: api::Notification = match notification.clone().try_into() {
336 Ok(notification) => notification,
337 Err(error) => {
338 warn!(%error, nickname, "could not deserialize notification");
339 continue;
340 }
341 };
342 let request = tonic::Request::new(notification.clone());
343 if let Err(error) = client.notify(request).await {
344 error!(
345 %error,
346 nickname,
347 ?notification,
348 "proxy: could not send notification",
349 )
350 }
351
352 if let Reason::NewBlock { height: _, hash: _ } = reason {
353 for exporter_client in &mut exporter_clients {
354 let request = tonic::Request::new(notification.clone());
355 if let Err(error) = exporter_client.notify(request).await {
356 error!(
357 %error,
358 nickname,
359 ?notification,
360 "block exporter: could not send notification",
361 )
362 }
363 }
364 }
365 }
366 }
367
368 fn handle_network_actions(&self, actions: NetworkActions) {
369 let mut cross_chain_sender = self.cross_chain_sender.clone();
370 let notification_sender = self.notification_sender.clone();
371
372 for request in actions.cross_chain_requests {
373 let shard_id = self.network.get_shard_id(request.target_chain_id());
374 trace!(
375 source_shard_id = self.shard_id,
376 target_shard_id = shard_id,
377 "Scheduling cross-chain query",
378 );
379
380 if let Err(error) = cross_chain_sender.try_send((request, shard_id)) {
381 error!(%error, "dropping cross-chain request");
382 #[cfg(with_metrics)]
383 if error.is_full() {
384 metrics::CROSS_CHAIN_MESSAGE_CHANNEL_FULL
385 .with_label_values(&[])
386 .inc();
387 }
388 }
389 }
390
391 for notification in actions.notifications {
392 trace!("Scheduling notification query");
393 if let Err(error) = notification_sender.send(notification) {
394 error!(%error, "dropping notification");
395 break;
396 }
397 }
398 }
399
400 #[instrument(skip_all, fields(nickname, %this_shard))]
401 #[expect(clippy::too_many_arguments)]
402 async fn forward_cross_chain_queries(
403 nickname: String,
404 network: ValidatorInternalNetworkConfig,
405 cross_chain_max_retries: u32,
406 cross_chain_retry_delay: Duration,
407 cross_chain_sender_delay: Duration,
408 cross_chain_sender_failure_rate: f32,
409 this_shard: ShardId,
410 receiver: mpsc::Receiver<(linera_core::data_types::CrossChainRequest, ShardId)>,
411 ) {
412 let pool = GrpcConnectionPool::default();
413 let handle_request =
414 move |shard_id: ShardId, request: linera_core::data_types::CrossChainRequest| {
415 let channel_result = pool.channel(network.shard(shard_id).http_address());
416 async move {
417 let mut client = ValidatorWorkerClient::new(channel_result?)
418 .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
419 .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
420 client
421 .handle_cross_chain_request(Request::new(request.try_into()?))
422 .await?;
423 anyhow::Result::<_, anyhow::Error>::Ok(())
424 }
425 };
426 cross_chain_message_queue::forward_cross_chain_queries(
427 nickname,
428 cross_chain_max_retries,
429 cross_chain_retry_delay,
430 cross_chain_sender_delay,
431 cross_chain_sender_failure_rate,
432 this_shard,
433 receiver,
434 handle_request,
435 )
436 .await;
437 }
438
439 fn log_request_outcome_and_latency(start: Instant, success: bool, method_name: &str) {
440 #![cfg_attr(not(with_metrics), allow(unused_variables))]
441 #[cfg(with_metrics)]
442 {
443 metrics::SERVER_REQUEST_LATENCY_PER_REQUEST_TYPE
444 .with_label_values(&[method_name])
445 .observe(start.elapsed().as_secs_f64() * 1000.0);
446 if success {
447 metrics::SERVER_REQUEST_SUCCESS
448 .with_label_values(&[method_name])
449 .inc();
450 } else {
451 metrics::SERVER_REQUEST_ERROR
452 .with_label_values(&[method_name])
453 .inc();
454 }
455 }
456 }
457
458 fn log_error(&self, error: &linera_core::worker::WorkerError, context: &str) {
459 let nickname = self.state.nickname();
460 if error.is_local() {
461 error!(nickname, %error, "{}", context);
462 } else {
463 debug!(nickname, %error, "{}", context);
464 }
465 }
466}
467
468#[tonic::async_trait]
469impl<S> ValidatorWorkerRpc for GrpcServer<S>
470where
471 S: Storage + Clone + Send + Sync + 'static,
472{
473 #[instrument(
474 target = "grpc_server",
475 skip_all,
476 err,
477 fields(
478 nickname = self.state.nickname(),
479 chain_id = ?request.get_ref().chain_id()
480 )
481 )]
482 async fn handle_block_proposal(
483 &self,
484 request: Request<BlockProposal>,
485 ) -> Result<Response<ChainInfoResult>, Status> {
486 let start = Instant::now();
487 let proposal = request.into_inner().try_into()?;
488 trace!(?proposal, "Handling block proposal");
489 Ok(Response::new(
490 match self.state.clone().handle_block_proposal(proposal).await {
491 Ok((info, actions)) => {
492 Self::log_request_outcome_and_latency(start, true, "handle_block_proposal");
493 self.handle_network_actions(actions);
494 info.try_into()?
495 }
496 Err(error) => {
497 Self::log_request_outcome_and_latency(start, false, "handle_block_proposal");
498 let nickname = self.state.nickname();
499 warn!(nickname, %error, "Failed to handle block proposal");
500 NodeError::from(error).try_into()?
501 }
502 },
503 ))
504 }
505
506 #[instrument(
507 target = "grpc_server",
508 skip_all,
509 err,
510 fields(
511 nickname = self.state.nickname(),
512 chain_id = ?request.get_ref().chain_id()
513 )
514 )]
515 async fn handle_lite_certificate(
516 &self,
517 request: Request<LiteCertificate>,
518 ) -> Result<Response<ChainInfoResult>, Status> {
519 let start = Instant::now();
520 let HandleLiteCertRequest {
521 certificate,
522 wait_for_outgoing_messages,
523 } = request.into_inner().try_into()?;
524 trace!(?certificate, "Handling lite certificate");
525 let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
526 match Box::pin(
527 self.state
528 .clone()
529 .handle_lite_certificate(certificate, sender),
530 )
531 .await
532 {
533 Ok((info, actions)) => {
534 Self::log_request_outcome_and_latency(start, true, "handle_lite_certificate");
535 self.handle_network_actions(actions);
536 if let Some(receiver) = receiver {
537 if let Err(e) = receiver.await {
538 error!("Failed to wait for message delivery: {e}");
539 }
540 }
541 Ok(Response::new(info.try_into()?))
542 }
543 Err(error) => {
544 Self::log_request_outcome_and_latency(start, false, "handle_lite_certificate");
545 self.log_error(&error, "Failed to handle lite certificate");
546 Ok(Response::new(NodeError::from(error).try_into()?))
547 }
548 }
549 }
550
551 #[instrument(
552 target = "grpc_server",
553 skip_all,
554 err,
555 fields(
556 nickname = self.state.nickname(),
557 chain_id = ?request.get_ref().chain_id()
558 )
559 )]
560 async fn handle_confirmed_certificate(
561 &self,
562 request: Request<api::HandleConfirmedCertificateRequest>,
563 ) -> Result<Response<ChainInfoResult>, Status> {
564 let start = Instant::now();
565 let HandleConfirmedCertificateRequest {
566 certificate,
567 wait_for_outgoing_messages,
568 } = request.into_inner().try_into()?;
569 trace!(?certificate, "Handling certificate");
570 let (sender, receiver) = wait_for_outgoing_messages.then(oneshot::channel).unzip();
571 match self
572 .state
573 .clone()
574 .handle_confirmed_certificate(certificate, sender)
575 .await
576 {
577 Ok((info, actions)) => {
578 Self::log_request_outcome_and_latency(start, true, "handle_confirmed_certificate");
579 self.handle_network_actions(actions);
580 if let Some(receiver) = receiver {
581 if let Err(e) = receiver.await {
582 error!("Failed to wait for message delivery: {e}");
583 }
584 }
585 Ok(Response::new(info.try_into()?))
586 }
587 Err(error) => {
588 Self::log_request_outcome_and_latency(start, false, "handle_confirmed_certificate");
589 self.log_error(&error, "Failed to handle confirmed certificate");
590 Ok(Response::new(NodeError::from(error).try_into()?))
591 }
592 }
593 }
594
595 #[instrument(
596 target = "grpc_server",
597 skip_all,
598 err,
599 fields(
600 nickname = self.state.nickname(),
601 chain_id = ?request.get_ref().chain_id()
602 )
603 )]
604 async fn handle_validated_certificate(
605 &self,
606 request: Request<api::HandleValidatedCertificateRequest>,
607 ) -> Result<Response<ChainInfoResult>, Status> {
608 let start = Instant::now();
609 let HandleValidatedCertificateRequest { certificate } = request.into_inner().try_into()?;
610 trace!(?certificate, "Handling certificate");
611 match self
612 .state
613 .clone()
614 .handle_validated_certificate(certificate)
615 .await
616 {
617 Ok((info, actions)) => {
618 Self::log_request_outcome_and_latency(start, true, "handle_validated_certificate");
619 self.handle_network_actions(actions);
620 Ok(Response::new(info.try_into()?))
621 }
622 Err(error) => {
623 Self::log_request_outcome_and_latency(start, false, "handle_validated_certificate");
624 self.log_error(&error, "Failed to handle validated certificate");
625 Ok(Response::new(NodeError::from(error).try_into()?))
626 }
627 }
628 }
629
630 #[instrument(
631 target = "grpc_server",
632 skip_all,
633 err,
634 fields(
635 nickname = self.state.nickname(),
636 chain_id = ?request.get_ref().chain_id()
637 )
638 )]
639 async fn handle_timeout_certificate(
640 &self,
641 request: Request<api::HandleTimeoutCertificateRequest>,
642 ) -> Result<Response<ChainInfoResult>, Status> {
643 let start = Instant::now();
644 let HandleTimeoutCertificateRequest { certificate } = request.into_inner().try_into()?;
645 trace!(?certificate, "Handling Timeout certificate");
646 match self
647 .state
648 .clone()
649 .handle_timeout_certificate(certificate)
650 .await
651 {
652 Ok((info, _actions)) => {
653 Self::log_request_outcome_and_latency(start, true, "handle_timeout_certificate");
654 Ok(Response::new(info.try_into()?))
655 }
656 Err(error) => {
657 Self::log_request_outcome_and_latency(start, false, "handle_timeout_certificate");
658 self.log_error(&error, "Failed to handle timeout certificate");
659 Ok(Response::new(NodeError::from(error).try_into()?))
660 }
661 }
662 }
663
664 #[instrument(
665 target = "grpc_server",
666 skip_all,
667 err,
668 fields(
669 nickname = self.state.nickname(),
670 chain_id = ?request.get_ref().chain_id()
671 )
672 )]
673 async fn handle_chain_info_query(
674 &self,
675 request: Request<ChainInfoQuery>,
676 ) -> Result<Response<ChainInfoResult>, Status> {
677 let start = Instant::now();
678 let query = request.into_inner().try_into()?;
679 trace!(?query, "Handling chain info query");
680 match self.state.clone().handle_chain_info_query(query).await {
681 Ok((info, actions)) => {
682 Self::log_request_outcome_and_latency(start, true, "handle_chain_info_query");
683 self.handle_network_actions(actions);
684 Ok(Response::new(info.try_into()?))
685 }
686 Err(error) => {
687 Self::log_request_outcome_and_latency(start, false, "handle_chain_info_query");
688 self.log_error(&error, "Failed to handle chain info query");
689 Ok(Response::new(NodeError::from(error).try_into()?))
690 }
691 }
692 }
693
694 #[instrument(
695 target = "grpc_server",
696 skip_all,
697 err,
698 fields(
699 nickname = self.state.nickname(),
700 chain_id = ?request.get_ref().chain_id()
701 )
702 )]
703 async fn download_pending_blob(
704 &self,
705 request: Request<PendingBlobRequest>,
706 ) -> Result<Response<PendingBlobResult>, Status> {
707 let start = Instant::now();
708 let (chain_id, blob_id) = request.into_inner().try_into()?;
709 trace!(?chain_id, ?blob_id, "Download pending blob");
710 match self
711 .state
712 .clone()
713 .download_pending_blob(chain_id, blob_id)
714 .await
715 {
716 Ok(blob) => {
717 Self::log_request_outcome_and_latency(start, true, "download_pending_blob");
718 Ok(Response::new(blob.into_content().try_into()?))
719 }
720 Err(error) => {
721 Self::log_request_outcome_and_latency(start, false, "download_pending_blob");
722 self.log_error(&error, "Failed to download pending blob");
723 Ok(Response::new(NodeError::from(error).try_into()?))
724 }
725 }
726 }
727
728 #[instrument(
729 target = "grpc_server",
730 skip_all,
731 err,
732 fields(
733 nickname = self.state.nickname(),
734 chain_id = ?request.get_ref().chain_id
735 )
736 )]
737 async fn handle_pending_blob(
738 &self,
739 request: Request<HandlePendingBlobRequest>,
740 ) -> Result<Response<ChainInfoResult>, Status> {
741 let start = Instant::now();
742 let (chain_id, blob_content) = request.into_inner().try_into()?;
743 let blob = Blob::new(blob_content);
744 let blob_id = blob.id();
745 trace!(?chain_id, ?blob_id, "Handle pending blob");
746 match self.state.clone().handle_pending_blob(chain_id, blob).await {
747 Ok(info) => {
748 Self::log_request_outcome_and_latency(start, true, "handle_pending_blob");
749 Ok(Response::new(info.try_into()?))
750 }
751 Err(error) => {
752 Self::log_request_outcome_and_latency(start, false, "handle_pending_blob");
753 self.log_error(&error, "Failed to handle pending blob");
754 Ok(Response::new(NodeError::from(error).try_into()?))
755 }
756 }
757 }
758
759 #[instrument(
760 target = "grpc_server",
761 skip_all,
762 err,
763 fields(
764 nickname = self.state.nickname(),
765 chain_id = ?request.get_ref().chain_id()
766 )
767 )]
768 async fn handle_cross_chain_request(
769 &self,
770 request: Request<CrossChainRequest>,
771 ) -> Result<Response<()>, Status> {
772 let start = Instant::now();
773 let request = request.into_inner().try_into()?;
774 trace!(?request, "Handling cross-chain request");
775 match self.state.clone().handle_cross_chain_request(request).await {
776 Ok(actions) => {
777 Self::log_request_outcome_and_latency(start, true, "handle_cross_chain_request");
778 self.handle_network_actions(actions)
779 }
780 Err(error) => {
781 Self::log_request_outcome_and_latency(start, false, "handle_cross_chain_request");
782 let nickname = self.state.nickname();
783 error!(nickname, %error, "Failed to handle cross-chain request");
784 }
785 }
786 Ok(Response::new(()))
787 }
788}
789
790pub trait GrpcProxyable {
793 fn chain_id(&self) -> Option<ChainId>;
794}
795
796impl GrpcProxyable for BlockProposal {
797 fn chain_id(&self) -> Option<ChainId> {
798 self.chain_id.clone()?.try_into().ok()
799 }
800}
801
802impl GrpcProxyable for LiteCertificate {
803 fn chain_id(&self) -> Option<ChainId> {
804 self.chain_id.clone()?.try_into().ok()
805 }
806}
807
808impl GrpcProxyable for api::HandleConfirmedCertificateRequest {
809 fn chain_id(&self) -> Option<ChainId> {
810 self.chain_id.clone()?.try_into().ok()
811 }
812}
813
814impl GrpcProxyable for api::HandleTimeoutCertificateRequest {
815 fn chain_id(&self) -> Option<ChainId> {
816 self.chain_id.clone()?.try_into().ok()
817 }
818}
819
820impl GrpcProxyable for api::HandleValidatedCertificateRequest {
821 fn chain_id(&self) -> Option<ChainId> {
822 self.chain_id.clone()?.try_into().ok()
823 }
824}
825
826impl GrpcProxyable for ChainInfoQuery {
827 fn chain_id(&self) -> Option<ChainId> {
828 self.chain_id.clone()?.try_into().ok()
829 }
830}
831
832impl GrpcProxyable for PendingBlobRequest {
833 fn chain_id(&self) -> Option<ChainId> {
834 self.chain_id.clone()?.try_into().ok()
835 }
836}
837
838impl GrpcProxyable for HandlePendingBlobRequest {
839 fn chain_id(&self) -> Option<ChainId> {
840 self.chain_id.clone()?.try_into().ok()
841 }
842}
843
844impl GrpcProxyable for CrossChainRequest {
845 fn chain_id(&self) -> Option<ChainId> {
846 use super::api::cross_chain_request::Inner;
847
848 match self.inner.as_ref()? {
849 Inner::UpdateRecipient(api::UpdateRecipient { recipient, .. })
850 | Inner::ConfirmUpdatedRecipient(api::ConfirmUpdatedRecipient { recipient, .. }) => {
851 recipient.clone()?.try_into().ok()
852 }
853 }
854 }
855}