1use std::{
7 collections::{BTreeMap, HashSet},
8 fmt,
9 sync::{self, Arc, RwLock},
10};
11
12use custom_debug_derive::Debug;
13use futures::FutureExt;
14use linera_base::{
15 crypto::{CryptoHash, ValidatorPublicKey},
16 data_types::{ApplicationDescription, Blob, BlockHeight, Epoch, TimeDelta, Timestamp},
17 hashed::Hashed,
18 identifiers::{ApplicationId, BlobId, ChainId},
19};
20use linera_chain::{
21 data_types::{BlockProposal, MessageBundle, ProposedBlock},
22 types::{Block, ConfirmedBlockCertificate, TimeoutCertificate, ValidatedBlockCertificate},
23 ChainStateView,
24};
25use linera_execution::{
26 ExecutionStateView, Query, QueryContext, QueryOutcome, ServiceRuntimeEndpoint,
27 ServiceSyncRuntime,
28};
29use linera_storage::{Clock as _, Storage};
30use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
31use tracing::{debug, instrument, trace, warn, Instrument as _};
32
33use super::{config::ChainWorkerConfig, state::ChainWorkerState, DeliveryNotifier};
34use crate::{
35 data_types::{ChainInfoQuery, ChainInfoResponse},
36 value_cache::ValueCache,
37 worker::{NetworkActions, WorkerError},
38};
39
40#[derive(Debug)]
42pub enum ChainWorkerRequest<Context>
43where
44 Context: linera_views::context::Context + Clone + Send + Sync + 'static,
45{
46 #[cfg(with_testing)]
48 ReadCertificate {
49 height: BlockHeight,
50 #[debug(skip)]
51 callback: oneshot::Sender<Result<Option<ConfirmedBlockCertificate>, WorkerError>>,
52 },
53
54 #[cfg(with_testing)]
56 FindBundleInInbox {
57 inbox_id: ChainId,
58 certificate_hash: CryptoHash,
59 height: BlockHeight,
60 index: u32,
61 #[debug(skip)]
62 callback: oneshot::Sender<Result<Option<MessageBundle>, WorkerError>>,
63 },
64
65 GetChainStateView {
67 #[debug(skip)]
68 callback:
69 oneshot::Sender<Result<OwnedRwLockReadGuard<ChainStateView<Context>>, WorkerError>>,
70 },
71
72 QueryApplication {
74 query: Query,
75 #[debug(skip)]
76 callback: oneshot::Sender<Result<QueryOutcome, WorkerError>>,
77 },
78
79 DescribeApplication {
81 application_id: ApplicationId,
82 #[debug(skip)]
83 callback: oneshot::Sender<Result<ApplicationDescription, WorkerError>>,
84 },
85
86 StageBlockExecution {
88 block: ProposedBlock,
89 round: Option<u32>,
90 published_blobs: Vec<Blob>,
91 #[debug(skip)]
92 callback: oneshot::Sender<Result<(Block, ChainInfoResponse), WorkerError>>,
93 },
94
95 ProcessTimeout {
97 certificate: TimeoutCertificate,
98 #[debug(skip)]
99 callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions), WorkerError>>,
100 },
101
102 HandleBlockProposal {
104 proposal: BlockProposal,
105 #[debug(skip)]
106 callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions), WorkerError>>,
107 },
108
109 ProcessValidatedBlock {
111 certificate: ValidatedBlockCertificate,
112 #[debug(skip)]
113 callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions, bool), WorkerError>>,
114 },
115
116 ProcessConfirmedBlock {
118 certificate: ConfirmedBlockCertificate,
119 #[debug(with = "elide_option")]
120 notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
121 #[debug(skip)]
122 callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions), WorkerError>>,
123 },
124
125 ProcessCrossChainUpdate {
127 origin: ChainId,
128 bundles: Vec<(Epoch, MessageBundle)>,
129 #[debug(skip)]
130 callback: oneshot::Sender<Result<Option<BlockHeight>, WorkerError>>,
131 },
132
133 ConfirmUpdatedRecipient {
135 recipient: ChainId,
136 latest_height: BlockHeight,
137 #[debug(skip)]
138 callback: oneshot::Sender<Result<(), WorkerError>>,
139 },
140
141 HandleChainInfoQuery {
143 query: ChainInfoQuery,
144 #[debug(skip)]
145 callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions), WorkerError>>,
146 },
147
148 DownloadPendingBlob {
150 blob_id: BlobId,
151 #[debug(skip)]
152 callback: oneshot::Sender<Result<Blob, WorkerError>>,
153 },
154
155 HandlePendingBlob {
157 blob: Blob,
158 #[debug(skip)]
159 callback: oneshot::Sender<Result<ChainInfoResponse, WorkerError>>,
160 },
161
162 UpdateReceivedCertificateTrackers {
164 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
165 callback: oneshot::Sender<Result<(), WorkerError>>,
166 },
167}
168
169pub struct ChainWorkerActor<StorageClient>
171where
172 StorageClient: Storage + Clone + Send + Sync + 'static,
173{
174 chain_id: ChainId,
175 config: ChainWorkerConfig,
176 storage: StorageClient,
177 block_values: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
178 execution_state_cache: Arc<ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>>,
179 tracked_chains: Option<Arc<sync::RwLock<HashSet<ChainId>>>>,
180 delivery_notifier: DeliveryNotifier,
181}
182
183impl<StorageClient> ChainWorkerActor<StorageClient>
184where
185 StorageClient: Storage + Clone + Send + Sync + 'static,
186{
187 #[expect(clippy::too_many_arguments)]
193 pub async fn run(
194 config: ChainWorkerConfig,
195 storage: StorageClient,
196 block_cache: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
197 execution_state_cache: Arc<
198 ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>,
199 >,
200 tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
201 delivery_notifier: DeliveryNotifier,
202 chain_id: ChainId,
203 mut incoming_requests: mpsc::UnboundedReceiver<(
204 ChainWorkerRequest<StorageClient::Context>,
205 tracing::Span,
206 )>,
207 ) {
208 let actor = loop {
209 let load_result = Self::load(
210 config.clone(),
211 storage.clone(),
212 block_cache.clone(),
213 execution_state_cache.clone(),
214 tracked_chains.clone(),
215 delivery_notifier.clone(),
216 chain_id,
217 )
218 .await
219 .inspect_err(|error| warn!("Failed to load chain state: {error:?}"));
220
221 match load_result {
222 Ok(actor) => break actor,
223 Err(error) => match incoming_requests.recv().await {
224 Some((request, _span)) => request.send_error(error),
225 None => return,
226 },
227 }
228 };
229
230 if let Err(err) = actor.handle_requests(incoming_requests).await {
231 tracing::error!("Chain actor error: {err}");
232 }
233 }
234
235 pub async fn load(
238 config: ChainWorkerConfig,
239 storage: StorageClient,
240 block_values: Arc<ValueCache<CryptoHash, Hashed<Block>>>,
241 execution_state_cache: Arc<
242 ValueCache<CryptoHash, ExecutionStateView<StorageClient::Context>>,
243 >,
244 tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
245 delivery_notifier: DeliveryNotifier,
246 chain_id: ChainId,
247 ) -> Result<Self, WorkerError> {
248 Ok(ChainWorkerActor {
249 config,
250 storage,
251 block_values,
252 execution_state_cache,
253 tracked_chains,
254 delivery_notifier,
255 chain_id,
256 })
257 }
258
259 async fn spawn_service_runtime_actor(
263 chain_id: ChainId,
264 ) -> (linera_base::task::Blocking, ServiceRuntimeEndpoint) {
265 let context = QueryContext {
266 chain_id,
267 next_block_height: BlockHeight(0),
268 local_time: Timestamp::from(0),
269 };
270
271 let (execution_state_sender, incoming_execution_requests) =
272 futures::channel::mpsc::unbounded();
273 let (runtime_request_sender, runtime_request_receiver) = std::sync::mpsc::channel();
274
275 let service_runtime_thread = linera_base::task::Blocking::spawn(move |_| async move {
276 ServiceSyncRuntime::new(execution_state_sender, context).run(runtime_request_receiver)
277 })
278 .await;
279
280 let endpoint = ServiceRuntimeEndpoint {
281 incoming_execution_requests,
282 runtime_request_sender,
283 };
284 (service_runtime_thread, endpoint)
285 }
286
287 pub(super) async fn sleep_until_timeout(&self) {
289 let now = self.storage.clock().current_time();
290 let ttl =
291 TimeDelta::from_micros(u64::try_from(self.config.ttl.as_micros()).unwrap_or(u64::MAX));
292 let timeout = now.saturating_add(ttl);
293 self.storage.clock().sleep_until(timeout).await
294 }
295
296 #[instrument(
298 skip_all,
299 fields(chain_id = format!("{:.8}", self.chain_id)),
300 )]
301 pub async fn handle_requests(
302 self,
303 mut incoming_requests: mpsc::UnboundedReceiver<(
304 ChainWorkerRequest<StorageClient::Context>,
305 tracing::Span,
306 )>,
307 ) -> Result<(), WorkerError> {
308 trace!("Starting `ChainWorkerActor`");
309
310 while let Some((request, span)) = incoming_requests.recv().await {
311 let (service_runtime_thread, service_runtime_endpoint) = {
312 if self.config.long_lived_services {
313 let (thread, endpoint) = Self::spawn_service_runtime_actor(self.chain_id).await;
314 (Some(thread), Some(endpoint))
315 } else {
316 (None, None)
317 }
318 };
319
320 let mut worker = ChainWorkerState::load(
321 self.config.clone(),
322 self.storage.clone(),
323 self.block_values.clone(),
324 self.execution_state_cache.clone(),
325 self.tracked_chains.clone(),
326 self.delivery_notifier.clone(),
327 self.chain_id,
328 service_runtime_endpoint,
329 )
330 .await?;
331
332 Box::pin(worker.handle_request(request).instrument(span)).await;
333
334 loop {
335 futures::select! {
336 () = self.sleep_until_timeout().fuse() => break,
337 maybe_request = incoming_requests.recv().fuse() => {
338 let Some((request, span)) = maybe_request else {
339 break; };
341 Box::pin(worker.handle_request(request).instrument(span)).await;
342 }
343 }
344 }
345
346 worker.clear_shared_chain_view().await;
347 drop(worker);
348 if let Some(thread) = service_runtime_thread {
349 thread.join().await
350 }
351 }
352
353 trace!("`ChainWorkerActor` finished");
354 Ok(())
355 }
356}
357
358impl<Context> ChainWorkerRequest<Context>
359where
360 Context: linera_views::context::Context + Clone + Send + Sync + 'static,
361{
362 pub fn send_error(self, error: WorkerError) {
364 debug!("Immediately sending error to chain worker request {self:?}");
365
366 let responded = match self {
367 #[cfg(with_testing)]
368 ChainWorkerRequest::ReadCertificate { callback, .. } => {
369 callback.send(Err(error)).is_ok()
370 }
371 #[cfg(with_testing)]
372 ChainWorkerRequest::FindBundleInInbox { callback, .. } => {
373 callback.send(Err(error)).is_ok()
374 }
375 ChainWorkerRequest::GetChainStateView { callback } => callback.send(Err(error)).is_ok(),
376 ChainWorkerRequest::QueryApplication { callback, .. } => {
377 callback.send(Err(error)).is_ok()
378 }
379 ChainWorkerRequest::DescribeApplication { callback, .. } => {
380 callback.send(Err(error)).is_ok()
381 }
382 ChainWorkerRequest::StageBlockExecution { callback, .. } => {
383 callback.send(Err(error)).is_ok()
384 }
385 ChainWorkerRequest::ProcessTimeout { callback, .. } => {
386 callback.send(Err(error)).is_ok()
387 }
388 ChainWorkerRequest::HandleBlockProposal { callback, .. } => {
389 callback.send(Err(error)).is_ok()
390 }
391 ChainWorkerRequest::ProcessValidatedBlock { callback, .. } => {
392 callback.send(Err(error)).is_ok()
393 }
394 ChainWorkerRequest::ProcessConfirmedBlock { callback, .. } => {
395 callback.send(Err(error)).is_ok()
396 }
397 ChainWorkerRequest::ProcessCrossChainUpdate { callback, .. } => {
398 callback.send(Err(error)).is_ok()
399 }
400 ChainWorkerRequest::ConfirmUpdatedRecipient { callback, .. } => {
401 callback.send(Err(error)).is_ok()
402 }
403 ChainWorkerRequest::HandleChainInfoQuery { callback, .. } => {
404 callback.send(Err(error)).is_ok()
405 }
406 ChainWorkerRequest::DownloadPendingBlob { callback, .. } => {
407 callback.send(Err(error)).is_ok()
408 }
409 ChainWorkerRequest::HandlePendingBlob { callback, .. } => {
410 callback.send(Err(error)).is_ok()
411 }
412 ChainWorkerRequest::UpdateReceivedCertificateTrackers { callback, .. } => {
413 callback.send(Err(error)).is_ok()
414 }
415 };
416
417 if !responded {
418 warn!("Callback for `ChainWorkerActor` was dropped before a response was sent");
419 }
420 }
421}
422
423fn elide_option<T>(option: &Option<T>, f: &mut fmt::Formatter) -> fmt::Result {
425 match option {
426 Some(_) => write!(f, "Some(..)"),
427 None => write!(f, "None"),
428 }
429}