linera_service/
query_subscription.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::HashMap,
6    sync::{Arc, Mutex},
7};
8
9use futures::StreamExt as _;
10use linera_base::identifiers::{ApplicationId, ChainId};
11use linera_client::chain_listener::ClientContext;
12use linera_core::worker::Reason;
13use linera_execution::{Query, QueryResponse};
14use serde_json::Value;
15use tokio::sync::broadcast;
16use tokio_util::sync::CancellationToken;
17use tracing::{debug, warn};
18
19/// A named GraphQL query string registered at startup via `--allow-subscription`.
20#[derive(Clone, Debug)]
21pub struct RegisteredQuery {
22    pub name: String,
23    pub query: String,
24}
25
26/// Parses a GraphQL query string like `query Name { ... }` and extracts the operation name.
27pub fn parse_allowed_subscription(s: &str) -> anyhow::Result<RegisteredQuery> {
28    let trimmed = s.trim();
29    let rest = trimmed
30        .strip_prefix("query")
31        .ok_or_else(|| anyhow::anyhow!("expected query to start with 'query', got: {s}"))?;
32    // The character right after "query" must be whitespace (not part of a longer word).
33    anyhow::ensure!(
34        rest.starts_with(char::is_whitespace),
35        "expected whitespace after 'query' keyword"
36    );
37    let rest = rest.trim_start();
38    // Extract the operation name: sequence of alphanumeric/underscore chars.
39    let name = rest
40        .split(|c: char| !c.is_alphanumeric() && c != '_')
41        .next()
42        .unwrap_or_default();
43    anyhow::ensure!(
44        !name.is_empty(),
45        "expected an operation name after 'query', e.g. 'query MyQuery {{ ... }}'"
46    );
47    Ok(RegisteredQuery {
48        name: name.to_string(),
49        query: trimmed.to_string(),
50    })
51}
52
53/// Identifies a unique subscription target: a named query for a specific chain and application.
54#[derive(Clone, Debug, Hash, Eq, PartialEq)]
55pub struct SubscriptionKey {
56    pub name: String,
57    pub chain_id: ChainId,
58    pub application_id: ApplicationId,
59}
60
61/// State for an active watcher: the broadcast sender and a handle for cleanup detection.
62struct WatcherState {
63    sender: broadcast::Sender<Value>,
64}
65
66/// Manages registered query names and active per-key watchers.
67pub struct QuerySubscriptionManager {
68    /// Registered queries by name.
69    queries: HashMap<String, String>,
70    /// Active watchers keyed by subscription target.
71    watchers: Mutex<HashMap<SubscriptionKey, WatcherState>>,
72}
73
74impl QuerySubscriptionManager {
75    /// Creates a new manager from the registered queries.
76    pub fn new(registered: Vec<RegisteredQuery>) -> Self {
77        let queries = registered
78            .into_iter()
79            .map(|rq| (rq.name, rq.query))
80            .collect();
81        Self {
82            queries,
83            watchers: Mutex::new(HashMap::new()),
84        }
85    }
86
87    /// Returns the GraphQL query string for a given name, if registered.
88    pub fn get_query(&self, name: &str) -> Option<&str> {
89        self.queries.get(name).map(|s| s.as_str())
90    }
91
92    /// Returns a broadcast receiver for the given key. Lazily spawns a watcher if needed.
93    pub fn subscribe<C: ClientContext + 'static>(
94        self: &Arc<Self>,
95        key: &SubscriptionKey,
96        context: Arc<futures::lock::Mutex<C>>,
97        token: CancellationToken,
98    ) -> anyhow::Result<broadcast::Receiver<Value>> {
99        let query_string = self
100            .get_query(&key.name)
101            .ok_or_else(|| {
102                anyhow::anyhow!("no subscription query registered with name '{}'", key.name)
103            })?
104            .to_string();
105
106        let mut watchers = self.watchers.lock().unwrap();
107
108        // If a watcher already exists, reuse it.
109        if let Some(state) = watchers.get(key) {
110            return Ok(state.sender.subscribe());
111        }
112
113        // Create a new broadcast channel and spawn a watcher.
114        let (sender, receiver) = broadcast::channel(64);
115        watchers.insert(
116            key.clone(),
117            WatcherState {
118                sender: sender.clone(),
119            },
120        );
121
122        let manager = Arc::clone(self);
123        let key_clone = key.clone();
124        tokio::spawn(run_query_subscription_watcher(
125            context,
126            manager,
127            key_clone,
128            query_string,
129            sender,
130            token,
131        ));
132
133        Ok(receiver)
134    }
135}
136
137/// Background task that watches for new blocks on a chain and re-executes the query.
138async fn run_query_subscription_watcher<C: ClientContext + 'static>(
139    context: Arc<futures::lock::Mutex<C>>,
140    manager: Arc<QuerySubscriptionManager>,
141    key: SubscriptionKey,
142    query_string: String,
143    sender: broadcast::Sender<Value>,
144    token: CancellationToken,
145) {
146    debug!(
147        name = %key.name,
148        chain_id = %key.chain_id,
149        application_id = %key.application_id,
150        "starting query subscription watcher"
151    );
152
153    let notification_stream = {
154        let ctx = context.lock().await;
155        match ctx.make_chain_client(key.chain_id).await {
156            Ok(client) => match client.subscribe() {
157                Ok(stream) => stream,
158                Err(e) => {
159                    warn!("failed to subscribe to chain notifications: {e}");
160                    cleanup_watcher(&manager, &key);
161                    return;
162                }
163            },
164            Err(e) => {
165                warn!("failed to create chain client: {e}");
166                cleanup_watcher(&manager, &key);
167                return;
168            }
169        }
170    };
171
172    let mut notification_stream = Box::pin(notification_stream);
173
174    // Cache the last result to deduplicate.
175    let mut last_result: Option<Vec<u8>> = None;
176
177    // Execute the query once immediately so the first subscriber gets a value.
178    execute_and_maybe_broadcast(&context, &key, &query_string, &sender, &mut last_result).await;
179
180    loop {
181        tokio::select! {
182            _ = token.cancelled() => {
183                debug!(name = %key.name, "watcher cancelled");
184                break;
185            }
186            notification = notification_stream.next() => {
187                match notification {
188                    Some(n) => {
189                        if matches!(n.reason, Reason::NewBlock { .. }) {
190                            execute_and_maybe_broadcast(
191                                &context,
192                                &key,
193                                &query_string,
194                                &sender,
195                                &mut last_result,
196                            )
197                            .await;
198                        }
199                    }
200                    None => {
201                        debug!(name = %key.name, "notification stream ended");
202                        break;
203                    }
204                }
205
206                // If no receivers remain, stop the watcher.
207                if sender.receiver_count() == 0 {
208                    debug!(name = %key.name, "no more subscribers, stopping watcher");
209                    break;
210                }
211            }
212        }
213    }
214
215    cleanup_watcher(&manager, &key);
216}
217
218/// Executes the query against the application and broadcasts if the result changed.
219async fn execute_and_maybe_broadcast<C: ClientContext + 'static>(
220    context: &Arc<futures::lock::Mutex<C>>,
221    key: &SubscriptionKey,
222    query_string: &str,
223    sender: &broadcast::Sender<Value>,
224    last_result: &mut Option<Vec<u8>>,
225) {
226    // The application service expects a JSON-encoded GraphQL request.
227    let json_request = serde_json::json!({ "query": query_string });
228    let request_bytes = serde_json::to_vec(&json_request).unwrap();
229    let query = Query::User {
230        application_id: key.application_id,
231        bytes: request_bytes,
232    };
233
234    let result = {
235        let ctx = context.lock().await;
236        match ctx.make_chain_client(key.chain_id).await {
237            Ok(client) => client.query_application(query, None).await,
238            Err(e) => {
239                warn!(name = %key.name, "failed to create chain client: {e}");
240                return;
241            }
242        }
243    };
244
245    match result {
246        Ok((outcome, _height)) => {
247            let response_bytes = match outcome.response {
248                QueryResponse::User(bytes) => bytes,
249                QueryResponse::System(_) => {
250                    warn!(name = %key.name, "unexpected system response for user query");
251                    return;
252                }
253            };
254
255            // Deduplicate: only broadcast if the result changed.
256            if last_result.as_ref() == Some(&response_bytes) {
257                return;
258            }
259
260            *last_result = Some(response_bytes.clone());
261
262            // Parse the response as JSON and broadcast.
263            match serde_json::from_slice::<Value>(&response_bytes) {
264                Ok(value) => {
265                    // Ignore send errors (no receivers).
266                    if let Err(e) = sender.send(value) {
267                        debug!(name = %key.name, "Failed to send graphql response: {e}");
268                    }
269                }
270                Err(e) => {
271                    warn!(name = %key.name, "failed to parse query response as JSON: {e}");
272                }
273            }
274        }
275        Err(e) => {
276            warn!(name = %key.name, "query execution failed: {e}");
277        }
278    }
279}
280
281/// Removes the watcher entry from the manager.
282fn cleanup_watcher(manager: &QuerySubscriptionManager, key: &SubscriptionKey) {
283    let mut watchers = manager.watchers.lock().unwrap();
284    watchers.remove(key);
285    debug!(name = %key.name, "watcher cleaned up");
286}