1use std::{
7 collections::{BTreeMap, HashSet},
8 fmt,
9 sync::{Arc, RwLock},
10};
11
12use custom_debug_derive::Debug;
13use linera_base::{
14 crypto::{CryptoHash, ValidatorPublicKey},
15 data_types::{ApplicationDescription, Blob, BlockHeight, Epoch, Timestamp},
16 hashed::Hashed,
17 identifiers::{ApplicationId, BlobId, ChainId},
18};
19use linera_chain::{
20 data_types::{BlockProposal, MessageBundle, ProposedBlock},
21 types::{Block, ConfirmedBlockCertificate, TimeoutCertificate, ValidatedBlockCertificate},
22 ChainStateView,
23};
24use linera_execution::{
25 ExecutionStateView, Query, QueryContext, QueryOutcome, ServiceRuntimeEndpoint,
26 ServiceSyncRuntime,
27};
28use linera_storage::Storage;
29use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
30use tracing::{debug, instrument, trace, warn, Instrument as _};
31
32use super::{config::ChainWorkerConfig, state::ChainWorkerState, DeliveryNotifier};
33use crate::{
34 data_types::{ChainInfoQuery, ChainInfoResponse},
35 value_cache::ValueCache,
36 worker::{NetworkActions, WorkerError},
37};
38
39#[derive(Debug)]
41pub enum ChainWorkerRequest<Context>
42where
43 Context: linera_views::context::Context + Clone + Send + Sync + 'static,
44{
45 #[cfg(with_testing)]
47 ReadCertificate {
48 height: BlockHeight,
49 #[debug(skip)]
50 callback: oneshot::Sender<Result<Option<ConfirmedBlockCertificate>, WorkerError>>,
51 },
52
53 #[cfg(with_testing)]
55 FindBundleInInbox {
56 inbox_id: ChainId,
57 certificate_hash: CryptoHash,
58 height: BlockHeight,
59 index: u32,
60 #[debug(skip)]
61 callback: oneshot::Sender<Result<Option<MessageBundle>, WorkerError>>,
62 },
63
64 GetChainStateView {
66 #[debug(skip)]
67 callback:
68 oneshot::Sender<Result<OwnedRwLockReadGuard<ChainStateView<Context>>, WorkerError>>,
69 },
70
71 QueryApplication {
73 query: Query,
74 #[debug(skip)]
75 callback: oneshot::Sender<Result<QueryOutcome, WorkerError>>,
76 },
77
78 DescribeApplication {
80 application_id: ApplicationId,
81 #[debug(skip)]
82 callback: oneshot::Sender<Result<ApplicationDescription, WorkerError>>,
83 },
84
85 StageBlockExecution {
87 block: ProposedBlock,
88 round: Option<u32>,
89 published_blobs: Vec<Blob>,
90 #[debug(skip)]
91 callback: oneshot::Sender<Result<(Block, ChainInfoResponse), WorkerError>>,
92 },
93
94 ProcessTimeout {
96 certificate: TimeoutCertificate,
97 #[debug(skip)]
98 callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions), WorkerError>>,
99 },
100
101 HandleBlockProposal {
103 proposal: BlockProposal,
104 #[debug(skip)]
105 callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions), WorkerError>>,
106 },
107
108 ProcessValidatedBlock {
110 certificate: ValidatedBlockCertificate,
111 #[debug(skip)]
112 callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions, bool), WorkerError>>,
113 },
114
115 ProcessConfirmedBlock {
117 certificate: ConfirmedBlockCertificate,
118 #[debug(with = "elide_option")]
119 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
120 #[debug(skip)]
121 callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions), WorkerError>>,
122 },
123
124 PreprocessCertificate {
126 certificate: ConfirmedBlockCertificate,
127 #[debug(skip)]
128 callback: oneshot::Sender<Result<NetworkActions, WorkerError>>,
129 },
130
131 ProcessCrossChainUpdate {
133 origin: ChainId,
134 bundles: Vec<(Epoch, MessageBundle)>,
135 #[debug(skip)]
136 callback: oneshot::Sender<Result<Option<BlockHeight>, WorkerError>>,
137 },
138
139 ConfirmUpdatedRecipient {
141 recipient: ChainId,
142 latest_height: BlockHeight,
143 #[debug(skip)]
144 callback: oneshot::Sender<Result<(), WorkerError>>,
145 },
146
147 HandleChainInfoQuery {
149 query: ChainInfoQuery,
150 #[debug(skip)]
151 callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions), WorkerError>>,
152 },
153
154 DownloadPendingBlob {
156 blob_id: BlobId,
157 #[debug(skip)]
158 callback: oneshot::Sender<Result<Blob, WorkerError>>,
159 },
160
161 HandlePendingBlob {
163 blob: Blob,
164 #[debug(skip)]
165 callback: oneshot::Sender<Result<ChainInfoResponse, WorkerError>>,
166 },
167
168 UpdateReceivedCertificateTrackers {
170 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
171 callback: oneshot::Sender<Result<(), WorkerError>>,
172 },
173}
174
175pub struct ChainWorkerActor<StorageClient>
177where
178 StorageClient: Storage + Clone + Send + Sync + 'static,
179{
180 worker: ChainWorkerState<StorageClient>,
181 service_runtime_thread: Option<linera_base::task::Blocking>,
182}
183
184impl<StorageClient> ChainWorkerActor<StorageClient>
185where
186 StorageClient: Storage + Clone + Send + Sync + 'static,
187{
188 #[expect(clippy::too_many_arguments)]
194 pub async fn run(
195 config: ChainWorkerConfig,
196 storage: StorageClient,
197 block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
198 execution_state_cache: Arc<
199 ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>,
200 >,
201 tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
202 delivery_notifier: DeliveryNotifier,
203 chain_id: ChainId,
204 mut incoming_requests: mpsc::UnboundedReceiver<(
205 ChainWorkerRequest<StorageClient::Context>,
206 tracing::Span,
207 )>,
208 ) {
209 let actor = loop {
210 let load_result = Self::load(
211 config.clone(),
212 storage.clone(),
213 block_cache.clone(),
214 execution_state_cache.clone(),
215 tracked_chains.clone(),
216 delivery_notifier.clone(),
217 chain_id,
218 )
219 .await
220 .inspect_err(|error| warn!("Failed to load chain state: {error:?}"));
221
222 match load_result {
223 Ok(actor) => break actor,
224 Err(error) => match incoming_requests.recv().await {
225 Some((request, _span)) => request.send_error(error),
226 None => return,
227 },
228 }
229 };
230
231 actor.handle_requests(incoming_requests).await;
232 }
233
234 pub async fn load(
237 config: ChainWorkerConfig,
238 storage: StorageClient,
239 block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
240 execution_state_cache: Arc<
241 ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>,
242 >,
243 tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
244 delivery_notifier: DeliveryNotifier,
245 chain_id: ChainId,
246 ) -> Result<Self, WorkerError> {
247 let (service_runtime_thread, service_runtime_endpoint) = {
248 if config.long_lived_services {
249 let (thread, endpoint) = Self::spawn_service_runtime_actor(chain_id).await;
250 (Some(thread), Some(endpoint))
251 } else {
252 (None, None)
253 }
254 };
255
256 let worker = ChainWorkerState::load(
257 config,
258 storage,
259 block_cache,
260 execution_state_cache,
261 tracked_chains,
262 delivery_notifier,
263 chain_id,
264 service_runtime_endpoint,
265 )
266 .await?;
267
268 Ok(ChainWorkerActor {
269 worker,
270 service_runtime_thread,
271 })
272 }
273
274 async fn spawn_service_runtime_actor(
278 chain_id: ChainId,
279 ) -> (linera_base::task::Blocking, ServiceRuntimeEndpoint) {
280 let context = QueryContext {
281 chain_id,
282 next_block_height: BlockHeight(0),
283 local_time: Timestamp::from(0),
284 };
285
286 let (execution_state_sender, incoming_execution_requests) =
287 futures::channel::mpsc::unbounded();
288 let (runtime_request_sender, runtime_request_receiver) = std::sync::mpsc::channel();
289
290 let service_runtime_thread = linera_base::task::Blocking::spawn(move |_| async move {
291 ServiceSyncRuntime::new(execution_state_sender, context).run(runtime_request_receiver)
292 })
293 .await;
294
295 let endpoint = ServiceRuntimeEndpoint {
296 incoming_execution_requests,
297 runtime_request_sender,
298 };
299 (service_runtime_thread, endpoint)
300 }
301
302 #[instrument(
304 name = "ChainWorkerActor::handle_requests",
305 skip_all,
306 fields(chain_id = format!("{:.8}", self.worker.chain_id())),
307 )]
308 pub async fn handle_requests(
309 mut self,
310 mut incoming_requests: mpsc::UnboundedReceiver<(
311 ChainWorkerRequest<StorageClient::Context>,
312 tracing::Span,
313 )>,
314 ) {
315 trace!("Starting `ChainWorkerActor`");
316
317 while let Some((request, span)) = incoming_requests.recv().await {
318 Box::pin(self.handle_request(request).instrument(span)).await;
319 }
320
321 if let Some(thread) = self.service_runtime_thread {
322 drop(self.worker);
323 thread.join().await
324 }
325
326 trace!("`ChainWorkerActor` finished");
327 }
328
329 #[instrument(skip(self, request))]
331 pub async fn handle_request(&mut self, request: ChainWorkerRequest<StorageClient::Context>) {
332 let responded = match request {
334 #[cfg(with_testing)]
335 ChainWorkerRequest::ReadCertificate { height, callback } => callback
336 .send(self.worker.read_certificate(height).await)
337 .is_ok(),
338 #[cfg(with_testing)]
339 ChainWorkerRequest::FindBundleInInbox {
340 inbox_id,
341 certificate_hash,
342 height,
343 index,
344 callback,
345 } => callback
346 .send(
347 self.worker
348 .find_bundle_in_inbox(inbox_id, certificate_hash, height, index)
349 .await,
350 )
351 .is_ok(),
352 ChainWorkerRequest::GetChainStateView { callback } => {
353 callback.send(self.worker.chain_state_view().await).is_ok()
354 }
355 ChainWorkerRequest::QueryApplication { query, callback } => callback
356 .send(self.worker.query_application(query).await)
357 .is_ok(),
358 ChainWorkerRequest::DescribeApplication {
359 application_id,
360 callback,
361 } => callback
362 .send(self.worker.describe_application(application_id).await)
363 .is_ok(),
364 ChainWorkerRequest::StageBlockExecution {
365 block,
366 round,
367 published_blobs,
368 callback,
369 } => callback
370 .send(
371 self.worker
372 .stage_block_execution(block, round, &published_blobs)
373 .await,
374 )
375 .is_ok(),
376 ChainWorkerRequest::ProcessTimeout {
377 certificate,
378 callback,
379 } => callback
380 .send(self.worker.process_timeout(certificate).await)
381 .is_ok(),
382 ChainWorkerRequest::HandleBlockProposal { proposal, callback } => callback
383 .send(self.worker.handle_block_proposal(proposal).await)
384 .is_ok(),
385 ChainWorkerRequest::ProcessValidatedBlock {
386 certificate,
387 callback,
388 } => callback
389 .send(self.worker.process_validated_block(certificate).await)
390 .is_ok(),
391 ChainWorkerRequest::ProcessConfirmedBlock {
392 certificate,
393 notify_when_messages_are_delivered,
394 callback,
395 } => callback
396 .send(
397 self.worker
398 .process_confirmed_block(certificate, notify_when_messages_are_delivered)
399 .await,
400 )
401 .is_ok(),
402 ChainWorkerRequest::PreprocessCertificate {
403 certificate,
404 callback,
405 } => callback
406 .send(self.worker.preprocess_certificate(certificate).await)
407 .is_ok(),
408 ChainWorkerRequest::ProcessCrossChainUpdate {
409 origin,
410 bundles,
411 callback,
412 } => callback
413 .send(
414 self.worker
415 .process_cross_chain_update(origin, bundles)
416 .await,
417 )
418 .is_ok(),
419 ChainWorkerRequest::ConfirmUpdatedRecipient {
420 recipient,
421 latest_height,
422 callback,
423 } => callback
424 .send(
425 self.worker
426 .confirm_updated_recipient(recipient, latest_height)
427 .await,
428 )
429 .is_ok(),
430 ChainWorkerRequest::HandleChainInfoQuery { query, callback } => callback
431 .send(self.worker.handle_chain_info_query(query).await)
432 .is_ok(),
433 ChainWorkerRequest::DownloadPendingBlob { blob_id, callback } => callback
434 .send(self.worker.download_pending_blob(blob_id).await)
435 .is_ok(),
436 ChainWorkerRequest::HandlePendingBlob { blob, callback } => callback
437 .send(self.worker.handle_pending_blob(blob).await)
438 .is_ok(),
439 ChainWorkerRequest::UpdateReceivedCertificateTrackers {
440 new_trackers,
441 callback,
442 } => callback
443 .send(
444 self.worker
445 .update_received_certificate_trackers(new_trackers)
446 .await,
447 )
448 .is_ok(),
449 };
450
451 if !responded {
452 warn!("Callback for `ChainWorkerActor` was dropped before a response was sent");
453 }
454 }
455}
456
457impl<Context> ChainWorkerRequest<Context>
458where
459 Context: linera_views::context::Context + Clone + Send + Sync + 'static,
460{
461 pub fn send_error(self, error: WorkerError) {
463 debug!("Immediately sending error to chain worker request {self:?}");
464
465 let responded = match self {
466 #[cfg(with_testing)]
467 ChainWorkerRequest::ReadCertificate { callback, .. } => {
468 callback.send(Err(error)).is_ok()
469 }
470 #[cfg(with_testing)]
471 ChainWorkerRequest::FindBundleInInbox { callback, .. } => {
472 callback.send(Err(error)).is_ok()
473 }
474 ChainWorkerRequest::GetChainStateView { callback } => callback.send(Err(error)).is_ok(),
475 ChainWorkerRequest::QueryApplication { callback, .. } => {
476 callback.send(Err(error)).is_ok()
477 }
478 ChainWorkerRequest::DescribeApplication { callback, .. } => {
479 callback.send(Err(error)).is_ok()
480 }
481 ChainWorkerRequest::StageBlockExecution { callback, .. } => {
482 callback.send(Err(error)).is_ok()
483 }
484 ChainWorkerRequest::ProcessTimeout { callback, .. } => {
485 callback.send(Err(error)).is_ok()
486 }
487 ChainWorkerRequest::HandleBlockProposal { callback, .. } => {
488 callback.send(Err(error)).is_ok()
489 }
490 ChainWorkerRequest::ProcessValidatedBlock { callback, .. } => {
491 callback.send(Err(error)).is_ok()
492 }
493 ChainWorkerRequest::ProcessConfirmedBlock { callback, .. } => {
494 callback.send(Err(error)).is_ok()
495 }
496 ChainWorkerRequest::PreprocessCertificate { callback, .. } => {
497 callback.send(Err(error)).is_ok()
498 }
499 ChainWorkerRequest::ProcessCrossChainUpdate { callback, .. } => {
500 callback.send(Err(error)).is_ok()
501 }
502 ChainWorkerRequest::ConfirmUpdatedRecipient { callback, .. } => {
503 callback.send(Err(error)).is_ok()
504 }
505 ChainWorkerRequest::HandleChainInfoQuery { callback, .. } => {
506 callback.send(Err(error)).is_ok()
507 }
508 ChainWorkerRequest::DownloadPendingBlob { callback, .. } => {
509 callback.send(Err(error)).is_ok()
510 }
511 ChainWorkerRequest::HandlePendingBlob { callback, .. } => {
512 callback.send(Err(error)).is_ok()
513 }
514 ChainWorkerRequest::UpdateReceivedCertificateTrackers { callback, .. } => {
515 callback.send(Err(error)).is_ok()
516 }
517 };
518
519 if !responded {
520 warn!("Callback for `ChainWorkerActor` was dropped before a response was sent");
521 }
522 }
523}
524
525fn elide_option<T>(option: &Option<T>, f: &mut fmt::Formatter) -> fmt::Result {
527 match option {
528 Some(_) => write!(f, "Some(..)"),
529 None => write!(f, "None"),
530 }
531}