Skip to main content

linera_service/
node_service.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Cow,
6    future::IntoFuture,
7    iter,
8    net::SocketAddr,
9    num::NonZeroU16,
10    sync::{Arc, Mutex as StdMutex},
11};
12
13use async_graphql::{
14    futures_util::Stream,
15    registry::{MetaType, MetaTypeId, Registry},
16    resolver_utils::ContainerType,
17    EmptyMutation, Error, MergedObject, OutputType, Positioned, Request, Response, ScalarType,
18    Schema, SimpleObject, Subscription,
19};
20use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
21use axum::{extract::Path, http::StatusCode, response, response::IntoResponse, Extension, Router};
22use futures::{lock::Mutex, Future, FutureExt as _, StreamExt as _, TryStreamExt as _};
23use linera_base::{
24    crypto::{CryptoError, CryptoHash},
25    data_types::{
26        Amount, ApplicationDescription, ApplicationPermissions, BlockHeight, Bytecode, Epoch,
27        TimeDelta,
28    },
29    identifiers::{
30        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
31    },
32    ownership::{ChainOwnership, TimeoutConfig},
33    vm::VmRuntime,
34    BcsHexParseError,
35};
36use linera_chain::{
37    types::{ConfirmedBlock, GenericCertificate},
38    ChainStateView,
39};
40use linera_client::chain_listener::{
41    ChainListener, ChainListenerConfig, ClientContext, ListenerCommand,
42};
43use linera_core::{
44    client::chain_client::{self, ChainClient},
45    data_types::ClientOutcome,
46    wallet::Wallet as _,
47    worker::{ChainStateViewReadGuard, Notification, Reason},
48};
49use linera_execution::{
50    committee::Committee, system::AdminOperation, Operation, Query, QueryOutcome, QueryResponse,
51    SystemOperation,
52};
53#[cfg(with_metrics)]
54use linera_metrics::monitoring_server;
55use linera_sdk::linera_base_types::BlobContent;
56use linera_storage::Storage;
57use lru::LruCache;
58use serde::{Deserialize, Serialize};
59use serde_json::json;
60use tokio::sync::mpsc::UnboundedReceiver;
61use tokio_util::sync::CancellationToken;
62use tower_http::cors::CorsLayer;
63use tracing::{debug, info, instrument, trace};
64
65use crate::util;
66
67/// A pre-serialized JSON string that implements [`OutputType`] as the `JSON` scalar.
68///
69/// When the `raw_value` feature of `async-graphql` is enabled, the string is
70/// emitted directly into the GraphQL response without any parsing or
71/// intermediate tree construction.
72#[derive(Clone)]
73struct RawJson(String);
74
75impl OutputType for RawJson {
76    fn type_name() -> Cow<'static, str> {
77        Cow::Borrowed("JSON")
78    }
79
80    fn create_type_info(registry: &mut Registry) -> String {
81        registry.create_output_type::<Self, _>(MetaTypeId::Scalar, |_| MetaType::Scalar {
82            name: "JSON".to_string(),
83            description: Some("A scalar that can represent any JSON value.".to_string()),
84            is_valid: None,
85            visible: None,
86            inaccessible: false,
87            tags: Default::default(),
88            specified_by_url: None,
89            directive_invocations: Default::default(),
90            requires_scopes: Default::default(),
91        })
92    }
93
94    async fn resolve(
95        &self,
96        _ctx: &async_graphql::ContextSelectionSet<'_>,
97        _field: &Positioned<async_graphql::parser::types::Field>,
98    ) -> async_graphql::ServerResult<async_graphql::Value> {
99        // Wrap the raw JSON string with the magic token that async-graphql's
100        // ConstValue serializer recognises (with feature `raw_value`).
101        // When the response is serialised to JSON the raw string is emitted
102        // verbatim, avoiding any parsing or tree conversion.
103        Ok(async_graphql::Value::Object(
104            std::iter::once((
105                async_graphql::Name::new(async_graphql_value::RAW_VALUE_TOKEN),
106                async_graphql::Value::String(self.0.clone()),
107            ))
108            .collect(),
109        ))
110    }
111}
112
113#[derive(SimpleObject, Serialize, Deserialize, Clone)]
114pub struct Chains {
115    pub list: Vec<ChainId>,
116    pub default: Option<ChainId>,
117}
118
119/// Our root GraphQL query type.
120pub struct QueryRoot<C> {
121    context: Arc<Mutex<C>>,
122    port: NonZeroU16,
123    default_chain: Option<ChainId>,
124}
125
126/// Our root GraphQL subscription type.
127pub struct SubscriptionRoot<C> {
128    context: Arc<Mutex<C>>,
129    query_subscriptions: Option<Arc<crate::query_subscription::QuerySubscriptionManager>>,
130    cancellation_token: CancellationToken,
131}
132
133/// Our root GraphQL mutation type.
134pub struct MutationRoot<C> {
135    context: Arc<Mutex<C>>,
136}
137
138#[derive(Debug, thiserror::Error)]
139enum NodeServiceError {
140    #[error(transparent)]
141    ChainClient(#[from] chain_client::Error),
142    #[error(transparent)]
143    BcsHex(#[from] BcsHexParseError),
144    #[error(transparent)]
145    Json(#[from] serde_json::Error),
146    #[error("malformed chain ID: {0}")]
147    InvalidChainId(CryptoError),
148    #[error(transparent)]
149    Client(#[from] linera_client::Error),
150    #[error("scheduling operations from queries is disabled in read-only mode")]
151    ReadOnlyModeOperationsNotAllowed,
152}
153
154impl IntoResponse for NodeServiceError {
155    fn into_response(self) -> response::Response {
156        let status = match self {
157            NodeServiceError::InvalidChainId(_) | NodeServiceError::BcsHex(_) => {
158                StatusCode::BAD_REQUEST
159            }
160            NodeServiceError::ReadOnlyModeOperationsNotAllowed => StatusCode::FORBIDDEN,
161            _ => StatusCode::INTERNAL_SERVER_ERROR,
162        };
163        let body = json!({"error": self.to_string()}).to_string();
164        (status, body).into_response()
165    }
166}
167
168#[Subscription]
169impl<C> SubscriptionRoot<C>
170where
171    C: ClientContext + 'static,
172{
173    /// Subscribes to notifications from the specified chain.
174    async fn notifications(
175        &self,
176        chain_id: ChainId,
177    ) -> Result<impl Stream<Item = Notification>, Error> {
178        let client = self
179            .context
180            .lock()
181            .await
182            .make_chain_client(chain_id)
183            .await?;
184        Ok(client.subscribe()?)
185    }
186
187    /// Subscribes to the result of a pre-registered GraphQL query.
188    /// Re-executes the query on every new block and pushes changed results.
189    async fn query_result(
190        &self,
191        #[graphql(desc = "Name of the registered subscription query.")] name: String,
192        #[graphql(desc = "The chain to watch.")] chain_id: ChainId,
193        #[graphql(desc = "The application to query.")] application_id: ApplicationId,
194    ) -> Result<impl Stream<Item = RawJson>, Error> {
195        let manager = self
196            .query_subscriptions
197            .as_ref()
198            .ok_or_else(|| Error::new("no subscription queries registered"))?;
199
200        let key = crate::query_subscription::SubscriptionKey {
201            name,
202            chain_id,
203            application_id,
204        };
205
206        let receiver = manager
207            .subscribe(
208                &key,
209                Arc::clone(&self.context),
210                self.cancellation_token.clone(),
211            )
212            .map_err(|e| Error::new(e.to_string()))?;
213
214        // `sender.subscribe()` marks the current value as "already seen", so
215        // `WatchStream` would skip it and wait for the next change.  Grab the
216        // current snapshot first and prepend it to the stream so that every new
217        // subscriber gets the latest cached result immediately.
218        let current = receiver.borrow().clone();
219        let changes = tokio_stream::wrappers::WatchStream::from_changes(receiver)
220            .filter_map(|value| async move { value });
221        Ok(futures::stream::iter(current).chain(changes).map(RawJson))
222    }
223}
224
225impl<C> MutationRoot<C>
226where
227    C: ClientContext,
228{
229    async fn execute_system_operation(
230        &self,
231        system_operation: SystemOperation,
232        chain_id: ChainId,
233    ) -> Result<CryptoHash, Error> {
234        let certificate = self
235            .apply_client_command(&chain_id, move |client| {
236                let operation = Operation::system(system_operation.clone());
237                async move {
238                    let result = client
239                        .execute_operation(operation)
240                        .await
241                        .map_err(Error::from);
242                    (result, client)
243                }
244            })
245            .await?;
246        Ok(certificate.hash())
247    }
248
249    /// Applies the given function to the chain client.
250    /// Updates the wallet regardless of the outcome. As long as the function returns a round
251    /// timeout, it will wait and retry.
252    async fn apply_client_command<F, Fut, T>(
253        &self,
254        chain_id: &ChainId,
255        mut f: F,
256    ) -> Result<T, Error>
257    where
258        F: FnMut(ChainClient<C::Environment>) -> Fut,
259        Fut: Future<Output = (Result<ClientOutcome<T>, Error>, ChainClient<C::Environment>)>,
260    {
261        loop {
262            let client = self
263                .context
264                .lock()
265                .await
266                .make_chain_client(*chain_id)
267                .await?;
268            let mut stream = client.subscribe()?;
269            let (result, client) = f(client).await;
270            self.context.lock().await.update_wallet(&client).await?;
271            let timeout = match result? {
272                ClientOutcome::Committed(t) => return Ok(t),
273                ClientOutcome::Conflict(certificate) => {
274                    return Err(chain_client::Error::Conflict(certificate.hash()).into());
275                }
276                ClientOutcome::WaitForTimeout(timeout) => timeout,
277            };
278            drop(client);
279            util::wait_for_next_round(&mut stream, timeout).await;
280        }
281    }
282}
283
284#[async_graphql::Object(cache_control(no_cache))]
285impl<C> MutationRoot<C>
286where
287    C: ClientContext + 'static,
288{
289    /// Processes the inbox and returns the lists of certificate hashes that were created, if any.
290    async fn process_inbox(
291        &self,
292        #[graphql(desc = "The chain whose inbox is being processed.")] chain_id: ChainId,
293    ) -> Result<Vec<CryptoHash>, Error> {
294        let mut hashes = Vec::new();
295        loop {
296            let client = self
297                .context
298                .lock()
299                .await
300                .make_chain_client(chain_id)
301                .await?;
302            let result = client.process_inbox().await;
303            self.context.lock().await.update_wallet(&client).await?;
304            let (certificates, maybe_timeout) = result?;
305            hashes.extend(certificates.into_iter().map(|cert| cert.hash()));
306            match maybe_timeout {
307                None => return Ok(hashes),
308                Some(timestamp) => {
309                    let mut stream = client.subscribe()?;
310                    drop(client);
311                    util::wait_for_next_round(&mut stream, timestamp).await;
312                }
313            }
314        }
315    }
316
317    /// Synchronizes the chain with the validators. Returns the chain's length.
318    ///
319    /// This is only used for testing, to make sure that a client is up to date.
320    // TODO(#4718): Remove this mutation.
321    async fn sync(
322        &self,
323        #[graphql(desc = "The chain being synchronized.")] chain_id: ChainId,
324    ) -> Result<u64, Error> {
325        let client = self
326            .context
327            .lock()
328            .await
329            .make_chain_client(chain_id)
330            .await?;
331        let info = client.synchronize_from_validators().await?;
332        self.context.lock().await.update_wallet(&client).await?;
333        Ok(info.next_block_height.0)
334    }
335
336    /// Retries the pending block that was unsuccessfully proposed earlier.
337    async fn retry_pending_block(
338        &self,
339        #[graphql(desc = "The chain on whose block is being retried.")] chain_id: ChainId,
340    ) -> Result<Option<CryptoHash>, Error> {
341        let client = self
342            .context
343            .lock()
344            .await
345            .make_chain_client(chain_id)
346            .await?;
347        let outcome = client.process_pending_block().await?;
348        self.context.lock().await.update_wallet(&client).await?;
349        match outcome {
350            ClientOutcome::Committed(Some(certificate)) => Ok(Some(certificate.hash())),
351            ClientOutcome::Committed(None) => Ok(None),
352            ClientOutcome::WaitForTimeout(timeout) => Err(Error::from(format!(
353                "Please try again at {}",
354                timeout.timestamp
355            ))),
356            ClientOutcome::Conflict(certificate) => Err(Error::from(format!(
357                "A different block was committed: {}",
358                certificate.hash()
359            ))),
360        }
361    }
362
363    /// Transfers `amount` units of value from the given owner's account to the recipient.
364    /// If no owner is given, try to take the units out of the chain account.
365    async fn transfer(
366        &self,
367        #[graphql(desc = "The chain which native tokens are being transferred from.")]
368        chain_id: ChainId,
369        #[graphql(desc = "The account being debited on the chain.")] owner: AccountOwner,
370        #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
371        #[graphql(desc = "The amount being transferred.")] amount: Amount,
372    ) -> Result<CryptoHash, Error> {
373        self.apply_client_command(&chain_id, move |client| async move {
374            let result = client
375                .transfer(owner, amount, recipient)
376                .await
377                .map_err(Error::from)
378                .map(|outcome| outcome.map(|certificate| certificate.hash()));
379            (result, client)
380        })
381        .await
382    }
383
384    /// Claims `amount` units of value from the given owner's account in the remote
385    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
386    /// process the message.
387    async fn claim(
388        &self,
389        #[graphql(desc = "The chain for whom owner is one of the owner.")] chain_id: ChainId,
390        #[graphql(desc = "The owner of chain targetId being debited.")] owner: AccountOwner,
391        #[graphql(desc = "The chain whose owner is being debited.")] target_id: ChainId,
392        #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
393        #[graphql(desc = "The amount being transferred.")] amount: Amount,
394    ) -> Result<CryptoHash, Error> {
395        self.apply_client_command(&chain_id, move |client| async move {
396            let result = client
397                .claim(owner, target_id, recipient, amount)
398                .await
399                .map_err(Error::from)
400                .map(|outcome| outcome.map(|certificate| certificate.hash()));
401            (result, client)
402        })
403        .await
404    }
405
406    /// Test if a data blob is readable from a transaction in the current chain.
407    // TODO(#2490): Consider removing or renaming this.
408    async fn read_data_blob(
409        &self,
410        chain_id: ChainId,
411        hash: CryptoHash,
412    ) -> Result<CryptoHash, Error> {
413        self.apply_client_command(&chain_id, move |client| async move {
414            let result = client
415                .read_data_blob(hash)
416                .await
417                .map_err(Error::from)
418                .map(|outcome| outcome.map(|certificate| certificate.hash()));
419            (result, client)
420        })
421        .await
422    }
423
424    /// Creates a new single-owner chain.
425    async fn open_chain(
426        &self,
427        #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
428        #[graphql(desc = "The owner of the new chain.")] owner: AccountOwner,
429        #[graphql(desc = "The balance of the chain being created. Zero if `None`.")]
430        balance: Option<Amount>,
431    ) -> Result<ChainId, Error> {
432        let ownership = ChainOwnership::single(owner);
433        let balance = balance.unwrap_or(Amount::ZERO);
434        let description = self
435            .apply_client_command(&chain_id, move |client| {
436                let ownership = ownership.clone();
437                async move {
438                    let result = client
439                        .open_chain(ownership, ApplicationPermissions::default(), balance)
440                        .await
441                        .map_err(Error::from)
442                        .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
443                    (result, client)
444                }
445            })
446            .await?;
447        Ok(description.id())
448    }
449
450    /// Creates a new multi-owner chain.
451    #[expect(clippy::too_many_arguments)]
452    async fn open_multi_owner_chain(
453        &self,
454        #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
455        #[graphql(desc = "Permissions for applications on the new chain")]
456        application_permissions: Option<ApplicationPermissions>,
457        #[graphql(desc = "The owners of the chain")] owners: Vec<AccountOwner>,
458        #[graphql(desc = "The weights of the owners")] weights: Option<Vec<u64>>,
459        #[graphql(desc = "The number of multi-leader rounds")] multi_leader_rounds: Option<u32>,
460        #[graphql(desc = "The balance of the chain. Zero if `None`")] balance: Option<Amount>,
461        #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
462        fast_round_ms: Option<u64>,
463        #[graphql(
464            desc = "The duration of the first single-leader and all multi-leader rounds",
465            default = 10_000
466        )]
467        base_timeout_ms: u64,
468        #[graphql(
469            desc = "The number of milliseconds by which the timeout increases after each \
470                    single-leader round",
471            default = 1_000
472        )]
473        timeout_increment_ms: u64,
474        #[graphql(
475            desc = "The age of an incoming tracked or protected message after which the \
476                    validators start transitioning the chain to fallback mode, in milliseconds.",
477            default = 86_400_000
478        )]
479        fallback_duration_ms: u64,
480    ) -> Result<ChainId, Error> {
481        let owners = if let Some(weights) = weights {
482            if weights.len() != owners.len() {
483                return Err(Error::new(format!(
484                    "There are {} owners but {} weights.",
485                    owners.len(),
486                    weights.len()
487                )));
488            }
489            owners.into_iter().zip(weights).collect::<Vec<_>>()
490        } else {
491            owners
492                .into_iter()
493                .zip(iter::repeat(100))
494                .collect::<Vec<_>>()
495        };
496        let multi_leader_rounds = multi_leader_rounds.unwrap_or(u32::MAX);
497        let timeout_config = TimeoutConfig {
498            fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
499            base_timeout: TimeDelta::from_millis(base_timeout_ms),
500            timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
501            fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
502        };
503        let ownership = ChainOwnership::multiple(owners, multi_leader_rounds, timeout_config);
504        let balance = balance.unwrap_or(Amount::ZERO);
505        let description = self
506            .apply_client_command(&chain_id, move |client| {
507                let ownership = ownership.clone();
508                let application_permissions = application_permissions.clone().unwrap_or_default();
509                async move {
510                    let result = client
511                        .open_chain(ownership, application_permissions, balance)
512                        .await
513                        .map_err(Error::from)
514                        .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
515                    (result, client)
516                }
517            })
518            .await?;
519        Ok(description.id())
520    }
521
522    /// Closes the chain. Returns the new block hash if successful or `None` if it was already closed.
523    async fn close_chain(
524        &self,
525        #[graphql(desc = "The chain being closed.")] chain_id: ChainId,
526    ) -> Result<Option<CryptoHash>, Error> {
527        let maybe_cert = self
528            .apply_client_command(&chain_id, |client| async move {
529                let result = client.close_chain().await.map_err(Error::from);
530                (result, client)
531            })
532            .await?;
533        Ok(maybe_cert.as_ref().map(GenericCertificate::hash))
534    }
535
536    /// Changes the chain to a single-owner chain
537    async fn change_owner(
538        &self,
539        #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
540        #[graphql(desc = "The new single owner of the chain")] new_owner: AccountOwner,
541    ) -> Result<CryptoHash, Error> {
542        let operation = SystemOperation::ChangeOwnership {
543            super_owners: vec![new_owner],
544            owners: Vec::new(),
545            first_leader: None,
546            multi_leader_rounds: 5,
547            open_multi_leader_rounds: false,
548            timeout_config: TimeoutConfig::default(),
549        };
550        self.execute_system_operation(operation, chain_id).await
551    }
552
553    /// Changes the ownership of the chain
554    #[expect(clippy::too_many_arguments)]
555    async fn change_multiple_owners(
556        &self,
557        #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
558        #[graphql(desc = "The new list of owners of the chain")] new_owners: Vec<AccountOwner>,
559        #[graphql(desc = "The new list of weights of the owners")] new_weights: Vec<u64>,
560        #[graphql(desc = "The multi-leader round of the chain")] multi_leader_rounds: u32,
561        #[graphql(
562            desc = "Whether multi-leader rounds are unrestricted, that is not limited to chain owners."
563        )]
564        open_multi_leader_rounds: bool,
565        #[graphql(desc = "The leader of the first single-leader round. \
566                          If not set, this is random like other rounds.")]
567        first_leader: Option<AccountOwner>,
568        #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
569        fast_round_ms: Option<u64>,
570        #[graphql(
571            desc = "The duration of the first single-leader and all multi-leader rounds",
572            default = 10_000
573        )]
574        base_timeout_ms: u64,
575        #[graphql(
576            desc = "The number of milliseconds by which the timeout increases after each \
577                    single-leader round",
578            default = 1_000
579        )]
580        timeout_increment_ms: u64,
581        #[graphql(
582            desc = "The age of an incoming tracked or protected message after which the \
583                    validators start transitioning the chain to fallback mode, in milliseconds.",
584            default = 86_400_000
585        )]
586        fallback_duration_ms: u64,
587    ) -> Result<CryptoHash, Error> {
588        let operation = SystemOperation::ChangeOwnership {
589            super_owners: Vec::new(),
590            owners: new_owners.into_iter().zip(new_weights).collect(),
591            first_leader,
592            multi_leader_rounds,
593            open_multi_leader_rounds,
594            timeout_config: TimeoutConfig {
595                fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
596                base_timeout: TimeDelta::from_millis(base_timeout_ms),
597                timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
598                fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
599            },
600        };
601        self.execute_system_operation(operation, chain_id).await
602    }
603
604    /// Changes the application permissions configuration on this chain.
605    #[expect(clippy::too_many_arguments)]
606    async fn change_application_permissions(
607        &self,
608        #[graphql(desc = "The chain whose permissions are being changed")] chain_id: ChainId,
609        #[graphql(
610            desc = "These applications are allowed to manage the chain: close it, change \
611                    application permissions, and change ownership."
612        )]
613        manage_chain: Vec<ApplicationId>,
614        #[graphql(
615            desc = "If this is `None`, all system operations and application operations are allowed.
616If it is `Some`, only operations from the specified applications are allowed,
617and no system operations."
618        )]
619        execute_operations: Option<Vec<ApplicationId>>,
620        #[graphql(
621            desc = "At least one operation or incoming message from each of these applications must occur in every block."
622        )]
623        mandatory_applications: Vec<ApplicationId>,
624        #[graphql(
625            desc = "These applications are allowed to perform calls to services as oracles."
626        )]
627        call_service_as_oracle: Option<Vec<ApplicationId>>,
628        #[graphql(desc = "These applications are allowed to perform HTTP requests.")]
629        make_http_requests: Option<Vec<ApplicationId>>,
630    ) -> Result<CryptoHash, Error> {
631        let operation = SystemOperation::ChangeApplicationPermissions(ApplicationPermissions {
632            execute_operations,
633            mandatory_applications,
634            manage_chain,
635            call_service_as_oracle,
636            make_http_requests,
637        });
638        self.execute_system_operation(operation, chain_id).await
639    }
640
641    /// (admin chain only) Registers a new committee. This will notify the subscribers of
642    /// the admin chain so that they can migrate to the new epoch (by accepting the
643    /// notification as an "incoming message" in a next block).
644    async fn create_committee(
645        &self,
646        chain_id: ChainId,
647        committee: Committee,
648    ) -> Result<CryptoHash, Error> {
649        Ok(self
650            .apply_client_command(&chain_id, move |client| {
651                let committee = committee.clone();
652                async move {
653                    let result = client
654                        .stage_new_committee(committee)
655                        .await
656                        .map_err(Error::from);
657                    (result, client)
658                }
659            })
660            .await?
661            .hash())
662    }
663
664    /// (admin chain only) Removes a committee. Once this message is accepted by a chain,
665    /// blocks from the retired epoch will not be accepted until they are followed (hence
666    /// re-certified) by a block certified by a recent committee.
667    async fn remove_committee(&self, chain_id: ChainId, epoch: Epoch) -> Result<CryptoHash, Error> {
668        let operation = SystemOperation::Admin(AdminOperation::RemoveCommittee { epoch });
669        self.execute_system_operation(operation, chain_id).await
670    }
671
672    /// Publishes a new application module, optionally along with a JSON-encoded
673    /// `Formats` description that becomes a third blob alongside the contract
674    /// and service blobs.
675    async fn publish_module(
676        &self,
677        #[graphql(desc = "The chain publishing the module")] chain_id: ChainId,
678        #[graphql(desc = "The bytecode of the contract code")] contract: Bytecode,
679        #[graphql(desc = "The bytecode of the service code (only relevant for WebAssembly)")]
680        service: Bytecode,
681        #[graphql(desc = "The virtual machine being used (either Wasm or Evm)")]
682        vm_runtime: VmRuntime,
683        #[graphql(desc = "Optional JSON-encoded `Formats` description bytes")] formats: Option<
684            Vec<u8>,
685        >,
686    ) -> Result<ModuleId, Error> {
687        self.apply_client_command(&chain_id, move |client| {
688            let contract = contract.clone();
689            let service = service.clone();
690            let formats = formats.clone();
691            async move {
692                let result = client
693                    .publish_module(contract, service, vm_runtime, formats)
694                    .await
695                    .map_err(Error::from)
696                    .map(|outcome| outcome.map(|(module_id, _)| module_id));
697                (result, client)
698            }
699        })
700        .await
701    }
702
703    /// Publishes a new data blob.
704    async fn publish_data_blob(
705        &self,
706        #[graphql(desc = "The chain paying for the blob publication")] chain_id: ChainId,
707        #[graphql(desc = "The content of the data blob being created")] bytes: Vec<u8>,
708    ) -> Result<CryptoHash, Error> {
709        self.apply_client_command(&chain_id, |client| {
710            let bytes = bytes.clone();
711            async move {
712                let result = client.publish_data_blob(bytes).await.map_err(Error::from);
713                (result, client)
714            }
715        })
716        .await
717        .map(|_| CryptoHash::new(&BlobContent::new_data(bytes)))
718    }
719
720    /// Creates a new application.
721    async fn create_application(
722        &self,
723        #[graphql(desc = "The chain paying for the creation of the application")] chain_id: ChainId,
724        #[graphql(desc = "The module ID of the application being created")] module_id: ModuleId,
725        #[graphql(desc = "The JSON serialization of the parameters of the application")]
726        parameters: String,
727        #[graphql(
728            desc = "The JSON serialization of the instantiation argument of the application"
729        )]
730        instantiation_argument: String,
731        #[graphql(desc = "The dependencies of the application being created")]
732        required_application_ids: Vec<ApplicationId>,
733    ) -> Result<ApplicationId, Error> {
734        self.apply_client_command(&chain_id, move |client| {
735            let parameters = parameters.as_bytes().to_vec();
736            let instantiation_argument = instantiation_argument.as_bytes().to_vec();
737            let required_application_ids = required_application_ids.clone();
738            async move {
739                let result = client
740                    .create_application_untyped(
741                        module_id,
742                        parameters,
743                        instantiation_argument,
744                        required_application_ids,
745                    )
746                    .await
747                    .map_err(Error::from)
748                    .map(|outcome| outcome.map(|(application_id, _)| application_id));
749                (result, client)
750            }
751        })
752        .await
753    }
754}
755
756#[async_graphql::Object(cache_control(no_cache))]
757impl<C> QueryRoot<C>
758where
759    C: ClientContext + 'static,
760{
761    async fn chain(
762        &self,
763        chain_id: ChainId,
764    ) -> Result<ChainStateExtendedView<<C::Environment as linera_core::Environment>::Storage>, Error>
765    {
766        let client = self
767            .context
768            .lock()
769            .await
770            .make_chain_client(chain_id)
771            .await?;
772        let view = client.chain_state_view().await?;
773        Ok(ChainStateExtendedView::new(view))
774    }
775
776    async fn applications(&self, chain_id: ChainId) -> Result<Vec<ApplicationOverview>, Error> {
777        let client = self
778            .context
779            .lock()
780            .await
781            .make_chain_client(chain_id)
782            .await?;
783        let applications = client
784            .chain_state_view()
785            .await?
786            .execution_state
787            .list_applications()
788            .await?;
789
790        let overviews = applications
791            .into_iter()
792            .map(|(id, description)| ApplicationOverview::new(id, description, self.port, chain_id))
793            .collect();
794
795        Ok(overviews)
796    }
797
798    async fn chains(&self) -> Result<Chains, Error> {
799        Ok(Chains {
800            list: self
801                .context
802                .lock()
803                .await
804                .wallet()
805                .chain_ids()
806                .try_collect()
807                .await?,
808            default: self.default_chain,
809        })
810    }
811
812    async fn block(
813        &self,
814        hash: Option<CryptoHash>,
815        chain_id: ChainId,
816    ) -> Result<Option<Arc<ConfirmedBlock>>, Error> {
817        let client = self
818            .context
819            .lock()
820            .await
821            .make_chain_client(chain_id)
822            .await?;
823        let hash = match hash {
824            Some(hash) => Some(hash),
825            None => client.chain_info().await?.block_hash,
826        };
827        if let Some(hash) = hash {
828            Ok(Some(client.read_confirmed_block(hash).await?))
829        } else {
830            Ok(None)
831        }
832    }
833
834    async fn events_from_index(
835        &self,
836        chain_id: ChainId,
837        stream_id: StreamId,
838        start_index: u32,
839    ) -> Result<Vec<IndexAndEvent>, Error> {
840        Ok(self
841            .context
842            .lock()
843            .await
844            .make_chain_client(chain_id)
845            .await?
846            .events_from_index(stream_id, start_index)
847            .await?)
848    }
849
850    async fn blocks(
851        &self,
852        from: Option<CryptoHash>,
853        chain_id: ChainId,
854        limit: Option<u32>,
855    ) -> Result<Vec<Arc<ConfirmedBlock>>, Error> {
856        let client = self
857            .context
858            .lock()
859            .await
860            .make_chain_client(chain_id)
861            .await?;
862        let limit = limit.unwrap_or(10);
863        let from = match from {
864            Some(from) => Some(from),
865            None => client.chain_info().await?.block_hash,
866        };
867        let Some(from) = from else {
868            return Ok(vec![]);
869        };
870        let mut hash = Some(from);
871        let mut values = Vec::new();
872        for _ in 0..limit {
873            let Some(next_hash) = hash else {
874                break;
875            };
876            let value = client.read_confirmed_block(next_hash).await?;
877            hash = value.block().header.previous_block_hash;
878            values.push(value);
879        }
880        Ok(values)
881    }
882
883    /// Returns the version information on this node service.
884    async fn version(&self) -> linera_version::VersionInfo {
885        linera_version::VersionInfo::default()
886    }
887
888    /// Returns the bytes of an application formats blob (JSON-encoded `Formats`)
889    /// stored in the local node, given the formats blob hash carried by a
890    /// `ModuleId`. Returns `None` if the blob is not present locally.
891    async fn application_formats(
892        &self,
893        chain_id: ChainId,
894        formats_blob_hash: CryptoHash,
895    ) -> Result<Option<Vec<u8>>, Error> {
896        let client = self
897            .context
898            .lock()
899            .await
900            .make_chain_client(chain_id)
901            .await?;
902        let blob_id = linera_base::identifiers::BlobId::new(
903            formats_blob_hash,
904            linera_base::identifiers::BlobType::ApplicationFormats,
905        );
906        let blob = client.storage_client().read_blob(blob_id).await?;
907        Ok(blob.map(|b| b.bytes().to_vec()))
908    }
909}
910
911// What follows is a hack to add a chain_id field to `ChainStateView` based on
912// https://async-graphql.github.io/async-graphql/en/merging_objects.html
913
914struct ChainStateViewExtension(ChainId);
915
916#[async_graphql::Object(cache_control(no_cache))]
917impl ChainStateViewExtension {
918    async fn chain_id(&self) -> ChainId {
919        self.0
920    }
921}
922
923#[derive(MergedObject)]
924struct ChainStateExtendedView<S: Storage>(ChainStateViewExtension, ReadOnlyChainStateView<S>);
925
926/// A wrapper type that allows proxying GraphQL queries to a [`ChainStateView`] that's behind
927/// a read guard.
928pub struct ReadOnlyChainStateView<S: Storage>(ChainStateViewReadGuard<S>);
929
930impl<S: Storage> ContainerType for ReadOnlyChainStateView<S>
931where
932    ChainStateView<S::Context>: ContainerType,
933{
934    async fn resolve_field(
935        &self,
936        context: &async_graphql::Context<'_>,
937    ) -> async_graphql::ServerResult<Option<async_graphql::Value>> {
938        self.0.resolve_field(context).await
939    }
940}
941
942impl<S: Storage> OutputType for ReadOnlyChainStateView<S>
943where
944    ChainStateView<S::Context>: OutputType,
945{
946    fn type_name() -> Cow<'static, str> {
947        ChainStateView::<S::Context>::type_name()
948    }
949
950    fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
951        ChainStateView::<S::Context>::create_type_info(registry)
952    }
953
954    async fn resolve(
955        &self,
956        context: &async_graphql::ContextSelectionSet<'_>,
957        field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
958    ) -> async_graphql::ServerResult<async_graphql::Value> {
959        self.0.resolve(context, field).await
960    }
961}
962
963impl<S: Storage> ChainStateExtendedView<S> {
964    fn new(view: ChainStateViewReadGuard<S>) -> Self {
965        Self(
966            ChainStateViewExtension(view.chain_id()),
967            ReadOnlyChainStateView(view),
968        )
969    }
970}
971
972#[derive(SimpleObject)]
973pub struct ApplicationOverview {
974    id: ApplicationId,
975    description: ApplicationDescription,
976    link: String,
977}
978
979impl ApplicationOverview {
980    fn new(
981        id: ApplicationId,
982        description: ApplicationDescription,
983        port: NonZeroU16,
984        chain_id: ChainId,
985    ) -> Self {
986        Self {
987            id,
988            description,
989            link: format!(
990                "http://localhost:{}/chains/{}/applications/{}",
991                port.get(),
992                chain_id,
993                id
994            ),
995        }
996    }
997}
998
999/// Schema type that can be either full (with mutations) or read-only.
1000pub enum NodeServiceSchema<C>
1001where
1002    C: ClientContext + 'static,
1003{
1004    /// Full schema with mutations enabled.
1005    Full(Schema<QueryRoot<C>, MutationRoot<C>, SubscriptionRoot<C>>),
1006    /// Read-only schema with mutations disabled.
1007    ReadOnly(Schema<QueryRoot<C>, EmptyMutation, SubscriptionRoot<C>>),
1008}
1009
1010impl<C> NodeServiceSchema<C>
1011where
1012    C: ClientContext,
1013{
1014    /// Executes a GraphQL request.
1015    pub async fn execute(&self, request: impl Into<Request>) -> Response {
1016        match self {
1017            Self::Full(schema) => schema.execute(request).await,
1018            Self::ReadOnly(schema) => schema.execute(request).await,
1019        }
1020    }
1021
1022    /// Returns the SDL (Schema Definition Language) representation.
1023    pub fn sdl(&self) -> String {
1024        match self {
1025            Self::Full(schema) => schema.sdl(),
1026            Self::ReadOnly(schema) => schema.sdl(),
1027        }
1028    }
1029}
1030
1031impl<C> Clone for NodeServiceSchema<C>
1032where
1033    C: ClientContext,
1034{
1035    fn clone(&self) -> Self {
1036        match self {
1037            Self::Full(schema) => Self::Full(schema.clone()),
1038            Self::ReadOnly(schema) => Self::ReadOnly(schema.clone()),
1039        }
1040    }
1041}
1042
1043#[cfg(with_metrics)]
1044mod query_cache_metrics {
1045    use std::sync::LazyLock;
1046
1047    use linera_base::prometheus_util::{register_int_counter_vec, register_int_gauge};
1048    use prometheus::{IntCounterVec, IntGauge};
1049
1050    pub static QUERY_CACHE_HIT: LazyLock<IntCounterVec> = LazyLock::new(|| {
1051        register_int_counter_vec("query_response_cache_hit", "Query response cache hits", &[])
1052    });
1053
1054    pub static QUERY_CACHE_MISS: LazyLock<IntCounterVec> = LazyLock::new(|| {
1055        register_int_counter_vec(
1056            "query_response_cache_miss",
1057            "Query response cache misses",
1058            &[],
1059        )
1060    });
1061
1062    pub static QUERY_CACHE_INVALIDATION: LazyLock<IntCounterVec> = LazyLock::new(|| {
1063        register_int_counter_vec(
1064            "query_response_cache_invalidation",
1065            "Query response cache invalidations (per chain)",
1066            &[],
1067        )
1068    });
1069
1070    pub static QUERY_CACHE_ENTRIES: LazyLock<IntGauge> = LazyLock::new(|| {
1071        register_int_gauge(
1072            "query_response_cache_entries",
1073            "Current number of cached query responses across all chains",
1074        )
1075    });
1076}
1077
1078/// Per-chain cache state: an LRU map plus the `next_block_height` at the time the
1079/// cache was last invalidated. Both are behind the same mutex.
1080struct PerChainCache {
1081    lru: LruCache<(ApplicationId, Vec<u8>), Vec<u8>>,
1082    next_block_height: BlockHeight,
1083}
1084
1085/// An LRU cache for application query responses, keyed per chain.
1086///
1087/// Caches serialized response bytes keyed on `(chain_id, application_id, request_bytes)`.
1088/// The entire per-chain cache is invalidated when a `NewBlock` notification arrives.
1089///
1090/// To prevent a race where a slow query inserts stale data *after* an invalidation,
1091/// each insert carries the chain's `next_block_height` at query time.
1092/// If a newer block has since been processed, the insert is silently dropped.
1093struct QueryResponseCache {
1094    chains: papaya::HashMap<ChainId, StdMutex<PerChainCache>>,
1095    /// Chains for which we have registered a notification subscription.
1096    subscribed: papaya::HashSet<ChainId>,
1097    /// Sender half of the notification channel, used to subscribe new chains lazily.
1098    notification_sender: StdMutex<Option<tokio::sync::mpsc::UnboundedSender<Notification>>>,
1099    capacity_per_chain: std::num::NonZeroUsize,
1100}
1101
1102impl QueryResponseCache {
1103    fn new(capacity_per_chain: usize) -> Self {
1104        Self {
1105            chains: papaya::HashMap::new(),
1106            subscribed: papaya::HashSet::new(),
1107            notification_sender: StdMutex::new(None),
1108            capacity_per_chain: std::num::NonZeroUsize::new(capacity_per_chain)
1109                .expect("capacity must be > 0"),
1110        }
1111    }
1112
1113    /// Stores the notification sender (called once during startup).
1114    fn set_notification_sender(&self, sender: tokio::sync::mpsc::UnboundedSender<Notification>) {
1115        *self
1116            .notification_sender
1117            .lock()
1118            .expect("sender mutex poisoned") = Some(sender);
1119    }
1120
1121    /// Returns the notification sender, if set.
1122    fn notification_sender(&self) -> Option<tokio::sync::mpsc::UnboundedSender<Notification>> {
1123        self.notification_sender
1124            .lock()
1125            .expect("sender mutex poisoned")
1126            .clone()
1127    }
1128
1129    /// Marks a chain as subscribed to notifications.
1130    fn mark_subscribed(&self, chain_id: ChainId) {
1131        self.subscribed.pin().insert(chain_id);
1132    }
1133
1134    /// Returns `true` if the chain is not yet subscribed to notifications.
1135    fn needs_subscription(&self, chain_id: &ChainId) -> bool {
1136        !self.subscribed.pin().contains(chain_id)
1137    }
1138
1139    /// Marks initial chains as subscribed (called during startup).
1140    fn mark_all_subscribed(&self, chain_ids: &[ChainId]) {
1141        let pinned = self.subscribed.pin();
1142        for &chain_id in chain_ids {
1143            pinned.insert(chain_id);
1144        }
1145    }
1146
1147    /// Looks up a cached response. Returns `Some(bytes)` on hit, `None` on miss
1148    /// (including when the chain has no cache entry yet).
1149    fn get(&self, chain_id: ChainId, app_id: &ApplicationId, request: &[u8]) -> Option<Vec<u8>> {
1150        let pinned = self.chains.pin();
1151        let result = pinned.get(&chain_id).and_then(|mutex| {
1152            mutex
1153                .lock()
1154                .expect("LRU mutex poisoned")
1155                .lru
1156                .get(&(*app_id, request.to_vec()))
1157                .cloned()
1158        });
1159        #[cfg(with_metrics)]
1160        {
1161            let metric = if result.is_some() {
1162                &query_cache_metrics::QUERY_CACHE_HIT
1163            } else {
1164                &query_cache_metrics::QUERY_CACHE_MISS
1165            };
1166            metric.with_label_values(&[]).inc();
1167        }
1168        result
1169    }
1170
1171    /// Inserts a response into the cache, unless the chain's `next_block_height` has
1172    /// advanced past the caller's snapshot (which would mean a new block arrived and
1173    /// this response is potentially stale).
1174    fn insert(
1175        &self,
1176        chain_id: ChainId,
1177        app_id: ApplicationId,
1178        request: Vec<u8>,
1179        response: Vec<u8>,
1180        next_block_height: BlockHeight,
1181    ) {
1182        let pinned = self.chains.pin();
1183        let capacity = self.capacity_per_chain;
1184        let mutex = pinned.get_or_insert_with(chain_id, || {
1185            StdMutex::new(PerChainCache {
1186                lru: LruCache::new(capacity),
1187                next_block_height,
1188            })
1189        });
1190        let mut cache = mutex.lock().expect("LRU mutex poisoned");
1191        if next_block_height < cache.next_block_height {
1192            return; // A new block arrived since this query started; discard stale response.
1193        }
1194        // If the chain has advanced since the last cache update, also clear stale entries.
1195        // Note: This should not happen if notifications are timely. Also, this only
1196        // works when we have a cache miss.
1197        if next_block_height > cache.next_block_height {
1198            debug!(
1199                "Unexpected query cache invalidation for chain {chain_id}:\
1200                 {next_block_height} > {}",
1201                cache.next_block_height
1202            );
1203            #[cfg(with_metrics)]
1204            {
1205                #[expect(
1206                    clippy::cast_possible_wrap,
1207                    reason = "LRU cache size fits in i64 for any realistic cache"
1208                )]
1209                let cache_len = cache.lru.len() as i64;
1210                query_cache_metrics::QUERY_CACHE_ENTRIES.sub(cache_len);
1211                query_cache_metrics::QUERY_CACHE_INVALIDATION
1212                    .with_label_values(&[])
1213                    .inc();
1214            }
1215            cache.lru.clear();
1216            cache.next_block_height = next_block_height;
1217        }
1218        #[cfg(with_metrics)]
1219        let prev_len = cache.lru.len();
1220        cache.lru.put((app_id, request), response);
1221        #[cfg(with_metrics)]
1222        if cache.lru.len() != prev_len {
1223            query_cache_metrics::QUERY_CACHE_ENTRIES.inc();
1224        }
1225    }
1226
1227    /// Called when a `NewBlock` notification arrives. Records the new
1228    /// `next_block_height` and clears all cached responses for the chain.
1229    fn invalidate_chain(&self, chain_id: &ChainId, next_block_height: BlockHeight) {
1230        let pinned = self.chains.pin();
1231        let capacity = self.capacity_per_chain;
1232        let mutex = pinned.get_or_insert_with(*chain_id, || {
1233            StdMutex::new(PerChainCache {
1234                lru: LruCache::new(capacity),
1235                next_block_height,
1236            })
1237        });
1238        let mut cache = mutex.lock().expect("LRU mutex poisoned");
1239        if next_block_height > cache.next_block_height {
1240            #[cfg(with_metrics)]
1241            {
1242                #[expect(
1243                    clippy::cast_possible_wrap,
1244                    reason = "LRU cache size fits in i64 for any realistic cache"
1245                )]
1246                let cache_len = cache.lru.len() as i64;
1247                query_cache_metrics::QUERY_CACHE_ENTRIES.sub(cache_len);
1248                query_cache_metrics::QUERY_CACHE_INVALIDATION
1249                    .with_label_values(&[])
1250                    .inc();
1251            }
1252            cache.lru.clear();
1253            cache.next_block_height = next_block_height;
1254        } else {
1255            debug!(
1256                "Query cache for chain {chain_id} was already invalidated:\
1257                 {next_block_height} <= {}",
1258                cache.next_block_height
1259            );
1260        }
1261    }
1262}
1263
1264/// The `NodeService` is a server that exposes a web-server to the client.
1265/// The node service is primarily used to explore the state of a chain in GraphQL.
1266pub struct NodeService<C>
1267where
1268    C: ClientContext + 'static,
1269{
1270    config: ChainListenerConfig,
1271    port: NonZeroU16,
1272    #[cfg(with_metrics)]
1273    metrics_port: NonZeroU16,
1274    default_chain: Option<ChainId>,
1275    context: Arc<Mutex<C>>,
1276    /// If true, disallow mutations and prevent queries from scheduling operations.
1277    read_only: bool,
1278    /// Optional LRU cache for application query responses. `None` when caching is disabled.
1279    query_cache: Option<Arc<QueryResponseCache>>,
1280    query_subscriptions: Option<Arc<crate::query_subscription::QuerySubscriptionManager>>,
1281    cancellation_token: CancellationToken,
1282    enable_memory_profiling: bool,
1283    /// If true, do not start the chain listener; serve queries from local state only.
1284    pause: bool,
1285}
1286
1287impl<C> Clone for NodeService<C>
1288where
1289    C: ClientContext + 'static,
1290{
1291    fn clone(&self) -> Self {
1292        Self {
1293            config: self.config.clone(),
1294            port: self.port,
1295            #[cfg(with_metrics)]
1296            metrics_port: self.metrics_port,
1297            default_chain: self.default_chain,
1298            context: Arc::clone(&self.context),
1299            read_only: self.read_only,
1300            query_cache: self.query_cache.clone(),
1301            query_subscriptions: self.query_subscriptions.clone(),
1302            cancellation_token: self.cancellation_token.clone(),
1303            enable_memory_profiling: self.enable_memory_profiling,
1304            pause: self.pause,
1305        }
1306    }
1307}
1308
1309impl<C> NodeService<C>
1310where
1311    C: ClientContext,
1312{
1313    /// Creates a new instance of the node service given a client chain and a port.
1314    ///
1315    /// `query_cache_size` controls the per-chain LRU cache capacity for application query
1316    /// responses. Pass `None` to disable the cache (the default). Enable with
1317    /// `--query-cache-size <N>`. Incompatible with `--long-lived-services`.
1318    #[expect(clippy::too_many_arguments)]
1319    pub fn new(
1320        config: ChainListenerConfig,
1321        port: NonZeroU16,
1322        #[cfg(with_metrics)] metrics_port: NonZeroU16,
1323        default_chain: Option<ChainId>,
1324        context: Arc<Mutex<C>>,
1325        read_only: bool,
1326        query_cache_size: Option<usize>,
1327        query_subscriptions: Option<Arc<crate::query_subscription::QuerySubscriptionManager>>,
1328        cancellation_token: CancellationToken,
1329        enable_memory_profiling: bool,
1330        pause: bool,
1331    ) -> Self {
1332        let query_cache = query_cache_size.map(|size| Arc::new(QueryResponseCache::new(size)));
1333        Self {
1334            config,
1335            port,
1336            #[cfg(with_metrics)]
1337            metrics_port,
1338            default_chain,
1339            context,
1340            read_only,
1341            query_cache,
1342            query_subscriptions,
1343            cancellation_token,
1344            enable_memory_profiling,
1345            pause,
1346        }
1347    }
1348
1349    #[cfg(with_metrics)]
1350    pub fn metrics_address(&self) -> SocketAddr {
1351        SocketAddr::from(([0, 0, 0, 0], self.metrics_port.get()))
1352    }
1353
1354    pub fn schema(&self) -> NodeServiceSchema<C> {
1355        let query = QueryRoot {
1356            context: Arc::clone(&self.context),
1357            port: self.port,
1358            default_chain: self.default_chain,
1359        };
1360        let subscription = SubscriptionRoot {
1361            context: Arc::clone(&self.context),
1362            query_subscriptions: self.query_subscriptions.clone(),
1363            cancellation_token: self.cancellation_token.clone(),
1364        };
1365
1366        if self.read_only {
1367            NodeServiceSchema::ReadOnly(Schema::build(query, EmptyMutation, subscription).finish())
1368        } else {
1369            NodeServiceSchema::Full(
1370                Schema::build(
1371                    query,
1372                    MutationRoot {
1373                        context: Arc::clone(&self.context),
1374                    },
1375                    subscription,
1376                )
1377                .finish(),
1378            )
1379        }
1380    }
1381
1382    /// Runs the node service.
1383    #[instrument(name = "node_service", level = "info", skip_all, fields(port = ?self.port))]
1384    pub async fn run(
1385        self,
1386        cancellation_token: CancellationToken,
1387        command_receiver: UnboundedReceiver<ListenerCommand>,
1388    ) -> Result<(), anyhow::Error> {
1389        let port = self.port.get();
1390        let index_handler = axum::routing::get(util::graphiql).post(Self::index_handler);
1391        let application_handler =
1392            axum::routing::get(util::graphiql).post(Self::application_handler);
1393
1394        #[cfg(with_metrics)]
1395        monitoring_server::start_metrics_with_profiling(
1396            self.metrics_address(),
1397            cancellation_token.clone(),
1398            self.enable_memory_profiling,
1399        )
1400        .await;
1401
1402        let base_router = Router::new()
1403            .route("/", index_handler)
1404            .route(
1405                "/chains/{chain_id}/applications/{application_id}",
1406                application_handler,
1407            )
1408            .route("/ready", axum::routing::get(|| async { "ready!" }));
1409
1410        // Create router with appropriate schema for WebSocket subscriptions.
1411        let app = match self.schema() {
1412            NodeServiceSchema::Full(schema) => {
1413                base_router.route_service("/ws", GraphQLSubscription::new(schema))
1414            }
1415            NodeServiceSchema::ReadOnly(schema) => {
1416                base_router.route_service("/ws", GraphQLSubscription::new(schema))
1417            }
1418        }
1419        .layer(Extension(self.clone()))
1420        // TODO(#551): Provide application authentication.
1421        .layer(CorsLayer::permissive());
1422
1423        info!("GraphiQL IDE: http://localhost:{}", port);
1424
1425        // Spawn the cache invalidation listener if caching is enabled.
1426        if let Some(cache) = &self.query_cache {
1427            let guard = self.context.lock().await;
1428            let chain_ids: Vec<ChainId> = guard.wallet().chain_ids().try_collect().await?;
1429            let (tx, mut receiver) = tokio::sync::mpsc::unbounded_channel();
1430            guard.client().subscribe_extra(chain_ids.clone(), &tx);
1431            cache.mark_all_subscribed(&chain_ids);
1432            cache.set_notification_sender(tx);
1433            drop(guard);
1434            let cache = Arc::clone(cache);
1435            tokio::spawn(async move {
1436                while let Some(notification) = receiver.recv().await {
1437                    if let Reason::NewBlock { height, .. } = notification.reason {
1438                        let next_block_height = height
1439                            .try_add_one()
1440                            .expect("block height should not overflow");
1441                        cache.invalidate_chain(&notification.chain_id, next_block_height);
1442                    }
1443                }
1444            });
1445        }
1446
1447        let tcp_listener =
1448            tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
1449        let server = axum::serve(tcp_listener, app)
1450            .with_graceful_shutdown(cancellation_token.clone().cancelled_owned())
1451            .into_future();
1452
1453        if self.pause {
1454            info!("Running in paused mode: chain synchronization is disabled");
1455            server.await?;
1456        } else {
1457            let storage = self.context.lock().await.storage().clone();
1458            let chain_listener = ChainListener::new(
1459                self.config,
1460                self.context,
1461                storage,
1462                cancellation_token.clone(),
1463                command_receiver,
1464                true,
1465            )
1466            .run()
1467            .await?;
1468            let mut chain_listener = Box::pin(chain_listener).fuse();
1469            futures::select! {
1470                result = chain_listener => result?,
1471                result = Box::pin(server).fuse() => result?,
1472            };
1473        }
1474
1475        Ok(())
1476    }
1477
1478    /// Handles service queries for user applications (including mutations).
1479    async fn handle_service_request(
1480        &self,
1481        application_id: ApplicationId,
1482        request: Vec<u8>,
1483        chain_id: ChainId,
1484        block_hash: Option<CryptoHash>,
1485    ) -> Result<Vec<u8>, NodeServiceError> {
1486        // Only cache read-only queries against the latest state (block_hash == None).
1487        let cache = block_hash
1488            .is_none()
1489            .then_some(self.query_cache.as_ref())
1490            .flatten();
1491
1492        // Return immediately on cache hit.
1493        if let Some(cache) = cache {
1494            if let Some(cached) = cache.get(chain_id, &application_id, &request) {
1495                return Ok(cached);
1496            }
1497        }
1498
1499        let (
1500            QueryOutcome {
1501                response,
1502                operations,
1503            },
1504            block_height,
1505        ) = self
1506            .query_user_application(application_id, request.clone(), chain_id, block_hash)
1507            .await?;
1508        if operations.is_empty() {
1509            if let Some(cache) = cache {
1510                // Lazily subscribe to notifications for chains discovered after startup.
1511                if cache.needs_subscription(&chain_id) {
1512                    if let Some(sender) = cache.notification_sender() {
1513                        self.context
1514                            .lock()
1515                            .await
1516                            .client()
1517                            .subscribe_extra(vec![chain_id], &sender);
1518                        cache.mark_subscribed(chain_id);
1519                    }
1520                }
1521                cache.insert(
1522                    chain_id,
1523                    application_id,
1524                    request,
1525                    response.clone(),
1526                    block_height,
1527                );
1528            }
1529            return Ok(response);
1530        }
1531
1532        if self.read_only {
1533            return Err(NodeServiceError::ReadOnlyModeOperationsNotAllowed);
1534        }
1535
1536        trace!("Query requested a new block with operations: {operations:?}");
1537        let client = self
1538            .context
1539            .lock()
1540            .await
1541            .make_chain_client(chain_id)
1542            .await?;
1543        let hash = loop {
1544            let timeout = match client
1545                .execute_operations(operations.clone(), vec![])
1546                .await?
1547            {
1548                ClientOutcome::Committed(certificate) => break certificate.hash(),
1549                ClientOutcome::Conflict(certificate) => {
1550                    return Err(chain_client::Error::Conflict(certificate.hash()).into());
1551                }
1552                ClientOutcome::WaitForTimeout(timeout) => timeout,
1553            };
1554            let mut stream = client.subscribe().map_err(|_| {
1555                chain_client::Error::InternalError("Could not subscribe to the local node.")
1556            })?;
1557            util::wait_for_next_round(&mut stream, timeout).await;
1558        };
1559        let response = async_graphql::Response::new(hash.to_value());
1560        Ok(serde_json::to_vec(&response)?)
1561    }
1562
1563    /// Queries a user application, returning the raw [`QueryOutcome`] and the height of the
1564    /// chain's latest block at the time of the query (used for cache staleness detection).
1565    async fn query_user_application(
1566        &self,
1567        application_id: ApplicationId,
1568        bytes: Vec<u8>,
1569        chain_id: ChainId,
1570        block_hash: Option<CryptoHash>,
1571    ) -> Result<(QueryOutcome<Vec<u8>>, BlockHeight), NodeServiceError> {
1572        let query = Query::User {
1573            application_id,
1574            bytes,
1575        };
1576        let client = self
1577            .context
1578            .lock()
1579            .await
1580            .make_chain_client(chain_id)
1581            .await?;
1582        let (
1583            QueryOutcome {
1584                response,
1585                operations,
1586            },
1587            next_block_height,
1588        ) = client.query_application(query, block_hash).await?;
1589        match response {
1590            QueryResponse::System(_) => {
1591                unreachable!("cannot get a system response for a user query")
1592            }
1593            QueryResponse::User(user_response_bytes) => Ok((
1594                QueryOutcome {
1595                    response: user_response_bytes,
1596                    operations,
1597                },
1598                next_block_height,
1599            )),
1600        }
1601    }
1602
1603    /// Executes a GraphQL query and generates a response for our `Schema`.
1604    async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
1605        service
1606            .0
1607            .schema()
1608            .execute(request.into_inner())
1609            .await
1610            .into()
1611    }
1612
1613    /// Executes a GraphQL query against an application.
1614    /// Pattern matches on the `OperationType` of the query and routes the query
1615    /// accordingly.
1616    async fn application_handler(
1617        Path((chain_id, application_id)): Path<(String, String)>,
1618        service: Extension<Self>,
1619        request: String,
1620    ) -> Result<Vec<u8>, NodeServiceError> {
1621        let chain_id: ChainId = chain_id.parse().map_err(NodeServiceError::InvalidChainId)?;
1622        let application_id: ApplicationId = application_id.parse()?;
1623
1624        debug!(
1625            %chain_id,
1626            %application_id,
1627            "processing request for application:\n{:?}",
1628            &request
1629        );
1630        let response = service
1631            .0
1632            .handle_service_request(application_id, request.into_bytes(), chain_id, None)
1633            .await?;
1634
1635        Ok(response)
1636    }
1637}
1638
1639#[cfg(test)]
1640mod tests {
1641    use linera_base::{
1642        crypto::CryptoHash,
1643        data_types::BlockHeight,
1644        identifiers::{ApplicationId, ChainId},
1645    };
1646
1647    use super::QueryResponseCache;
1648
1649    fn test_chain(n: u64) -> ChainId {
1650        ChainId(CryptoHash::test_hash(format!("chain-{n}")))
1651    }
1652
1653    fn test_app(n: u64) -> ApplicationId {
1654        ApplicationId::new(CryptoHash::test_hash(format!("app-{n}")))
1655    }
1656
1657    #[test]
1658    fn cache_hit_and_miss() {
1659        let cache = QueryResponseCache::new(100);
1660        let chain = test_chain(0);
1661        let app = test_app(0);
1662        let request = b"query { balance }".to_vec();
1663        let response = b"{ \"balance\": 42 }".to_vec();
1664
1665        // Unknown chain — get returns None.
1666        assert!(cache.get(chain, &app, &request).is_none());
1667
1668        // Insert creates the per-chain entry.
1669        cache.insert(
1670            chain,
1671            app,
1672            request.clone(),
1673            response.clone(),
1674            BlockHeight(1),
1675        );
1676
1677        // Hit after insert.
1678        assert_eq!(cache.get(chain, &app, &request), Some(response));
1679    }
1680
1681    #[test]
1682    fn per_chain_isolation() {
1683        let cache = QueryResponseCache::new(100);
1684        let chain_a = test_chain(0);
1685        let chain_b = test_chain(1);
1686        let app = test_app(0);
1687        let request = b"q".to_vec();
1688        let response = b"r".to_vec();
1689
1690        cache.insert(
1691            chain_a,
1692            app,
1693            request.clone(),
1694            response.clone(),
1695            BlockHeight(1),
1696        );
1697
1698        // Invalidating chain B must not affect chain A.
1699        cache.invalidate_chain(&chain_b, BlockHeight(1));
1700        assert_eq!(cache.get(chain_a, &app, &request), Some(response));
1701    }
1702
1703    #[test]
1704    fn invalidation_clears_all_entries() {
1705        let cache = QueryResponseCache::new(100);
1706        let chain = test_chain(0);
1707        let app = test_app(0);
1708
1709        cache.insert(chain, app, b"q1".to_vec(), b"r1".to_vec(), BlockHeight(1));
1710        cache.insert(chain, app, b"q2".to_vec(), b"r2".to_vec(), BlockHeight(1));
1711
1712        cache.invalidate_chain(&chain, BlockHeight(2));
1713        assert!(cache.get(chain, &app, b"q1").is_none());
1714        assert!(cache.get(chain, &app, b"q2").is_none());
1715    }
1716
1717    #[test]
1718    fn lru_eviction() {
1719        let cache = QueryResponseCache::new(2);
1720        let chain = test_chain(0);
1721        let app = test_app(0);
1722
1723        cache.insert(chain, app, b"q1".to_vec(), b"r1".to_vec(), BlockHeight(1));
1724        cache.insert(chain, app, b"q2".to_vec(), b"r2".to_vec(), BlockHeight(1));
1725        // Third insert evicts q1 (least recently used).
1726        cache.insert(chain, app, b"q3".to_vec(), b"r3".to_vec(), BlockHeight(1));
1727
1728        assert!(cache.get(chain, &app, b"q1").is_none());
1729        assert!(cache.get(chain, &app, b"q2").is_some());
1730        assert!(cache.get(chain, &app, b"q3").is_some());
1731    }
1732
1733    #[test]
1734    fn stale_insert_rejected_after_invalidation() {
1735        let cache = QueryResponseCache::new(100);
1736        let chain = test_chain(0);
1737        let app = test_app(0);
1738
1739        // Chain is at block 3. A query starts and snapshots this height.
1740        cache.insert(chain, app, b"q0".to_vec(), b"r0".to_vec(), BlockHeight(3));
1741        let stale_height = BlockHeight(3);
1742
1743        // Block 4 arrives while the query is in flight.
1744        cache.invalidate_chain(&chain, BlockHeight(4));
1745
1746        // Slow query finishes and tries to insert with the stale height.
1747        cache.insert(chain, app, b"q".to_vec(), b"stale".to_vec(), stale_height);
1748
1749        // The stale insert should have been rejected.
1750        assert!(cache.get(chain, &app, b"q").is_none());
1751    }
1752}