linera_service/
node_service.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Cow,
6    future::IntoFuture,
7    iter,
8    net::SocketAddr,
9    num::NonZeroU16,
10    sync::{Arc, Mutex as StdMutex},
11};
12
13use async_graphql::{
14    futures_util::Stream,
15    registry::{MetaType, MetaTypeId, Registry},
16    resolver_utils::ContainerType,
17    EmptyMutation, Error, MergedObject, OutputType, Positioned, Request, Response, ScalarType,
18    Schema, SimpleObject, Subscription,
19};
20use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
21use axum::{extract::Path, http::StatusCode, response, response::IntoResponse, Extension, Router};
22use futures::{lock::Mutex, Future, FutureExt as _, StreamExt as _, TryStreamExt as _};
23use linera_base::{
24    crypto::{CryptoError, CryptoHash},
25    data_types::{
26        Amount, ApplicationDescription, ApplicationPermissions, BlockHeight, Bytecode, Epoch,
27        TimeDelta,
28    },
29    identifiers::{
30        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
31    },
32    ownership::{ChainOwnership, TimeoutConfig},
33    vm::VmRuntime,
34    BcsHexParseError,
35};
36use linera_chain::{
37    types::{ConfirmedBlock, GenericCertificate},
38    ChainStateView,
39};
40use linera_client::chain_listener::{
41    ChainListener, ChainListenerConfig, ClientContext, ListenerCommand,
42};
43use linera_core::{
44    client::chain_client::{self, ChainClient},
45    data_types::ClientOutcome,
46    wallet::Wallet as _,
47    worker::{ChainStateViewReadGuard, Notification, Reason},
48};
49use linera_execution::{
50    committee::Committee, system::AdminOperation, Operation, Query, QueryOutcome, QueryResponse,
51    SystemOperation,
52};
53#[cfg(with_metrics)]
54use linera_metrics::monitoring_server;
55use linera_sdk::linera_base_types::BlobContent;
56use linera_storage::Storage;
57use lru::LruCache;
58use serde::{Deserialize, Serialize};
59use serde_json::json;
60use tokio::sync::mpsc::UnboundedReceiver;
61use tokio_util::sync::CancellationToken;
62use tower_http::cors::CorsLayer;
63use tracing::{debug, info, instrument, trace};
64
65use crate::util;
66
67/// A pre-serialized JSON string that implements [`OutputType`] as the `JSON` scalar.
68///
69/// When the `raw_value` feature of `async-graphql` is enabled, the string is
70/// emitted directly into the GraphQL response without any parsing or
71/// intermediate tree construction.
72#[derive(Clone)]
73struct RawJson(String);
74
75impl OutputType for RawJson {
76    fn type_name() -> Cow<'static, str> {
77        Cow::Borrowed("JSON")
78    }
79
80    fn create_type_info(registry: &mut Registry) -> String {
81        registry.create_output_type::<Self, _>(MetaTypeId::Scalar, |_| MetaType::Scalar {
82            name: "JSON".to_string(),
83            description: Some("A scalar that can represent any JSON value.".to_string()),
84            is_valid: None,
85            visible: None,
86            inaccessible: false,
87            tags: Default::default(),
88            specified_by_url: None,
89            directive_invocations: Default::default(),
90            requires_scopes: Default::default(),
91        })
92    }
93
94    async fn resolve(
95        &self,
96        _ctx: &async_graphql::ContextSelectionSet<'_>,
97        _field: &Positioned<async_graphql::parser::types::Field>,
98    ) -> async_graphql::ServerResult<async_graphql::Value> {
99        // Wrap the raw JSON string with the magic token that async-graphql's
100        // ConstValue serializer recognises (with feature `raw_value`).
101        // When the response is serialised to JSON the raw string is emitted
102        // verbatim, avoiding any parsing or tree conversion.
103        Ok(async_graphql::Value::Object(
104            std::iter::once((
105                async_graphql::Name::new(async_graphql_value::RAW_VALUE_TOKEN),
106                async_graphql::Value::String(self.0.clone()),
107            ))
108            .collect(),
109        ))
110    }
111}
112
113#[derive(SimpleObject, Serialize, Deserialize, Clone)]
114pub struct Chains {
115    pub list: Vec<ChainId>,
116    pub default: Option<ChainId>,
117}
118
119/// Our root GraphQL query type.
120pub struct QueryRoot<C> {
121    context: Arc<Mutex<C>>,
122    port: NonZeroU16,
123    default_chain: Option<ChainId>,
124}
125
126/// Our root GraphQL subscription type.
127pub struct SubscriptionRoot<C> {
128    context: Arc<Mutex<C>>,
129    query_subscriptions: Option<Arc<crate::query_subscription::QuerySubscriptionManager>>,
130    cancellation_token: CancellationToken,
131}
132
133/// Our root GraphQL mutation type.
134pub struct MutationRoot<C> {
135    context: Arc<Mutex<C>>,
136}
137
138#[derive(Debug, thiserror::Error)]
139enum NodeServiceError {
140    #[error(transparent)]
141    ChainClient(#[from] chain_client::Error),
142    #[error(transparent)]
143    BcsHex(#[from] BcsHexParseError),
144    #[error(transparent)]
145    Json(#[from] serde_json::Error),
146    #[error("malformed chain ID: {0}")]
147    InvalidChainId(CryptoError),
148    #[error(transparent)]
149    Client(#[from] linera_client::Error),
150    #[error("scheduling operations from queries is disabled in read-only mode")]
151    ReadOnlyModeOperationsNotAllowed,
152}
153
154impl IntoResponse for NodeServiceError {
155    fn into_response(self) -> response::Response {
156        let status = match self {
157            NodeServiceError::InvalidChainId(_) | NodeServiceError::BcsHex(_) => {
158                StatusCode::BAD_REQUEST
159            }
160            NodeServiceError::ReadOnlyModeOperationsNotAllowed => StatusCode::FORBIDDEN,
161            _ => StatusCode::INTERNAL_SERVER_ERROR,
162        };
163        let body = json!({"error": self.to_string()}).to_string();
164        (status, body).into_response()
165    }
166}
167
168#[Subscription]
169impl<C> SubscriptionRoot<C>
170where
171    C: ClientContext + 'static,
172{
173    /// Subscribes to notifications from the specified chain.
174    async fn notifications(
175        &self,
176        chain_id: ChainId,
177    ) -> Result<impl Stream<Item = Notification>, Error> {
178        let client = self
179            .context
180            .lock()
181            .await
182            .make_chain_client(chain_id)
183            .await?;
184        Ok(client.subscribe()?)
185    }
186
187    /// Subscribes to the result of a pre-registered GraphQL query.
188    /// Re-executes the query on every new block and pushes changed results.
189    async fn query_result(
190        &self,
191        #[graphql(desc = "Name of the registered subscription query.")] name: String,
192        #[graphql(desc = "The chain to watch.")] chain_id: ChainId,
193        #[graphql(desc = "The application to query.")] application_id: ApplicationId,
194    ) -> Result<impl Stream<Item = RawJson>, Error> {
195        let manager = self
196            .query_subscriptions
197            .as_ref()
198            .ok_or_else(|| Error::new("no subscription queries registered"))?;
199
200        let key = crate::query_subscription::SubscriptionKey {
201            name,
202            chain_id,
203            application_id,
204        };
205
206        let receiver = manager
207            .subscribe(
208                &key,
209                Arc::clone(&self.context),
210                self.cancellation_token.clone(),
211            )
212            .map_err(|e| Error::new(e.to_string()))?;
213
214        // `sender.subscribe()` marks the current value as "already seen", so
215        // `WatchStream` would skip it and wait for the next change.  Grab the
216        // current snapshot first and prepend it to the stream so that every new
217        // subscriber gets the latest cached result immediately.
218        let current = receiver.borrow().clone();
219        let changes = tokio_stream::wrappers::WatchStream::from_changes(receiver)
220            .filter_map(|value| async move { value });
221        Ok(futures::stream::iter(current).chain(changes).map(RawJson))
222    }
223}
224
225impl<C> MutationRoot<C>
226where
227    C: ClientContext,
228{
229    async fn execute_system_operation(
230        &self,
231        system_operation: SystemOperation,
232        chain_id: ChainId,
233    ) -> Result<CryptoHash, Error> {
234        let certificate = self
235            .apply_client_command(&chain_id, move |client| {
236                let operation = Operation::system(system_operation.clone());
237                async move {
238                    let result = client
239                        .execute_operation(operation)
240                        .await
241                        .map_err(Error::from);
242                    (result, client)
243                }
244            })
245            .await?;
246        Ok(certificate.hash())
247    }
248
249    /// Applies the given function to the chain client.
250    /// Updates the wallet regardless of the outcome. As long as the function returns a round
251    /// timeout, it will wait and retry.
252    async fn apply_client_command<F, Fut, T>(
253        &self,
254        chain_id: &ChainId,
255        mut f: F,
256    ) -> Result<T, Error>
257    where
258        F: FnMut(ChainClient<C::Environment>) -> Fut,
259        Fut: Future<Output = (Result<ClientOutcome<T>, Error>, ChainClient<C::Environment>)>,
260    {
261        loop {
262            let client = self
263                .context
264                .lock()
265                .await
266                .make_chain_client(*chain_id)
267                .await?;
268            let mut stream = client.subscribe()?;
269            let (result, client) = f(client).await;
270            self.context.lock().await.update_wallet(&client).await?;
271            let timeout = match result? {
272                ClientOutcome::Committed(t) => return Ok(t),
273                ClientOutcome::Conflict(certificate) => {
274                    return Err(chain_client::Error::Conflict(certificate.hash()).into());
275                }
276                ClientOutcome::WaitForTimeout(timeout) => timeout,
277            };
278            drop(client);
279            util::wait_for_next_round(&mut stream, timeout).await;
280        }
281    }
282}
283
284#[async_graphql::Object(cache_control(no_cache))]
285impl<C> MutationRoot<C>
286where
287    C: ClientContext + 'static,
288{
289    /// Processes the inbox and returns the lists of certificate hashes that were created, if any.
290    async fn process_inbox(
291        &self,
292        #[graphql(desc = "The chain whose inbox is being processed.")] chain_id: ChainId,
293    ) -> Result<Vec<CryptoHash>, Error> {
294        let mut hashes = Vec::new();
295        loop {
296            let client = self
297                .context
298                .lock()
299                .await
300                .make_chain_client(chain_id)
301                .await?;
302            let result = client.process_inbox().await;
303            self.context.lock().await.update_wallet(&client).await?;
304            let (certificates, maybe_timeout) = result?;
305            hashes.extend(certificates.into_iter().map(|cert| cert.hash()));
306            match maybe_timeout {
307                None => return Ok(hashes),
308                Some(timestamp) => {
309                    let mut stream = client.subscribe()?;
310                    drop(client);
311                    util::wait_for_next_round(&mut stream, timestamp).await;
312                }
313            }
314        }
315    }
316
317    /// Synchronizes the chain with the validators. Returns the chain's length.
318    ///
319    /// This is only used for testing, to make sure that a client is up to date.
320    // TODO(#4718): Remove this mutation.
321    async fn sync(
322        &self,
323        #[graphql(desc = "The chain being synchronized.")] chain_id: ChainId,
324    ) -> Result<u64, Error> {
325        let client = self
326            .context
327            .lock()
328            .await
329            .make_chain_client(chain_id)
330            .await?;
331        let info = client.synchronize_from_validators().await?;
332        self.context.lock().await.update_wallet(&client).await?;
333        Ok(info.next_block_height.0)
334    }
335
336    /// Retries the pending block that was unsuccessfully proposed earlier.
337    async fn retry_pending_block(
338        &self,
339        #[graphql(desc = "The chain on whose block is being retried.")] chain_id: ChainId,
340    ) -> Result<Option<CryptoHash>, Error> {
341        let client = self
342            .context
343            .lock()
344            .await
345            .make_chain_client(chain_id)
346            .await?;
347        let outcome = client.process_pending_block().await?;
348        self.context.lock().await.update_wallet(&client).await?;
349        match outcome {
350            ClientOutcome::Committed(Some(certificate)) => Ok(Some(certificate.hash())),
351            ClientOutcome::Committed(None) => Ok(None),
352            ClientOutcome::WaitForTimeout(timeout) => Err(Error::from(format!(
353                "Please try again at {}",
354                timeout.timestamp
355            ))),
356            ClientOutcome::Conflict(certificate) => Err(Error::from(format!(
357                "A different block was committed: {}",
358                certificate.hash()
359            ))),
360        }
361    }
362
363    /// Transfers `amount` units of value from the given owner's account to the recipient.
364    /// If no owner is given, try to take the units out of the chain account.
365    async fn transfer(
366        &self,
367        #[graphql(desc = "The chain which native tokens are being transferred from.")]
368        chain_id: ChainId,
369        #[graphql(desc = "The account being debited on the chain.")] owner: AccountOwner,
370        #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
371        #[graphql(desc = "The amount being transferred.")] amount: Amount,
372    ) -> Result<CryptoHash, Error> {
373        self.apply_client_command(&chain_id, move |client| async move {
374            let result = client
375                .transfer(owner, amount, recipient)
376                .await
377                .map_err(Error::from)
378                .map(|outcome| outcome.map(|certificate| certificate.hash()));
379            (result, client)
380        })
381        .await
382    }
383
384    /// Claims `amount` units of value from the given owner's account in the remote
385    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
386    /// process the message.
387    async fn claim(
388        &self,
389        #[graphql(desc = "The chain for whom owner is one of the owner.")] chain_id: ChainId,
390        #[graphql(desc = "The owner of chain targetId being debited.")] owner: AccountOwner,
391        #[graphql(desc = "The chain whose owner is being debited.")] target_id: ChainId,
392        #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
393        #[graphql(desc = "The amount being transferred.")] amount: Amount,
394    ) -> Result<CryptoHash, Error> {
395        self.apply_client_command(&chain_id, move |client| async move {
396            let result = client
397                .claim(owner, target_id, recipient, amount)
398                .await
399                .map_err(Error::from)
400                .map(|outcome| outcome.map(|certificate| certificate.hash()));
401            (result, client)
402        })
403        .await
404    }
405
406    /// Test if a data blob is readable from a transaction in the current chain.
407    // TODO(#2490): Consider removing or renaming this.
408    async fn read_data_blob(
409        &self,
410        chain_id: ChainId,
411        hash: CryptoHash,
412    ) -> Result<CryptoHash, Error> {
413        self.apply_client_command(&chain_id, move |client| async move {
414            let result = client
415                .read_data_blob(hash)
416                .await
417                .map_err(Error::from)
418                .map(|outcome| outcome.map(|certificate| certificate.hash()));
419            (result, client)
420        })
421        .await
422    }
423
424    /// Creates a new single-owner chain.
425    async fn open_chain(
426        &self,
427        #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
428        #[graphql(desc = "The owner of the new chain.")] owner: AccountOwner,
429        #[graphql(desc = "The balance of the chain being created. Zero if `None`.")]
430        balance: Option<Amount>,
431    ) -> Result<ChainId, Error> {
432        let ownership = ChainOwnership::single(owner);
433        let balance = balance.unwrap_or(Amount::ZERO);
434        let description = self
435            .apply_client_command(&chain_id, move |client| {
436                let ownership = ownership.clone();
437                async move {
438                    let result = client
439                        .open_chain(ownership, ApplicationPermissions::default(), balance)
440                        .await
441                        .map_err(Error::from)
442                        .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
443                    (result, client)
444                }
445            })
446            .await?;
447        Ok(description.id())
448    }
449
450    /// Creates a new multi-owner chain.
451    #[expect(clippy::too_many_arguments)]
452    async fn open_multi_owner_chain(
453        &self,
454        #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
455        #[graphql(desc = "Permissions for applications on the new chain")]
456        application_permissions: Option<ApplicationPermissions>,
457        #[graphql(desc = "The owners of the chain")] owners: Vec<AccountOwner>,
458        #[graphql(desc = "The weights of the owners")] weights: Option<Vec<u64>>,
459        #[graphql(desc = "The number of multi-leader rounds")] multi_leader_rounds: Option<u32>,
460        #[graphql(desc = "The balance of the chain. Zero if `None`")] balance: Option<Amount>,
461        #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
462        fast_round_ms: Option<u64>,
463        #[graphql(
464            desc = "The duration of the first single-leader and all multi-leader rounds",
465            default = 10_000
466        )]
467        base_timeout_ms: u64,
468        #[graphql(
469            desc = "The number of milliseconds by which the timeout increases after each \
470                    single-leader round",
471            default = 1_000
472        )]
473        timeout_increment_ms: u64,
474        #[graphql(
475            desc = "The age of an incoming tracked or protected message after which the \
476                    validators start transitioning the chain to fallback mode, in milliseconds.",
477            default = 86_400_000
478        )]
479        fallback_duration_ms: u64,
480    ) -> Result<ChainId, Error> {
481        let owners = if let Some(weights) = weights {
482            if weights.len() != owners.len() {
483                return Err(Error::new(format!(
484                    "There are {} owners but {} weights.",
485                    owners.len(),
486                    weights.len()
487                )));
488            }
489            owners.into_iter().zip(weights).collect::<Vec<_>>()
490        } else {
491            owners
492                .into_iter()
493                .zip(iter::repeat(100))
494                .collect::<Vec<_>>()
495        };
496        let multi_leader_rounds = multi_leader_rounds.unwrap_or(u32::MAX);
497        let timeout_config = TimeoutConfig {
498            fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
499            base_timeout: TimeDelta::from_millis(base_timeout_ms),
500            timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
501            fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
502        };
503        let ownership = ChainOwnership::multiple(owners, multi_leader_rounds, timeout_config);
504        let balance = balance.unwrap_or(Amount::ZERO);
505        let description = self
506            .apply_client_command(&chain_id, move |client| {
507                let ownership = ownership.clone();
508                let application_permissions = application_permissions.clone().unwrap_or_default();
509                async move {
510                    let result = client
511                        .open_chain(ownership, application_permissions, balance)
512                        .await
513                        .map_err(Error::from)
514                        .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
515                    (result, client)
516                }
517            })
518            .await?;
519        Ok(description.id())
520    }
521
522    /// Closes the chain. Returns the new block hash if successful or `None` if it was already closed.
523    async fn close_chain(
524        &self,
525        #[graphql(desc = "The chain being closed.")] chain_id: ChainId,
526    ) -> Result<Option<CryptoHash>, Error> {
527        let maybe_cert = self
528            .apply_client_command(&chain_id, |client| async move {
529                let result = client.close_chain().await.map_err(Error::from);
530                (result, client)
531            })
532            .await?;
533        Ok(maybe_cert.as_ref().map(GenericCertificate::hash))
534    }
535
536    /// Changes the chain to a single-owner chain
537    async fn change_owner(
538        &self,
539        #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
540        #[graphql(desc = "The new single owner of the chain")] new_owner: AccountOwner,
541    ) -> Result<CryptoHash, Error> {
542        let operation = SystemOperation::ChangeOwnership {
543            super_owners: vec![new_owner],
544            owners: Vec::new(),
545            first_leader: None,
546            multi_leader_rounds: 2,
547            open_multi_leader_rounds: false,
548            timeout_config: TimeoutConfig::default(),
549        };
550        self.execute_system_operation(operation, chain_id).await
551    }
552
553    /// Changes the ownership of the chain
554    #[expect(clippy::too_many_arguments)]
555    async fn change_multiple_owners(
556        &self,
557        #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
558        #[graphql(desc = "The new list of owners of the chain")] new_owners: Vec<AccountOwner>,
559        #[graphql(desc = "The new list of weights of the owners")] new_weights: Vec<u64>,
560        #[graphql(desc = "The multi-leader round of the chain")] multi_leader_rounds: u32,
561        #[graphql(
562            desc = "Whether multi-leader rounds are unrestricted, that is not limited to chain owners."
563        )]
564        open_multi_leader_rounds: bool,
565        #[graphql(desc = "The leader of the first single-leader round. \
566                          If not set, this is random like other rounds.")]
567        first_leader: Option<AccountOwner>,
568        #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
569        fast_round_ms: Option<u64>,
570        #[graphql(
571            desc = "The duration of the first single-leader and all multi-leader rounds",
572            default = 10_000
573        )]
574        base_timeout_ms: u64,
575        #[graphql(
576            desc = "The number of milliseconds by which the timeout increases after each \
577                    single-leader round",
578            default = 1_000
579        )]
580        timeout_increment_ms: u64,
581        #[graphql(
582            desc = "The age of an incoming tracked or protected message after which the \
583                    validators start transitioning the chain to fallback mode, in milliseconds.",
584            default = 86_400_000
585        )]
586        fallback_duration_ms: u64,
587    ) -> Result<CryptoHash, Error> {
588        let operation = SystemOperation::ChangeOwnership {
589            super_owners: Vec::new(),
590            owners: new_owners.into_iter().zip(new_weights).collect(),
591            first_leader,
592            multi_leader_rounds,
593            open_multi_leader_rounds,
594            timeout_config: TimeoutConfig {
595                fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
596                base_timeout: TimeDelta::from_millis(base_timeout_ms),
597                timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
598                fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
599            },
600        };
601        self.execute_system_operation(operation, chain_id).await
602    }
603
604    /// Changes the application permissions configuration on this chain.
605    #[expect(clippy::too_many_arguments)]
606    async fn change_application_permissions(
607        &self,
608        #[graphql(desc = "The chain whose permissions are being changed")] chain_id: ChainId,
609        #[graphql(
610            desc = "These applications are allowed to manage the chain: close it, change \
611                    application permissions, and change ownership."
612        )]
613        manage_chain: Vec<ApplicationId>,
614        #[graphql(
615            desc = "If this is `None`, all system operations and application operations are allowed.
616If it is `Some`, only operations from the specified applications are allowed,
617and no system operations."
618        )]
619        execute_operations: Option<Vec<ApplicationId>>,
620        #[graphql(
621            desc = "At least one operation or incoming message from each of these applications must occur in every block."
622        )]
623        mandatory_applications: Vec<ApplicationId>,
624        #[graphql(
625            desc = "These applications are allowed to perform calls to services as oracles."
626        )]
627        call_service_as_oracle: Option<Vec<ApplicationId>>,
628        #[graphql(desc = "These applications are allowed to perform HTTP requests.")]
629        make_http_requests: Option<Vec<ApplicationId>>,
630    ) -> Result<CryptoHash, Error> {
631        let operation = SystemOperation::ChangeApplicationPermissions(ApplicationPermissions {
632            execute_operations,
633            mandatory_applications,
634            manage_chain,
635            call_service_as_oracle,
636            make_http_requests,
637        });
638        self.execute_system_operation(operation, chain_id).await
639    }
640
641    /// (admin chain only) Registers a new committee. This will notify the subscribers of
642    /// the admin chain so that they can migrate to the new epoch (by accepting the
643    /// notification as an "incoming message" in a next block).
644    async fn create_committee(
645        &self,
646        chain_id: ChainId,
647        committee: Committee,
648    ) -> Result<CryptoHash, Error> {
649        Ok(self
650            .apply_client_command(&chain_id, move |client| {
651                let committee = committee.clone();
652                async move {
653                    let result = client
654                        .stage_new_committee(committee)
655                        .await
656                        .map_err(Error::from);
657                    (result, client)
658                }
659            })
660            .await?
661            .hash())
662    }
663
664    /// (admin chain only) Removes a committee. Once this message is accepted by a chain,
665    /// blocks from the retired epoch will not be accepted until they are followed (hence
666    /// re-certified) by a block certified by a recent committee.
667    async fn remove_committee(&self, chain_id: ChainId, epoch: Epoch) -> Result<CryptoHash, Error> {
668        let operation = SystemOperation::Admin(AdminOperation::RemoveCommittee { epoch });
669        self.execute_system_operation(operation, chain_id).await
670    }
671
672    /// Publishes a new application module.
673    async fn publish_module(
674        &self,
675        #[graphql(desc = "The chain publishing the module")] chain_id: ChainId,
676        #[graphql(desc = "The bytecode of the contract code")] contract: Bytecode,
677        #[graphql(desc = "The bytecode of the service code (only relevant for WebAssembly)")]
678        service: Bytecode,
679        #[graphql(desc = "The virtual machine being used (either Wasm or Evm)")]
680        vm_runtime: VmRuntime,
681    ) -> Result<ModuleId, Error> {
682        self.apply_client_command(&chain_id, move |client| {
683            let contract = contract.clone();
684            let service = service.clone();
685            async move {
686                let result = client
687                    .publish_module(contract, service, vm_runtime)
688                    .await
689                    .map_err(Error::from)
690                    .map(|outcome| outcome.map(|(module_id, _)| module_id));
691                (result, client)
692            }
693        })
694        .await
695    }
696
697    /// Publishes a new data blob.
698    async fn publish_data_blob(
699        &self,
700        #[graphql(desc = "The chain paying for the blob publication")] chain_id: ChainId,
701        #[graphql(desc = "The content of the data blob being created")] bytes: Vec<u8>,
702    ) -> Result<CryptoHash, Error> {
703        self.apply_client_command(&chain_id, |client| {
704            let bytes = bytes.clone();
705            async move {
706                let result = client.publish_data_blob(bytes).await.map_err(Error::from);
707                (result, client)
708            }
709        })
710        .await
711        .map(|_| CryptoHash::new(&BlobContent::new_data(bytes)))
712    }
713
714    /// Creates a new application.
715    async fn create_application(
716        &self,
717        #[graphql(desc = "The chain paying for the creation of the application")] chain_id: ChainId,
718        #[graphql(desc = "The module ID of the application being created")] module_id: ModuleId,
719        #[graphql(desc = "The JSON serialization of the parameters of the application")]
720        parameters: String,
721        #[graphql(
722            desc = "The JSON serialization of the instantiation argument of the application"
723        )]
724        instantiation_argument: String,
725        #[graphql(desc = "The dependencies of the application being created")]
726        required_application_ids: Vec<ApplicationId>,
727    ) -> Result<ApplicationId, Error> {
728        self.apply_client_command(&chain_id, move |client| {
729            let parameters = parameters.as_bytes().to_vec();
730            let instantiation_argument = instantiation_argument.as_bytes().to_vec();
731            let required_application_ids = required_application_ids.clone();
732            async move {
733                let result = client
734                    .create_application_untyped(
735                        module_id,
736                        parameters,
737                        instantiation_argument,
738                        required_application_ids,
739                    )
740                    .await
741                    .map_err(Error::from)
742                    .map(|outcome| outcome.map(|(application_id, _)| application_id));
743                (result, client)
744            }
745        })
746        .await
747    }
748}
749
750#[async_graphql::Object(cache_control(no_cache))]
751impl<C> QueryRoot<C>
752where
753    C: ClientContext + 'static,
754{
755    async fn chain(
756        &self,
757        chain_id: ChainId,
758    ) -> Result<ChainStateExtendedView<<C::Environment as linera_core::Environment>::Storage>, Error>
759    {
760        let client = self
761            .context
762            .lock()
763            .await
764            .make_chain_client(chain_id)
765            .await?;
766        let view = client.chain_state_view().await?;
767        Ok(ChainStateExtendedView::new(view))
768    }
769
770    async fn applications(&self, chain_id: ChainId) -> Result<Vec<ApplicationOverview>, Error> {
771        let client = self
772            .context
773            .lock()
774            .await
775            .make_chain_client(chain_id)
776            .await?;
777        let applications = client
778            .chain_state_view()
779            .await?
780            .execution_state
781            .list_applications()
782            .await?;
783
784        let overviews = applications
785            .into_iter()
786            .map(|(id, description)| ApplicationOverview::new(id, description, self.port, chain_id))
787            .collect();
788
789        Ok(overviews)
790    }
791
792    async fn chains(&self) -> Result<Chains, Error> {
793        Ok(Chains {
794            list: self
795                .context
796                .lock()
797                .await
798                .wallet()
799                .chain_ids()
800                .try_collect()
801                .await?,
802            default: self.default_chain,
803        })
804    }
805
806    async fn block(
807        &self,
808        hash: Option<CryptoHash>,
809        chain_id: ChainId,
810    ) -> Result<Option<ConfirmedBlock>, Error> {
811        let client = self
812            .context
813            .lock()
814            .await
815            .make_chain_client(chain_id)
816            .await?;
817        let hash = match hash {
818            Some(hash) => Some(hash),
819            None => client.chain_info().await?.block_hash,
820        };
821        if let Some(hash) = hash {
822            let block = client.read_confirmed_block(hash).await?;
823            Ok(Some(block))
824        } else {
825            Ok(None)
826        }
827    }
828
829    async fn events_from_index(
830        &self,
831        chain_id: ChainId,
832        stream_id: StreamId,
833        start_index: u32,
834    ) -> Result<Vec<IndexAndEvent>, Error> {
835        Ok(self
836            .context
837            .lock()
838            .await
839            .make_chain_client(chain_id)
840            .await?
841            .events_from_index(stream_id, start_index)
842            .await?)
843    }
844
845    async fn blocks(
846        &self,
847        from: Option<CryptoHash>,
848        chain_id: ChainId,
849        limit: Option<u32>,
850    ) -> Result<Vec<ConfirmedBlock>, Error> {
851        let client = self
852            .context
853            .lock()
854            .await
855            .make_chain_client(chain_id)
856            .await?;
857        let limit = limit.unwrap_or(10);
858        let from = match from {
859            Some(from) => Some(from),
860            None => client.chain_info().await?.block_hash,
861        };
862        let Some(from) = from else {
863            return Ok(vec![]);
864        };
865        let mut hash = Some(from);
866        let mut values = Vec::new();
867        for _ in 0..limit {
868            let Some(next_hash) = hash else {
869                break;
870            };
871            let value = client.read_confirmed_block(next_hash).await?;
872            hash = value.block().header.previous_block_hash;
873            values.push(value);
874        }
875        Ok(values)
876    }
877
878    /// Returns the version information on this node service.
879    async fn version(&self) -> linera_version::VersionInfo {
880        linera_version::VersionInfo::default()
881    }
882}
883
884// What follows is a hack to add a chain_id field to `ChainStateView` based on
885// https://async-graphql.github.io/async-graphql/en/merging_objects.html
886
887struct ChainStateViewExtension(ChainId);
888
889#[async_graphql::Object(cache_control(no_cache))]
890impl ChainStateViewExtension {
891    async fn chain_id(&self) -> ChainId {
892        self.0
893    }
894}
895
896#[derive(MergedObject)]
897struct ChainStateExtendedView<S: Storage>(ChainStateViewExtension, ReadOnlyChainStateView<S>);
898
899/// A wrapper type that allows proxying GraphQL queries to a [`ChainStateView`] that's behind
900/// a read guard.
901pub struct ReadOnlyChainStateView<S: Storage>(ChainStateViewReadGuard<S>);
902
903impl<S: Storage> ContainerType for ReadOnlyChainStateView<S>
904where
905    ChainStateView<S::Context>: ContainerType,
906{
907    async fn resolve_field(
908        &self,
909        context: &async_graphql::Context<'_>,
910    ) -> async_graphql::ServerResult<Option<async_graphql::Value>> {
911        self.0.resolve_field(context).await
912    }
913}
914
915impl<S: Storage> OutputType for ReadOnlyChainStateView<S>
916where
917    ChainStateView<S::Context>: OutputType,
918{
919    fn type_name() -> Cow<'static, str> {
920        ChainStateView::<S::Context>::type_name()
921    }
922
923    fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
924        ChainStateView::<S::Context>::create_type_info(registry)
925    }
926
927    async fn resolve(
928        &self,
929        context: &async_graphql::ContextSelectionSet<'_>,
930        field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
931    ) -> async_graphql::ServerResult<async_graphql::Value> {
932        self.0.resolve(context, field).await
933    }
934}
935
936impl<S: Storage> ChainStateExtendedView<S> {
937    fn new(view: ChainStateViewReadGuard<S>) -> Self {
938        Self(
939            ChainStateViewExtension(view.chain_id()),
940            ReadOnlyChainStateView(view),
941        )
942    }
943}
944
945#[derive(SimpleObject)]
946pub struct ApplicationOverview {
947    id: ApplicationId,
948    description: ApplicationDescription,
949    link: String,
950}
951
952impl ApplicationOverview {
953    fn new(
954        id: ApplicationId,
955        description: ApplicationDescription,
956        port: NonZeroU16,
957        chain_id: ChainId,
958    ) -> Self {
959        Self {
960            id,
961            description,
962            link: format!(
963                "http://localhost:{}/chains/{}/applications/{}",
964                port.get(),
965                chain_id,
966                id
967            ),
968        }
969    }
970}
971
972/// Schema type that can be either full (with mutations) or read-only.
973pub enum NodeServiceSchema<C>
974where
975    C: ClientContext + 'static,
976{
977    /// Full schema with mutations enabled.
978    Full(Schema<QueryRoot<C>, MutationRoot<C>, SubscriptionRoot<C>>),
979    /// Read-only schema with mutations disabled.
980    ReadOnly(Schema<QueryRoot<C>, EmptyMutation, SubscriptionRoot<C>>),
981}
982
983impl<C> NodeServiceSchema<C>
984where
985    C: ClientContext,
986{
987    /// Executes a GraphQL request.
988    pub async fn execute(&self, request: impl Into<Request>) -> Response {
989        match self {
990            Self::Full(schema) => schema.execute(request).await,
991            Self::ReadOnly(schema) => schema.execute(request).await,
992        }
993    }
994
995    /// Returns the SDL (Schema Definition Language) representation.
996    pub fn sdl(&self) -> String {
997        match self {
998            Self::Full(schema) => schema.sdl(),
999            Self::ReadOnly(schema) => schema.sdl(),
1000        }
1001    }
1002}
1003
1004impl<C> Clone for NodeServiceSchema<C>
1005where
1006    C: ClientContext,
1007{
1008    fn clone(&self) -> Self {
1009        match self {
1010            Self::Full(schema) => Self::Full(schema.clone()),
1011            Self::ReadOnly(schema) => Self::ReadOnly(schema.clone()),
1012        }
1013    }
1014}
1015
1016#[cfg(with_metrics)]
1017mod query_cache_metrics {
1018    use std::sync::LazyLock;
1019
1020    use linera_base::prometheus_util::{register_int_counter_vec, register_int_gauge};
1021    use prometheus::{IntCounterVec, IntGauge};
1022
1023    pub static QUERY_CACHE_HIT: LazyLock<IntCounterVec> = LazyLock::new(|| {
1024        register_int_counter_vec("query_response_cache_hit", "Query response cache hits", &[])
1025    });
1026
1027    pub static QUERY_CACHE_MISS: LazyLock<IntCounterVec> = LazyLock::new(|| {
1028        register_int_counter_vec(
1029            "query_response_cache_miss",
1030            "Query response cache misses",
1031            &[],
1032        )
1033    });
1034
1035    pub static QUERY_CACHE_INVALIDATION: LazyLock<IntCounterVec> = LazyLock::new(|| {
1036        register_int_counter_vec(
1037            "query_response_cache_invalidation",
1038            "Query response cache invalidations (per chain)",
1039            &[],
1040        )
1041    });
1042
1043    pub static QUERY_CACHE_ENTRIES: LazyLock<IntGauge> = LazyLock::new(|| {
1044        register_int_gauge(
1045            "query_response_cache_entries",
1046            "Current number of cached query responses across all chains",
1047        )
1048    });
1049}
1050
1051/// Per-chain cache state: an LRU map plus the `next_block_height` at the time the
1052/// cache was last invalidated. Both are behind the same mutex.
1053struct PerChainCache {
1054    lru: LruCache<(ApplicationId, Vec<u8>), Vec<u8>>,
1055    next_block_height: BlockHeight,
1056}
1057
1058/// An LRU cache for application query responses, keyed per chain.
1059///
1060/// Caches serialized response bytes keyed on `(chain_id, application_id, request_bytes)`.
1061/// The entire per-chain cache is invalidated when a `NewBlock` notification arrives.
1062///
1063/// To prevent a race where a slow query inserts stale data *after* an invalidation,
1064/// each insert carries the chain's `next_block_height` at query time.
1065/// If a newer block has since been processed, the insert is silently dropped.
1066struct QueryResponseCache {
1067    chains: papaya::HashMap<ChainId, StdMutex<PerChainCache>>,
1068    /// Chains for which we have registered a notification subscription.
1069    subscribed: papaya::HashSet<ChainId>,
1070    /// Sender half of the notification channel, used to subscribe new chains lazily.
1071    notification_sender: StdMutex<Option<tokio::sync::mpsc::UnboundedSender<Notification>>>,
1072    capacity_per_chain: std::num::NonZeroUsize,
1073}
1074
1075impl QueryResponseCache {
1076    fn new(capacity_per_chain: usize) -> Self {
1077        Self {
1078            chains: papaya::HashMap::new(),
1079            subscribed: papaya::HashSet::new(),
1080            notification_sender: StdMutex::new(None),
1081            capacity_per_chain: std::num::NonZeroUsize::new(capacity_per_chain)
1082                .expect("capacity must be > 0"),
1083        }
1084    }
1085
1086    /// Stores the notification sender (called once during startup).
1087    fn set_notification_sender(&self, sender: tokio::sync::mpsc::UnboundedSender<Notification>) {
1088        *self
1089            .notification_sender
1090            .lock()
1091            .expect("sender mutex poisoned") = Some(sender);
1092    }
1093
1094    /// Returns the notification sender, if set.
1095    fn notification_sender(&self) -> Option<tokio::sync::mpsc::UnboundedSender<Notification>> {
1096        self.notification_sender
1097            .lock()
1098            .expect("sender mutex poisoned")
1099            .clone()
1100    }
1101
1102    /// Marks a chain as subscribed to notifications.
1103    fn mark_subscribed(&self, chain_id: ChainId) {
1104        self.subscribed.pin().insert(chain_id);
1105    }
1106
1107    /// Returns `true` if the chain is not yet subscribed to notifications.
1108    fn needs_subscription(&self, chain_id: &ChainId) -> bool {
1109        !self.subscribed.pin().contains(chain_id)
1110    }
1111
1112    /// Marks initial chains as subscribed (called during startup).
1113    fn mark_all_subscribed(&self, chain_ids: &[ChainId]) {
1114        let pinned = self.subscribed.pin();
1115        for &chain_id in chain_ids {
1116            pinned.insert(chain_id);
1117        }
1118    }
1119
1120    /// Looks up a cached response. Returns `Some(bytes)` on hit, `None` on miss
1121    /// (including when the chain has no cache entry yet).
1122    #[allow(clippy::question_mark)]
1123    fn get(&self, chain_id: ChainId, app_id: &ApplicationId, request: &[u8]) -> Option<Vec<u8>> {
1124        let pinned = self.chains.pin();
1125        let Some(mutex) = pinned.get(&chain_id) else {
1126            #[cfg(with_metrics)]
1127            query_cache_metrics::QUERY_CACHE_MISS
1128                .with_label_values(&[])
1129                .inc();
1130            return None;
1131        };
1132        let mut cache = mutex.lock().expect("LRU mutex poisoned");
1133        let key = (*app_id, request.to_vec());
1134        let result = cache.lru.get(&key).cloned();
1135        #[cfg(with_metrics)]
1136        {
1137            if result.is_some() {
1138                query_cache_metrics::QUERY_CACHE_HIT
1139                    .with_label_values(&[])
1140                    .inc();
1141            } else {
1142                query_cache_metrics::QUERY_CACHE_MISS
1143                    .with_label_values(&[])
1144                    .inc();
1145            }
1146        }
1147        result
1148    }
1149
1150    /// Inserts a response into the cache, unless the chain's `next_block_height` has
1151    /// advanced past the caller's snapshot (which would mean a new block arrived and
1152    /// this response is potentially stale).
1153    fn insert(
1154        &self,
1155        chain_id: ChainId,
1156        app_id: ApplicationId,
1157        request: Vec<u8>,
1158        response: Vec<u8>,
1159        next_block_height: BlockHeight,
1160    ) {
1161        let pinned = self.chains.pin();
1162        let capacity = self.capacity_per_chain;
1163        let mutex = pinned.get_or_insert_with(chain_id, || {
1164            StdMutex::new(PerChainCache {
1165                lru: LruCache::new(capacity),
1166                next_block_height,
1167            })
1168        });
1169        let mut cache = mutex.lock().expect("LRU mutex poisoned");
1170        if next_block_height < cache.next_block_height {
1171            return; // A new block arrived since this query started; discard stale response.
1172        }
1173        // If the chain has advanced since the last cache update, also clear stale entries.
1174        // Note: This should not happen if notifications are timely. Also, this only
1175        // works when we have a cache miss.
1176        if next_block_height > cache.next_block_height {
1177            debug!(
1178                "Unexpected query cache invalidation for chain {chain_id}:\
1179                 {next_block_height} > {}",
1180                cache.next_block_height
1181            );
1182            #[cfg(with_metrics)]
1183            {
1184                query_cache_metrics::QUERY_CACHE_ENTRIES.sub(cache.lru.len() as i64);
1185                query_cache_metrics::QUERY_CACHE_INVALIDATION
1186                    .with_label_values(&[])
1187                    .inc();
1188            }
1189            cache.lru.clear();
1190            cache.next_block_height = next_block_height;
1191        }
1192        #[cfg(with_metrics)]
1193        let prev_len = cache.lru.len();
1194        cache.lru.put((app_id, request), response);
1195        #[cfg(with_metrics)]
1196        if cache.lru.len() != prev_len {
1197            query_cache_metrics::QUERY_CACHE_ENTRIES.inc();
1198        }
1199    }
1200
1201    /// Called when a `NewBlock` notification arrives. Records the new
1202    /// `next_block_height` and clears all cached responses for the chain.
1203    fn invalidate_chain(&self, chain_id: &ChainId, next_block_height: BlockHeight) {
1204        let pinned = self.chains.pin();
1205        let capacity = self.capacity_per_chain;
1206        let mutex = pinned.get_or_insert_with(*chain_id, || {
1207            StdMutex::new(PerChainCache {
1208                lru: LruCache::new(capacity),
1209                next_block_height,
1210            })
1211        });
1212        let mut cache = mutex.lock().expect("LRU mutex poisoned");
1213        if next_block_height > cache.next_block_height {
1214            #[cfg(with_metrics)]
1215            {
1216                query_cache_metrics::QUERY_CACHE_ENTRIES.sub(cache.lru.len() as i64);
1217                query_cache_metrics::QUERY_CACHE_INVALIDATION
1218                    .with_label_values(&[])
1219                    .inc();
1220            }
1221            cache.lru.clear();
1222            cache.next_block_height = next_block_height;
1223        } else {
1224            debug!(
1225                "Query cache for chain {chain_id} was already invalidated:\
1226                 {next_block_height} <= {}",
1227                cache.next_block_height
1228            );
1229        }
1230    }
1231}
1232
1233/// The `NodeService` is a server that exposes a web-server to the client.
1234/// The node service is primarily used to explore the state of a chain in GraphQL.
1235pub struct NodeService<C>
1236where
1237    C: ClientContext + 'static,
1238{
1239    config: ChainListenerConfig,
1240    port: NonZeroU16,
1241    #[cfg(with_metrics)]
1242    metrics_port: NonZeroU16,
1243    default_chain: Option<ChainId>,
1244    context: Arc<Mutex<C>>,
1245    /// If true, disallow mutations and prevent queries from scheduling operations.
1246    read_only: bool,
1247    /// Optional LRU cache for application query responses. `None` when caching is disabled.
1248    query_cache: Option<Arc<QueryResponseCache>>,
1249    query_subscriptions: Option<Arc<crate::query_subscription::QuerySubscriptionManager>>,
1250    cancellation_token: CancellationToken,
1251}
1252
1253impl<C> Clone for NodeService<C>
1254where
1255    C: ClientContext + 'static,
1256{
1257    fn clone(&self) -> Self {
1258        Self {
1259            config: self.config.clone(),
1260            port: self.port,
1261            #[cfg(with_metrics)]
1262            metrics_port: self.metrics_port,
1263            default_chain: self.default_chain,
1264            context: Arc::clone(&self.context),
1265            read_only: self.read_only,
1266            query_cache: self.query_cache.clone(),
1267            query_subscriptions: self.query_subscriptions.clone(),
1268            cancellation_token: self.cancellation_token.clone(),
1269        }
1270    }
1271}
1272
1273impl<C> NodeService<C>
1274where
1275    C: ClientContext,
1276{
1277    /// Creates a new instance of the node service given a client chain and a port.
1278    ///
1279    /// `query_cache_size` controls the per-chain LRU cache capacity for application query
1280    /// responses. Pass `None` to disable the cache (the default). Enable with
1281    /// `--query-cache-size <N>`. Incompatible with `--long-lived-services`.
1282    #[allow(clippy::too_many_arguments)]
1283    pub fn new(
1284        config: ChainListenerConfig,
1285        port: NonZeroU16,
1286        #[cfg(with_metrics)] metrics_port: NonZeroU16,
1287        default_chain: Option<ChainId>,
1288        context: Arc<Mutex<C>>,
1289        read_only: bool,
1290        query_cache_size: Option<usize>,
1291        query_subscriptions: Option<Arc<crate::query_subscription::QuerySubscriptionManager>>,
1292        cancellation_token: CancellationToken,
1293    ) -> Self {
1294        let query_cache = query_cache_size.map(|size| Arc::new(QueryResponseCache::new(size)));
1295        Self {
1296            config,
1297            port,
1298            #[cfg(with_metrics)]
1299            metrics_port,
1300            default_chain,
1301            context,
1302            read_only,
1303            query_cache,
1304            query_subscriptions,
1305            cancellation_token,
1306        }
1307    }
1308
1309    #[cfg(with_metrics)]
1310    pub fn metrics_address(&self) -> SocketAddr {
1311        SocketAddr::from(([0, 0, 0, 0], self.metrics_port.get()))
1312    }
1313
1314    pub fn schema(&self) -> NodeServiceSchema<C> {
1315        let query = QueryRoot {
1316            context: Arc::clone(&self.context),
1317            port: self.port,
1318            default_chain: self.default_chain,
1319        };
1320        let subscription = SubscriptionRoot {
1321            context: Arc::clone(&self.context),
1322            query_subscriptions: self.query_subscriptions.clone(),
1323            cancellation_token: self.cancellation_token.clone(),
1324        };
1325
1326        if self.read_only {
1327            NodeServiceSchema::ReadOnly(Schema::build(query, EmptyMutation, subscription).finish())
1328        } else {
1329            NodeServiceSchema::Full(
1330                Schema::build(
1331                    query,
1332                    MutationRoot {
1333                        context: Arc::clone(&self.context),
1334                    },
1335                    subscription,
1336                )
1337                .finish(),
1338            )
1339        }
1340    }
1341
1342    /// Runs the node service.
1343    #[instrument(name = "node_service", level = "info", skip_all, fields(port = ?self.port))]
1344    pub async fn run(
1345        self,
1346        cancellation_token: CancellationToken,
1347        command_receiver: UnboundedReceiver<ListenerCommand>,
1348    ) -> Result<(), anyhow::Error> {
1349        let port = self.port.get();
1350        let index_handler = axum::routing::get(util::graphiql).post(Self::index_handler);
1351        let application_handler =
1352            axum::routing::get(util::graphiql).post(Self::application_handler);
1353
1354        #[cfg(with_metrics)]
1355        monitoring_server::start_metrics(self.metrics_address(), cancellation_token.clone());
1356
1357        let base_router = Router::new()
1358            .route("/", index_handler)
1359            .route(
1360                "/chains/{chain_id}/applications/{application_id}",
1361                application_handler,
1362            )
1363            .route("/ready", axum::routing::get(|| async { "ready!" }));
1364
1365        // Create router with appropriate schema for WebSocket subscriptions.
1366        let app = match self.schema() {
1367            NodeServiceSchema::Full(schema) => {
1368                base_router.route_service("/ws", GraphQLSubscription::new(schema))
1369            }
1370            NodeServiceSchema::ReadOnly(schema) => {
1371                base_router.route_service("/ws", GraphQLSubscription::new(schema))
1372            }
1373        }
1374        .layer(Extension(self.clone()))
1375        // TODO(#551): Provide application authentication.
1376        .layer(CorsLayer::permissive());
1377
1378        info!("GraphiQL IDE: http://localhost:{}", port);
1379
1380        // Spawn the cache invalidation listener if caching is enabled.
1381        if let Some(cache) = &self.query_cache {
1382            let guard = self.context.lock().await;
1383            let chain_ids: Vec<ChainId> = guard.wallet().chain_ids().try_collect().await?;
1384            let (tx, mut receiver) = tokio::sync::mpsc::unbounded_channel();
1385            guard.client().subscribe_extra(chain_ids.clone(), &tx);
1386            cache.mark_all_subscribed(&chain_ids);
1387            cache.set_notification_sender(tx);
1388            drop(guard);
1389            let cache = Arc::clone(cache);
1390            tokio::spawn(async move {
1391                while let Some(notification) = receiver.recv().await {
1392                    if let Reason::NewBlock { height, .. } = notification.reason {
1393                        let next_block_height = height
1394                            .try_add_one()
1395                            .expect("block height should not overflow");
1396                        cache.invalidate_chain(&notification.chain_id, next_block_height);
1397                    }
1398                }
1399            });
1400        }
1401
1402        let storage = self.context.lock().await.storage().clone();
1403
1404        let chain_listener = ChainListener::new(
1405            self.config,
1406            self.context,
1407            storage,
1408            cancellation_token.clone(),
1409            command_receiver,
1410            true,
1411        )
1412        .run()
1413        .await?;
1414        let mut chain_listener = Box::pin(chain_listener).fuse();
1415        let tcp_listener =
1416            tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
1417        let server = axum::serve(tcp_listener, app)
1418            .with_graceful_shutdown(cancellation_token.cancelled_owned())
1419            .into_future();
1420        futures::select! {
1421            result = chain_listener => result?,
1422            result = Box::pin(server).fuse() => result?,
1423        };
1424
1425        Ok(())
1426    }
1427
1428    /// Handles service queries for user applications (including mutations).
1429    async fn handle_service_request(
1430        &self,
1431        application_id: ApplicationId,
1432        request: Vec<u8>,
1433        chain_id: ChainId,
1434        block_hash: Option<CryptoHash>,
1435    ) -> Result<Vec<u8>, NodeServiceError> {
1436        // Only cache read-only queries against the latest state (block_hash == None).
1437        let cache = block_hash
1438            .is_none()
1439            .then_some(self.query_cache.as_ref())
1440            .flatten();
1441
1442        // Return immediately on cache hit.
1443        if let Some(cache) = cache {
1444            if let Some(cached) = cache.get(chain_id, &application_id, &request) {
1445                return Ok(cached);
1446            }
1447        }
1448
1449        let (
1450            QueryOutcome {
1451                response,
1452                operations,
1453            },
1454            block_height,
1455        ) = self
1456            .query_user_application(application_id, request.clone(), chain_id, block_hash)
1457            .await?;
1458        if operations.is_empty() {
1459            if let Some(cache) = cache {
1460                // Lazily subscribe to notifications for chains discovered after startup.
1461                if cache.needs_subscription(&chain_id) {
1462                    if let Some(sender) = cache.notification_sender() {
1463                        self.context
1464                            .lock()
1465                            .await
1466                            .client()
1467                            .subscribe_extra(vec![chain_id], &sender);
1468                        cache.mark_subscribed(chain_id);
1469                    }
1470                }
1471                cache.insert(
1472                    chain_id,
1473                    application_id,
1474                    request,
1475                    response.clone(),
1476                    block_height,
1477                );
1478            }
1479            return Ok(response);
1480        }
1481
1482        if self.read_only {
1483            return Err(NodeServiceError::ReadOnlyModeOperationsNotAllowed);
1484        }
1485
1486        trace!("Query requested a new block with operations: {operations:?}");
1487        let client = self
1488            .context
1489            .lock()
1490            .await
1491            .make_chain_client(chain_id)
1492            .await?;
1493        let hash = loop {
1494            let timeout = match client
1495                .execute_operations(operations.clone(), vec![])
1496                .await?
1497            {
1498                ClientOutcome::Committed(certificate) => break certificate.hash(),
1499                ClientOutcome::Conflict(certificate) => {
1500                    return Err(chain_client::Error::Conflict(certificate.hash()).into());
1501                }
1502                ClientOutcome::WaitForTimeout(timeout) => timeout,
1503            };
1504            let mut stream = client.subscribe().map_err(|_| {
1505                chain_client::Error::InternalError("Could not subscribe to the local node.")
1506            })?;
1507            util::wait_for_next_round(&mut stream, timeout).await;
1508        };
1509        let response = async_graphql::Response::new(hash.to_value());
1510        Ok(serde_json::to_vec(&response)?)
1511    }
1512
1513    /// Queries a user application, returning the raw [`QueryOutcome`] and the height of the
1514    /// chain's latest block at the time of the query (used for cache staleness detection).
1515    async fn query_user_application(
1516        &self,
1517        application_id: ApplicationId,
1518        bytes: Vec<u8>,
1519        chain_id: ChainId,
1520        block_hash: Option<CryptoHash>,
1521    ) -> Result<(QueryOutcome<Vec<u8>>, BlockHeight), NodeServiceError> {
1522        let query = Query::User {
1523            application_id,
1524            bytes,
1525        };
1526        let client = self
1527            .context
1528            .lock()
1529            .await
1530            .make_chain_client(chain_id)
1531            .await?;
1532        let (
1533            QueryOutcome {
1534                response,
1535                operations,
1536            },
1537            next_block_height,
1538        ) = client.query_application(query, block_hash).await?;
1539        match response {
1540            QueryResponse::System(_) => {
1541                unreachable!("cannot get a system response for a user query")
1542            }
1543            QueryResponse::User(user_response_bytes) => Ok((
1544                QueryOutcome {
1545                    response: user_response_bytes,
1546                    operations,
1547                },
1548                next_block_height,
1549            )),
1550        }
1551    }
1552
1553    /// Executes a GraphQL query and generates a response for our `Schema`.
1554    async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
1555        service
1556            .0
1557            .schema()
1558            .execute(request.into_inner())
1559            .await
1560            .into()
1561    }
1562
1563    /// Executes a GraphQL query against an application.
1564    /// Pattern matches on the `OperationType` of the query and routes the query
1565    /// accordingly.
1566    async fn application_handler(
1567        Path((chain_id, application_id)): Path<(String, String)>,
1568        service: Extension<Self>,
1569        request: String,
1570    ) -> Result<Vec<u8>, NodeServiceError> {
1571        let chain_id: ChainId = chain_id.parse().map_err(NodeServiceError::InvalidChainId)?;
1572        let application_id: ApplicationId = application_id.parse()?;
1573
1574        debug!(
1575            %chain_id,
1576            %application_id,
1577            "processing request for application:\n{:?}",
1578            &request
1579        );
1580        let response = service
1581            .0
1582            .handle_service_request(application_id, request.into_bytes(), chain_id, None)
1583            .await?;
1584
1585        Ok(response)
1586    }
1587}
1588
1589#[cfg(test)]
1590mod tests {
1591    use linera_base::{
1592        crypto::CryptoHash,
1593        data_types::BlockHeight,
1594        identifiers::{ApplicationId, ChainId},
1595    };
1596
1597    use super::QueryResponseCache;
1598
1599    fn test_chain(n: u64) -> ChainId {
1600        ChainId(CryptoHash::test_hash(format!("chain-{n}")))
1601    }
1602
1603    fn test_app(n: u64) -> ApplicationId {
1604        ApplicationId::new(CryptoHash::test_hash(format!("app-{n}")))
1605    }
1606
1607    #[test]
1608    fn cache_hit_and_miss() {
1609        let cache = QueryResponseCache::new(100);
1610        let chain = test_chain(0);
1611        let app = test_app(0);
1612        let request = b"query { balance }".to_vec();
1613        let response = b"{ \"balance\": 42 }".to_vec();
1614
1615        // Unknown chain — get returns None.
1616        assert!(cache.get(chain, &app, &request).is_none());
1617
1618        // Insert creates the per-chain entry.
1619        cache.insert(
1620            chain,
1621            app,
1622            request.clone(),
1623            response.clone(),
1624            BlockHeight(1),
1625        );
1626
1627        // Hit after insert.
1628        assert_eq!(cache.get(chain, &app, &request), Some(response));
1629    }
1630
1631    #[test]
1632    fn per_chain_isolation() {
1633        let cache = QueryResponseCache::new(100);
1634        let chain_a = test_chain(0);
1635        let chain_b = test_chain(1);
1636        let app = test_app(0);
1637        let request = b"q".to_vec();
1638        let response = b"r".to_vec();
1639
1640        cache.insert(
1641            chain_a,
1642            app,
1643            request.clone(),
1644            response.clone(),
1645            BlockHeight(1),
1646        );
1647
1648        // Invalidating chain B must not affect chain A.
1649        cache.invalidate_chain(&chain_b, BlockHeight(1));
1650        assert_eq!(cache.get(chain_a, &app, &request), Some(response));
1651    }
1652
1653    #[test]
1654    fn invalidation_clears_all_entries() {
1655        let cache = QueryResponseCache::new(100);
1656        let chain = test_chain(0);
1657        let app = test_app(0);
1658
1659        cache.insert(chain, app, b"q1".to_vec(), b"r1".to_vec(), BlockHeight(1));
1660        cache.insert(chain, app, b"q2".to_vec(), b"r2".to_vec(), BlockHeight(1));
1661
1662        cache.invalidate_chain(&chain, BlockHeight(2));
1663        assert!(cache.get(chain, &app, b"q1").is_none());
1664        assert!(cache.get(chain, &app, b"q2").is_none());
1665    }
1666
1667    #[test]
1668    fn lru_eviction() {
1669        let cache = QueryResponseCache::new(2);
1670        let chain = test_chain(0);
1671        let app = test_app(0);
1672
1673        cache.insert(chain, app, b"q1".to_vec(), b"r1".to_vec(), BlockHeight(1));
1674        cache.insert(chain, app, b"q2".to_vec(), b"r2".to_vec(), BlockHeight(1));
1675        // Third insert evicts q1 (least recently used).
1676        cache.insert(chain, app, b"q3".to_vec(), b"r3".to_vec(), BlockHeight(1));
1677
1678        assert!(cache.get(chain, &app, b"q1").is_none());
1679        assert!(cache.get(chain, &app, b"q2").is_some());
1680        assert!(cache.get(chain, &app, b"q3").is_some());
1681    }
1682
1683    #[test]
1684    fn stale_insert_rejected_after_invalidation() {
1685        let cache = QueryResponseCache::new(100);
1686        let chain = test_chain(0);
1687        let app = test_app(0);
1688
1689        // Chain is at block 3. A query starts and snapshots this height.
1690        cache.insert(chain, app, b"q0".to_vec(), b"r0".to_vec(), BlockHeight(3));
1691        let stale_height = BlockHeight(3);
1692
1693        // Block 4 arrives while the query is in flight.
1694        cache.invalidate_chain(&chain, BlockHeight(4));
1695
1696        // Slow query finishes and tries to insert with the stale height.
1697        cache.insert(chain, app, b"q".to_vec(), b"stale".to_vec(), stale_height);
1698
1699        // The stale insert should have been rejected.
1700        assert!(cache.get(chain, &app, b"q").is_none());
1701    }
1702}