1use std::{
10 cmp::Reverse,
11 collections::{BTreeMap, BTreeSet, BinaryHeap},
12 path::PathBuf,
13 sync::Arc,
14};
15
16use async_graphql::InputType as _;
17use futures::{future, stream::StreamExt, FutureExt};
18use linera_base::{
19 data_types::{TimeDelta, Timestamp},
20 identifiers::{ApplicationId, ChainId},
21 task_processor::{ProcessorActions, TaskOutcome},
22};
23use linera_core::{
24 client::ChainClient, data_types::ClientOutcome, node::NotificationStream, worker::Reason,
25};
26use serde_json::json;
27use tokio::{io::AsyncWriteExt, process::Command, select, sync::mpsc};
28use tokio_util::sync::CancellationToken;
29use tracing::{debug, error, info};
30
31use crate::controller::Update;
32
33pub type OperatorMap = Arc<BTreeMap<String, PathBuf>>;
35
36pub fn parse_operator(s: &str) -> Result<(String, PathBuf), String> {
39 if let Some((name, path)) = s.split_once('=') {
40 Ok((name.to_string(), PathBuf::from(path)))
41 } else {
42 Ok((s.to_string(), PathBuf::from(s)))
43 }
44}
45
46type Deadline = Reverse<(Timestamp, Option<ApplicationId>)>;
47
48struct BatchResult {
50 application_id: ApplicationId,
51 retry_at: Option<Timestamp>,
53}
54
55pub struct TaskProcessor<Env: linera_core::Environment> {
57 chain_id: ChainId,
58 application_ids: Vec<ApplicationId>,
59 cursors: BTreeMap<ApplicationId, String>,
60 chain_client: ChainClient<Env>,
61 cancellation_token: CancellationToken,
62 notifications: NotificationStream,
63 batch_sender: mpsc::UnboundedSender<BatchResult>,
64 batch_receiver: mpsc::UnboundedReceiver<BatchResult>,
65 update_receiver: mpsc::UnboundedReceiver<Update>,
66 deadlines: BinaryHeap<Deadline>,
67 operators: OperatorMap,
68 retry_delay: TimeDelta,
69 in_flight_apps: BTreeSet<ApplicationId>,
70}
71
72impl<Env: linera_core::Environment> TaskProcessor<Env> {
73 pub fn new(
75 chain_id: ChainId,
76 application_ids: Vec<ApplicationId>,
77 chain_client: ChainClient<Env>,
78 cancellation_token: CancellationToken,
79 operators: OperatorMap,
80 retry_delay: TimeDelta,
81 update_receiver: Option<mpsc::UnboundedReceiver<Update>>,
82 ) -> Self {
83 let notifications = chain_client.subscribe().expect("client subscription");
84 let (batch_sender, batch_receiver) = mpsc::unbounded_channel();
85 let update_receiver = update_receiver.unwrap_or_else(|| mpsc::unbounded_channel().1);
86 Self {
87 chain_id,
88 application_ids,
89 cursors: BTreeMap::new(),
90 chain_client,
91 cancellation_token,
92 notifications,
93 batch_sender,
94 batch_receiver,
95 update_receiver,
96 deadlines: BinaryHeap::new(),
97 operators,
98 retry_delay,
99 in_flight_apps: BTreeSet::new(),
100 }
101 }
102
103 pub async fn run(mut self) {
105 info!("Watching for notifications for chain {}", self.chain_id);
106 self.process_actions(self.application_ids.clone()).await;
107 loop {
108 select! {
109 Some(notification) = self.notifications.next() => {
110 if let Reason::NewBlock { .. } = notification.reason {
111 debug!(%self.chain_id, "Processing notification");
112 self.process_actions(self.application_ids.clone()).await;
113 }
114 }
115 _ = tokio::time::sleep(Self::duration_until_next_deadline(&self.deadlines)) => {
116 debug!("Processing event");
117 let application_ids = self.process_events();
118 self.process_actions(application_ids).await;
119 }
120 Some(result) = self.batch_receiver.recv() => {
121 self.in_flight_apps.remove(&result.application_id);
122 if self.application_ids.contains(&result.application_id) {
125 if let Some(retry_at) = result.retry_at {
126 self.deadlines.push(Reverse((
127 retry_at,
128 Some(result.application_id),
129 )));
130 } else {
131 self.process_actions(vec![result.application_id]).await;
133 }
134 }
135 }
136 Some(update) = self.update_receiver.recv() => {
137 self.apply_update(update).await;
138 }
139 _ = self.cancellation_token.cancelled().fuse() => {
140 break;
141 }
142 }
143 }
144 debug!("Notification stream ended.");
145 }
146
147 fn duration_until_next_deadline(deadlines: &BinaryHeap<Deadline>) -> tokio::time::Duration {
148 deadlines
149 .peek()
150 .map_or(tokio::time::Duration::MAX, |Reverse((x, _))| {
151 x.delta_since(Timestamp::now()).as_duration()
152 })
153 }
154
155 async fn apply_update(&mut self, update: Update) {
156 info!(
157 "Applying update for chain {}: {:?}",
158 self.chain_id, update.application_ids
159 );
160
161 let new_app_set: BTreeSet<_> = update.application_ids.iter().cloned().collect();
162 let old_app_set: BTreeSet<_> = self.application_ids.iter().cloned().collect();
163
164 self.cursors
165 .retain(|app_id, _| new_app_set.contains(app_id));
166 self.in_flight_apps
167 .retain(|app_id| new_app_set.contains(app_id));
168
169 self.application_ids = update.application_ids;
171
172 let new_apps = self
174 .application_ids
175 .iter()
176 .filter(|app_id| !old_app_set.contains(app_id))
177 .cloned()
178 .collect::<Vec<_>>();
179 if !new_apps.is_empty() {
180 self.process_actions(new_apps).await;
181 }
182 }
183
184 fn process_events(&mut self) -> Vec<ApplicationId> {
185 let now = Timestamp::now();
186 let mut application_ids = Vec::new();
187 while let Some(deadline) = self.deadlines.pop() {
188 if let Reverse((_, Some(id))) = deadline {
189 application_ids.push(id);
190 }
191 let Some(Reverse((ts, _))) = self.deadlines.peek() else {
192 break;
193 };
194 if *ts > now {
195 break;
196 }
197 }
198 application_ids
199 }
200
201 async fn process_actions(&mut self, application_ids: Vec<ApplicationId>) {
202 for application_id in application_ids {
203 if !self.application_ids.contains(&application_id) {
204 debug!("Skipping {application_id}: it's no longer assigned to this processor");
205 continue;
206 }
207 if self.in_flight_apps.contains(&application_id) {
208 debug!("Skipping {application_id}: tasks already in flight");
209 continue;
210 }
211 debug!("Processing actions for {application_id}");
212 let now = Timestamp::now();
213 let app_cursor = self.cursors.get(&application_id).cloned();
214 let actions = match self.query_actions(application_id, app_cursor, now).await {
215 Ok(actions) => actions,
216 Err(error) => {
217 error!("Error reading application actions: {error}");
218 self.deadlines.push(Reverse((
220 now.saturating_add(TimeDelta::from_secs(60)),
221 Some(application_id),
222 )));
223 continue;
224 }
225 };
226 if let Some(timestamp) = actions.request_callback {
227 self.deadlines
228 .push(Reverse((timestamp, Some(application_id))));
229 }
230 if let Some(cursor) = actions.set_cursor {
231 self.cursors.insert(application_id, cursor);
232 }
233 if !actions.execute_tasks.is_empty() {
234 self.in_flight_apps.insert(application_id);
235 let chain_client = self.chain_client.clone();
236 let batch_sender = self.batch_sender.clone();
237 let retry_delay = self.retry_delay;
238 let operators = self.operators.clone();
239 tokio::spawn(async move {
240 let handles: Vec<_> = actions
242 .execute_tasks
243 .into_iter()
244 .map(|task| {
245 let operators = operators.clone();
246 tokio::spawn(Self::execute_task(
247 application_id,
248 task.operator,
249 task.input,
250 operators,
251 ))
252 })
253 .collect();
254 let results = future::join_all(handles).await;
255 let mut retry_at = None;
261 for result in results {
262 match result {
263 Ok(Ok(outcome)) => {
264 if let Err(timestamp) = Self::submit_task_outcome(
265 &chain_client,
266 application_id,
267 &outcome,
268 retry_delay,
269 )
270 .await
271 {
272 retry_at = Some(timestamp);
273 break;
274 }
275 }
276 Ok(Err(error)) => {
277 error!(%application_id, %error, "Error executing task");
278 retry_at = Some(Timestamp::now().saturating_add(retry_delay));
279 break;
280 }
281 Err(error) => {
282 error!(%application_id, %error, "Task panicked");
283 retry_at = Some(Timestamp::now().saturating_add(retry_delay));
284 break;
285 }
286 }
287 }
288 if batch_sender
289 .send(BatchResult {
290 application_id,
291 retry_at,
292 })
293 .is_err()
294 {
295 error!("Batch receiver dropped for {application_id}");
296 }
297 });
298 }
299 }
300 }
301
302 async fn execute_task(
303 application_id: ApplicationId,
304 operator: String,
305 input: String,
306 operators: OperatorMap,
307 ) -> Result<TaskOutcome, anyhow::Error> {
308 let binary_path = operators
309 .get(&operator)
310 .ok_or_else(|| anyhow::anyhow!("unsupported operator: {}", operator))?;
311 debug!("Executing task {operator} ({binary_path:?}) for {application_id}");
312 let mut child = Command::new(binary_path)
313 .stdin(std::process::Stdio::piped())
314 .stdout(std::process::Stdio::piped())
315 .spawn()?;
316
317 let mut stdin = child.stdin.take().expect("stdin should be configured");
318 stdin.write_all(input.as_bytes()).await?;
319 drop(stdin);
320
321 let output = child.wait_with_output().await?;
322 anyhow::ensure!(
323 output.status.success(),
324 "operator {} exited with status: {}",
325 operator,
326 output.status
327 );
328 let outcome = TaskOutcome {
329 operator,
330 output: String::from_utf8_lossy(&output.stdout).into(),
331 };
332 debug!("Done executing task for {application_id}");
333 Ok(outcome)
334 }
335
336 #[allow(clippy::needless_pass_by_ref_mut)]
339 async fn query_actions(
340 &mut self,
341 application_id: ApplicationId,
342 cursor: Option<String>,
343 now: Timestamp,
344 ) -> Result<ProcessorActions, anyhow::Error> {
345 let query = format!(
346 "query {{ nextActions(cursor: {}, now: {}) }}",
347 cursor.to_value(),
348 now.to_value(),
349 );
350 let bytes = serde_json::to_vec(&json!({"query": query}))?;
351 let query = linera_execution::Query::User {
352 application_id,
353 bytes,
354 };
355 let (
356 linera_execution::QueryOutcome {
357 response,
358 operations: _,
359 },
360 _,
361 ) = self.chain_client.query_application(query, None).await?;
362 let linera_execution::QueryResponse::User(response) = response else {
363 anyhow::bail!("cannot get a system response for a user query");
364 };
365 let mut response: serde_json::Value = serde_json::from_slice(&response)?;
366 let actions: ProcessorActions =
367 serde_json::from_value(response["data"]["nextActions"].take())?;
368 Ok(actions)
369 }
370
371 async fn submit_task_outcome(
374 chain_client: &ChainClient<Env>,
375 application_id: ApplicationId,
376 task_outcome: &TaskOutcome,
377 retry_delay: TimeDelta,
378 ) -> Result<(), Timestamp> {
379 info!("Submitting task outcome for {application_id}: {task_outcome:?}");
380 let retry_with_delay = || Timestamp::now().saturating_add(retry_delay);
381 let query = format!(
382 "query {{ processTaskOutcome(outcome: {{ operator: {}, output: {} }}) }}",
383 task_outcome.operator.to_value(),
384 task_outcome.output.to_value(),
385 );
386 let bytes = serde_json::to_vec(&json!({"query": query})).map_err(|error| {
387 error!(%application_id, %error, "Error serializing task outcome query");
388 retry_with_delay()
389 })?;
390 let query = linera_execution::Query::User {
391 application_id,
392 bytes,
393 };
394 let (
395 linera_execution::QueryOutcome {
396 response: _,
397 operations,
398 },
399 _,
400 ) = chain_client
401 .query_application(query, None)
402 .await
403 .map_err(|error| {
404 error!(%application_id, %error, "Error querying application");
405 retry_with_delay()
406 })?;
407 if !operations.is_empty() {
408 match chain_client
409 .execute_operations(operations, vec![])
410 .await
411 .map_err(|error| {
412 error!(%application_id, %error, "Error executing operations");
413 retry_with_delay()
414 })? {
415 ClientOutcome::Committed(_) => {}
416 ClientOutcome::WaitForTimeout(timeout) => {
417 error!(%application_id, "Not the round leader, retrying after {}", timeout.timestamp);
418 return Err(timeout.timestamp);
419 }
420 ClientOutcome::Conflict(_) => {
421 debug!(%application_id, "Block conflict, retrying immediately");
422 return Err(Timestamp::now());
423 }
424 }
425 }
426 Ok(())
427 }
428}