1use std::{
10 cmp::Reverse,
11 collections::{BTreeMap, BTreeSet, BinaryHeap},
12 path::PathBuf,
13 sync::Arc,
14};
15
16use async_graphql::InputType as _;
17use futures::{stream::StreamExt, FutureExt};
18use linera_base::{
19 data_types::{TimeDelta, Timestamp},
20 identifiers::{ApplicationId, ChainId},
21 task_processor::{ProcessorActions, TaskOutcome},
22};
23use linera_core::{client::ChainClient, node::NotificationStream, worker::Reason};
24use serde_json::json;
25use tokio::{io::AsyncWriteExt, process::Command, select, sync::mpsc};
26use tokio_util::sync::CancellationToken;
27use tracing::{debug, error, info};
28
29use crate::controller::Update;
30
31pub type OperatorMap = Arc<BTreeMap<String, PathBuf>>;
33
34pub fn parse_operator(s: &str) -> Result<(String, PathBuf), String> {
37 if let Some((name, path)) = s.split_once('=') {
38 Ok((name.to_string(), PathBuf::from(path)))
39 } else {
40 Ok((s.to_string(), PathBuf::from(s)))
41 }
42}
43
44type Deadline = Reverse<(Timestamp, Option<ApplicationId>)>;
45
46pub struct TaskProcessor<Env: linera_core::Environment> {
48 chain_id: ChainId,
49 application_ids: Vec<ApplicationId>,
50 last_requested_callbacks: BTreeMap<ApplicationId, Timestamp>,
51 chain_client: ChainClient<Env>,
52 cancellation_token: CancellationToken,
53 notifications: NotificationStream,
54 outcome_sender: mpsc::UnboundedSender<(ApplicationId, TaskOutcome)>,
55 outcome_receiver: mpsc::UnboundedReceiver<(ApplicationId, TaskOutcome)>,
56 update_receiver: mpsc::UnboundedReceiver<Update>,
57 deadlines: BinaryHeap<Deadline>,
58 operators: OperatorMap,
59}
60
61impl<Env: linera_core::Environment> TaskProcessor<Env> {
62 pub fn new(
64 chain_id: ChainId,
65 application_ids: Vec<ApplicationId>,
66 chain_client: ChainClient<Env>,
67 cancellation_token: CancellationToken,
68 operators: OperatorMap,
69 update_receiver: Option<mpsc::UnboundedReceiver<Update>>,
70 ) -> Self {
71 let notifications = chain_client.subscribe().expect("client subscription");
72 let (outcome_sender, outcome_receiver) = mpsc::unbounded_channel();
73 let update_receiver = update_receiver.unwrap_or_else(|| mpsc::unbounded_channel().1);
74 Self {
75 chain_id,
76 application_ids,
77 last_requested_callbacks: BTreeMap::new(),
78 chain_client,
79 cancellation_token,
80 outcome_sender,
81 outcome_receiver,
82 notifications,
83 deadlines: BinaryHeap::new(),
84 operators,
85 update_receiver,
86 }
87 }
88
89 pub async fn run(mut self) {
91 info!("Watching for notifications for chain {}", self.chain_id);
92 self.process_actions(self.application_ids.clone()).await;
93 loop {
94 select! {
95 Some(notification) = self.notifications.next() => {
96 if let Reason::NewBlock { .. } = notification.reason {
97 debug!(%self.chain_id, "Processing notification");
98 self.process_actions(self.application_ids.clone()).await;
99 }
100 }
101 _ = tokio::time::sleep(Self::duration_until_next_deadline(&self.deadlines)) => {
102 debug!("Processing event");
103 let application_ids = self.process_events();
104 self.process_actions(application_ids).await;
105 }
106 Some((application_id, outcome)) = self.outcome_receiver.recv() => {
107 if let Err(e) = self.submit_task_outcome(application_id, &outcome).await {
108 error!("Error while processing task outcome {outcome:?}: {e}");
109 }
110 }
111 Some(update) = self.update_receiver.recv() => {
112 self.apply_update(update).await;
113 }
114 _ = self.cancellation_token.cancelled().fuse() => {
115 break;
116 }
117 }
118 }
119 debug!("Notification stream ended.");
120 }
121
122 fn duration_until_next_deadline(deadlines: &BinaryHeap<Deadline>) -> tokio::time::Duration {
123 deadlines
124 .peek()
125 .map_or(tokio::time::Duration::MAX, |Reverse((x, _))| {
126 x.delta_since(Timestamp::now()).as_duration()
127 })
128 }
129
130 async fn apply_update(&mut self, update: Update) {
131 info!(
132 "Applying update for chain {}: {:?}",
133 self.chain_id, update.application_ids
134 );
135
136 let new_app_set: BTreeSet<_> = update.application_ids.iter().cloned().collect();
137 let old_app_set: BTreeSet<_> = self.application_ids.iter().cloned().collect();
138
139 self.last_requested_callbacks
141 .retain(|app_id, _| new_app_set.contains(app_id));
142
143 self.application_ids = update.application_ids;
145
146 let new_apps: Vec<_> = self
148 .application_ids
149 .iter()
150 .filter(|app_id| !old_app_set.contains(app_id))
151 .cloned()
152 .collect();
153 if !new_apps.is_empty() {
154 self.process_actions(new_apps).await;
155 }
156 }
157
158 fn process_events(&mut self) -> Vec<ApplicationId> {
159 let now = Timestamp::now();
160 let mut application_ids = Vec::new();
161 while let Some(deadline) = self.deadlines.pop() {
162 if let Reverse((_, Some(id))) = deadline {
163 application_ids.push(id);
164 }
165 let Some(Reverse((ts, _))) = self.deadlines.peek() else {
166 break;
167 };
168 if *ts > now {
169 break;
170 }
171 }
172 application_ids
173 }
174
175 async fn process_actions(&mut self, application_ids: Vec<ApplicationId>) {
176 for application_id in application_ids {
177 debug!("Processing actions for {application_id}");
178 let now = Timestamp::now();
179 let last_requested_callback =
180 self.last_requested_callbacks.get(&application_id).cloned();
181 let actions = match self
182 .query_actions(application_id, last_requested_callback, now)
183 .await
184 {
185 Ok(actions) => actions,
186 Err(error) => {
187 error!("Error reading application actions: {error}");
188 self.deadlines.push(Reverse((
190 now.saturating_add(TimeDelta::from_secs(60)),
191 None,
192 )));
193 continue;
194 }
195 };
196 if let Some(timestamp) = actions.request_callback {
197 self.last_requested_callbacks.insert(application_id, now);
198 self.deadlines
199 .push(Reverse((timestamp, Some(application_id))));
200 }
201 for task in actions.execute_tasks {
202 let sender = self.outcome_sender.clone();
203 let operators = self.operators.clone();
204 tokio::spawn(async move {
205 if let Err(e) = Self::execute_task(
206 application_id,
207 task.operator,
208 task.input,
209 sender,
210 operators,
211 )
212 .await
213 {
214 error!("Error executing task for {application_id}: {e}");
215 }
216 });
217 }
218 }
219 }
220
221 async fn execute_task(
222 application_id: ApplicationId,
223 operator: String,
224 input: String,
225 sender: mpsc::UnboundedSender<(ApplicationId, TaskOutcome)>,
226 operators: OperatorMap,
227 ) -> Result<(), anyhow::Error> {
228 let binary_path = operators
229 .get(&operator)
230 .ok_or_else(|| anyhow::anyhow!("unsupported operator: {}", operator))?;
231 debug!("Executing task {operator} ({binary_path:?}) for {application_id}");
232 let mut child = Command::new(binary_path)
233 .stdin(std::process::Stdio::piped())
234 .stdout(std::process::Stdio::piped())
235 .spawn()?;
236
237 let mut stdin = child.stdin.take().expect("stdin should be configured");
238 stdin.write_all(input.as_bytes()).await?;
239 drop(stdin);
240
241 let output = child.wait_with_output().await?;
242 anyhow::ensure!(
243 output.status.success(),
244 "operator {} exited with status: {}",
245 operator,
246 output.status
247 );
248 let outcome = TaskOutcome {
249 operator,
250 output: String::from_utf8_lossy(&output.stdout).into(),
251 };
252 debug!("Done executing task for {application_id}");
253 sender.send((application_id, outcome))?;
254 Ok(())
255 }
256
257 async fn query_actions(
258 &mut self,
259 application_id: ApplicationId,
260 last_requested_callback: Option<Timestamp>,
261 now: Timestamp,
262 ) -> Result<ProcessorActions, anyhow::Error> {
263 let query = format!(
264 "query {{ nextActions(lastRequestedCallback: {}, now: {}) }}",
265 last_requested_callback.to_value(),
266 now.to_value(),
267 );
268 let bytes = serde_json::to_vec(&json!({"query": query}))?;
269 let query = linera_execution::Query::User {
270 application_id,
271 bytes,
272 };
273 let linera_execution::QueryOutcome {
274 response,
275 operations: _,
276 } = self.chain_client.query_application(query, None).await?;
277 let linera_execution::QueryResponse::User(response) = response else {
278 anyhow::bail!("cannot get a system response for a user query");
279 };
280 let mut response: serde_json::Value = serde_json::from_slice(&response)?;
281 let actions: ProcessorActions =
282 serde_json::from_value(response["data"]["nextActions"].take())?;
283 Ok(actions)
284 }
285
286 async fn submit_task_outcome(
287 &mut self,
288 application_id: ApplicationId,
289 task_outcome: &TaskOutcome,
290 ) -> Result<(), anyhow::Error> {
291 info!("Submitting task outcome for {application_id}: {task_outcome:?}");
292 let query = format!(
293 "query {{ processTaskOutcome(outcome: {{ operator: {}, output: {} }}) }}",
294 task_outcome.operator.to_value(),
295 task_outcome.output.to_value(),
296 );
297 let bytes = serde_json::to_vec(&json!({"query": query}))?;
298 let query = linera_execution::Query::User {
299 application_id,
300 bytes,
301 };
302 let linera_execution::QueryOutcome {
303 response: _,
304 operations,
305 } = self.chain_client.query_application(query, None).await?;
306 if !operations.is_empty() {
307 if let Err(e) = self
308 .chain_client
309 .execute_operations(operations, vec![])
310 .await
311 {
312 error!("Failed to execute on-chain operations for {application_id}: {e}");
314 }
315 }
316 Ok(())
317 }
318}