linera_service/
task_processor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Task processor for executing off-chain operators on behalf of on-chain applications.
5//!
6//! The task processor watches specified applications for requests to execute off-chain tasks,
7//! runs external operator binaries, and submits the results back to the chain.
8
9use std::{
10    cmp::Reverse,
11    collections::{BTreeMap, BTreeSet, BinaryHeap},
12    path::PathBuf,
13    sync::Arc,
14};
15
16use async_graphql::InputType as _;
17use futures::{future, stream::StreamExt, FutureExt};
18use linera_base::{
19    data_types::{TimeDelta, Timestamp},
20    identifiers::{ApplicationId, ChainId},
21    task_processor::{ProcessorActions, TaskOutcome},
22};
23use linera_core::{
24    client::ChainClient, data_types::ClientOutcome, node::NotificationStream, worker::Reason,
25};
26use serde_json::json;
27use tokio::{io::AsyncWriteExt, process::Command, select, sync::mpsc};
28use tokio_util::sync::CancellationToken;
29use tracing::{debug, error, info};
30
31use crate::controller::Update;
32
33/// A map from operator names to their binary paths.
34pub type OperatorMap = Arc<BTreeMap<String, PathBuf>>;
35
36/// Parse an operator mapping in the format `name=path` or just `name`.
37/// If only `name` is provided, the path defaults to the name itself.
38pub fn parse_operator(s: &str) -> Result<(String, PathBuf), String> {
39    if let Some((name, path)) = s.split_once('=') {
40        Ok((name.to_string(), PathBuf::from(path)))
41    } else {
42        Ok((s.to_string(), PathBuf::from(s)))
43    }
44}
45
46type Deadline = Reverse<(Timestamp, Option<ApplicationId>)>;
47
48/// Message sent from a background batch task to the main loop on completion.
49struct BatchResult {
50    application_id: ApplicationId,
51    /// If set, the batch failed and should be retried at this timestamp.
52    retry_at: Option<Timestamp>,
53}
54
55/// A task processor that watches applications and executes off-chain operators.
56pub struct TaskProcessor<Env: linera_core::Environment> {
57    chain_id: ChainId,
58    application_ids: Vec<ApplicationId>,
59    cursors: BTreeMap<ApplicationId, String>,
60    chain_client: ChainClient<Env>,
61    cancellation_token: CancellationToken,
62    notifications: NotificationStream,
63    batch_sender: mpsc::UnboundedSender<BatchResult>,
64    batch_receiver: mpsc::UnboundedReceiver<BatchResult>,
65    update_receiver: mpsc::UnboundedReceiver<Update>,
66    deadlines: BinaryHeap<Deadline>,
67    operators: OperatorMap,
68    retry_delay: TimeDelta,
69    in_flight_apps: BTreeSet<ApplicationId>,
70}
71
72impl<Env: linera_core::Environment> TaskProcessor<Env> {
73    /// Creates a new task processor.
74    pub fn new(
75        chain_id: ChainId,
76        application_ids: Vec<ApplicationId>,
77        chain_client: ChainClient<Env>,
78        cancellation_token: CancellationToken,
79        operators: OperatorMap,
80        retry_delay: TimeDelta,
81        update_receiver: Option<mpsc::UnboundedReceiver<Update>>,
82    ) -> Self {
83        let notifications = chain_client.subscribe().expect("client subscription");
84        let (batch_sender, batch_receiver) = mpsc::unbounded_channel();
85        let update_receiver = update_receiver.unwrap_or_else(|| mpsc::unbounded_channel().1);
86        Self {
87            chain_id,
88            application_ids,
89            cursors: BTreeMap::new(),
90            chain_client,
91            cancellation_token,
92            notifications,
93            batch_sender,
94            batch_receiver,
95            update_receiver,
96            deadlines: BinaryHeap::new(),
97            operators,
98            retry_delay,
99            in_flight_apps: BTreeSet::new(),
100        }
101    }
102
103    /// Runs the task processor until the cancellation token is triggered.
104    pub async fn run(mut self) {
105        info!("Watching for notifications for chain {}", self.chain_id);
106        self.process_actions(self.application_ids.clone()).await;
107        loop {
108            select! {
109                Some(notification) = self.notifications.next() => {
110                    if let Reason::NewBlock { .. } = notification.reason {
111                        debug!(%self.chain_id, "Processing notification");
112                        self.process_actions(self.application_ids.clone()).await;
113                    }
114                }
115                _ = tokio::time::sleep(Self::duration_until_next_deadline(&self.deadlines)) => {
116                    debug!("Processing event");
117                    let application_ids = self.process_events();
118                    self.process_actions(application_ids).await;
119                }
120                Some(result) = self.batch_receiver.recv() => {
121                    self.in_flight_apps.remove(&result.application_id);
122                    // The application could have been unassigned from this processor
123                    // in the meantime - do not retry if that is the case.
124                    if self.application_ids.contains(&result.application_id) {
125                        if let Some(retry_at) = result.retry_at {
126                            self.deadlines.push(Reverse((
127                                retry_at,
128                                Some(result.application_id),
129                            )));
130                        } else {
131                            // Re-process immediately to pick up new tasks.
132                            self.process_actions(vec![result.application_id]).await;
133                        }
134                    }
135                }
136                Some(update) = self.update_receiver.recv() => {
137                    self.apply_update(update).await;
138                }
139                _ = self.cancellation_token.cancelled().fuse() => {
140                    break;
141                }
142            }
143        }
144        debug!("Notification stream ended.");
145    }
146
147    fn duration_until_next_deadline(deadlines: &BinaryHeap<Deadline>) -> tokio::time::Duration {
148        deadlines
149            .peek()
150            .map_or(tokio::time::Duration::MAX, |Reverse((x, _))| {
151                x.delta_since(Timestamp::now()).as_duration()
152            })
153    }
154
155    async fn apply_update(&mut self, update: Update) {
156        info!(
157            "Applying update for chain {}: {:?}",
158            self.chain_id, update.application_ids
159        );
160
161        let new_app_set: BTreeSet<_> = update.application_ids.iter().cloned().collect();
162        let old_app_set: BTreeSet<_> = self.application_ids.iter().cloned().collect();
163
164        self.cursors
165            .retain(|app_id, _| new_app_set.contains(app_id));
166        self.in_flight_apps
167            .retain(|app_id| new_app_set.contains(app_id));
168
169        // Update the application_ids
170        self.application_ids = update.application_ids;
171
172        // Process actions for newly added applications
173        let new_apps = self
174            .application_ids
175            .iter()
176            .filter(|app_id| !old_app_set.contains(app_id))
177            .cloned()
178            .collect::<Vec<_>>();
179        if !new_apps.is_empty() {
180            self.process_actions(new_apps).await;
181        }
182    }
183
184    fn process_events(&mut self) -> Vec<ApplicationId> {
185        let now = Timestamp::now();
186        let mut application_ids = Vec::new();
187        while let Some(deadline) = self.deadlines.pop() {
188            if let Reverse((_, Some(id))) = deadline {
189                application_ids.push(id);
190            }
191            let Some(Reverse((ts, _))) = self.deadlines.peek() else {
192                break;
193            };
194            if *ts > now {
195                break;
196            }
197        }
198        application_ids
199    }
200
201    async fn process_actions(&mut self, application_ids: Vec<ApplicationId>) {
202        for application_id in application_ids {
203            if !self.application_ids.contains(&application_id) {
204                debug!("Skipping {application_id}: it's no longer assigned to this processor");
205                continue;
206            }
207            if self.in_flight_apps.contains(&application_id) {
208                debug!("Skipping {application_id}: tasks already in flight");
209                continue;
210            }
211            debug!("Processing actions for {application_id}");
212            let now = Timestamp::now();
213            let app_cursor = self.cursors.get(&application_id).cloned();
214            let actions = match self.query_actions(application_id, app_cursor, now).await {
215                Ok(actions) => actions,
216                Err(error) => {
217                    error!("Error reading application actions: {error}");
218                    // Retry in at most 1 minute.
219                    self.deadlines.push(Reverse((
220                        now.saturating_add(TimeDelta::from_secs(60)),
221                        Some(application_id),
222                    )));
223                    continue;
224                }
225            };
226            if let Some(timestamp) = actions.request_callback {
227                self.deadlines
228                    .push(Reverse((timestamp, Some(application_id))));
229            }
230            if let Some(cursor) = actions.set_cursor {
231                self.cursors.insert(application_id, cursor);
232            }
233            if !actions.execute_tasks.is_empty() {
234                self.in_flight_apps.insert(application_id);
235                let chain_client = self.chain_client.clone();
236                let batch_sender = self.batch_sender.clone();
237                let retry_delay = self.retry_delay;
238                let operators = self.operators.clone();
239                tokio::spawn(async move {
240                    // Spawn all tasks concurrently and join them.
241                    let handles: Vec<_> = actions
242                        .execute_tasks
243                        .into_iter()
244                        .map(|task| {
245                            let operators = operators.clone();
246                            tokio::spawn(Self::execute_task(
247                                application_id,
248                                task.operator,
249                                task.input,
250                                operators,
251                            ))
252                        })
253                        .collect();
254                    let results = future::join_all(handles).await;
255                    // Submit outcomes in the original order. Stop on any failure to
256                    // preserve ordering: the on-chain queue is FIFO, so skipping a
257                    // failed task and submitting a later one would pop the wrong entry.
258                    // Tasks are assumed idempotent, so on failure the whole batch is
259                    // retried from scratch.
260                    let mut retry_at = None;
261                    for result in results {
262                        match result {
263                            Ok(Ok(outcome)) => {
264                                if let Err(timestamp) = Self::submit_task_outcome(
265                                    &chain_client,
266                                    application_id,
267                                    &outcome,
268                                    retry_delay,
269                                )
270                                .await
271                                {
272                                    retry_at = Some(timestamp);
273                                    break;
274                                }
275                            }
276                            Ok(Err(error)) => {
277                                error!(%application_id, %error, "Error executing task");
278                                retry_at = Some(Timestamp::now().saturating_add(retry_delay));
279                                break;
280                            }
281                            Err(error) => {
282                                error!(%application_id, %error, "Task panicked");
283                                retry_at = Some(Timestamp::now().saturating_add(retry_delay));
284                                break;
285                            }
286                        }
287                    }
288                    if batch_sender
289                        .send(BatchResult {
290                            application_id,
291                            retry_at,
292                        })
293                        .is_err()
294                    {
295                        error!("Batch receiver dropped for {application_id}");
296                    }
297                });
298            }
299        }
300    }
301
302    async fn execute_task(
303        application_id: ApplicationId,
304        operator: String,
305        input: String,
306        operators: OperatorMap,
307    ) -> Result<TaskOutcome, anyhow::Error> {
308        let binary_path = operators
309            .get(&operator)
310            .ok_or_else(|| anyhow::anyhow!("unsupported operator: {}", operator))?;
311        debug!("Executing task {operator} ({binary_path:?}) for {application_id}");
312        let mut child = Command::new(binary_path)
313            .stdin(std::process::Stdio::piped())
314            .stdout(std::process::Stdio::piped())
315            .spawn()?;
316
317        let mut stdin = child.stdin.take().expect("stdin should be configured");
318        stdin.write_all(input.as_bytes()).await?;
319        drop(stdin);
320
321        let output = child.wait_with_output().await?;
322        anyhow::ensure!(
323            output.status.success(),
324            "operator {} exited with status: {}",
325            operator,
326            output.status
327        );
328        let outcome = TaskOutcome {
329            operator,
330            output: String::from_utf8_lossy(&output.stdout).into(),
331        };
332        debug!("Done executing task for {application_id}");
333        Ok(outcome)
334    }
335
336    // Keeping `&mut self` avoids borrowing `TaskProcessor` through `&self` across `.await`,
337    // which would make the spawned future require `TaskProcessor: Sync`.
338    #[allow(clippy::needless_pass_by_ref_mut)]
339    async fn query_actions(
340        &mut self,
341        application_id: ApplicationId,
342        cursor: Option<String>,
343        now: Timestamp,
344    ) -> Result<ProcessorActions, anyhow::Error> {
345        let query = format!(
346            "query {{ nextActions(cursor: {}, now: {}) }}",
347            cursor.to_value(),
348            now.to_value(),
349        );
350        let bytes = serde_json::to_vec(&json!({"query": query}))?;
351        let query = linera_execution::Query::User {
352            application_id,
353            bytes,
354        };
355        let (
356            linera_execution::QueryOutcome {
357                response,
358                operations: _,
359            },
360            _,
361        ) = self.chain_client.query_application(query, None).await?;
362        let linera_execution::QueryResponse::User(response) = response else {
363            anyhow::bail!("cannot get a system response for a user query");
364        };
365        let mut response: serde_json::Value = serde_json::from_slice(&response)?;
366        let actions: ProcessorActions =
367            serde_json::from_value(response["data"]["nextActions"].take())?;
368        Ok(actions)
369    }
370
371    /// Submits a task outcome on-chain. On success returns `Ok(())`. On failure, logs the
372    /// error and returns `Err(retry_at)` with the timestamp at which to retry.
373    async fn submit_task_outcome(
374        chain_client: &ChainClient<Env>,
375        application_id: ApplicationId,
376        task_outcome: &TaskOutcome,
377        retry_delay: TimeDelta,
378    ) -> Result<(), Timestamp> {
379        info!("Submitting task outcome for {application_id}: {task_outcome:?}");
380        let retry_with_delay = || Timestamp::now().saturating_add(retry_delay);
381        let query = format!(
382            "query {{ processTaskOutcome(outcome: {{ operator: {}, output: {} }}) }}",
383            task_outcome.operator.to_value(),
384            task_outcome.output.to_value(),
385        );
386        let bytes = serde_json::to_vec(&json!({"query": query})).map_err(|error| {
387            error!(%application_id, %error, "Error serializing task outcome query");
388            retry_with_delay()
389        })?;
390        let query = linera_execution::Query::User {
391            application_id,
392            bytes,
393        };
394        let (
395            linera_execution::QueryOutcome {
396                response: _,
397                operations,
398            },
399            _,
400        ) = chain_client
401            .query_application(query, None)
402            .await
403            .map_err(|error| {
404                error!(%application_id, %error, "Error querying application");
405                retry_with_delay()
406            })?;
407        if !operations.is_empty() {
408            match chain_client
409                .execute_operations(operations, vec![])
410                .await
411                .map_err(|error| {
412                    error!(%application_id, %error, "Error executing operations");
413                    retry_with_delay()
414                })? {
415                ClientOutcome::Committed(_) => {}
416                ClientOutcome::WaitForTimeout(timeout) => {
417                    error!(%application_id, "Not the round leader, retrying after {}", timeout.timestamp);
418                    return Err(timeout.timestamp);
419                }
420                ClientOutcome::Conflict(_) => {
421                    debug!(%application_id, "Block conflict, retrying immediately");
422                    return Err(Timestamp::now());
423                }
424            }
425        }
426        Ok(())
427    }
428}