1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::{channel::mpsc, lock::Mutex};
8use linera_base::{data_types::Blob, time::Duration};
9use linera_core::{
10 data_types::CrossChainRequest,
11 node::NodeError,
12 worker::{NetworkActions, WorkerError, WorkerState},
13 JoinSetExt as _,
14};
15use linera_storage::Storage;
16use tokio::{sync::oneshot, task::JoinSet};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument, warn};
19
20use super::transport::{MessageHandler, ServerHandle, TransportProtocol};
21use crate::{
22 config::{CrossChainConfig, ShardId, ValidatorInternalNetworkPreConfig},
23 cross_chain_message_queue, RpcMessage,
24};
25
26#[derive(Clone)]
27pub struct Server<S>
28where
29 S: Storage,
30{
31 network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
32 host: String,
33 port: u16,
34 state: WorkerState<S>,
35 shard_id: ShardId,
36 cross_chain_config: CrossChainConfig,
37 packets_processed: u64,
39 user_errors: u64,
40}
41
42impl<S> Server<S>
43where
44 S: Storage,
45{
46 pub fn new(
47 network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
48 host: String,
49 port: u16,
50 state: WorkerState<S>,
51 shard_id: ShardId,
52 cross_chain_config: CrossChainConfig,
53 ) -> Self {
54 Self {
55 network,
56 host,
57 port,
58 state,
59 shard_id,
60 cross_chain_config,
61 packets_processed: 0,
62 user_errors: 0,
63 }
64 }
65
66 pub fn packets_processed(&self) -> u64 {
67 self.packets_processed
68 }
69
70 pub fn user_errors(&self) -> u64 {
71 self.user_errors
72 }
73}
74
75impl<S> Server<S>
76where
77 S: Storage + Clone + Send + Sync + 'static,
78{
79 #[expect(clippy::too_many_arguments)]
80 async fn forward_cross_chain_queries(
81 nickname: String,
82 network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
83 cross_chain_max_retries: u32,
84 cross_chain_retry_delay: Duration,
85 cross_chain_sender_delay: Duration,
86 cross_chain_sender_failure_rate: f32,
87 this_shard: ShardId,
88 receiver: mpsc::Receiver<(CrossChainRequest, ShardId)>,
89 ) {
90 let pool = Arc::new(Mutex::new(
91 network
92 .protocol
93 .make_outgoing_connection_pool()
94 .await
95 .expect("Initialization should not fail"),
96 ));
97 let handle_request = move |shard_id, request| {
98 let pool = pool.clone();
99 let shard = network.shard(shard_id);
100 let remote_address = format!("{}:{}", shard.host, shard.port);
101 let message = RpcMessage::CrossChainRequest(Box::new(request));
102 async move {
103 pool.lock()
104 .await
105 .send_message_to(message.clone(), &remote_address)
106 .await?;
107 anyhow::Result::<_, anyhow::Error>::Ok(())
108 }
109 };
110 cross_chain_message_queue::forward_cross_chain_queries(
111 nickname,
112 cross_chain_max_retries,
113 cross_chain_retry_delay,
114 cross_chain_sender_delay,
115 cross_chain_sender_failure_rate,
116 this_shard,
117 receiver,
118 handle_request,
119 )
120 .await;
121 }
122
123 pub fn spawn(
124 self,
125 shutdown_signal: CancellationToken,
126 join_set: &mut JoinSet<()>,
127 ) -> ServerHandle {
128 info!(
129 "Listening to {:?} traffic on {}:{}",
130 self.network.protocol, self.host, self.port
131 );
132 let address = (self.host.clone(), self.port);
133
134 let (cross_chain_sender, cross_chain_receiver) =
135 mpsc::channel(self.cross_chain_config.queue_size);
136
137 join_set.spawn_task(Self::forward_cross_chain_queries(
138 self.state.nickname().to_string(),
139 self.network.clone(),
140 self.cross_chain_config.max_retries,
141 Duration::from_millis(self.cross_chain_config.retry_delay_ms),
142 Duration::from_millis(self.cross_chain_config.sender_delay_ms),
143 self.cross_chain_config.sender_failure_rate,
144 self.shard_id,
145 cross_chain_receiver,
146 ));
147
148 let protocol = self.network.protocol;
149 let state = RunningServerState {
150 server: self,
151 cross_chain_sender,
152 };
153 protocol.spawn_server(address, state, shutdown_signal, join_set)
155 }
156}
157
158#[derive(Clone)]
159struct RunningServerState<S>
160where
161 S: Storage,
162{
163 server: Server<S>,
164 cross_chain_sender: mpsc::Sender<(CrossChainRequest, ShardId)>,
165}
166
167#[async_trait]
168impl<S> MessageHandler for RunningServerState<S>
169where
170 S: Storage + Clone + Send + Sync + 'static,
171{
172 #[instrument(
173 target = "simple_server",
174 skip_all,
175 fields(
176 nickname = self.server.state.nickname(),
177 chain_id = ?message.target_chain_id()
178 )
179 )]
180 async fn handle_message(&mut self, message: RpcMessage) -> Option<RpcMessage> {
181 let reply = match message {
182 RpcMessage::BlockProposal(message) => {
183 match self.server.state.handle_block_proposal(*message).await {
184 Ok((info, actions)) => {
185 self.handle_network_actions(actions);
187 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
189 }
190 Err(error) => {
191 let nickname = self.server.state.nickname();
192 warn!(nickname, %error, "Failed to handle block proposal");
193 Err(error.into())
194 }
195 }
196 }
197 RpcMessage::LiteCertificate(request) => {
198 let (sender, receiver) = request
199 .wait_for_outgoing_messages
200 .then(oneshot::channel)
201 .unzip();
202 match Box::pin(
203 self.server
204 .state
205 .handle_lite_certificate(request.certificate, sender),
206 )
207 .await
208 {
209 Ok((info, actions)) => {
210 self.handle_network_actions(actions);
212 if let Some(receiver) = receiver {
213 if let Err(e) = receiver.await {
214 error!("Failed to wait for message delivery: {e}");
215 }
216 }
217 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
219 }
220 Err(error) => {
221 let nickname = self.server.state.nickname();
222 if let WorkerError::MissingCertificateValue = &error {
223 debug!(nickname, %error, "Failed to handle lite certificate");
224 } else {
225 error!(nickname, %error, "Failed to handle lite certificate");
226 }
227 Err(error.into())
228 }
229 }
230 }
231 RpcMessage::TimeoutCertificate(request) => {
232 match self
233 .server
234 .state
235 .handle_timeout_certificate(request.certificate)
236 .await
237 {
238 Ok((info, actions)) => {
239 self.handle_network_actions(actions);
241 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
243 }
244 Err(error) => {
245 let nickname = self.server.state.nickname();
246 error!(nickname, %error, "Failed to handle timeout certificate");
247 Err(error.into())
248 }
249 }
250 }
251 RpcMessage::ValidatedCertificate(request) => {
252 match self
253 .server
254 .state
255 .handle_validated_certificate(request.certificate)
256 .await
257 {
258 Ok((info, actions)) => {
259 self.handle_network_actions(actions);
261 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
263 }
264 Err(error) => {
265 error!(
266 nickname = self.server.state.nickname(), %error,
267 "Failed to handle validated certificate"
268 );
269 Err(error.into())
270 }
271 }
272 }
273 RpcMessage::ConfirmedCertificate(request) => {
274 let (sender, receiver) = request
275 .wait_for_outgoing_messages
276 .then(oneshot::channel)
277 .unzip();
278 match self
279 .server
280 .state
281 .handle_confirmed_certificate(request.certificate, sender)
282 .await
283 {
284 Ok((info, actions)) => {
285 self.handle_network_actions(actions);
287 if let Some(receiver) = receiver {
288 if let Err(e) = receiver.await {
289 error!("Failed to wait for message delivery: {e}");
290 }
291 }
292 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
294 }
295 Err(error) => {
296 let nickname = self.server.state.nickname();
297 error!(nickname, %error, "Failed to handle confirmed certificate");
298 Err(error.into())
299 }
300 }
301 }
302 RpcMessage::ChainInfoQuery(message) => {
303 match self.server.state.handle_chain_info_query(*message).await {
304 Ok((info, actions)) => {
305 self.handle_network_actions(actions);
307 Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info))))
309 }
310 Err(error) => {
311 let nickname = self.server.state.nickname();
312 error!(nickname, %error, "Failed to handle chain info query");
313 Err(error.into())
314 }
315 }
316 }
317 RpcMessage::CrossChainRequest(request) => {
318 match self.server.state.handle_cross_chain_request(*request).await {
319 Ok(actions) => {
320 self.handle_network_actions(actions);
321 }
322 Err(error) => {
323 let nickname = self.server.state.nickname();
324 error!(nickname, %error, "Failed to handle cross-chain request");
325 }
326 }
327 Ok(None)
329 }
330 RpcMessage::DownloadPendingBlob(request) => {
331 let (chain_id, blob_id) = *request;
332 match self
333 .server
334 .state
335 .download_pending_blob(chain_id, blob_id)
336 .await
337 {
338 Ok(blob) => Ok(Some(RpcMessage::DownloadPendingBlobResponse(Box::new(
339 blob.into(),
340 )))),
341 Err(error) => {
342 let nickname = self.server.state.nickname();
343 error!(nickname, %error, "Failed to handle pending blob request");
344 Err(error.into())
345 }
346 }
347 }
348 RpcMessage::HandlePendingBlob(request) => {
349 let (chain_id, blob_content) = *request;
350 match self
351 .server
352 .state
353 .handle_pending_blob(chain_id, Blob::new(blob_content))
354 .await
355 {
356 Ok(info) => Ok(Some(RpcMessage::ChainInfoResponse(Box::new(info)))),
357 Err(error) => {
358 let nickname = self.server.state.nickname();
359 error!(nickname, %error, "Failed to handle pending blob");
360 Err(error.into())
361 }
362 }
363 }
364
365 RpcMessage::VersionInfoQuery => {
366 Ok(Some(RpcMessage::VersionInfoResponse(Box::default())))
367 }
368
369 RpcMessage::Vote(_)
370 | RpcMessage::Error(_)
371 | RpcMessage::ChainInfoResponse(_)
372 | RpcMessage::VersionInfoResponse(_)
373 | RpcMessage::NetworkDescriptionQuery
374 | RpcMessage::NetworkDescriptionResponse(_)
375 | RpcMessage::DownloadBlob(_)
376 | RpcMessage::DownloadBlobResponse(_)
377 | RpcMessage::DownloadPendingBlobResponse(_)
378 | RpcMessage::DownloadConfirmedBlock(_)
379 | RpcMessage::DownloadConfirmedBlockResponse(_)
380 | RpcMessage::BlobLastUsedBy(_)
381 | RpcMessage::BlobLastUsedByResponse(_)
382 | RpcMessage::MissingBlobIds(_)
383 | RpcMessage::MissingBlobIdsResponse(_)
384 | RpcMessage::DownloadCertificates(_)
385 | RpcMessage::DownloadCertificatesResponse(_)
386 | RpcMessage::UploadBlob(_)
387 | RpcMessage::UploadBlobResponse(_) => Err(NodeError::UnexpectedMessage),
388 };
389
390 self.server.packets_processed += 1;
391 if self.server.packets_processed % 5000 == 0 {
392 debug!(
393 "[{}] {}:{} (shard {}) has processed {} packets",
394 self.server.state.nickname(),
395 self.server.host,
396 self.server.port,
397 self.server.shard_id,
398 self.server.packets_processed
399 );
400 }
401
402 match reply {
403 Ok(x) => x,
404 Err(error) => {
405 debug!(
407 "[{}] User query failed: {}",
408 self.server.state.nickname(),
409 error
410 );
411 self.server.user_errors += 1;
412 Some(error.into())
413 }
414 }
415 }
416}
417
418impl<S> RunningServerState<S>
419where
420 S: Storage + Send,
421{
422 fn handle_network_actions(&mut self, actions: NetworkActions) {
423 for request in actions.cross_chain_requests {
424 let shard_id = self.server.network.get_shard_id(request.target_chain_id());
425 debug!(
426 "[{}] Scheduling cross-chain query: {} -> {}",
427 self.server.state.nickname(),
428 self.server.shard_id,
429 shard_id
430 );
431 if let Err(error) = self.cross_chain_sender.try_send((request, shard_id)) {
432 error!(%error, "dropping cross-chain request");
433 break;
434 }
435 }
436 }
437}