1use std::{
10 cmp::Reverse,
11 collections::{BTreeMap, BTreeSet, BinaryHeap},
12 path::PathBuf,
13 sync::Arc,
14};
15
16use async_graphql::{scalar, InputType as _};
17use futures::{stream::StreamExt, FutureExt};
18use linera_base::{
19 data_types::{TimeDelta, Timestamp},
20 identifiers::{ApplicationId, ChainId},
21};
22use linera_core::{client::ChainClient, node::NotificationStream, worker::Reason};
23use serde::{Deserialize, Serialize};
24use serde_json::json;
25use tokio::{io::AsyncWriteExt, process::Command, select, sync::mpsc};
26use tokio_util::sync::CancellationToken;
27use tracing::{debug, error, info};
28
29use crate::controller::Update;
30
31pub type OperatorMap = Arc<BTreeMap<String, PathBuf>>;
33
34#[derive(Default, Debug, Serialize, Deserialize)]
47pub struct ProcessorActions {
48 pub request_callback: Option<Timestamp>,
50 pub execute_tasks: Vec<Task>,
52}
53
54scalar!(ProcessorActions);
55
56#[derive(Debug, Serialize, Deserialize)]
58pub struct Task {
59 pub operator: String,
61 pub input: String,
63}
64
65#[derive(Debug, Serialize, Deserialize)]
67pub struct TaskOutcome {
68 pub operator: String,
70 pub output: String,
72}
73
74scalar!(TaskOutcome);
75
76pub fn parse_operator(s: &str) -> Result<(String, PathBuf), String> {
79 if let Some((name, path)) = s.split_once('=') {
80 Ok((name.to_string(), PathBuf::from(path)))
81 } else {
82 Ok((s.to_string(), PathBuf::from(s)))
83 }
84}
85
86type Deadline = Reverse<(Timestamp, Option<ApplicationId>)>;
87
88pub struct TaskProcessor<Env: linera_core::Environment> {
90 chain_id: ChainId,
91 application_ids: Vec<ApplicationId>,
92 last_requested_callbacks: BTreeMap<ApplicationId, Timestamp>,
93 chain_client: ChainClient<Env>,
94 cancellation_token: CancellationToken,
95 notifications: NotificationStream,
96 outcome_sender: mpsc::UnboundedSender<(ApplicationId, TaskOutcome)>,
97 outcome_receiver: mpsc::UnboundedReceiver<(ApplicationId, TaskOutcome)>,
98 update_receiver: mpsc::UnboundedReceiver<Update>,
99 deadlines: BinaryHeap<Deadline>,
100 operators: OperatorMap,
101}
102
103impl<Env: linera_core::Environment> TaskProcessor<Env> {
104 pub fn new(
106 chain_id: ChainId,
107 application_ids: Vec<ApplicationId>,
108 chain_client: ChainClient<Env>,
109 cancellation_token: CancellationToken,
110 operators: OperatorMap,
111 update_receiver: Option<mpsc::UnboundedReceiver<Update>>,
112 ) -> Self {
113 let notifications = chain_client.subscribe().expect("client subscription");
114 let (outcome_sender, outcome_receiver) = mpsc::unbounded_channel();
115 let update_receiver = update_receiver.unwrap_or_else(|| mpsc::unbounded_channel().1);
116 Self {
117 chain_id,
118 application_ids,
119 last_requested_callbacks: BTreeMap::new(),
120 chain_client,
121 cancellation_token,
122 outcome_sender,
123 outcome_receiver,
124 notifications,
125 deadlines: BinaryHeap::new(),
126 operators,
127 update_receiver,
128 }
129 }
130
131 pub async fn run(mut self) {
133 info!("Watching for notifications for chain {}", self.chain_id);
134 self.process_actions(self.application_ids.clone()).await;
135 loop {
136 select! {
137 Some(notification) = self.notifications.next() => {
138 if let Reason::NewBlock { .. } = notification.reason {
139 debug!(%self.chain_id, "Processing notification");
140 self.process_actions(self.application_ids.clone()).await;
141 }
142 }
143 _ = tokio::time::sleep(Self::duration_until_next_deadline(&self.deadlines)) => {
144 debug!("Processing event");
145 let application_ids = self.process_events();
146 self.process_actions(application_ids).await;
147 }
148 Some((application_id, outcome)) = self.outcome_receiver.recv() => {
149 if let Err(e) = self.submit_task_outcome(application_id, &outcome).await {
150 error!("Error while processing task outcome {outcome:?}: {e}");
151 }
152 }
153 Some(update) = self.update_receiver.recv() => {
154 self.apply_update(update).await;
155 }
156 _ = self.cancellation_token.cancelled().fuse() => {
157 break;
158 }
159 }
160 }
161 debug!("Notification stream ended.");
162 }
163
164 fn duration_until_next_deadline(deadlines: &BinaryHeap<Deadline>) -> tokio::time::Duration {
165 deadlines
166 .peek()
167 .map_or(tokio::time::Duration::MAX, |Reverse((x, _))| {
168 x.delta_since(Timestamp::now()).as_duration()
169 })
170 }
171
172 async fn apply_update(&mut self, update: Update) {
173 info!(
174 "Applying update for chain {}: {:?}",
175 self.chain_id, update.application_ids
176 );
177
178 let new_app_set: BTreeSet<_> = update.application_ids.iter().cloned().collect();
179 let old_app_set: BTreeSet<_> = self.application_ids.iter().cloned().collect();
180
181 self.last_requested_callbacks
183 .retain(|app_id, _| new_app_set.contains(app_id));
184
185 self.application_ids = update.application_ids;
187
188 let new_apps: Vec<_> = self
190 .application_ids
191 .iter()
192 .filter(|app_id| !old_app_set.contains(app_id))
193 .cloned()
194 .collect();
195 if !new_apps.is_empty() {
196 self.process_actions(new_apps).await;
197 }
198 }
199
200 fn process_events(&mut self) -> Vec<ApplicationId> {
201 let now = Timestamp::now();
202 let mut application_ids = Vec::new();
203 while let Some(deadline) = self.deadlines.pop() {
204 if let Reverse((_, Some(id))) = deadline {
205 application_ids.push(id);
206 }
207 let Some(Reverse((ts, _))) = self.deadlines.peek() else {
208 break;
209 };
210 if *ts > now {
211 break;
212 }
213 }
214 application_ids
215 }
216
217 async fn process_actions(&mut self, application_ids: Vec<ApplicationId>) {
218 for application_id in application_ids {
219 debug!("Processing actions for {application_id}");
220 let now = Timestamp::now();
221 let last_requested_callback =
222 self.last_requested_callbacks.get(&application_id).cloned();
223 let actions = match self
224 .query_actions(application_id, last_requested_callback, now)
225 .await
226 {
227 Ok(actions) => actions,
228 Err(error) => {
229 error!("Error reading application actions: {error}");
230 self.deadlines.push(Reverse((
232 now.saturating_add(TimeDelta::from_secs(60)),
233 None,
234 )));
235 continue;
236 }
237 };
238 if let Some(timestamp) = actions.request_callback {
239 self.last_requested_callbacks.insert(application_id, now);
240 self.deadlines
241 .push(Reverse((timestamp, Some(application_id))));
242 }
243 for task in actions.execute_tasks {
244 let sender = self.outcome_sender.clone();
245 let operators = self.operators.clone();
246 tokio::spawn(async move {
247 if let Err(e) = Self::execute_task(
248 application_id,
249 task.operator,
250 task.input,
251 sender,
252 operators,
253 )
254 .await
255 {
256 error!("Error executing task for {application_id}: {e}");
257 }
258 });
259 }
260 }
261 }
262
263 async fn execute_task(
264 application_id: ApplicationId,
265 operator: String,
266 input: String,
267 sender: mpsc::UnboundedSender<(ApplicationId, TaskOutcome)>,
268 operators: OperatorMap,
269 ) -> Result<(), anyhow::Error> {
270 let binary_path = operators
271 .get(&operator)
272 .ok_or_else(|| anyhow::anyhow!("unsupported operator: {}", operator))?;
273 debug!("Executing task {operator} ({binary_path:?}) for {application_id}");
274 let mut child = Command::new(binary_path)
275 .stdin(std::process::Stdio::piped())
276 .stdout(std::process::Stdio::piped())
277 .spawn()?;
278
279 let mut stdin = child.stdin.take().expect("stdin should be configured");
280 stdin.write_all(input.as_bytes()).await?;
281 drop(stdin);
282
283 let output = child.wait_with_output().await?;
284 anyhow::ensure!(
285 output.status.success(),
286 "operator {} exited with status: {}",
287 operator,
288 output.status
289 );
290 let outcome = TaskOutcome {
291 operator,
292 output: String::from_utf8_lossy(&output.stdout).into(),
293 };
294 debug!("Done executing task for {application_id}");
295 sender.send((application_id, outcome))?;
296 Ok(())
297 }
298
299 async fn query_actions(
300 &mut self,
301 application_id: ApplicationId,
302 last_requested_callback: Option<Timestamp>,
303 now: Timestamp,
304 ) -> Result<ProcessorActions, anyhow::Error> {
305 let query = format!(
306 "query {{ nextActions(lastRequestedCallback: {}, now: {}) }}",
307 last_requested_callback.to_value(),
308 now.to_value(),
309 );
310 let bytes = serde_json::to_vec(&json!({"query": query}))?;
311 let query = linera_execution::Query::User {
312 application_id,
313 bytes,
314 };
315 let linera_execution::QueryOutcome {
316 response,
317 operations: _,
318 } = self.chain_client.query_application(query, None).await?;
319 let linera_execution::QueryResponse::User(response) = response else {
320 anyhow::bail!("cannot get a system response for a user query");
321 };
322 let mut response: serde_json::Value = serde_json::from_slice(&response)?;
323 let actions: ProcessorActions =
324 serde_json::from_value(response["data"]["nextActions"].take())?;
325 Ok(actions)
326 }
327
328 async fn submit_task_outcome(
329 &mut self,
330 application_id: ApplicationId,
331 task_outcome: &TaskOutcome,
332 ) -> Result<(), anyhow::Error> {
333 info!("Submitting task outcome for {application_id}: {task_outcome:?}");
334 let query = format!(
335 "query {{ processTaskOutcome(outcome: {{ operator: {}, output: {} }}) }}",
336 task_outcome.operator.to_value(),
337 task_outcome.output.to_value(),
338 );
339 let bytes = serde_json::to_vec(&json!({"query": query}))?;
340 let query = linera_execution::Query::User {
341 application_id,
342 bytes,
343 };
344 let linera_execution::QueryOutcome {
345 response: _,
346 operations,
347 } = self.chain_client.query_application(query, None).await?;
348 if !operations.is_empty() {
349 if let Err(e) = self
350 .chain_client
351 .execute_operations(operations, vec![])
352 .await
353 {
354 error!("Failed to execute on-chain operations for {application_id}: {e}");
356 }
357 }
358 Ok(())
359 }
360}