linera_service/
query_subscription.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::HashMap,
6    sync::{Arc, Mutex},
7    time::Duration,
8};
9
10use futures::StreamExt as _;
11use linera_base::identifiers::{ApplicationId, ChainId};
12use linera_client::chain_listener::ClientContext;
13use linera_core::worker::Reason;
14use linera_execution::{Query, QueryResponse};
15use tokio::sync::watch;
16use tokio_util::sync::CancellationToken;
17use tracing::{debug, warn};
18
19/// A named GraphQL query string registered at startup via `--allow-subscription`.
20#[derive(Clone, Debug)]
21pub struct RegisteredQuery {
22    pub name: String,
23    pub query: String,
24}
25
26/// Parses a GraphQL query string like `query Name { ... }` and extracts the operation name.
27pub fn parse_allowed_subscription(s: &str) -> anyhow::Result<RegisteredQuery> {
28    let trimmed = s.trim();
29    let rest = trimmed
30        .strip_prefix("query")
31        .ok_or_else(|| anyhow::anyhow!("expected query to start with 'query', got: {s}"))?;
32    // The character right after "query" must be whitespace (not part of a longer word).
33    anyhow::ensure!(
34        rest.starts_with(char::is_whitespace),
35        "expected whitespace after 'query' keyword"
36    );
37    let rest = rest.trim_start();
38    // Extract the operation name: sequence of alphanumeric/underscore chars.
39    let name = rest
40        .split(|c: char| !c.is_alphanumeric() && c != '_')
41        .next()
42        .unwrap_or_default();
43    anyhow::ensure!(
44        !name.is_empty(),
45        "expected an operation name after 'query', e.g. 'query MyQuery {{ ... }}'"
46    );
47    Ok(RegisteredQuery {
48        name: name.to_string(),
49        query: trimmed.to_string(),
50    })
51}
52
53/// Parses a `Name=Secs` string into a query name and TTL in seconds.
54pub fn parse_subscription_ttl(s: &str) -> Result<(String, u64), String> {
55    let (name, secs) = s
56        .split_once('=')
57        .ok_or_else(|| format!("expected format Name=Secs, got: {s}"))?;
58    let secs: u64 = secs
59        .parse()
60        .map_err(|e| format!("invalid seconds value '{secs}': {e}"))?;
61    Ok((name.to_string(), secs))
62}
63
64/// Identifies a unique subscription target: a named query for a specific chain and application.
65#[derive(Clone, Debug, Hash, Eq, PartialEq)]
66pub struct SubscriptionKey {
67    pub name: String,
68    pub chain_id: ChainId,
69    pub application_id: ApplicationId,
70}
71
72/// State for an active watcher: the watch sender for the latest query result.
73/// The channel carries pre-serialized JSON strings so that cloning between
74/// subscribers is a single `memcpy` instead of a deep `serde_json::Value` clone.
75struct WatcherState {
76    sender: watch::Sender<Option<String>>,
77}
78
79/// Manages registered query names and active per-key watchers.
80pub struct QuerySubscriptionManager {
81    /// Registered queries by name.
82    queries: HashMap<String, String>,
83    /// Per-query minimum TTL for cached results.
84    ttls: HashMap<String, Duration>,
85    /// Active watchers keyed by subscription target.
86    watchers: Mutex<HashMap<SubscriptionKey, WatcherState>>,
87}
88
89impl QuerySubscriptionManager {
90    /// Creates a new manager from the registered queries and optional per-query TTLs.
91    pub fn new(registered: Vec<RegisteredQuery>, ttls: HashMap<String, Duration>) -> Self {
92        let queries = registered
93            .into_iter()
94            .map(|rq| (rq.name, rq.query))
95            .collect();
96        Self {
97            queries,
98            ttls,
99            watchers: Mutex::new(HashMap::new()),
100        }
101    }
102
103    /// Returns the GraphQL query string for a given name, if registered.
104    pub fn get_query(&self, name: &str) -> Option<&str> {
105        self.queries.get(name).map(|s| s.as_str())
106    }
107
108    /// Returns a watch receiver for the given key. Lazily spawns a watcher if needed.
109    /// The receiver initially holds `None`; the watcher populates it with `Some(value)`
110    /// after the first query. Callers should filter out `None` values from the stream.
111    pub fn subscribe<C: ClientContext + 'static>(
112        self: &Arc<Self>,
113        key: &SubscriptionKey,
114        context: Arc<futures::lock::Mutex<C>>,
115        token: CancellationToken,
116    ) -> anyhow::Result<watch::Receiver<Option<String>>> {
117        let query_string = self
118            .get_query(&key.name)
119            .ok_or_else(|| {
120                anyhow::anyhow!("no subscription query registered with name '{}'", key.name)
121            })?
122            .to_string();
123
124        let mut watchers = self.watchers.lock().unwrap();
125
126        // If a watcher already exists, reuse it.
127        if let Some(state) = watchers.get(key) {
128            return Ok(state.sender.subscribe());
129        }
130
131        // Create a new watch channel (initial value is None until the first query completes).
132        let (sender, receiver) = watch::channel(None);
133        watchers.insert(
134            key.clone(),
135            WatcherState {
136                sender: sender.clone(),
137            },
138        );
139
140        let ttl = self.ttls.get(&key.name).copied();
141        let manager = Arc::clone(self);
142        let key_clone = key.clone();
143        tokio::spawn(run_query_subscription_watcher(
144            context,
145            manager,
146            key_clone,
147            query_string,
148            sender,
149            token,
150            ttl,
151        ));
152
153        Ok(receiver)
154    }
155}
156
157/// Background task that watches for new blocks on a chain and re-executes the query.
158async fn run_query_subscription_watcher<C: ClientContext + 'static>(
159    context: Arc<futures::lock::Mutex<C>>,
160    manager: Arc<QuerySubscriptionManager>,
161    key: SubscriptionKey,
162    query_string: String,
163    sender: watch::Sender<Option<String>>,
164    token: CancellationToken,
165    ttl: Option<Duration>,
166) {
167    debug!(
168        name = %key.name,
169        chain_id = %key.chain_id,
170        application_id = %key.application_id,
171        ?ttl,
172        "starting query subscription watcher"
173    );
174
175    let notification_stream = {
176        let ctx = context.lock().await;
177        match ctx.make_chain_client(key.chain_id).await {
178            Ok(client) => match client.subscribe() {
179                Ok(stream) => stream,
180                Err(e) => {
181                    warn!("failed to subscribe to chain notifications: {e}");
182                    cleanup_watcher(&manager, &key);
183                    return;
184                }
185            },
186            Err(e) => {
187                warn!("failed to create chain client: {e}");
188                cleanup_watcher(&manager, &key);
189                return;
190            }
191        }
192    };
193
194    let mut notification_stream = Box::pin(notification_stream);
195
196    // Cache the last result as a string to deduplicate.
197    let mut last_result: Option<String> = None;
198
199    // Execute the query once immediately so the first subscriber gets a value.
200    execute_and_maybe_send(&context, &key, &query_string, &sender, &mut last_result).await;
201
202    // Track when the last execution happened, for TTL-based deferral.
203    let mut last_execution = tokio::time::Instant::now();
204    // Whether a deferred re-execution is pending (a NewBlock arrived during the TTL window).
205    let mut pending_invalidation = false;
206
207    loop {
208        // If there's a pending invalidation, compute the remaining TTL sleep.
209        let ttl_sleep = if pending_invalidation {
210            if let Some(ttl) = ttl {
211                let elapsed = last_execution.elapsed();
212                if elapsed < ttl {
213                    tokio::time::sleep(ttl - elapsed)
214                } else {
215                    tokio::time::sleep(Duration::ZERO)
216                }
217            } else {
218                // No TTL configured; should not happen since pending_invalidation
219                // is only set when ttl is Some, but handle gracefully.
220                tokio::time::sleep(Duration::ZERO)
221            }
222        } else {
223            // No pending invalidation: sleep forever (effectively disabled).
224            tokio::time::sleep(Duration::MAX)
225        };
226        tokio::pin!(ttl_sleep);
227
228        tokio::select! {
229            _ = token.cancelled() => {
230                debug!(name = %key.name, "watcher cancelled");
231                break;
232            }
233            () = &mut ttl_sleep, if pending_invalidation => {
234                pending_invalidation = false;
235                execute_and_maybe_send(
236                    &context,
237                    &key,
238                    &query_string,
239                    &sender,
240                    &mut last_result,
241                )
242                .await;
243                last_execution = tokio::time::Instant::now();
244            }
245            notification = notification_stream.next() => {
246                match notification {
247                    Some(n) => {
248                        if matches!(n.reason, Reason::NewBlock { .. }) {
249                            if ttl.is_some() {
250                                // Defer re-execution until the TTL expires.
251                                if !pending_invalidation {
252                                    debug!(name = %key.name, "deferring invalidation until TTL expires");
253                                }
254                                pending_invalidation = true;
255                            } else {
256                                execute_and_maybe_send(
257                                    &context,
258                                    &key,
259                                    &query_string,
260                                    &sender,
261                                    &mut last_result,
262                                )
263                                .await;
264                                last_execution = tokio::time::Instant::now();
265                            }
266                        }
267                    }
268                    None => {
269                        debug!(name = %key.name, "notification stream ended");
270                        break;
271                    }
272                }
273
274                // If no receivers remain, stop the watcher.
275                if sender.is_closed() {
276                    debug!(name = %key.name, "no more subscribers, stopping watcher");
277                    break;
278                }
279            }
280        }
281    }
282
283    cleanup_watcher(&manager, &key);
284}
285
286/// Executes the query against the application and updates the watch channel if the result changed.
287async fn execute_and_maybe_send<C: ClientContext + 'static>(
288    context: &Arc<futures::lock::Mutex<C>>,
289    key: &SubscriptionKey,
290    query_string: &str,
291    sender: &watch::Sender<Option<String>>,
292    last_result: &mut Option<String>,
293) {
294    // The application service expects a JSON-encoded GraphQL request.
295    let json_request = serde_json::json!({ "query": query_string });
296    let request_bytes = serde_json::to_vec(&json_request).unwrap();
297    let query = Query::User {
298        application_id: key.application_id,
299        bytes: request_bytes,
300    };
301
302    let result = {
303        let ctx = context.lock().await;
304        match ctx.make_chain_client(key.chain_id).await {
305            Ok(client) => client.query_application(query, None).await,
306            Err(e) => {
307                warn!(name = %key.name, "failed to create chain client: {e}");
308                return;
309            }
310        }
311    };
312
313    match result {
314        Ok((outcome, _height)) => {
315            let response_bytes = match outcome.response {
316                QueryResponse::User(bytes) => bytes,
317                QueryResponse::System(_) => {
318                    warn!(name = %key.name, "unexpected system response for user query");
319                    return;
320                }
321            };
322
323            // Convert to string. The bytes are already valid UTF-8 JSON
324            // from the application service.
325            let json_string = match String::from_utf8(response_bytes) {
326                Ok(s) => s,
327                Err(error) => {
328                    warn!(%error, name = %key.name, "response bytes are not valid UTF-8");
329                    return;
330                }
331            };
332
333            // Deduplicate: only send if the result changed.
334            if last_result.as_ref() == Some(&json_string) {
335                return;
336            }
337
338            if let Err(e) = sender.send(Some(json_string.clone())) {
339                debug!(name = %key.name, "Failed to send graphql response: {e}");
340            }
341            *last_result = Some(json_string);
342        }
343        Err(e) => {
344            warn!(name = %key.name, "query execution failed: {e}");
345        }
346    }
347}
348
349/// Removes the watcher entry from the manager.
350fn cleanup_watcher(manager: &QuerySubscriptionManager, key: &SubscriptionKey) {
351    let mut watchers = manager.watchers.lock().unwrap();
352    watchers.remove(key);
353    debug!(name = %key.name, "watcher cleaned up");
354}