1use std::{pin::Pin, sync::Arc};
5
6use async_trait::async_trait;
7use futures::{channel::mpsc, lock::Mutex, Stream, StreamExt as _};
8use linera_base::{data_types::Blob, identifiers::ChainId, time::Duration};
9use linera_core::{
10 data_types::CrossChainRequest,
11 node::NodeError,
12 worker::{NetworkActions, Notification, WorkerError, WorkerState},
13 JoinSetExt as _, ProcessConfirmedBlockMode,
14};
15use linera_storage::Storage;
16use tokio::{sync, sync::oneshot, task::JoinSet};
17use tokio_stream::wrappers::BroadcastStream;
18use tokio_util::sync::CancellationToken;
19use tracing::{debug, error, info, instrument};
20
21use super::transport::{MessageHandler, ServerHandle, TransportProtocol};
22use crate::{
23 config::{CrossChainConfig, ShardId, ValidatorInternalNetworkPreConfig},
24 cross_chain_message_queue, RpcMessage,
25};
26
27#[derive(Clone)]
28pub struct Server<S>
29where
30 S: Storage,
31{
32 network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
33 host: String,
34 port: u16,
35 state: WorkerState<S>,
36 shard_id: ShardId,
37 cross_chain_config: CrossChainConfig,
38 packets_processed: u64,
40 user_errors: u64,
41}
42
43impl<S> Server<S>
44where
45 S: Storage,
46{
47 pub fn new(
48 network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
49 host: String,
50 port: u16,
51 state: WorkerState<S>,
52 shard_id: ShardId,
53 cross_chain_config: CrossChainConfig,
54 ) -> Self {
55 Self {
56 network,
57 host,
58 port,
59 state,
60 shard_id,
61 cross_chain_config,
62 packets_processed: 0,
63 user_errors: 0,
64 }
65 }
66
67 pub fn packets_processed(&self) -> u64 {
68 self.packets_processed
69 }
70
71 pub fn user_errors(&self) -> u64 {
72 self.user_errors
73 }
74}
75
76impl<S> Server<S>
77where
78 S: Storage + Clone + Send + Sync + 'static,
79{
80 #[expect(clippy::too_many_arguments)]
81 async fn forward_cross_chain_queries(
82 nickname: String,
83 network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
84 cross_chain_max_retries: u32,
85 cross_chain_retry_delay: Duration,
86 cross_chain_max_backoff: Duration,
87 cross_chain_sender_delay: Duration,
88 cross_chain_sender_failure_rate: f32,
89 this_shard: ShardId,
90 receiver: mpsc::Receiver<(CrossChainRequest, ShardId)>,
91 ) {
92 let pool = Arc::new(Mutex::new(
93 network
94 .protocol
95 .make_outgoing_connection_pool()
96 .await
97 .expect("Initialization should not fail"),
98 ));
99 let handle_request = move |shard_id, request| {
100 let pool = pool.clone();
101 let shard = network.shard(shard_id);
102 let remote_address = format!("{}:{}", shard.host, shard.port);
103 let message = RpcMessage::CrossChainRequest(Box::new(request));
104 async move {
105 pool.lock()
106 .await
107 .send_message_to(message.clone(), &remote_address)
108 .await?;
109 anyhow::Result::<_, anyhow::Error>::Ok(())
110 }
111 };
112 cross_chain_message_queue::forward_cross_chain_queries(
113 nickname,
114 cross_chain_max_retries,
115 cross_chain_retry_delay,
116 cross_chain_max_backoff,
117 cross_chain_sender_delay,
118 cross_chain_sender_failure_rate,
119 this_shard,
120 receiver,
121 handle_request,
122 )
123 .await;
124 }
125
126 pub fn spawn(
127 mut self,
128 shutdown_signal: CancellationToken,
129 join_set: &mut JoinSet<()>,
130 ) -> ServerHandle {
131 info!(
132 "Listening to {:?} traffic on {}:{}",
133 self.network.protocol, self.host, self.port
134 );
135 let address = (self.host.clone(), self.port);
136
137 let (cross_chain_sender, cross_chain_receiver) =
138 mpsc::channel(self.cross_chain_config.queue_size);
139
140 let (notification_sender, _) = sync::broadcast::channel(1000);
141
142 {
146 let routing_network = self.network.clone();
147 let routing_sender = cross_chain_sender.clone();
148 self.state = self
149 .state
150 .clone()
151 .with_outbound_cross_chain_sender(Arc::new(move |request| {
152 let shard_id = routing_network.get_shard_id(request.target_chain_id());
153 if let Err(error) = routing_sender.clone().try_send((request, shard_id)) {
154 tracing::error!(%error, "dropping cross-chain request");
155 }
156 }));
157 }
158
159 join_set.spawn_task(Self::forward_cross_chain_queries(
160 self.state.nickname().to_string(),
161 self.network.clone(),
162 self.cross_chain_config.max_retries,
163 Duration::from_millis(self.cross_chain_config.retry_delay_ms),
164 Duration::from_millis(self.cross_chain_config.max_backoff_ms),
165 Duration::from_millis(self.cross_chain_config.sender_delay_ms),
166 self.cross_chain_config.sender_failure_rate,
167 self.shard_id,
168 cross_chain_receiver,
169 ));
170
171 let protocol = self.network.protocol;
172 let state = RunningServerState {
173 server: self,
174 cross_chain_sender,
175 notification_sender,
176 };
177 protocol.spawn_server(address, state, shutdown_signal, join_set)
179 }
180}
181
182#[derive(Clone)]
183struct RunningServerState<S>
184where
185 S: Storage,
186{
187 server: Server<S>,
188 cross_chain_sender: mpsc::Sender<(CrossChainRequest, ShardId)>,
189 notification_sender: sync::broadcast::Sender<Notification>,
190}
191
192#[async_trait]
193impl<S> MessageHandler for RunningServerState<S>
194where
195 S: Storage + Clone + Send + Sync + 'static,
196{
197 #[instrument(
198 target = "simple_server",
199 skip_all,
200 fields(
201 nickname = self.server.state.nickname(),
202 chain_id = ?message.target_chain_id()
203 )
204 )]
205 async fn handle_message(&mut self, message: RpcMessage) -> Option<RpcMessage> {
206 let reply = match message {
207 RpcMessage::BlockProposal(message) => {
208 let (result, actions) = self.server.state.handle_block_proposal(*message).await;
209 self.handle_network_actions(actions);
215 match result {
216 Ok(info) => Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info)))),
217 Err(error) => {
218 self.log_error(&error, "Failed to handle block proposal");
219 Err(error.into())
220 }
221 }
222 }
223 RpcMessage::LiteCertificate(request) => {
224 let (sender, receiver) = request
225 .wait_for_outgoing_messages
226 .then(oneshot::channel)
227 .unzip();
228 match Box::pin(
229 self.server
230 .state
231 .handle_lite_certificate(request.certificate, sender),
232 )
233 .await
234 {
235 Ok((info, actions)) => {
236 self.handle_network_actions(actions);
238 if let Some(receiver) = receiver {
239 if let Err(e) = receiver.await {
240 error!("Failed to wait for message delivery: {e}");
241 }
242 }
243 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
245 }
246 Err(error) => {
247 let nickname = self.server.state.nickname();
248 if let WorkerError::MissingCertificateValue = &error {
249 debug!(nickname, %error, "Failed to handle lite certificate");
250 } else {
251 error!(nickname, %error, "Failed to handle lite certificate");
252 }
253 Err(error.into())
254 }
255 }
256 }
257 RpcMessage::TimeoutCertificate(request) => {
258 match self
259 .server
260 .state
261 .handle_timeout_certificate(request.certificate)
262 .await
263 {
264 Ok((info, actions)) => {
265 self.handle_network_actions(actions);
267 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
269 }
270 Err(error) => {
271 self.log_error(&error, "Failed to handle timeout certificate");
272 Err(error.into())
273 }
274 }
275 }
276 RpcMessage::ValidatedCertificate(request) => {
277 match self
278 .server
279 .state
280 .handle_validated_certificate(request.certificate)
281 .await
282 {
283 Ok((info, actions)) => {
284 self.handle_network_actions(actions);
286 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
288 }
289 Err(error) => {
290 self.log_error(&error, "Failed to handle validated certificate");
291 Err(error.into())
292 }
293 }
294 }
295 RpcMessage::ConfirmedCertificate(request) => {
296 let (sender, receiver) = request
297 .wait_for_outgoing_messages
298 .then(oneshot::channel)
299 .unzip();
300 match self
301 .server
302 .state
303 .handle_confirmed_certificate(
304 request.certificate,
305 ProcessConfirmedBlockMode::Auto,
306 sender,
307 )
308 .await
309 {
310 Ok((info, actions)) => {
311 self.handle_network_actions(actions);
313 if let Some(receiver) = receiver {
314 if let Err(e) = receiver.await {
315 error!("Failed to wait for message delivery: {e}");
316 }
317 }
318 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
320 }
321 Err(error) => {
322 self.log_error(&error, "Failed to handle confirmed certificate");
323 Err(error.into())
324 }
325 }
326 }
327 RpcMessage::ChainInfoQuery(message) => {
328 match self.server.state.handle_chain_info_query(*message).await {
329 Ok(info) => Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info)))),
330 Err(error) => {
331 self.log_error(&error, "Failed to handle chain info query");
332 Err(error.into())
333 }
334 }
335 }
336 RpcMessage::CrossChainRequest(request) => {
337 match self.server.state.handle_cross_chain_request(*request).await {
338 Ok(actions) => {
339 self.handle_network_actions(actions);
340 }
341 Err(error) => {
342 self.log_error(&error, "Failed to handle cross-chain request");
343 }
344 }
345 Ok(None)
347 }
348 RpcMessage::DownloadPendingBlob(request) => {
349 let (chain_id, blob_id) = *request;
350 match self
351 .server
352 .state
353 .download_pending_blob(chain_id, blob_id)
354 .await
355 {
356 Ok(blob) => Ok(Some(RpcMessage::DownloadPendingBlobResponse(Box::new(
357 blob.content().clone(),
358 )))),
359 Err(error) => {
360 self.log_error(&error, "Failed to handle pending blob request");
361 Err(error.into())
362 }
363 }
364 }
365 RpcMessage::HandlePendingBlob(request) => {
366 let (chain_id, blob_content) = *request;
367 match self
368 .server
369 .state
370 .handle_pending_blob(chain_id, Blob::new(blob_content))
371 .await
372 {
373 Ok(info) => Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info)))),
374 Err(error) => {
375 self.log_error(&error, "Failed to handle pending blob");
376 Err(error.into())
377 }
378 }
379 }
380
381 RpcMessage::VersionInfoQuery => {
382 Ok(Some(RpcMessage::VersionInfoResponse(Box::default())))
383 }
384
385 RpcMessage::SubscribeNotifications(_) | RpcMessage::Notification(_) => {
386 Err(NodeError::UnexpectedMessage)
388 }
389
390 RpcMessage::Vote(_)
391 | RpcMessage::Error(_)
392 | RpcMessage::ChainInfoResponse(_)
393 | RpcMessage::VersionInfoResponse(_)
394 | RpcMessage::NetworkDescriptionQuery
395 | RpcMessage::NetworkDescriptionResponse(_)
396 | RpcMessage::ShardInfoQuery(_)
397 | RpcMessage::ShardInfoResponse(_)
398 | RpcMessage::DownloadBlob(_)
399 | RpcMessage::DownloadBlobs(_)
400 | RpcMessage::DownloadBlobResponse(_)
401 | RpcMessage::DownloadPendingBlobResponse(_)
402 | RpcMessage::DownloadConfirmedBlock(_)
403 | RpcMessage::DownloadConfirmedBlockResponse(_)
404 | RpcMessage::BlobLastUsedBy(_)
405 | RpcMessage::BlobLastUsedByResponse(_)
406 | RpcMessage::BlobLastUsedByCertificate(_)
407 | RpcMessage::BlobLastUsedByCertificateResponse(_)
408 | RpcMessage::MissingBlobIds(_)
409 | RpcMessage::MissingBlobIdsResponse(_)
410 | RpcMessage::EventBlockHeights(_)
411 | RpcMessage::EventBlockHeightsResponse(_)
412 | RpcMessage::DownloadCertificates(_)
413 | RpcMessage::DownloadCertificatesResponse(_)
414 | RpcMessage::UploadBlob(_)
415 | RpcMessage::UploadBlobResponse(_)
416 | RpcMessage::DownloadCertificatesByHeights(_, _)
417 | RpcMessage::DownloadCertificatesByHeightsResponse(_) => {
418 Err(NodeError::UnexpectedMessage)
419 }
420 };
421
422 self.server.packets_processed += 1;
423 #[allow(unknown_lints)]
425 #[expect(clippy::manual_is_multiple_of)]
426 if self.server.packets_processed % 5000 == 0 {
427 debug!(
428 "[{}] {}:{} (shard {}) has processed {} packets",
429 self.server.state.nickname(),
430 self.server.host,
431 self.server.port,
432 self.server.shard_id,
433 self.server.packets_processed
434 );
435 }
436
437 match reply {
438 Ok(x) => x,
439 Err(error) => {
440 debug!(
442 "[{}] User query failed: {}",
443 self.server.state.nickname(),
444 error
445 );
446 self.server.user_errors += 1;
447 Some(error.into())
448 }
449 }
450 }
451
452 async fn handle_subscribe(
453 &mut self,
454 chains: Vec<ChainId>,
455 ) -> Option<Pin<Box<dyn Stream<Item = RpcMessage> + Send>>> {
456 RunningServerState::subscribe_to_notifications(self, chains).await
457 }
458}
459
460impl<S> RunningServerState<S>
461where
462 S: Storage + Clone + Send + Sync + 'static,
463{
464 async fn subscribe_to_notifications(
465 &self,
466 chains: Vec<ChainId>,
467 ) -> Option<Pin<Box<dyn Stream<Item = RpcMessage> + Send>>> {
468 let receiver = self.notification_sender.subscribe();
469 let stream = BroadcastStream::new(receiver).filter_map(move |result| {
470 let chains = chains.clone();
471 async move {
472 match result {
473 Ok(notification) if chains.contains(¬ification.chain_id) => {
474 Some(RpcMessage::Notification(Box::new(notification)))
475 }
476 _ => None,
477 }
478 }
479 });
480 Some(Box::pin(stream))
481 }
482}
483
484impl<S> RunningServerState<S>
485where
486 S: Storage + Send,
487{
488 fn handle_network_actions(&mut self, actions: NetworkActions) {
489 for request in actions.cross_chain_requests {
490 let shard_id = self.server.network.get_shard_id(request.target_chain_id());
491 debug!(
492 "[{}] Scheduling cross-chain query: {} -> {}",
493 self.server.state.nickname(),
494 self.server.shard_id,
495 shard_id
496 );
497 if let Err(error) = self.cross_chain_sender.try_send((request, shard_id)) {
498 error!(%error, "dropping cross-chain request");
499 break;
500 }
501 }
502 for notification in actions.notifications {
503 debug!("Scheduling notification query");
504 if let Err(error) = self.notification_sender.send(notification) {
505 debug!(%error, "dropping notification (no receivers)");
506 }
507 }
508 }
509
510 fn log_error(&self, error: &WorkerError, context: &str) {
511 let nickname = self.server.state.nickname();
512 if error.is_local() {
513 error!(nickname, %error, "{}", context);
514 } else {
515 debug!(nickname, %error, "{}", context);
516 }
517 }
518}