1use std::{pin::Pin, sync::Arc};
5
6use async_trait::async_trait;
7use futures::{channel::mpsc, lock::Mutex, Stream, StreamExt as _};
8use linera_base::{data_types::Blob, identifiers::ChainId, time::Duration};
9use linera_core::{
10 data_types::CrossChainRequest,
11 node::NodeError,
12 worker::{NetworkActions, Notification, WorkerError, WorkerState},
13 JoinSetExt as _,
14};
15use linera_storage::Storage;
16use tokio::{sync, sync::oneshot, task::JoinSet};
17use tokio_stream::wrappers::BroadcastStream;
18use tokio_util::sync::CancellationToken;
19use tracing::{debug, error, info, instrument};
20
21use super::transport::{MessageHandler, ServerHandle, TransportProtocol};
22use crate::{
23 config::{CrossChainConfig, ShardId, ValidatorInternalNetworkPreConfig},
24 cross_chain_message_queue, RpcMessage,
25};
26
27#[derive(Clone)]
28pub struct Server<S>
29where
30 S: Storage,
31{
32 network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
33 host: String,
34 port: u16,
35 state: WorkerState<S>,
36 shard_id: ShardId,
37 cross_chain_config: CrossChainConfig,
38 packets_processed: u64,
40 user_errors: u64,
41}
42
43impl<S> Server<S>
44where
45 S: Storage,
46{
47 pub fn new(
48 network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
49 host: String,
50 port: u16,
51 state: WorkerState<S>,
52 shard_id: ShardId,
53 cross_chain_config: CrossChainConfig,
54 ) -> Self {
55 Self {
56 network,
57 host,
58 port,
59 state,
60 shard_id,
61 cross_chain_config,
62 packets_processed: 0,
63 user_errors: 0,
64 }
65 }
66
67 pub fn packets_processed(&self) -> u64 {
68 self.packets_processed
69 }
70
71 pub fn user_errors(&self) -> u64 {
72 self.user_errors
73 }
74}
75
76impl<S> Server<S>
77where
78 S: Storage + Clone + Send + Sync + 'static,
79{
80 #[expect(clippy::too_many_arguments)]
81 async fn forward_cross_chain_queries(
82 nickname: String,
83 network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
84 cross_chain_max_retries: u32,
85 cross_chain_retry_delay: Duration,
86 cross_chain_max_backoff: Duration,
87 cross_chain_sender_delay: Duration,
88 cross_chain_sender_failure_rate: f32,
89 this_shard: ShardId,
90 receiver: mpsc::Receiver<(CrossChainRequest, ShardId)>,
91 ) {
92 let pool = Arc::new(Mutex::new(
93 network
94 .protocol
95 .make_outgoing_connection_pool()
96 .await
97 .expect("Initialization should not fail"),
98 ));
99 let handle_request = move |shard_id, request| {
100 let pool = pool.clone();
101 let shard = network.shard(shard_id);
102 let remote_address = format!("{}:{}", shard.host, shard.port);
103 let message = RpcMessage::CrossChainRequest(Box::new(request));
104 async move {
105 pool.lock()
106 .await
107 .send_message_to(message.clone(), &remote_address)
108 .await?;
109 anyhow::Result::<_, anyhow::Error>::Ok(())
110 }
111 };
112 cross_chain_message_queue::forward_cross_chain_queries(
113 nickname,
114 cross_chain_max_retries,
115 cross_chain_retry_delay,
116 cross_chain_max_backoff,
117 cross_chain_sender_delay,
118 cross_chain_sender_failure_rate,
119 this_shard,
120 receiver,
121 handle_request,
122 )
123 .await;
124 }
125
126 pub fn spawn(
127 mut self,
128 shutdown_signal: CancellationToken,
129 join_set: &mut JoinSet<()>,
130 ) -> ServerHandle {
131 info!(
132 "Listening to {:?} traffic on {}:{}",
133 self.network.protocol, self.host, self.port
134 );
135 let address = (self.host.clone(), self.port);
136
137 let (cross_chain_sender, cross_chain_receiver) =
138 mpsc::channel(self.cross_chain_config.queue_size);
139
140 let (notification_sender, _) = sync::broadcast::channel(1000);
141
142 {
146 let routing_network = self.network.clone();
147 let routing_sender = cross_chain_sender.clone();
148 self.state = self
149 .state
150 .clone()
151 .with_outbound_cross_chain_sender(Arc::new(move |request| {
152 let shard_id = routing_network.get_shard_id(request.target_chain_id());
153 if let Err(error) = routing_sender.clone().try_send((request, shard_id)) {
154 tracing::error!(%error, "dropping cross-chain request");
155 }
156 }));
157 }
158
159 join_set.spawn_task(Self::forward_cross_chain_queries(
160 self.state.nickname().to_string(),
161 self.network.clone(),
162 self.cross_chain_config.max_retries,
163 Duration::from_millis(self.cross_chain_config.retry_delay_ms),
164 Duration::from_millis(self.cross_chain_config.max_backoff_ms),
165 Duration::from_millis(self.cross_chain_config.sender_delay_ms),
166 self.cross_chain_config.sender_failure_rate,
167 self.shard_id,
168 cross_chain_receiver,
169 ));
170
171 let protocol = self.network.protocol;
172 let state = RunningServerState {
173 server: self,
174 cross_chain_sender,
175 notification_sender,
176 };
177 protocol.spawn_server(address, state, shutdown_signal, join_set)
179 }
180}
181
182#[derive(Clone)]
183struct RunningServerState<S>
184where
185 S: Storage,
186{
187 server: Server<S>,
188 cross_chain_sender: mpsc::Sender<(CrossChainRequest, ShardId)>,
189 notification_sender: sync::broadcast::Sender<Notification>,
190}
191
192#[async_trait]
193impl<S> MessageHandler for RunningServerState<S>
194where
195 S: Storage + Clone + Send + Sync + 'static,
196{
197 #[instrument(
198 target = "simple_server",
199 skip_all,
200 fields(
201 nickname = self.server.state.nickname(),
202 chain_id = ?message.target_chain_id()
203 )
204 )]
205 async fn handle_message(&mut self, message: RpcMessage) -> Option<RpcMessage> {
206 let reply = match message {
207 RpcMessage::BlockProposal(message) => {
208 match self.server.state.handle_block_proposal(*message).await {
209 Ok((info, actions)) => {
210 self.handle_network_actions(actions);
212 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
214 }
215 Err(error) => {
216 self.log_error(&error, "Failed to handle block proposal");
217 Err(error.into())
218 }
219 }
220 }
221 RpcMessage::LiteCertificate(request) => {
222 let (sender, receiver) = request
223 .wait_for_outgoing_messages
224 .then(oneshot::channel)
225 .unzip();
226 match Box::pin(
227 self.server
228 .state
229 .handle_lite_certificate(request.certificate, sender),
230 )
231 .await
232 {
233 Ok((info, actions)) => {
234 self.handle_network_actions(actions);
236 if let Some(receiver) = receiver {
237 if let Err(e) = receiver.await {
238 error!("Failed to wait for message delivery: {e}");
239 }
240 }
241 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
243 }
244 Err(error) => {
245 let nickname = self.server.state.nickname();
246 if let WorkerError::MissingCertificateValue = &error {
247 debug!(nickname, %error, "Failed to handle lite certificate");
248 } else {
249 error!(nickname, %error, "Failed to handle lite certificate");
250 }
251 Err(error.into())
252 }
253 }
254 }
255 RpcMessage::TimeoutCertificate(request) => {
256 match self
257 .server
258 .state
259 .handle_timeout_certificate(request.certificate)
260 .await
261 {
262 Ok((info, actions)) => {
263 self.handle_network_actions(actions);
265 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
267 }
268 Err(error) => {
269 self.log_error(&error, "Failed to handle timeout certificate");
270 Err(error.into())
271 }
272 }
273 }
274 RpcMessage::ValidatedCertificate(request) => {
275 match self
276 .server
277 .state
278 .handle_validated_certificate(request.certificate)
279 .await
280 {
281 Ok((info, actions)) => {
282 self.handle_network_actions(actions);
284 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
286 }
287 Err(error) => {
288 self.log_error(&error, "Failed to handle validated certificate");
289 Err(error.into())
290 }
291 }
292 }
293 RpcMessage::ConfirmedCertificate(request) => {
294 let (sender, receiver) = request
295 .wait_for_outgoing_messages
296 .then(oneshot::channel)
297 .unzip();
298 match self
299 .server
300 .state
301 .handle_confirmed_certificate(request.certificate, sender)
302 .await
303 {
304 Ok((info, actions)) => {
305 self.handle_network_actions(actions);
307 if let Some(receiver) = receiver {
308 if let Err(e) = receiver.await {
309 error!("Failed to wait for message delivery: {e}");
310 }
311 }
312 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
314 }
315 Err(error) => {
316 self.log_error(&error, "Failed to handle confirmed certificate");
317 Err(error.into())
318 }
319 }
320 }
321 RpcMessage::ChainInfoQuery(message) => {
322 match self.server.state.handle_chain_info_query(*message).await {
323 Ok(info) => Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info)))),
324 Err(error) => {
325 self.log_error(&error, "Failed to handle chain info query");
326 Err(error.into())
327 }
328 }
329 }
330 RpcMessage::CrossChainRequest(request) => {
331 match self.server.state.handle_cross_chain_request(*request).await {
332 Ok(actions) => {
333 self.handle_network_actions(actions);
334 }
335 Err(error) => {
336 self.log_error(&error, "Failed to handle cross-chain request");
337 }
338 }
339 Ok(None)
341 }
342 RpcMessage::DownloadPendingBlob(request) => {
343 let (chain_id, blob_id) = *request;
344 match self
345 .server
346 .state
347 .download_pending_blob(chain_id, blob_id)
348 .await
349 {
350 Ok(blob) => Ok(Some(RpcMessage::DownloadPendingBlobResponse(Box::new(
351 blob.content().clone(),
352 )))),
353 Err(error) => {
354 self.log_error(&error, "Failed to handle pending blob request");
355 Err(error.into())
356 }
357 }
358 }
359 RpcMessage::HandlePendingBlob(request) => {
360 let (chain_id, blob_content) = *request;
361 match self
362 .server
363 .state
364 .handle_pending_blob(chain_id, Blob::new(blob_content))
365 .await
366 {
367 Ok(info) => Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info)))),
368 Err(error) => {
369 self.log_error(&error, "Failed to handle pending blob");
370 Err(error.into())
371 }
372 }
373 }
374
375 RpcMessage::VersionInfoQuery => {
376 Ok(Some(RpcMessage::VersionInfoResponse(Box::default())))
377 }
378
379 RpcMessage::SubscribeNotifications(_) | RpcMessage::Notification(_) => {
380 Err(NodeError::UnexpectedMessage)
382 }
383
384 RpcMessage::Vote(_)
385 | RpcMessage::Error(_)
386 | RpcMessage::ChainInfoResponse(_)
387 | RpcMessage::VersionInfoResponse(_)
388 | RpcMessage::NetworkDescriptionQuery
389 | RpcMessage::NetworkDescriptionResponse(_)
390 | RpcMessage::ShardInfoQuery(_)
391 | RpcMessage::ShardInfoResponse(_)
392 | RpcMessage::DownloadBlob(_)
393 | RpcMessage::DownloadBlobs(_)
394 | RpcMessage::DownloadBlobResponse(_)
395 | RpcMessage::DownloadPendingBlobResponse(_)
396 | RpcMessage::DownloadConfirmedBlock(_)
397 | RpcMessage::DownloadConfirmedBlockResponse(_)
398 | RpcMessage::BlobLastUsedBy(_)
399 | RpcMessage::BlobLastUsedByResponse(_)
400 | RpcMessage::BlobLastUsedByCertificate(_)
401 | RpcMessage::BlobLastUsedByCertificateResponse(_)
402 | RpcMessage::MissingBlobIds(_)
403 | RpcMessage::MissingBlobIdsResponse(_)
404 | RpcMessage::EventBlockHeights(_)
405 | RpcMessage::EventBlockHeightsResponse(_)
406 | RpcMessage::DownloadCertificates(_)
407 | RpcMessage::DownloadCertificatesResponse(_)
408 | RpcMessage::UploadBlob(_)
409 | RpcMessage::UploadBlobResponse(_)
410 | RpcMessage::DownloadCertificatesByHeights(_, _)
411 | RpcMessage::DownloadCertificatesByHeightsResponse(_) => {
412 Err(NodeError::UnexpectedMessage)
413 }
414 };
415
416 self.server.packets_processed += 1;
417 #[allow(unknown_lints)]
419 #[expect(clippy::manual_is_multiple_of)]
420 if self.server.packets_processed % 5000 == 0 {
421 debug!(
422 "[{}] {}:{} (shard {}) has processed {} packets",
423 self.server.state.nickname(),
424 self.server.host,
425 self.server.port,
426 self.server.shard_id,
427 self.server.packets_processed
428 );
429 }
430
431 match reply {
432 Ok(x) => x,
433 Err(error) => {
434 debug!(
436 "[{}] User query failed: {}",
437 self.server.state.nickname(),
438 error
439 );
440 self.server.user_errors += 1;
441 Some(error.into())
442 }
443 }
444 }
445
446 async fn handle_subscribe(
447 &mut self,
448 chains: Vec<ChainId>,
449 ) -> Option<Pin<Box<dyn Stream<Item = RpcMessage> + Send>>> {
450 RunningServerState::subscribe_to_notifications(self, chains).await
451 }
452}
453
454impl<S> RunningServerState<S>
455where
456 S: Storage + Clone + Send + Sync + 'static,
457{
458 async fn subscribe_to_notifications(
459 &self,
460 chains: Vec<ChainId>,
461 ) -> Option<Pin<Box<dyn Stream<Item = RpcMessage> + Send>>> {
462 let receiver = self.notification_sender.subscribe();
463 let stream = BroadcastStream::new(receiver).filter_map(move |result| {
464 let chains = chains.clone();
465 async move {
466 match result {
467 Ok(notification) if chains.contains(¬ification.chain_id) => {
468 Some(RpcMessage::Notification(Box::new(notification)))
469 }
470 _ => None,
471 }
472 }
473 });
474 Some(Box::pin(stream))
475 }
476}
477
478impl<S> RunningServerState<S>
479where
480 S: Storage + Send,
481{
482 fn handle_network_actions(&mut self, actions: NetworkActions) {
483 for request in actions.cross_chain_requests {
484 let shard_id = self.server.network.get_shard_id(request.target_chain_id());
485 debug!(
486 "[{}] Scheduling cross-chain query: {} -> {}",
487 self.server.state.nickname(),
488 self.server.shard_id,
489 shard_id
490 );
491 if let Err(error) = self.cross_chain_sender.try_send((request, shard_id)) {
492 error!(%error, "dropping cross-chain request");
493 break;
494 }
495 }
496 for notification in actions.notifications {
497 debug!("Scheduling notification query");
498 if let Err(error) = self.notification_sender.send(notification) {
499 debug!(%error, "dropping notification (no receivers)");
500 }
501 }
502 }
503
504 fn log_error(&self, error: &WorkerError, context: &str) {
505 let nickname = self.server.state.nickname();
506 if error.is_local() {
507 error!(nickname, %error, "{}", context);
508 } else {
509 debug!(nickname, %error, "{}", context);
510 }
511 }
512}