1use std::{
5 collections::HashMap,
6 sync::{Arc, Mutex},
7};
8
9use futures::StreamExt as _;
10use linera_base::identifiers::{ApplicationId, ChainId};
11use linera_client::chain_listener::ClientContext;
12use linera_core::worker::Reason;
13use linera_execution::{Query, QueryResponse};
14use serde_json::Value;
15use tokio::sync::broadcast;
16use tokio_util::sync::CancellationToken;
17use tracing::{debug, warn};
18
19#[derive(Clone, Debug)]
21pub struct RegisteredQuery {
22 pub name: String,
23 pub query: String,
24}
25
26pub fn parse_allowed_subscription(s: &str) -> anyhow::Result<RegisteredQuery> {
28 let trimmed = s.trim();
29 let rest = trimmed
30 .strip_prefix("query")
31 .ok_or_else(|| anyhow::anyhow!("expected query to start with 'query', got: {s}"))?;
32 anyhow::ensure!(
34 rest.starts_with(char::is_whitespace),
35 "expected whitespace after 'query' keyword"
36 );
37 let rest = rest.trim_start();
38 let name = rest
40 .split(|c: char| !c.is_alphanumeric() && c != '_')
41 .next()
42 .unwrap_or_default();
43 anyhow::ensure!(
44 !name.is_empty(),
45 "expected an operation name after 'query', e.g. 'query MyQuery {{ ... }}'"
46 );
47 Ok(RegisteredQuery {
48 name: name.to_string(),
49 query: trimmed.to_string(),
50 })
51}
52
53#[derive(Clone, Debug, Hash, Eq, PartialEq)]
55pub struct SubscriptionKey {
56 pub name: String,
57 pub chain_id: ChainId,
58 pub application_id: ApplicationId,
59}
60
61struct WatcherState {
63 sender: broadcast::Sender<Value>,
64}
65
66pub struct QuerySubscriptionManager {
68 queries: HashMap<String, String>,
70 watchers: Mutex<HashMap<SubscriptionKey, WatcherState>>,
72}
73
74impl QuerySubscriptionManager {
75 pub fn new(registered: Vec<RegisteredQuery>) -> Self {
77 let queries = registered
78 .into_iter()
79 .map(|rq| (rq.name, rq.query))
80 .collect();
81 Self {
82 queries,
83 watchers: Mutex::new(HashMap::new()),
84 }
85 }
86
87 pub fn get_query(&self, name: &str) -> Option<&str> {
89 self.queries.get(name).map(|s| s.as_str())
90 }
91
92 pub fn subscribe<C: ClientContext + 'static>(
94 self: &Arc<Self>,
95 key: &SubscriptionKey,
96 context: Arc<futures::lock::Mutex<C>>,
97 token: CancellationToken,
98 ) -> anyhow::Result<broadcast::Receiver<Value>> {
99 let query_string = self
100 .get_query(&key.name)
101 .ok_or_else(|| {
102 anyhow::anyhow!("no subscription query registered with name '{}'", key.name)
103 })?
104 .to_string();
105
106 let mut watchers = self.watchers.lock().unwrap();
107
108 if let Some(state) = watchers.get(key) {
110 return Ok(state.sender.subscribe());
111 }
112
113 let (sender, receiver) = broadcast::channel(64);
115 watchers.insert(
116 key.clone(),
117 WatcherState {
118 sender: sender.clone(),
119 },
120 );
121
122 let manager = Arc::clone(self);
123 let key_clone = key.clone();
124 tokio::spawn(run_query_subscription_watcher(
125 context,
126 manager,
127 key_clone,
128 query_string,
129 sender,
130 token,
131 ));
132
133 Ok(receiver)
134 }
135}
136
137async fn run_query_subscription_watcher<C: ClientContext + 'static>(
139 context: Arc<futures::lock::Mutex<C>>,
140 manager: Arc<QuerySubscriptionManager>,
141 key: SubscriptionKey,
142 query_string: String,
143 sender: broadcast::Sender<Value>,
144 token: CancellationToken,
145) {
146 debug!(
147 name = %key.name,
148 chain_id = %key.chain_id,
149 application_id = %key.application_id,
150 "starting query subscription watcher"
151 );
152
153 let notification_stream = {
154 let ctx = context.lock().await;
155 match ctx.make_chain_client(key.chain_id).await {
156 Ok(client) => match client.subscribe() {
157 Ok(stream) => stream,
158 Err(e) => {
159 warn!("failed to subscribe to chain notifications: {e}");
160 cleanup_watcher(&manager, &key);
161 return;
162 }
163 },
164 Err(e) => {
165 warn!("failed to create chain client: {e}");
166 cleanup_watcher(&manager, &key);
167 return;
168 }
169 }
170 };
171
172 let mut notification_stream = Box::pin(notification_stream);
173
174 let mut last_result: Option<Vec<u8>> = None;
176
177 execute_and_maybe_broadcast(&context, &key, &query_string, &sender, &mut last_result).await;
179
180 loop {
181 tokio::select! {
182 _ = token.cancelled() => {
183 debug!(name = %key.name, "watcher cancelled");
184 break;
185 }
186 notification = notification_stream.next() => {
187 match notification {
188 Some(n) => {
189 if matches!(n.reason, Reason::NewBlock { .. }) {
190 execute_and_maybe_broadcast(
191 &context,
192 &key,
193 &query_string,
194 &sender,
195 &mut last_result,
196 )
197 .await;
198 }
199 }
200 None => {
201 debug!(name = %key.name, "notification stream ended");
202 break;
203 }
204 }
205
206 if sender.receiver_count() == 0 {
208 debug!(name = %key.name, "no more subscribers, stopping watcher");
209 break;
210 }
211 }
212 }
213 }
214
215 cleanup_watcher(&manager, &key);
216}
217
218async fn execute_and_maybe_broadcast<C: ClientContext + 'static>(
220 context: &Arc<futures::lock::Mutex<C>>,
221 key: &SubscriptionKey,
222 query_string: &str,
223 sender: &broadcast::Sender<Value>,
224 last_result: &mut Option<Vec<u8>>,
225) {
226 let json_request = serde_json::json!({ "query": query_string });
228 let request_bytes = serde_json::to_vec(&json_request).unwrap();
229 let query = Query::User {
230 application_id: key.application_id,
231 bytes: request_bytes,
232 };
233
234 let result = {
235 let ctx = context.lock().await;
236 match ctx.make_chain_client(key.chain_id).await {
237 Ok(client) => client.query_application(query, None).await,
238 Err(e) => {
239 warn!(name = %key.name, "failed to create chain client: {e}");
240 return;
241 }
242 }
243 };
244
245 match result {
246 Ok((outcome, _height)) => {
247 let response_bytes = match outcome.response {
248 QueryResponse::User(bytes) => bytes,
249 QueryResponse::System(_) => {
250 warn!(name = %key.name, "unexpected system response for user query");
251 return;
252 }
253 };
254
255 if last_result.as_ref() == Some(&response_bytes) {
257 return;
258 }
259
260 *last_result = Some(response_bytes.clone());
261
262 match serde_json::from_slice::<Value>(&response_bytes) {
264 Ok(value) => {
265 if let Err(e) = sender.send(value) {
267 debug!(name = %key.name, "Failed to send graphql response: {e}");
268 }
269 }
270 Err(e) => {
271 warn!(name = %key.name, "failed to parse query response as JSON: {e}");
272 }
273 }
274 }
275 Err(e) => {
276 warn!(name = %key.name, "query execution failed: {e}");
277 }
278 }
279}
280
281fn cleanup_watcher(manager: &QuerySubscriptionManager, key: &SubscriptionKey) {
283 let mut watchers = manager.watchers.lock().unwrap();
284 watchers.remove(key);
285 debug!(name = %key.name, "watcher cleaned up");
286}