1use std::{
5 collections::HashMap,
6 sync::{Arc, Mutex},
7 time::Duration,
8};
9
10use futures::StreamExt as _;
11use linera_base::identifiers::{ApplicationId, ChainId};
12use linera_client::chain_listener::ClientContext;
13use linera_core::worker::Reason;
14use linera_execution::{Query, QueryResponse};
15use tokio::sync::watch;
16use tokio_util::sync::CancellationToken;
17use tracing::{debug, warn};
18
19#[derive(Clone, Debug)]
21pub struct RegisteredQuery {
22 pub name: String,
23 pub query: String,
24}
25
26pub fn parse_allowed_subscription(s: &str) -> anyhow::Result<RegisteredQuery> {
28 let trimmed = s.trim();
29 let rest = trimmed
30 .strip_prefix("query")
31 .ok_or_else(|| anyhow::anyhow!("expected query to start with 'query', got: {s}"))?;
32 anyhow::ensure!(
34 rest.starts_with(char::is_whitespace),
35 "expected whitespace after 'query' keyword"
36 );
37 let rest = rest.trim_start();
38 let name = rest
40 .split(|c: char| !c.is_alphanumeric() && c != '_')
41 .next()
42 .unwrap_or_default();
43 anyhow::ensure!(
44 !name.is_empty(),
45 "expected an operation name after 'query', e.g. 'query MyQuery {{ ... }}'"
46 );
47 Ok(RegisteredQuery {
48 name: name.to_string(),
49 query: trimmed.to_string(),
50 })
51}
52
53pub fn parse_subscription_ttl(s: &str) -> Result<(String, u64), String> {
55 let (name, secs) = s
56 .split_once('=')
57 .ok_or_else(|| format!("expected format Name=Secs, got: {s}"))?;
58 let secs: u64 = secs
59 .parse()
60 .map_err(|e| format!("invalid seconds value '{secs}': {e}"))?;
61 Ok((name.to_string(), secs))
62}
63
64#[derive(Clone, Debug, Hash, Eq, PartialEq)]
66pub struct SubscriptionKey {
67 pub name: String,
68 pub chain_id: ChainId,
69 pub application_id: ApplicationId,
70}
71
72struct WatcherState {
76 sender: watch::Sender<Option<String>>,
77}
78
79pub struct QuerySubscriptionManager {
81 queries: HashMap<String, String>,
83 ttls: HashMap<String, Duration>,
85 watchers: Mutex<HashMap<SubscriptionKey, WatcherState>>,
87}
88
89impl QuerySubscriptionManager {
90 pub fn new(registered: Vec<RegisteredQuery>, ttls: HashMap<String, Duration>) -> Self {
92 let queries = registered
93 .into_iter()
94 .map(|rq| (rq.name, rq.query))
95 .collect();
96 Self {
97 queries,
98 ttls,
99 watchers: Mutex::new(HashMap::new()),
100 }
101 }
102
103 pub fn get_query(&self, name: &str) -> Option<&str> {
105 self.queries.get(name).map(|s| s.as_str())
106 }
107
108 pub fn subscribe<C: ClientContext + 'static>(
112 self: &Arc<Self>,
113 key: &SubscriptionKey,
114 context: Arc<futures::lock::Mutex<C>>,
115 token: CancellationToken,
116 ) -> anyhow::Result<watch::Receiver<Option<String>>> {
117 let query_string = self
118 .get_query(&key.name)
119 .ok_or_else(|| {
120 anyhow::anyhow!("no subscription query registered with name '{}'", key.name)
121 })?
122 .to_string();
123
124 let mut watchers = self.watchers.lock().unwrap();
125
126 if let Some(state) = watchers.get(key) {
128 return Ok(state.sender.subscribe());
129 }
130
131 let (sender, receiver) = watch::channel(None);
133 watchers.insert(
134 key.clone(),
135 WatcherState {
136 sender: sender.clone(),
137 },
138 );
139
140 let ttl = self.ttls.get(&key.name).copied();
141 let manager = Arc::clone(self);
142 let key_clone = key.clone();
143 tokio::spawn(run_query_subscription_watcher(
144 context,
145 manager,
146 key_clone,
147 query_string,
148 sender,
149 token,
150 ttl,
151 ));
152
153 Ok(receiver)
154 }
155}
156
157async fn run_query_subscription_watcher<C: ClientContext + 'static>(
159 context: Arc<futures::lock::Mutex<C>>,
160 manager: Arc<QuerySubscriptionManager>,
161 key: SubscriptionKey,
162 query_string: String,
163 sender: watch::Sender<Option<String>>,
164 token: CancellationToken,
165 ttl: Option<Duration>,
166) {
167 debug!(
168 name = %key.name,
169 chain_id = %key.chain_id,
170 application_id = %key.application_id,
171 ?ttl,
172 "starting query subscription watcher"
173 );
174
175 let notification_stream = {
176 let ctx = context.lock().await;
177 match ctx.make_chain_client(key.chain_id).await {
178 Ok(client) => match client.subscribe() {
179 Ok(stream) => stream,
180 Err(e) => {
181 warn!("failed to subscribe to chain notifications: {e}");
182 cleanup_watcher(&manager, &key);
183 return;
184 }
185 },
186 Err(e) => {
187 warn!("failed to create chain client: {e}");
188 cleanup_watcher(&manager, &key);
189 return;
190 }
191 }
192 };
193
194 let mut notification_stream = Box::pin(notification_stream);
195
196 let mut last_result: Option<String> = None;
198
199 execute_and_maybe_send(&context, &key, &query_string, &sender, &mut last_result).await;
201
202 let mut last_execution = tokio::time::Instant::now();
204 let mut pending_invalidation = false;
206
207 loop {
208 let ttl_sleep = if pending_invalidation {
210 if let Some(ttl) = ttl {
211 let elapsed = last_execution.elapsed();
212 if elapsed < ttl {
213 tokio::time::sleep(ttl - elapsed)
214 } else {
215 tokio::time::sleep(Duration::ZERO)
216 }
217 } else {
218 tokio::time::sleep(Duration::ZERO)
221 }
222 } else {
223 tokio::time::sleep(Duration::MAX)
225 };
226 tokio::pin!(ttl_sleep);
227
228 tokio::select! {
229 _ = token.cancelled() => {
230 debug!(name = %key.name, "watcher cancelled");
231 break;
232 }
233 () = &mut ttl_sleep, if pending_invalidation => {
234 pending_invalidation = false;
235 execute_and_maybe_send(
236 &context,
237 &key,
238 &query_string,
239 &sender,
240 &mut last_result,
241 )
242 .await;
243 last_execution = tokio::time::Instant::now();
244 }
245 notification = notification_stream.next() => {
246 match notification {
247 Some(n) => {
248 if matches!(n.reason, Reason::NewBlock { .. }) {
249 if ttl.is_some() {
250 if !pending_invalidation {
252 debug!(name = %key.name, "deferring invalidation until TTL expires");
253 }
254 pending_invalidation = true;
255 } else {
256 execute_and_maybe_send(
257 &context,
258 &key,
259 &query_string,
260 &sender,
261 &mut last_result,
262 )
263 .await;
264 last_execution = tokio::time::Instant::now();
265 }
266 }
267 }
268 None => {
269 debug!(name = %key.name, "notification stream ended");
270 break;
271 }
272 }
273
274 if sender.is_closed() {
276 debug!(name = %key.name, "no more subscribers, stopping watcher");
277 break;
278 }
279 }
280 }
281 }
282
283 cleanup_watcher(&manager, &key);
284}
285
286async fn execute_and_maybe_send<C: ClientContext + 'static>(
288 context: &Arc<futures::lock::Mutex<C>>,
289 key: &SubscriptionKey,
290 query_string: &str,
291 sender: &watch::Sender<Option<String>>,
292 last_result: &mut Option<String>,
293) {
294 let json_request = serde_json::json!({ "query": query_string });
296 let request_bytes = serde_json::to_vec(&json_request).unwrap();
297 let query = Query::User {
298 application_id: key.application_id,
299 bytes: request_bytes,
300 };
301
302 let result = {
303 let ctx = context.lock().await;
304 match ctx.make_chain_client(key.chain_id).await {
305 Ok(client) => client.query_application(query, None).await,
306 Err(e) => {
307 warn!(name = %key.name, "failed to create chain client: {e}");
308 return;
309 }
310 }
311 };
312
313 match result {
314 Ok((outcome, _height)) => {
315 let response_bytes = match outcome.response {
316 QueryResponse::User(bytes) => bytes,
317 QueryResponse::System(_) => {
318 warn!(name = %key.name, "unexpected system response for user query");
319 return;
320 }
321 };
322
323 let json_string = match String::from_utf8(response_bytes) {
326 Ok(s) => s,
327 Err(error) => {
328 warn!(%error, name = %key.name, "response bytes are not valid UTF-8");
329 return;
330 }
331 };
332
333 if last_result.as_ref() == Some(&json_string) {
335 return;
336 }
337
338 if let Err(e) = sender.send(Some(json_string.clone())) {
339 debug!(name = %key.name, "Failed to send graphql response: {e}");
340 }
341 *last_result = Some(json_string);
342 }
343 Err(e) => {
344 warn!(name = %key.name, "query execution failed: {e}");
345 }
346 }
347}
348
349fn cleanup_watcher(manager: &QuerySubscriptionManager, key: &SubscriptionKey) {
351 let mut watchers = manager.watchers.lock().unwrap();
352 watchers.remove(key);
353 debug!(name = %key.name, "watcher cleaned up");
354}