linera_service/
task_processor.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Task processor for executing off-chain operators on behalf of on-chain applications.
5//!
6//! The task processor watches specified applications for requests to execute off-chain tasks,
7//! runs external operator binaries, and submits the results back to the chain.
8
9use std::{
10    cmp::Reverse,
11    collections::{BTreeMap, BTreeSet, BinaryHeap},
12    path::PathBuf,
13    sync::Arc,
14};
15
16use async_graphql::{scalar, InputType as _};
17use futures::{stream::StreamExt, FutureExt};
18use linera_base::{
19    data_types::{TimeDelta, Timestamp},
20    identifiers::{ApplicationId, ChainId},
21};
22use linera_core::{client::ChainClient, node::NotificationStream, worker::Reason};
23use serde::{Deserialize, Serialize};
24use serde_json::json;
25use tokio::{io::AsyncWriteExt, process::Command, select, sync::mpsc};
26use tokio_util::sync::CancellationToken;
27use tracing::{debug, error, info};
28
29use crate::controller::Update;
30
31/// A map from operator names to their binary paths.
32pub type OperatorMap = Arc<BTreeMap<String, PathBuf>>;
33
34/// The off-chain actions requested by the service of an on-chain application.
35///
36/// On-chain applications should be ready to respond to GraphQL queries of the form:
37/// ```ignore
38/// query {
39///   nextActions(lastRequestedCallback: Timestamp, now: Timestamp!): ProcessorActions!
40/// }
41///
42/// query {
43///   processTaskOutcome(outcome: TaskOutcome!)
44/// }
45/// ```
46#[derive(Default, Debug, Serialize, Deserialize)]
47pub struct ProcessorActions {
48    /// The application is requesting to be called back no later than the given timestamp.
49    pub request_callback: Option<Timestamp>,
50    /// The application is requesting the execution of the given tasks.
51    pub execute_tasks: Vec<Task>,
52}
53
54scalar!(ProcessorActions);
55
56/// An off-chain task requested by an on-chain application.
57#[derive(Debug, Serialize, Deserialize)]
58pub struct Task {
59    /// The operator handling the task.
60    pub operator: String,
61    /// The input argument in JSON.
62    pub input: String,
63}
64
65/// The result of executing an off-chain operator.
66#[derive(Debug, Serialize, Deserialize)]
67pub struct TaskOutcome {
68    /// The operator handling the task.
69    pub operator: String,
70    /// The JSON output.
71    pub output: String,
72}
73
74scalar!(TaskOutcome);
75
76/// Parse an operator mapping in the format `name=path` or just `name`.
77/// If only `name` is provided, the path defaults to the name itself.
78pub fn parse_operator(s: &str) -> Result<(String, PathBuf), String> {
79    if let Some((name, path)) = s.split_once('=') {
80        Ok((name.to_string(), PathBuf::from(path)))
81    } else {
82        Ok((s.to_string(), PathBuf::from(s)))
83    }
84}
85
86type Deadline = Reverse<(Timestamp, Option<ApplicationId>)>;
87
88/// A task processor that watches applications and executes off-chain operators.
89pub struct TaskProcessor<Env: linera_core::Environment> {
90    chain_id: ChainId,
91    application_ids: Vec<ApplicationId>,
92    last_requested_callbacks: BTreeMap<ApplicationId, Timestamp>,
93    chain_client: ChainClient<Env>,
94    cancellation_token: CancellationToken,
95    notifications: NotificationStream,
96    outcome_sender: mpsc::UnboundedSender<(ApplicationId, TaskOutcome)>,
97    outcome_receiver: mpsc::UnboundedReceiver<(ApplicationId, TaskOutcome)>,
98    update_receiver: mpsc::UnboundedReceiver<Update>,
99    deadlines: BinaryHeap<Deadline>,
100    operators: OperatorMap,
101}
102
103impl<Env: linera_core::Environment> TaskProcessor<Env> {
104    /// Creates a new task processor.
105    pub fn new(
106        chain_id: ChainId,
107        application_ids: Vec<ApplicationId>,
108        chain_client: ChainClient<Env>,
109        cancellation_token: CancellationToken,
110        operators: OperatorMap,
111        update_receiver: Option<mpsc::UnboundedReceiver<Update>>,
112    ) -> Self {
113        let notifications = chain_client.subscribe().expect("client subscription");
114        let (outcome_sender, outcome_receiver) = mpsc::unbounded_channel();
115        let update_receiver = update_receiver.unwrap_or_else(|| mpsc::unbounded_channel().1);
116        Self {
117            chain_id,
118            application_ids,
119            last_requested_callbacks: BTreeMap::new(),
120            chain_client,
121            cancellation_token,
122            outcome_sender,
123            outcome_receiver,
124            notifications,
125            deadlines: BinaryHeap::new(),
126            operators,
127            update_receiver,
128        }
129    }
130
131    /// Runs the task processor until the cancellation token is triggered.
132    pub async fn run(mut self) {
133        info!("Watching for notifications for chain {}", self.chain_id);
134        self.process_actions(self.application_ids.clone()).await;
135        loop {
136            select! {
137                Some(notification) = self.notifications.next() => {
138                    if let Reason::NewBlock { .. } = notification.reason {
139                        debug!(%self.chain_id, "Processing notification");
140                        self.process_actions(self.application_ids.clone()).await;
141                    }
142                }
143                _ = tokio::time::sleep(Self::duration_until_next_deadline(&self.deadlines)) => {
144                    debug!("Processing event");
145                    let application_ids = self.process_events();
146                    self.process_actions(application_ids).await;
147                }
148                Some((application_id, outcome)) = self.outcome_receiver.recv() => {
149                    if let Err(e) = self.submit_task_outcome(application_id, &outcome).await {
150                        error!("Error while processing task outcome {outcome:?}: {e}");
151                    }
152                }
153                Some(update) = self.update_receiver.recv() => {
154                    self.apply_update(update).await;
155                }
156                _ = self.cancellation_token.cancelled().fuse() => {
157                    break;
158                }
159            }
160        }
161        debug!("Notification stream ended.");
162    }
163
164    fn duration_until_next_deadline(deadlines: &BinaryHeap<Deadline>) -> tokio::time::Duration {
165        deadlines
166            .peek()
167            .map_or(tokio::time::Duration::MAX, |Reverse((x, _))| {
168                x.delta_since(Timestamp::now()).as_duration()
169            })
170    }
171
172    async fn apply_update(&mut self, update: Update) {
173        info!(
174            "Applying update for chain {}: {:?}",
175            self.chain_id, update.application_ids
176        );
177
178        let new_app_set: BTreeSet<_> = update.application_ids.iter().cloned().collect();
179        let old_app_set: BTreeSet<_> = self.application_ids.iter().cloned().collect();
180
181        // Retain only last_requested_callbacks for applications that are still active
182        self.last_requested_callbacks
183            .retain(|app_id, _| new_app_set.contains(app_id));
184
185        // Update the application_ids
186        self.application_ids = update.application_ids;
187
188        // Process actions for newly added applications
189        let new_apps: Vec<_> = self
190            .application_ids
191            .iter()
192            .filter(|app_id| !old_app_set.contains(app_id))
193            .cloned()
194            .collect();
195        if !new_apps.is_empty() {
196            self.process_actions(new_apps).await;
197        }
198    }
199
200    fn process_events(&mut self) -> Vec<ApplicationId> {
201        let now = Timestamp::now();
202        let mut application_ids = Vec::new();
203        while let Some(deadline) = self.deadlines.pop() {
204            if let Reverse((_, Some(id))) = deadline {
205                application_ids.push(id);
206            }
207            let Some(Reverse((ts, _))) = self.deadlines.peek() else {
208                break;
209            };
210            if *ts > now {
211                break;
212            }
213        }
214        application_ids
215    }
216
217    async fn process_actions(&mut self, application_ids: Vec<ApplicationId>) {
218        for application_id in application_ids {
219            debug!("Processing actions for {application_id}");
220            let now = Timestamp::now();
221            let last_requested_callback =
222                self.last_requested_callbacks.get(&application_id).cloned();
223            let actions = match self
224                .query_actions(application_id, last_requested_callback, now)
225                .await
226            {
227                Ok(actions) => actions,
228                Err(error) => {
229                    error!("Error reading application actions: {error}");
230                    // Retry in at most 1 minute.
231                    self.deadlines.push(Reverse((
232                        now.saturating_add(TimeDelta::from_secs(60)),
233                        None,
234                    )));
235                    continue;
236                }
237            };
238            if let Some(timestamp) = actions.request_callback {
239                self.last_requested_callbacks.insert(application_id, now);
240                self.deadlines
241                    .push(Reverse((timestamp, Some(application_id))));
242            }
243            for task in actions.execute_tasks {
244                let sender = self.outcome_sender.clone();
245                let operators = self.operators.clone();
246                tokio::spawn(async move {
247                    if let Err(e) = Self::execute_task(
248                        application_id,
249                        task.operator,
250                        task.input,
251                        sender,
252                        operators,
253                    )
254                    .await
255                    {
256                        error!("Error executing task for {application_id}: {e}");
257                    }
258                });
259            }
260        }
261    }
262
263    async fn execute_task(
264        application_id: ApplicationId,
265        operator: String,
266        input: String,
267        sender: mpsc::UnboundedSender<(ApplicationId, TaskOutcome)>,
268        operators: OperatorMap,
269    ) -> Result<(), anyhow::Error> {
270        let binary_path = operators
271            .get(&operator)
272            .ok_or_else(|| anyhow::anyhow!("unsupported operator: {}", operator))?;
273        debug!("Executing task {operator} ({binary_path:?}) for {application_id}");
274        let mut child = Command::new(binary_path)
275            .stdin(std::process::Stdio::piped())
276            .stdout(std::process::Stdio::piped())
277            .spawn()?;
278
279        let mut stdin = child.stdin.take().expect("stdin should be configured");
280        stdin.write_all(input.as_bytes()).await?;
281        drop(stdin);
282
283        let output = child.wait_with_output().await?;
284        anyhow::ensure!(
285            output.status.success(),
286            "operator {} exited with status: {}",
287            operator,
288            output.status
289        );
290        let outcome = TaskOutcome {
291            operator,
292            output: String::from_utf8_lossy(&output.stdout).into(),
293        };
294        debug!("Done executing task for {application_id}");
295        sender.send((application_id, outcome))?;
296        Ok(())
297    }
298
299    async fn query_actions(
300        &mut self,
301        application_id: ApplicationId,
302        last_requested_callback: Option<Timestamp>,
303        now: Timestamp,
304    ) -> Result<ProcessorActions, anyhow::Error> {
305        let query = format!(
306            "query {{ nextActions(lastRequestedCallback: {}, now: {}) }}",
307            last_requested_callback.to_value(),
308            now.to_value(),
309        );
310        let bytes = serde_json::to_vec(&json!({"query": query}))?;
311        let query = linera_execution::Query::User {
312            application_id,
313            bytes,
314        };
315        let linera_execution::QueryOutcome {
316            response,
317            operations: _,
318        } = self.chain_client.query_application(query, None).await?;
319        let linera_execution::QueryResponse::User(response) = response else {
320            anyhow::bail!("cannot get a system response for a user query");
321        };
322        let mut response: serde_json::Value = serde_json::from_slice(&response)?;
323        let actions: ProcessorActions =
324            serde_json::from_value(response["data"]["nextActions"].take())?;
325        Ok(actions)
326    }
327
328    async fn submit_task_outcome(
329        &mut self,
330        application_id: ApplicationId,
331        task_outcome: &TaskOutcome,
332    ) -> Result<(), anyhow::Error> {
333        info!("Submitting task outcome for {application_id}: {task_outcome:?}");
334        let query = format!(
335            "query {{ processTaskOutcome(outcome: {{ operator: {}, output: {} }}) }}",
336            task_outcome.operator.to_value(),
337            task_outcome.output.to_value(),
338        );
339        let bytes = serde_json::to_vec(&json!({"query": query}))?;
340        let query = linera_execution::Query::User {
341            application_id,
342            bytes,
343        };
344        let linera_execution::QueryOutcome {
345            response: _,
346            operations,
347        } = self.chain_client.query_application(query, None).await?;
348        if !operations.is_empty() {
349            if let Err(e) = self
350                .chain_client
351                .execute_operations(operations, vec![])
352                .await
353            {
354                // TODO: handle leader timeouts.
355                error!("Failed to execute on-chain operations for {application_id}: {e}");
356            }
357        }
358        Ok(())
359    }
360}