linera_service/
task_processor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Task processor for executing off-chain operators on behalf of on-chain applications.
5//!
6//! The task processor watches specified applications for requests to execute off-chain tasks,
7//! runs external operator binaries, and submits the results back to the chain.
8
9use std::{
10    cmp::Reverse,
11    collections::{BTreeMap, BTreeSet, BinaryHeap},
12    path::PathBuf,
13    sync::Arc,
14};
15
16use async_graphql::InputType as _;
17use futures::{stream::StreamExt, FutureExt};
18use linera_base::{
19    data_types::{TimeDelta, Timestamp},
20    identifiers::{ApplicationId, ChainId},
21    task_processor::{ProcessorActions, TaskOutcome},
22};
23use linera_core::{client::ChainClient, node::NotificationStream, worker::Reason};
24use serde_json::json;
25use tokio::{io::AsyncWriteExt, process::Command, select, sync::mpsc};
26use tokio_util::sync::CancellationToken;
27use tracing::{debug, error, info};
28
29use crate::controller::Update;
30
31/// A map from operator names to their binary paths.
32pub type OperatorMap = Arc<BTreeMap<String, PathBuf>>;
33
34/// Parse an operator mapping in the format `name=path` or just `name`.
35/// If only `name` is provided, the path defaults to the name itself.
36pub fn parse_operator(s: &str) -> Result<(String, PathBuf), String> {
37    if let Some((name, path)) = s.split_once('=') {
38        Ok((name.to_string(), PathBuf::from(path)))
39    } else {
40        Ok((s.to_string(), PathBuf::from(s)))
41    }
42}
43
44type Deadline = Reverse<(Timestamp, Option<ApplicationId>)>;
45
46/// A task processor that watches applications and executes off-chain operators.
47pub struct TaskProcessor<Env: linera_core::Environment> {
48    chain_id: ChainId,
49    application_ids: Vec<ApplicationId>,
50    last_requested_callbacks: BTreeMap<ApplicationId, Timestamp>,
51    chain_client: ChainClient<Env>,
52    cancellation_token: CancellationToken,
53    notifications: NotificationStream,
54    outcome_sender: mpsc::UnboundedSender<(ApplicationId, TaskOutcome)>,
55    outcome_receiver: mpsc::UnboundedReceiver<(ApplicationId, TaskOutcome)>,
56    update_receiver: mpsc::UnboundedReceiver<Update>,
57    deadlines: BinaryHeap<Deadline>,
58    operators: OperatorMap,
59}
60
61impl<Env: linera_core::Environment> TaskProcessor<Env> {
62    /// Creates a new task processor.
63    pub fn new(
64        chain_id: ChainId,
65        application_ids: Vec<ApplicationId>,
66        chain_client: ChainClient<Env>,
67        cancellation_token: CancellationToken,
68        operators: OperatorMap,
69        update_receiver: Option<mpsc::UnboundedReceiver<Update>>,
70    ) -> Self {
71        let notifications = chain_client.subscribe().expect("client subscription");
72        let (outcome_sender, outcome_receiver) = mpsc::unbounded_channel();
73        let update_receiver = update_receiver.unwrap_or_else(|| mpsc::unbounded_channel().1);
74        Self {
75            chain_id,
76            application_ids,
77            last_requested_callbacks: BTreeMap::new(),
78            chain_client,
79            cancellation_token,
80            outcome_sender,
81            outcome_receiver,
82            notifications,
83            deadlines: BinaryHeap::new(),
84            operators,
85            update_receiver,
86        }
87    }
88
89    /// Runs the task processor until the cancellation token is triggered.
90    pub async fn run(mut self) {
91        info!("Watching for notifications for chain {}", self.chain_id);
92        self.process_actions(self.application_ids.clone()).await;
93        loop {
94            select! {
95                Some(notification) = self.notifications.next() => {
96                    if let Reason::NewBlock { .. } = notification.reason {
97                        debug!(%self.chain_id, "Processing notification");
98                        self.process_actions(self.application_ids.clone()).await;
99                    }
100                }
101                _ = tokio::time::sleep(Self::duration_until_next_deadline(&self.deadlines)) => {
102                    debug!("Processing event");
103                    let application_ids = self.process_events();
104                    self.process_actions(application_ids).await;
105                }
106                Some((application_id, outcome)) = self.outcome_receiver.recv() => {
107                    if let Err(e) = self.submit_task_outcome(application_id, &outcome).await {
108                        error!("Error while processing task outcome {outcome:?}: {e}");
109                    }
110                }
111                Some(update) = self.update_receiver.recv() => {
112                    self.apply_update(update).await;
113                }
114                _ = self.cancellation_token.cancelled().fuse() => {
115                    break;
116                }
117            }
118        }
119        debug!("Notification stream ended.");
120    }
121
122    fn duration_until_next_deadline(deadlines: &BinaryHeap<Deadline>) -> tokio::time::Duration {
123        deadlines
124            .peek()
125            .map_or(tokio::time::Duration::MAX, |Reverse((x, _))| {
126                x.delta_since(Timestamp::now()).as_duration()
127            })
128    }
129
130    async fn apply_update(&mut self, update: Update) {
131        info!(
132            "Applying update for chain {}: {:?}",
133            self.chain_id, update.application_ids
134        );
135
136        let new_app_set: BTreeSet<_> = update.application_ids.iter().cloned().collect();
137        let old_app_set: BTreeSet<_> = self.application_ids.iter().cloned().collect();
138
139        // Retain only last_requested_callbacks for applications that are still active
140        self.last_requested_callbacks
141            .retain(|app_id, _| new_app_set.contains(app_id));
142
143        // Update the application_ids
144        self.application_ids = update.application_ids;
145
146        // Process actions for newly added applications
147        let new_apps: Vec<_> = self
148            .application_ids
149            .iter()
150            .filter(|app_id| !old_app_set.contains(app_id))
151            .cloned()
152            .collect();
153        if !new_apps.is_empty() {
154            self.process_actions(new_apps).await;
155        }
156    }
157
158    fn process_events(&mut self) -> Vec<ApplicationId> {
159        let now = Timestamp::now();
160        let mut application_ids = Vec::new();
161        while let Some(deadline) = self.deadlines.pop() {
162            if let Reverse((_, Some(id))) = deadline {
163                application_ids.push(id);
164            }
165            let Some(Reverse((ts, _))) = self.deadlines.peek() else {
166                break;
167            };
168            if *ts > now {
169                break;
170            }
171        }
172        application_ids
173    }
174
175    async fn process_actions(&mut self, application_ids: Vec<ApplicationId>) {
176        for application_id in application_ids {
177            debug!("Processing actions for {application_id}");
178            let now = Timestamp::now();
179            let last_requested_callback =
180                self.last_requested_callbacks.get(&application_id).cloned();
181            let actions = match self
182                .query_actions(application_id, last_requested_callback, now)
183                .await
184            {
185                Ok(actions) => actions,
186                Err(error) => {
187                    error!("Error reading application actions: {error}");
188                    // Retry in at most 1 minute.
189                    self.deadlines.push(Reverse((
190                        now.saturating_add(TimeDelta::from_secs(60)),
191                        None,
192                    )));
193                    continue;
194                }
195            };
196            if let Some(timestamp) = actions.request_callback {
197                self.last_requested_callbacks.insert(application_id, now);
198                self.deadlines
199                    .push(Reverse((timestamp, Some(application_id))));
200            }
201            for task in actions.execute_tasks {
202                let sender = self.outcome_sender.clone();
203                let operators = self.operators.clone();
204                tokio::spawn(async move {
205                    if let Err(e) = Self::execute_task(
206                        application_id,
207                        task.operator,
208                        task.input,
209                        sender,
210                        operators,
211                    )
212                    .await
213                    {
214                        error!("Error executing task for {application_id}: {e}");
215                    }
216                });
217            }
218        }
219    }
220
221    async fn execute_task(
222        application_id: ApplicationId,
223        operator: String,
224        input: String,
225        sender: mpsc::UnboundedSender<(ApplicationId, TaskOutcome)>,
226        operators: OperatorMap,
227    ) -> Result<(), anyhow::Error> {
228        let binary_path = operators
229            .get(&operator)
230            .ok_or_else(|| anyhow::anyhow!("unsupported operator: {}", operator))?;
231        debug!("Executing task {operator} ({binary_path:?}) for {application_id}");
232        let mut child = Command::new(binary_path)
233            .stdin(std::process::Stdio::piped())
234            .stdout(std::process::Stdio::piped())
235            .spawn()?;
236
237        let mut stdin = child.stdin.take().expect("stdin should be configured");
238        stdin.write_all(input.as_bytes()).await?;
239        drop(stdin);
240
241        let output = child.wait_with_output().await?;
242        anyhow::ensure!(
243            output.status.success(),
244            "operator {} exited with status: {}",
245            operator,
246            output.status
247        );
248        let outcome = TaskOutcome {
249            operator,
250            output: String::from_utf8_lossy(&output.stdout).into(),
251        };
252        debug!("Done executing task for {application_id}");
253        sender.send((application_id, outcome))?;
254        Ok(())
255    }
256
257    async fn query_actions(
258        &mut self,
259        application_id: ApplicationId,
260        last_requested_callback: Option<Timestamp>,
261        now: Timestamp,
262    ) -> Result<ProcessorActions, anyhow::Error> {
263        let query = format!(
264            "query {{ nextActions(lastRequestedCallback: {}, now: {}) }}",
265            last_requested_callback.to_value(),
266            now.to_value(),
267        );
268        let bytes = serde_json::to_vec(&json!({"query": query}))?;
269        let query = linera_execution::Query::User {
270            application_id,
271            bytes,
272        };
273        let linera_execution::QueryOutcome {
274            response,
275            operations: _,
276        } = self.chain_client.query_application(query, None).await?;
277        let linera_execution::QueryResponse::User(response) = response else {
278            anyhow::bail!("cannot get a system response for a user query");
279        };
280        let mut response: serde_json::Value = serde_json::from_slice(&response)?;
281        let actions: ProcessorActions =
282            serde_json::from_value(response["data"]["nextActions"].take())?;
283        Ok(actions)
284    }
285
286    async fn submit_task_outcome(
287        &mut self,
288        application_id: ApplicationId,
289        task_outcome: &TaskOutcome,
290    ) -> Result<(), anyhow::Error> {
291        info!("Submitting task outcome for {application_id}: {task_outcome:?}");
292        let query = format!(
293            "query {{ processTaskOutcome(outcome: {{ operator: {}, output: {} }}) }}",
294            task_outcome.operator.to_value(),
295            task_outcome.output.to_value(),
296        );
297        let bytes = serde_json::to_vec(&json!({"query": query}))?;
298        let query = linera_execution::Query::User {
299            application_id,
300            bytes,
301        };
302        let linera_execution::QueryOutcome {
303            response: _,
304            operations,
305        } = self.chain_client.query_application(query, None).await?;
306        if !operations.is_empty() {
307            if let Err(e) = self
308                .chain_client
309                .execute_operations(operations, vec![])
310                .await
311            {
312                // TODO: handle leader timeouts.
313                error!("Failed to execute on-chain operations for {application_id}: {e}");
314            }
315        }
316        Ok(())
317    }
318}