1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::{channel::mpsc, lock::Mutex};
8use linera_base::{data_types::Blob, time::Duration};
9use linera_core::{
10 data_types::CrossChainRequest,
11 node::NodeError,
12 worker::{NetworkActions, WorkerError, WorkerState},
13 JoinSetExt as _,
14};
15use linera_storage::Storage;
16use tokio::{sync::oneshot, task::JoinSet};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument};
19
20use super::transport::{MessageHandler, ServerHandle, TransportProtocol};
21use crate::{
22 config::{CrossChainConfig, ShardId, ValidatorInternalNetworkPreConfig},
23 cross_chain_message_queue, RpcMessage,
24};
25
26#[derive(Clone)]
27pub struct Server<S>
28where
29 S: Storage,
30{
31 network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
32 host: String,
33 port: u16,
34 state: WorkerState<S>,
35 shard_id: ShardId,
36 cross_chain_config: CrossChainConfig,
37 packets_processed: u64,
39 user_errors: u64,
40}
41
42impl<S> Server<S>
43where
44 S: Storage,
45{
46 pub fn new(
47 network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
48 host: String,
49 port: u16,
50 state: WorkerState<S>,
51 shard_id: ShardId,
52 cross_chain_config: CrossChainConfig,
53 ) -> Self {
54 Self {
55 network,
56 host,
57 port,
58 state,
59 shard_id,
60 cross_chain_config,
61 packets_processed: 0,
62 user_errors: 0,
63 }
64 }
65
66 pub fn packets_processed(&self) -> u64 {
67 self.packets_processed
68 }
69
70 pub fn user_errors(&self) -> u64 {
71 self.user_errors
72 }
73}
74
75impl<S> Server<S>
76where
77 S: Storage + Clone + Send + Sync + 'static,
78{
79 #[expect(clippy::too_many_arguments)]
80 async fn forward_cross_chain_queries(
81 nickname: String,
82 network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
83 cross_chain_max_retries: u32,
84 cross_chain_retry_delay: Duration,
85 cross_chain_max_backoff: Duration,
86 cross_chain_sender_delay: Duration,
87 cross_chain_sender_failure_rate: f32,
88 this_shard: ShardId,
89 receiver: mpsc::Receiver<(CrossChainRequest, ShardId)>,
90 ) {
91 let pool = Arc::new(Mutex::new(
92 network
93 .protocol
94 .make_outgoing_connection_pool()
95 .await
96 .expect("Initialization should not fail"),
97 ));
98 let handle_request = move |shard_id, request| {
99 let pool = pool.clone();
100 let shard = network.shard(shard_id);
101 let remote_address = format!("{}:{}", shard.host, shard.port);
102 let message = RpcMessage::CrossChainRequest(Box::new(request));
103 async move {
104 pool.lock()
105 .await
106 .send_message_to(message.clone(), &remote_address)
107 .await?;
108 anyhow::Result::<_, anyhow::Error>::Ok(())
109 }
110 };
111 cross_chain_message_queue::forward_cross_chain_queries(
112 nickname,
113 cross_chain_max_retries,
114 cross_chain_retry_delay,
115 cross_chain_max_backoff,
116 cross_chain_sender_delay,
117 cross_chain_sender_failure_rate,
118 this_shard,
119 receiver,
120 handle_request,
121 )
122 .await;
123 }
124
125 pub fn spawn(
126 self,
127 shutdown_signal: CancellationToken,
128 join_set: &mut JoinSet<()>,
129 ) -> ServerHandle {
130 info!(
131 "Listening to {:?} traffic on {}:{}",
132 self.network.protocol, self.host, self.port
133 );
134 let address = (self.host.clone(), self.port);
135
136 let (cross_chain_sender, cross_chain_receiver) =
137 mpsc::channel(self.cross_chain_config.queue_size);
138
139 join_set.spawn_task(Self::forward_cross_chain_queries(
140 self.state.nickname().to_string(),
141 self.network.clone(),
142 self.cross_chain_config.max_retries,
143 Duration::from_millis(self.cross_chain_config.retry_delay_ms),
144 Duration::from_millis(self.cross_chain_config.max_backoff_ms),
145 Duration::from_millis(self.cross_chain_config.sender_delay_ms),
146 self.cross_chain_config.sender_failure_rate,
147 self.shard_id,
148 cross_chain_receiver,
149 ));
150
151 let protocol = self.network.protocol;
152 let state = RunningServerState {
153 server: self,
154 cross_chain_sender,
155 };
156 protocol.spawn_server(address, state, shutdown_signal, join_set)
158 }
159}
160
161#[derive(Clone)]
162struct RunningServerState<S>
163where
164 S: Storage,
165{
166 server: Server<S>,
167 cross_chain_sender: mpsc::Sender<(CrossChainRequest, ShardId)>,
168}
169
170#[async_trait]
171impl<S> MessageHandler for RunningServerState<S>
172where
173 S: Storage + Clone + Send + Sync + 'static,
174{
175 #[instrument(
176 target = "simple_server",
177 skip_all,
178 fields(
179 nickname = self.server.state.nickname(),
180 chain_id = ?message.target_chain_id()
181 )
182 )]
183 async fn handle_message(&mut self, message: RpcMessage) -> Option<RpcMessage> {
184 let reply = match message {
185 RpcMessage::BlockProposal(message) => {
186 match self.server.state.handle_block_proposal(*message).await {
187 Ok((info, actions)) => {
188 self.handle_network_actions(actions);
190 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
192 }
193 Err(error) => {
194 self.log_error(&error, "Failed to handle block proposal");
195 Err(error.into())
196 }
197 }
198 }
199 RpcMessage::LiteCertificate(request) => {
200 let (sender, receiver) = request
201 .wait_for_outgoing_messages
202 .then(oneshot::channel)
203 .unzip();
204 match Box::pin(
205 self.server
206 .state
207 .handle_lite_certificate(request.certificate, sender),
208 )
209 .await
210 {
211 Ok((info, actions)) => {
212 self.handle_network_actions(actions);
214 if let Some(receiver) = receiver {
215 if let Err(e) = receiver.await {
216 error!("Failed to wait for message delivery: {e}");
217 }
218 }
219 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
221 }
222 Err(error) => {
223 let nickname = self.server.state.nickname();
224 if let WorkerError::MissingCertificateValue = &error {
225 debug!(nickname, %error, "Failed to handle lite certificate");
226 } else {
227 error!(nickname, %error, "Failed to handle lite certificate");
228 }
229 Err(error.into())
230 }
231 }
232 }
233 RpcMessage::TimeoutCertificate(request) => {
234 match self
235 .server
236 .state
237 .handle_timeout_certificate(request.certificate)
238 .await
239 {
240 Ok((info, actions)) => {
241 self.handle_network_actions(actions);
243 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
245 }
246 Err(error) => {
247 self.log_error(&error, "Failed to handle timeout certificate");
248 Err(error.into())
249 }
250 }
251 }
252 RpcMessage::ValidatedCertificate(request) => {
253 match self
254 .server
255 .state
256 .handle_validated_certificate(request.certificate)
257 .await
258 {
259 Ok((info, actions)) => {
260 self.handle_network_actions(actions);
262 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
264 }
265 Err(error) => {
266 self.log_error(&error, "Failed to handle validated certificate");
267 Err(error.into())
268 }
269 }
270 }
271 RpcMessage::ConfirmedCertificate(request) => {
272 let (sender, receiver) = request
273 .wait_for_outgoing_messages
274 .then(oneshot::channel)
275 .unzip();
276 match self
277 .server
278 .state
279 .handle_confirmed_certificate(request.certificate, sender)
280 .await
281 {
282 Ok((info, actions)) => {
283 self.handle_network_actions(actions);
285 if let Some(receiver) = receiver {
286 if let Err(e) = receiver.await {
287 error!("Failed to wait for message delivery: {e}");
288 }
289 }
290 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
292 }
293 Err(error) => {
294 self.log_error(&error, "Failed to handle confirmed certificate");
295 Err(error.into())
296 }
297 }
298 }
299 RpcMessage::ChainInfoQuery(message) => {
300 match self.server.state.handle_chain_info_query(*message).await {
301 Ok((info, actions)) => {
302 self.handle_network_actions(actions);
304 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
306 }
307 Err(error) => {
308 self.log_error(&error, "Failed to handle chain info query");
309 Err(error.into())
310 }
311 }
312 }
313 RpcMessage::CrossChainRequest(request) => {
314 match self.server.state.handle_cross_chain_request(*request).await {
315 Ok(actions) => {
316 self.handle_network_actions(actions);
317 }
318 Err(error) => {
319 self.log_error(&error, "Failed to handle cross-chain request");
320 }
321 }
322 Ok(None)
324 }
325 RpcMessage::DownloadPendingBlob(request) => {
326 let (chain_id, blob_id) = *request;
327 match self
328 .server
329 .state
330 .download_pending_blob(chain_id, blob_id)
331 .await
332 {
333 Ok(blob) => Ok(Some(RpcMessage::DownloadPendingBlobResponse(Box::new(
334 blob.into(),
335 )))),
336 Err(error) => {
337 self.log_error(&error, "Failed to handle pending blob request");
338 Err(error.into())
339 }
340 }
341 }
342 RpcMessage::HandlePendingBlob(request) => {
343 let (chain_id, blob_content) = *request;
344 match self
345 .server
346 .state
347 .handle_pending_blob(chain_id, Blob::new(blob_content))
348 .await
349 {
350 Ok(info) => Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info)))),
351 Err(error) => {
352 self.log_error(&error, "Failed to handle pending blob");
353 Err(error.into())
354 }
355 }
356 }
357
358 RpcMessage::VersionInfoQuery => {
359 Ok(Some(RpcMessage::VersionInfoResponse(Box::default())))
360 }
361
362 RpcMessage::Vote(_)
363 | RpcMessage::Error(_)
364 | RpcMessage::ChainInfoResponse(_)
365 | RpcMessage::VersionInfoResponse(_)
366 | RpcMessage::NetworkDescriptionQuery
367 | RpcMessage::NetworkDescriptionResponse(_)
368 | RpcMessage::ShardInfoQuery(_)
369 | RpcMessage::ShardInfoResponse(_)
370 | RpcMessage::DownloadBlob(_)
371 | RpcMessage::DownloadBlobResponse(_)
372 | RpcMessage::DownloadPendingBlobResponse(_)
373 | RpcMessage::DownloadConfirmedBlock(_)
374 | RpcMessage::DownloadConfirmedBlockResponse(_)
375 | RpcMessage::BlobLastUsedBy(_)
376 | RpcMessage::BlobLastUsedByResponse(_)
377 | RpcMessage::BlobLastUsedByCertificate(_)
378 | RpcMessage::BlobLastUsedByCertificateResponse(_)
379 | RpcMessage::MissingBlobIds(_)
380 | RpcMessage::MissingBlobIdsResponse(_)
381 | RpcMessage::DownloadCertificates(_)
382 | RpcMessage::DownloadCertificatesResponse(_)
383 | RpcMessage::UploadBlob(_)
384 | RpcMessage::UploadBlobResponse(_)
385 | RpcMessage::DownloadCertificatesByHeights(_, _)
386 | RpcMessage::DownloadCertificatesByHeightsResponse(_) => {
387 Err(NodeError::UnexpectedMessage)
388 }
389 };
390
391 self.server.packets_processed += 1;
392 #[allow(unknown_lints)]
394 #[allow(clippy::manual_is_multiple_of)]
395 if self.server.packets_processed % 5000 == 0 {
396 debug!(
397 "[{}] {}:{} (shard {}) has processed {} packets",
398 self.server.state.nickname(),
399 self.server.host,
400 self.server.port,
401 self.server.shard_id,
402 self.server.packets_processed
403 );
404 }
405
406 match reply {
407 Ok(x) => x,
408 Err(error) => {
409 debug!(
411 "[{}] User query failed: {}",
412 self.server.state.nickname(),
413 error
414 );
415 self.server.user_errors += 1;
416 Some(error.into())
417 }
418 }
419 }
420}
421
422impl<S> RunningServerState<S>
423where
424 S: Storage + Send,
425{
426 fn handle_network_actions(&mut self, actions: NetworkActions) {
427 for request in actions.cross_chain_requests {
428 let shard_id = self.server.network.get_shard_id(request.target_chain_id());
429 debug!(
430 "[{}] Scheduling cross-chain query: {} -> {}",
431 self.server.state.nickname(),
432 self.server.shard_id,
433 shard_id
434 );
435 if let Err(error) = self.cross_chain_sender.try_send((request, shard_id)) {
436 error!(%error, "dropping cross-chain request");
437 break;
438 }
439 }
440 }
441
442 fn log_error(&self, error: &WorkerError, context: &str) {
443 let nickname = self.server.state.nickname();
444 if error.is_local() {
445 error!(nickname, %error, "{}", context);
446 } else {
447 debug!(nickname, %error, "{}", context);
448 }
449 }
450}