1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::{channel::mpsc, lock::Mutex};
8use linera_base::{data_types::Blob, time::Duration};
9use linera_core::{
10 data_types::CrossChainRequest,
11 node::NodeError,
12 worker::{NetworkActions, WorkerError, WorkerState},
13 JoinSetExt as _,
14};
15use linera_storage::Storage;
16use tokio::{sync::oneshot, task::JoinSet};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument};
19
20use super::transport::{MessageHandler, ServerHandle, TransportProtocol};
21use crate::{
22 config::{CrossChainConfig, ShardId, ValidatorInternalNetworkPreConfig},
23 cross_chain_message_queue, RpcMessage,
24};
25
26#[derive(Clone)]
27pub struct Server<S>
28where
29 S: Storage,
30{
31 network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
32 host: String,
33 port: u16,
34 state: WorkerState<S>,
35 shard_id: ShardId,
36 cross_chain_config: CrossChainConfig,
37 packets_processed: u64,
39 user_errors: u64,
40}
41
42impl<S> Server<S>
43where
44 S: Storage,
45{
46 pub fn new(
47 network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
48 host: String,
49 port: u16,
50 state: WorkerState<S>,
51 shard_id: ShardId,
52 cross_chain_config: CrossChainConfig,
53 ) -> Self {
54 Self {
55 network,
56 host,
57 port,
58 state,
59 shard_id,
60 cross_chain_config,
61 packets_processed: 0,
62 user_errors: 0,
63 }
64 }
65
66 pub fn packets_processed(&self) -> u64 {
67 self.packets_processed
68 }
69
70 pub fn user_errors(&self) -> u64 {
71 self.user_errors
72 }
73}
74
75impl<S> Server<S>
76where
77 S: Storage + Clone + Send + Sync + 'static,
78{
79 #[expect(clippy::too_many_arguments)]
80 async fn forward_cross_chain_queries(
81 nickname: String,
82 network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
83 cross_chain_max_retries: u32,
84 cross_chain_retry_delay: Duration,
85 cross_chain_sender_delay: Duration,
86 cross_chain_sender_failure_rate: f32,
87 this_shard: ShardId,
88 receiver: mpsc::Receiver<(CrossChainRequest, ShardId)>,
89 ) {
90 let pool = Arc::new(Mutex::new(
91 network
92 .protocol
93 .make_outgoing_connection_pool()
94 .await
95 .expect("Initialization should not fail"),
96 ));
97 let handle_request = move |shard_id, request| {
98 let pool = pool.clone();
99 let shard = network.shard(shard_id);
100 let remote_address = format!("{}:{}", shard.host, shard.port);
101 let message = RpcMessage::CrossChainRequest(Box::new(request));
102 async move {
103 pool.lock()
104 .await
105 .send_message_to(message.clone(), &remote_address)
106 .await?;
107 anyhow::Result::<_, anyhow::Error>::Ok(())
108 }
109 };
110 cross_chain_message_queue::forward_cross_chain_queries(
111 nickname,
112 cross_chain_max_retries,
113 cross_chain_retry_delay,
114 cross_chain_sender_delay,
115 cross_chain_sender_failure_rate,
116 this_shard,
117 receiver,
118 handle_request,
119 )
120 .await;
121 }
122
123 pub fn spawn(
124 self,
125 shutdown_signal: CancellationToken,
126 join_set: &mut JoinSet<()>,
127 ) -> ServerHandle {
128 info!(
129 "Listening to {:?} traffic on {}:{}",
130 self.network.protocol, self.host, self.port
131 );
132 let address = (self.host.clone(), self.port);
133
134 let (cross_chain_sender, cross_chain_receiver) =
135 mpsc::channel(self.cross_chain_config.queue_size);
136
137 join_set.spawn_task(Self::forward_cross_chain_queries(
138 self.state.nickname().to_string(),
139 self.network.clone(),
140 self.cross_chain_config.max_retries,
141 Duration::from_millis(self.cross_chain_config.retry_delay_ms),
142 Duration::from_millis(self.cross_chain_config.sender_delay_ms),
143 self.cross_chain_config.sender_failure_rate,
144 self.shard_id,
145 cross_chain_receiver,
146 ));
147
148 let protocol = self.network.protocol;
149 let state = RunningServerState {
150 server: self,
151 cross_chain_sender,
152 };
153 protocol.spawn_server(address, state, shutdown_signal, join_set)
155 }
156}
157
158#[derive(Clone)]
159struct RunningServerState<S>
160where
161 S: Storage,
162{
163 server: Server<S>,
164 cross_chain_sender: mpsc::Sender<(CrossChainRequest, ShardId)>,
165}
166
167#[async_trait]
168impl<S> MessageHandler for RunningServerState<S>
169where
170 S: Storage + Clone + Send + Sync + 'static,
171{
172 #[instrument(
173 target = "simple_server",
174 skip_all,
175 fields(
176 nickname = self.server.state.nickname(),
177 chain_id = ?message.target_chain_id()
178 )
179 )]
180 async fn handle_message(&mut self, message: RpcMessage) -> Option<RpcMessage> {
181 let reply = match message {
182 RpcMessage::BlockProposal(message) => {
183 match self.server.state.handle_block_proposal(*message).await {
184 Ok((info, actions)) => {
185 self.handle_network_actions(actions);
187 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
189 }
190 Err(error) => {
191 self.log_error(&error, "Failed to handle block proposal");
192 Err(error.into())
193 }
194 }
195 }
196 RpcMessage::LiteCertificate(request) => {
197 let (sender, receiver) = request
198 .wait_for_outgoing_messages
199 .then(oneshot::channel)
200 .unzip();
201 match Box::pin(
202 self.server
203 .state
204 .handle_lite_certificate(request.certificate, sender),
205 )
206 .await
207 {
208 Ok((info, actions)) => {
209 self.handle_network_actions(actions);
211 if let Some(receiver) = receiver {
212 if let Err(e) = receiver.await {
213 error!("Failed to wait for message delivery: {e}");
214 }
215 }
216 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
218 }
219 Err(error) => {
220 let nickname = self.server.state.nickname();
221 if let WorkerError::MissingCertificateValue = &error {
222 debug!(nickname, %error, "Failed to handle lite certificate");
223 } else {
224 error!(nickname, %error, "Failed to handle lite certificate");
225 }
226 Err(error.into())
227 }
228 }
229 }
230 RpcMessage::TimeoutCertificate(request) => {
231 match self
232 .server
233 .state
234 .handle_timeout_certificate(request.certificate)
235 .await
236 {
237 Ok((info, actions)) => {
238 self.handle_network_actions(actions);
240 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
242 }
243 Err(error) => {
244 self.log_error(&error, "Failed to handle timeout certificate");
245 Err(error.into())
246 }
247 }
248 }
249 RpcMessage::ValidatedCertificate(request) => {
250 match self
251 .server
252 .state
253 .handle_validated_certificate(request.certificate)
254 .await
255 {
256 Ok((info, actions)) => {
257 self.handle_network_actions(actions);
259 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
261 }
262 Err(error) => {
263 self.log_error(&error, "Failed to handle validated certificate");
264 Err(error.into())
265 }
266 }
267 }
268 RpcMessage::ConfirmedCertificate(request) => {
269 let (sender, receiver) = request
270 .wait_for_outgoing_messages
271 .then(oneshot::channel)
272 .unzip();
273 match self
274 .server
275 .state
276 .handle_confirmed_certificate(request.certificate, sender)
277 .await
278 {
279 Ok((info, actions)) => {
280 self.handle_network_actions(actions);
282 if let Some(receiver) = receiver {
283 if let Err(e) = receiver.await {
284 error!("Failed to wait for message delivery: {e}");
285 }
286 }
287 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
289 }
290 Err(error) => {
291 self.log_error(&error, "Failed to handle confirmed certificate");
292 Err(error.into())
293 }
294 }
295 }
296 RpcMessage::ChainInfoQuery(message) => {
297 match self.server.state.handle_chain_info_query(*message).await {
298 Ok((info, actions)) => {
299 self.handle_network_actions(actions);
301 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
303 }
304 Err(error) => {
305 self.log_error(&error, "Failed to handle chain info query");
306 Err(error.into())
307 }
308 }
309 }
310 RpcMessage::CrossChainRequest(request) => {
311 match self.server.state.handle_cross_chain_request(*request).await {
312 Ok(actions) => {
313 self.handle_network_actions(actions);
314 }
315 Err(error) => {
316 self.log_error(&error, "Failed to handle cross-chain request");
317 }
318 }
319 Ok(None)
321 }
322 RpcMessage::DownloadPendingBlob(request) => {
323 let (chain_id, blob_id) = *request;
324 match self
325 .server
326 .state
327 .download_pending_blob(chain_id, blob_id)
328 .await
329 {
330 Ok(blob) => Ok(Some(RpcMessage::DownloadPendingBlobResponse(Box::new(
331 blob.into(),
332 )))),
333 Err(error) => {
334 self.log_error(&error, "Failed to handle pending blob request");
335 Err(error.into())
336 }
337 }
338 }
339 RpcMessage::HandlePendingBlob(request) => {
340 let (chain_id, blob_content) = *request;
341 match self
342 .server
343 .state
344 .handle_pending_blob(chain_id, Blob::new(blob_content))
345 .await
346 {
347 Ok(info) => Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info)))),
348 Err(error) => {
349 self.log_error(&error, "Failed to handle pending blob");
350 Err(error.into())
351 }
352 }
353 }
354
355 RpcMessage::VersionInfoQuery => {
356 Ok(Some(RpcMessage::VersionInfoResponse(Box::default())))
357 }
358
359 RpcMessage::Vote(_)
360 | RpcMessage::Error(_)
361 | RpcMessage::ChainInfoResponse(_)
362 | RpcMessage::VersionInfoResponse(_)
363 | RpcMessage::NetworkDescriptionQuery
364 | RpcMessage::NetworkDescriptionResponse(_)
365 | RpcMessage::ShardInfoQuery(_)
366 | RpcMessage::ShardInfoResponse(_)
367 | RpcMessage::DownloadBlob(_)
368 | RpcMessage::DownloadBlobResponse(_)
369 | RpcMessage::DownloadPendingBlobResponse(_)
370 | RpcMessage::DownloadConfirmedBlock(_)
371 | RpcMessage::DownloadConfirmedBlockResponse(_)
372 | RpcMessage::BlobLastUsedBy(_)
373 | RpcMessage::BlobLastUsedByResponse(_)
374 | RpcMessage::BlobLastUsedByCertificate(_)
375 | RpcMessage::BlobLastUsedByCertificateResponse(_)
376 | RpcMessage::MissingBlobIds(_)
377 | RpcMessage::MissingBlobIdsResponse(_)
378 | RpcMessage::DownloadCertificates(_)
379 | RpcMessage::DownloadCertificatesResponse(_)
380 | RpcMessage::UploadBlob(_)
381 | RpcMessage::UploadBlobResponse(_)
382 | RpcMessage::DownloadCertificatesByHeights(_, _)
383 | RpcMessage::DownloadCertificatesByHeightsResponse(_) => {
384 Err(NodeError::UnexpectedMessage)
385 }
386 };
387
388 self.server.packets_processed += 1;
389 if self.server.packets_processed % 5000 == 0 {
390 debug!(
391 "[{}] {}:{} (shard {}) has processed {} packets",
392 self.server.state.nickname(),
393 self.server.host,
394 self.server.port,
395 self.server.shard_id,
396 self.server.packets_processed
397 );
398 }
399
400 match reply {
401 Ok(x) => x,
402 Err(error) => {
403 debug!(
405 "[{}] User query failed: {}",
406 self.server.state.nickname(),
407 error
408 );
409 self.server.user_errors += 1;
410 Some(error.into())
411 }
412 }
413 }
414}
415
416impl<S> RunningServerState<S>
417where
418 S: Storage + Send,
419{
420 fn handle_network_actions(&mut self, actions: NetworkActions) {
421 for request in actions.cross_chain_requests {
422 let shard_id = self.server.network.get_shard_id(request.target_chain_id());
423 debug!(
424 "[{}] Scheduling cross-chain query: {} -> {}",
425 self.server.state.nickname(),
426 self.server.shard_id,
427 shard_id
428 );
429 if let Err(error) = self.cross_chain_sender.try_send((request, shard_id)) {
430 error!(%error, "dropping cross-chain request");
431 break;
432 }
433 }
434 }
435
436 fn log_error(&self, error: &WorkerError, context: &str) {
437 let nickname = self.server.state.nickname();
438 if error.is_local() {
439 error!(nickname, %error, "{}", context);
440 } else {
441 debug!(nickname, %error, "{}", context);
442 }
443 }
444}