Skip to main content

linera_service/
node_service.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Cow,
6    collections::BTreeSet,
7    future::IntoFuture,
8    iter,
9    net::SocketAddr,
10    num::NonZeroU16,
11    sync::{Arc, Mutex as StdMutex},
12};
13
14use async_graphql::{
15    futures_util::Stream,
16    registry::{MetaType, MetaTypeId, Registry},
17    resolver_utils::ContainerType,
18    EmptyMutation, Error, MergedObject, OutputType, Positioned, Request, Response, ScalarType,
19    Schema, SimpleObject, Subscription,
20};
21use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
22use axum::{extract::Path, http::StatusCode, response, response::IntoResponse, Extension, Router};
23use futures::{lock::Mutex, Future, FutureExt as _, StreamExt as _, TryStreamExt as _};
24use linera_base::{
25    crypto::{CryptoError, CryptoHash},
26    data_types::{
27        Amount, ApplicationDescription, ApplicationPermissions, BlockHeight, Bytecode, Epoch,
28        TimeDelta,
29    },
30    identifiers::{
31        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
32    },
33    ownership::{ChainOwnership, TimeoutConfig},
34    vm::VmRuntime,
35    BcsHexParseError,
36};
37use linera_chain::{
38    types::{ConfirmedBlock, GenericCertificate},
39    ChainStateView,
40};
41use linera_client::chain_listener::{
42    ChainListener, ChainListenerConfig, ClientContext, ClientContextExt as _, ListenerCommand,
43};
44use linera_core::{
45    client::chain_client::{self, ChainClient},
46    data_types::ClientOutcome,
47    wallet::Wallet as _,
48    worker::{ChainStateViewReadGuard, Notification, Reason},
49};
50use linera_execution::{
51    committee::Committee, system::AdminOperation, Operation, Query, QueryOutcome, QueryResponse,
52    SystemOperation,
53};
54#[cfg(with_metrics)]
55use linera_metrics::monitoring_server;
56use linera_sdk::linera_base_types::BlobContent;
57use linera_storage::Storage;
58use lru::LruCache;
59use serde::{Deserialize, Serialize};
60use serde_json::json;
61use tokio::sync::mpsc::UnboundedReceiver;
62use tokio_util::sync::CancellationToken;
63use tower_http::cors::CorsLayer;
64use tracing::{debug, info, instrument, trace};
65
66use crate::util;
67
68/// A pre-serialized JSON string that implements [`OutputType`] as the `JSON` scalar.
69///
70/// When the `raw_value` feature of `async-graphql` is enabled, the string is
71/// emitted directly into the GraphQL response without any parsing or
72/// intermediate tree construction.
73#[derive(Clone)]
74struct RawJson(String);
75
76impl OutputType for RawJson {
77    fn type_name() -> Cow<'static, str> {
78        Cow::Borrowed("JSON")
79    }
80
81    fn create_type_info(registry: &mut Registry) -> String {
82        registry.create_output_type::<Self, _>(MetaTypeId::Scalar, |_| MetaType::Scalar {
83            name: "JSON".to_string(),
84            description: Some("A scalar that can represent any JSON value.".to_string()),
85            is_valid: None,
86            visible: None,
87            inaccessible: false,
88            tags: Default::default(),
89            specified_by_url: None,
90            directive_invocations: Default::default(),
91            requires_scopes: Default::default(),
92        })
93    }
94
95    async fn resolve(
96        &self,
97        _ctx: &async_graphql::ContextSelectionSet<'_>,
98        _field: &Positioned<async_graphql::parser::types::Field>,
99    ) -> async_graphql::ServerResult<async_graphql::Value> {
100        // Wrap the raw JSON string with the magic token that async-graphql's
101        // ConstValue serializer recognises (with feature `raw_value`).
102        // When the response is serialised to JSON the raw string is emitted
103        // verbatim, avoiding any parsing or tree conversion.
104        Ok(async_graphql::Value::Object(
105            std::iter::once((
106                async_graphql::Name::new(async_graphql_value::RAW_VALUE_TOKEN),
107                async_graphql::Value::String(self.0.clone()),
108            ))
109            .collect(),
110        ))
111    }
112}
113
114/// The set of chains tracked by the wallet.
115#[derive(SimpleObject, Serialize, Deserialize, Clone)]
116pub struct Chains {
117    /// The IDs of the tracked chains.
118    pub list: Vec<ChainId>,
119    /// The default chain of the wallet, if one is set.
120    pub default: Option<ChainId>,
121}
122
123/// Our root GraphQL query type.
124pub struct QueryRoot<C> {
125    context: Arc<Mutex<C>>,
126    port: NonZeroU16,
127    default_chain: Option<ChainId>,
128}
129
130/// Our root GraphQL subscription type.
131pub struct SubscriptionRoot<C> {
132    context: Arc<Mutex<C>>,
133    query_subscriptions: Option<Arc<crate::query_subscription::QuerySubscriptionManager>>,
134    cancellation_token: CancellationToken,
135}
136
137/// Our root GraphQL mutation type.
138pub struct MutationRoot<C> {
139    context: Arc<Mutex<C>>,
140}
141
142#[derive(Debug, thiserror::Error)]
143enum NodeServiceError {
144    #[error(transparent)]
145    ChainClient(#[from] chain_client::Error),
146    #[error(transparent)]
147    BcsHex(#[from] BcsHexParseError),
148    #[error(transparent)]
149    Json(#[from] serde_json::Error),
150    #[error("malformed chain ID: {0}")]
151    InvalidChainId(CryptoError),
152    #[error(transparent)]
153    Client(#[from] linera_client::Error),
154    #[error("scheduling operations from queries is disabled in read-only mode")]
155    ReadOnlyModeOperationsNotAllowed,
156}
157
158impl IntoResponse for NodeServiceError {
159    fn into_response(self) -> response::Response {
160        let status = match self {
161            NodeServiceError::InvalidChainId(_) | NodeServiceError::BcsHex(_) => {
162                StatusCode::BAD_REQUEST
163            }
164            NodeServiceError::ReadOnlyModeOperationsNotAllowed => StatusCode::FORBIDDEN,
165            _ => StatusCode::INTERNAL_SERVER_ERROR,
166        };
167        let body = json!({"error": self.to_string()}).to_string();
168        (status, body).into_response()
169    }
170}
171
172#[Subscription]
173impl<C> SubscriptionRoot<C>
174where
175    C: ClientContext + 'static,
176{
177    /// Subscribes to notifications from the specified chain.
178    async fn notifications(
179        &self,
180        chain_id: ChainId,
181    ) -> Result<impl Stream<Item = Notification>, Error> {
182        let client = self
183            .context
184            .lock()
185            .await
186            .make_chain_client(chain_id)
187            .await?;
188        Ok(client.subscribe()?)
189    }
190
191    /// Subscribes to the result of a pre-registered GraphQL query.
192    /// Re-executes the query on every new block and pushes changed results.
193    async fn query_result(
194        &self,
195        #[graphql(desc = "Name of the registered subscription query.")] name: String,
196        #[graphql(desc = "The chain to watch.")] chain_id: ChainId,
197        #[graphql(desc = "The application to query.")] application_id: ApplicationId,
198    ) -> Result<impl Stream<Item = RawJson>, Error> {
199        let manager = self
200            .query_subscriptions
201            .as_ref()
202            .ok_or_else(|| Error::new("no subscription queries registered"))?;
203
204        let key = crate::query_subscription::SubscriptionKey {
205            name,
206            chain_id,
207            application_id,
208        };
209
210        let receiver = manager
211            .subscribe(
212                &key,
213                Arc::clone(&self.context),
214                self.cancellation_token.clone(),
215            )
216            .map_err(|e| Error::new(e.to_string()))?;
217
218        // `sender.subscribe()` marks the current value as "already seen", so
219        // `WatchStream` would skip it and wait for the next change.  Grab the
220        // current snapshot first and prepend it to the stream so that every new
221        // subscriber gets the latest cached result immediately.
222        let current = receiver.borrow().clone();
223        let changes = tokio_stream::wrappers::WatchStream::from_changes(receiver)
224            .filter_map(|value| async move { value });
225        Ok(futures::stream::iter(current).chain(changes).map(RawJson))
226    }
227}
228
229impl<C> MutationRoot<C>
230where
231    C: ClientContext,
232{
233    async fn execute_system_operation(
234        &self,
235        system_operation: SystemOperation,
236        chain_id: ChainId,
237    ) -> Result<CryptoHash, Error> {
238        let certificate = self
239            .apply_client_command(&chain_id, move |client| {
240                let operation = Operation::system(system_operation.clone());
241                async move {
242                    let result = client
243                        .execute_operation(operation)
244                        .await
245                        .map_err(Error::from);
246                    (result, client)
247                }
248            })
249            .await?;
250        Ok(certificate.hash())
251    }
252
253    /// Sets the preferred owner of the chain if the current one is no longer in the owner
254    /// set and we have a key pair for exactly one of the new owners.
255    async fn maybe_auto_assign_preferred_owner(
256        &self,
257        chain_id: ChainId,
258        new_ownership: &ChainOwnership,
259    ) -> Result<(), Error> {
260        let context = self.context.lock().await;
261        let mut chain_client = context.make_chain_client(chain_id).await?;
262        context
263            .maybe_auto_assign_preferred_owner(&mut chain_client, new_ownership)
264            .await?;
265        Ok(())
266    }
267
268    /// Applies the given function to the chain client.
269    /// Updates the wallet regardless of the outcome. As long as the function returns a round
270    /// timeout, it will wait and retry.
271    async fn apply_client_command<F, Fut, T>(
272        &self,
273        chain_id: &ChainId,
274        mut f: F,
275    ) -> Result<T, Error>
276    where
277        F: FnMut(ChainClient<C::Environment>) -> Fut,
278        Fut: Future<Output = (Result<ClientOutcome<T>, Error>, ChainClient<C::Environment>)>,
279    {
280        loop {
281            let client = self
282                .context
283                .lock()
284                .await
285                .make_chain_client(*chain_id)
286                .await?;
287            let mut stream = client.subscribe()?;
288            let (result, client) = f(client).await;
289            self.context.lock().await.update_wallet(&client).await?;
290            let timeout = match result? {
291                ClientOutcome::Committed(t) => return Ok(t),
292                ClientOutcome::Conflict(certificate) => {
293                    return Err(chain_client::Error::Conflict(certificate.hash()).into());
294                }
295                ClientOutcome::WaitForTimeout(timeout) => timeout,
296            };
297            drop(client);
298            util::wait_for_next_round(&mut stream, timeout).await;
299        }
300    }
301}
302
303#[async_graphql::Object(cache_control(no_cache))]
304impl<C> MutationRoot<C>
305where
306    C: ClientContext + 'static,
307{
308    /// Processes the inbox and returns the lists of certificate hashes that were created, if any.
309    async fn process_inbox(
310        &self,
311        #[graphql(desc = "The chain whose inbox is being processed.")] chain_id: ChainId,
312    ) -> Result<Vec<CryptoHash>, Error> {
313        let mut hashes = Vec::new();
314        loop {
315            let client = self
316                .context
317                .lock()
318                .await
319                .make_chain_client(chain_id)
320                .await?;
321            let result = client.process_inbox().await;
322            self.context.lock().await.update_wallet(&client).await?;
323            let (certificates, maybe_timeout) = result?;
324            hashes.extend(certificates.into_iter().map(|cert| cert.hash()));
325            match maybe_timeout {
326                None => return Ok(hashes),
327                Some(timestamp) => {
328                    let mut stream = client.subscribe()?;
329                    drop(client);
330                    util::wait_for_next_round(&mut stream, timestamp).await;
331                }
332            }
333        }
334    }
335
336    /// Synchronizes the chain with the validators. Returns the chain's length.
337    ///
338    /// This is only used for testing, to make sure that a client is up to date.
339    // TODO(#4718): Remove this mutation.
340    async fn sync(
341        &self,
342        #[graphql(desc = "The chain being synchronized.")] chain_id: ChainId,
343    ) -> Result<u64, Error> {
344        let client = self
345            .context
346            .lock()
347            .await
348            .make_chain_client(chain_id)
349            .await?;
350        let info = client.synchronize_from_validators().await?;
351        self.context.lock().await.update_wallet(&client).await?;
352        Ok(info.next_block_height.0)
353    }
354
355    /// Retries the pending block that was unsuccessfully proposed earlier.
356    async fn retry_pending_block(
357        &self,
358        #[graphql(desc = "The chain on whose block is being retried.")] chain_id: ChainId,
359    ) -> Result<Option<CryptoHash>, Error> {
360        let client = self
361            .context
362            .lock()
363            .await
364            .make_chain_client(chain_id)
365            .await?;
366        let outcome = client.process_pending_block().await?;
367        self.context.lock().await.update_wallet(&client).await?;
368        match outcome {
369            ClientOutcome::Committed(Some(certificate)) => Ok(Some(certificate.hash())),
370            ClientOutcome::Committed(None) => Ok(None),
371            ClientOutcome::WaitForTimeout(timeout) => Err(Error::from(format!(
372                "Please try again at {}",
373                timeout.timestamp
374            ))),
375            ClientOutcome::Conflict(certificate) => Err(Error::from(format!(
376                "A different block was committed: {}",
377                certificate.hash()
378            ))),
379        }
380    }
381
382    /// Transfers `amount` units of value from the given owner's account to the recipient.
383    /// If no owner is given, try to take the units out of the chain account.
384    async fn transfer(
385        &self,
386        #[graphql(desc = "The chain which native tokens are being transferred from.")]
387        chain_id: ChainId,
388        #[graphql(desc = "The account being debited on the chain.")] owner: AccountOwner,
389        #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
390        #[graphql(desc = "The amount being transferred.")] amount: Amount,
391    ) -> Result<CryptoHash, Error> {
392        self.apply_client_command(&chain_id, move |client| async move {
393            let result = client
394                .transfer(owner, amount, recipient)
395                .await
396                .map_err(Error::from)
397                .map(|outcome| outcome.map(|certificate| certificate.hash()));
398            (result, client)
399        })
400        .await
401    }
402
403    /// Claims `amount` units of value from the given owner's account in the remote
404    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
405    /// process the message.
406    async fn claim(
407        &self,
408        #[graphql(desc = "The chain for whom owner is one of the owner.")] chain_id: ChainId,
409        #[graphql(desc = "The owner of chain targetId being debited.")] owner: AccountOwner,
410        #[graphql(desc = "The chain whose owner is being debited.")] target_id: ChainId,
411        #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
412        #[graphql(desc = "The amount being transferred.")] amount: Amount,
413    ) -> Result<CryptoHash, Error> {
414        self.apply_client_command(&chain_id, move |client| async move {
415            let result = client
416                .claim(owner, target_id, recipient, amount)
417                .await
418                .map_err(Error::from)
419                .map(|outcome| outcome.map(|certificate| certificate.hash()));
420            (result, client)
421        })
422        .await
423    }
424
425    /// Test if a data blob is readable from a transaction in the current chain.
426    // TODO(#2490): Consider removing or renaming this.
427    async fn read_data_blob(
428        &self,
429        chain_id: ChainId,
430        hash: CryptoHash,
431    ) -> Result<CryptoHash, Error> {
432        self.apply_client_command(&chain_id, move |client| async move {
433            let result = client
434                .read_data_blob(hash)
435                .await
436                .map_err(Error::from)
437                .map(|outcome| outcome.map(|certificate| certificate.hash()));
438            (result, client)
439        })
440        .await
441    }
442
443    /// Creates a new single-owner chain.
444    async fn open_chain(
445        &self,
446        #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
447        #[graphql(desc = "The owner of the new chain.")] owner: AccountOwner,
448        #[graphql(desc = "The balance of the chain being created. Zero if `None`.")]
449        balance: Option<Amount>,
450    ) -> Result<ChainId, Error> {
451        let ownership = ChainOwnership::single(owner);
452        let balance = balance.unwrap_or(Amount::ZERO);
453        let description = self
454            .apply_client_command(&chain_id, move |client| {
455                let ownership = ownership.clone();
456                async move {
457                    let result = client
458                        .open_chain(ownership, ApplicationPermissions::default(), balance)
459                        .await
460                        .map_err(Error::from)
461                        .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
462                    (result, client)
463                }
464            })
465            .await?;
466        Ok(description.id())
467    }
468
469    /// Creates a new multi-owner chain.
470    #[expect(clippy::too_many_arguments)]
471    async fn open_multi_owner_chain(
472        &self,
473        #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
474        #[graphql(desc = "Permissions for applications on the new chain")]
475        application_permissions: Option<ApplicationPermissions>,
476        #[graphql(desc = "The owners of the chain")] owners: Vec<AccountOwner>,
477        #[graphql(desc = "The weights of the owners")] weights: Option<Vec<u64>>,
478        #[graphql(desc = "The number of multi-leader rounds")] multi_leader_rounds: Option<u32>,
479        #[graphql(desc = "The balance of the chain. Zero if `None`")] balance: Option<Amount>,
480        #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
481        fast_round_ms: Option<u64>,
482        #[graphql(
483            desc = "The duration of the first single-leader and all multi-leader rounds",
484            default = 10_000
485        )]
486        base_timeout_ms: u64,
487        #[graphql(
488            desc = "The number of milliseconds by which the timeout increases after each \
489                    single-leader round",
490            default = 1_000
491        )]
492        timeout_increment_ms: u64,
493        #[graphql(
494            desc = "The age of an incoming tracked or protected message after which the \
495                    validators start transitioning the chain to fallback mode, in milliseconds.",
496            default = 86_400_000
497        )]
498        fallback_duration_ms: u64,
499    ) -> Result<ChainId, Error> {
500        let owners = if let Some(weights) = weights {
501            if weights.len() != owners.len() {
502                return Err(Error::new(format!(
503                    "There are {} owners but {} weights.",
504                    owners.len(),
505                    weights.len()
506                )));
507            }
508            owners.into_iter().zip(weights).collect::<Vec<_>>()
509        } else {
510            owners
511                .into_iter()
512                .zip(iter::repeat(100))
513                .collect::<Vec<_>>()
514        };
515        let multi_leader_rounds = multi_leader_rounds.unwrap_or(u32::MAX);
516        let timeout_config = TimeoutConfig {
517            fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
518            base_timeout: TimeDelta::from_millis(base_timeout_ms),
519            timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
520            fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
521        };
522        let ownership = ChainOwnership::multiple(owners, multi_leader_rounds, timeout_config);
523        let balance = balance.unwrap_or(Amount::ZERO);
524        let description = self
525            .apply_client_command(&chain_id, move |client| {
526                let ownership = ownership.clone();
527                let application_permissions = application_permissions.clone().unwrap_or_default();
528                async move {
529                    let result = client
530                        .open_chain(ownership, application_permissions, balance)
531                        .await
532                        .map_err(Error::from)
533                        .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
534                    (result, client)
535                }
536            })
537            .await?;
538        Ok(description.id())
539    }
540
541    /// Closes the chain. Returns the new block hash if successful or `None` if it was already closed.
542    async fn close_chain(
543        &self,
544        #[graphql(desc = "The chain being closed.")] chain_id: ChainId,
545    ) -> Result<Option<CryptoHash>, Error> {
546        let maybe_cert = self
547            .apply_client_command(&chain_id, |client| async move {
548                let result = client.close_chain().await.map_err(Error::from);
549                (result, client)
550            })
551            .await?;
552        Ok(maybe_cert.as_ref().map(GenericCertificate::hash))
553    }
554
555    /// Changes the chain to a single-owner chain
556    async fn change_owner(
557        &self,
558        #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
559        #[graphql(desc = "The new single owner of the chain")] new_owner: AccountOwner,
560    ) -> Result<CryptoHash, Error> {
561        let new_ownership = ChainOwnership::single_super(new_owner);
562        let operation = SystemOperation::ChangeOwnership {
563            super_owners: vec![new_owner],
564            owners: Vec::new(),
565            first_leader: None,
566            multi_leader_rounds: 5,
567            open_multi_leader_rounds: false,
568            timeout_config: TimeoutConfig::default(),
569        };
570        let hash = self.execute_system_operation(operation, chain_id).await?;
571        self.maybe_auto_assign_preferred_owner(chain_id, &new_ownership)
572            .await?;
573        Ok(hash)
574    }
575
576    /// Changes the ownership of the chain
577    #[expect(clippy::too_many_arguments)]
578    async fn change_multiple_owners(
579        &self,
580        #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
581        #[graphql(desc = "The new list of owners of the chain")] new_owners: Vec<AccountOwner>,
582        #[graphql(desc = "The new list of weights of the owners")] new_weights: Vec<u64>,
583        #[graphql(desc = "The multi-leader round of the chain")] multi_leader_rounds: u32,
584        #[graphql(
585            desc = "Whether multi-leader rounds are unrestricted, that is not limited to chain owners."
586        )]
587        open_multi_leader_rounds: bool,
588        #[graphql(desc = "The leader of the first single-leader round. \
589                          If not set, this is random like other rounds.")]
590        first_leader: Option<AccountOwner>,
591        #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
592        fast_round_ms: Option<u64>,
593        #[graphql(
594            desc = "The duration of the first single-leader and all multi-leader rounds",
595            default = 10_000
596        )]
597        base_timeout_ms: u64,
598        #[graphql(
599            desc = "The number of milliseconds by which the timeout increases after each \
600                    single-leader round",
601            default = 1_000
602        )]
603        timeout_increment_ms: u64,
604        #[graphql(
605            desc = "The age of an incoming tracked or protected message after which the \
606                    validators start transitioning the chain to fallback mode, in milliseconds.",
607            default = 86_400_000
608        )]
609        fallback_duration_ms: u64,
610    ) -> Result<CryptoHash, Error> {
611        let timeout_config = TimeoutConfig {
612            fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
613            base_timeout: TimeDelta::from_millis(base_timeout_ms),
614            timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
615            fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
616        };
617        let owners = new_owners.into_iter().zip(new_weights).collect::<Vec<_>>();
618        let new_ownership = ChainOwnership {
619            super_owners: BTreeSet::new(),
620            owners: owners.iter().cloned().collect(),
621            first_leader,
622            multi_leader_rounds,
623            open_multi_leader_rounds,
624            timeout_config: timeout_config.clone(),
625        };
626        let operation = SystemOperation::ChangeOwnership {
627            super_owners: Vec::new(),
628            owners,
629            first_leader,
630            multi_leader_rounds,
631            open_multi_leader_rounds,
632            timeout_config,
633        };
634        let hash = self.execute_system_operation(operation, chain_id).await?;
635        self.maybe_auto_assign_preferred_owner(chain_id, &new_ownership)
636            .await?;
637        Ok(hash)
638    }
639
640    /// Changes the application permissions configuration on this chain.
641    #[expect(clippy::too_many_arguments)]
642    async fn change_application_permissions(
643        &self,
644        #[graphql(desc = "The chain whose permissions are being changed")] chain_id: ChainId,
645        #[graphql(
646            desc = "These applications are allowed to manage the chain: close it, change \
647                    application permissions, and change ownership."
648        )]
649        manage_chain: Vec<ApplicationId>,
650        #[graphql(
651            desc = "If this is `None`, all system operations and application operations are allowed.
652If it is `Some`, only operations from the specified applications are allowed,
653and no system operations."
654        )]
655        execute_operations: Option<Vec<ApplicationId>>,
656        #[graphql(
657            desc = "At least one operation or incoming message from each of these applications must occur in every block."
658        )]
659        mandatory_applications: Vec<ApplicationId>,
660        #[graphql(
661            desc = "These applications are allowed to perform calls to services as oracles."
662        )]
663        call_service_as_oracle: Option<Vec<ApplicationId>>,
664        #[graphql(desc = "These applications are allowed to perform HTTP requests.")]
665        make_http_requests: Option<Vec<ApplicationId>>,
666    ) -> Result<CryptoHash, Error> {
667        let operation = SystemOperation::ChangeApplicationPermissions(ApplicationPermissions {
668            execute_operations,
669            mandatory_applications,
670            manage_chain,
671            call_service_as_oracle,
672            make_http_requests,
673        });
674        self.execute_system_operation(operation, chain_id).await
675    }
676
677    /// (admin chain only) Registers a new committee. This will notify the subscribers of
678    /// the admin chain so that they can migrate to the new epoch (by accepting the
679    /// notification as an "incoming message" in a next block).
680    async fn create_committee(
681        &self,
682        chain_id: ChainId,
683        committee: Committee,
684    ) -> Result<CryptoHash, Error> {
685        Ok(self
686            .apply_client_command(&chain_id, move |client| {
687                let committee = committee.clone();
688                async move {
689                    let result = client
690                        .stage_new_committee(committee)
691                        .await
692                        .map_err(Error::from);
693                    (result, client)
694                }
695            })
696            .await?
697            .hash())
698    }
699
700    /// (admin chain only) Removes a committee. Once this message is accepted by a chain,
701    /// blocks from the retired epoch will not be accepted until they are followed (hence
702    /// re-certified) by a block certified by a recent committee.
703    async fn remove_committee(&self, chain_id: ChainId, epoch: Epoch) -> Result<CryptoHash, Error> {
704        let operation = SystemOperation::Admin(AdminOperation::RemoveCommittee { epoch });
705        self.execute_system_operation(operation, chain_id).await
706    }
707
708    /// Publishes a new application module, optionally along with a JSON-encoded
709    /// `Formats` description that becomes a third blob alongside the contract
710    /// and service blobs.
711    async fn publish_module(
712        &self,
713        #[graphql(desc = "The chain publishing the module")] chain_id: ChainId,
714        #[graphql(desc = "The bytecode of the contract code")] contract: Bytecode,
715        #[graphql(desc = "The bytecode of the service code (only relevant for WebAssembly)")]
716        service: Bytecode,
717        #[graphql(desc = "The virtual machine being used (either Wasm or Evm)")]
718        vm_runtime: VmRuntime,
719        #[graphql(desc = "Optional JSON-encoded `Formats` description bytes")] formats: Option<
720            Vec<u8>,
721        >,
722    ) -> Result<ModuleId, Error> {
723        self.apply_client_command(&chain_id, move |client| {
724            let contract = contract.clone();
725            let service = service.clone();
726            let formats = formats.clone();
727            async move {
728                let result = client
729                    .publish_module(contract, service, vm_runtime, formats)
730                    .await
731                    .map_err(Error::from)
732                    .map(|outcome| outcome.map(|(module_id, _)| module_id));
733                (result, client)
734            }
735        })
736        .await
737    }
738
739    /// Publishes a new data blob.
740    async fn publish_data_blob(
741        &self,
742        #[graphql(desc = "The chain paying for the blob publication")] chain_id: ChainId,
743        #[graphql(desc = "The content of the data blob being created")] bytes: Vec<u8>,
744    ) -> Result<CryptoHash, Error> {
745        self.apply_client_command(&chain_id, |client| {
746            let bytes = bytes.clone();
747            async move {
748                let result = client.publish_data_blob(bytes).await.map_err(Error::from);
749                (result, client)
750            }
751        })
752        .await
753        .map(|_| CryptoHash::new(&BlobContent::new_data(bytes)))
754    }
755
756    /// Creates a new application.
757    async fn create_application(
758        &self,
759        #[graphql(desc = "The chain paying for the creation of the application")] chain_id: ChainId,
760        #[graphql(desc = "The module ID of the application being created")] module_id: ModuleId,
761        #[graphql(desc = "The JSON serialization of the parameters of the application")]
762        parameters: String,
763        #[graphql(
764            desc = "The JSON serialization of the instantiation argument of the application"
765        )]
766        instantiation_argument: String,
767        #[graphql(desc = "The dependencies of the application being created")]
768        required_application_ids: Vec<ApplicationId>,
769    ) -> Result<ApplicationId, Error> {
770        self.apply_client_command(&chain_id, move |client| {
771            let parameters = parameters.as_bytes().to_vec();
772            let instantiation_argument = instantiation_argument.as_bytes().to_vec();
773            let required_application_ids = required_application_ids.clone();
774            async move {
775                let result = client
776                    .create_application_untyped(
777                        module_id,
778                        parameters,
779                        instantiation_argument,
780                        required_application_ids,
781                    )
782                    .await
783                    .map_err(Error::from)
784                    .map(|outcome| outcome.map(|(application_id, _)| application_id));
785                (result, client)
786            }
787        })
788        .await
789    }
790}
791
792#[async_graphql::Object(cache_control(no_cache))]
793impl<C> QueryRoot<C>
794where
795    C: ClientContext + 'static,
796{
797    async fn chain(
798        &self,
799        chain_id: ChainId,
800    ) -> Result<ChainStateExtendedView<<C::Environment as linera_core::Environment>::Storage>, Error>
801    {
802        let client = self
803            .context
804            .lock()
805            .await
806            .make_chain_client(chain_id)
807            .await?;
808        let view = client.chain_state_view().await?;
809        Ok(ChainStateExtendedView::new(view))
810    }
811
812    async fn applications(&self, chain_id: ChainId) -> Result<Vec<ApplicationOverview>, Error> {
813        let client = self
814            .context
815            .lock()
816            .await
817            .make_chain_client(chain_id)
818            .await?;
819        let applications = client
820            .chain_state_view()
821            .await?
822            .execution_state
823            .list_applications()
824            .await?;
825
826        let overviews = applications
827            .into_iter()
828            .map(|(id, description)| ApplicationOverview::new(id, description, self.port, chain_id))
829            .collect();
830
831        Ok(overviews)
832    }
833
834    async fn chains(&self) -> Result<Chains, Error> {
835        Ok(Chains {
836            list: self
837                .context
838                .lock()
839                .await
840                .wallet()
841                .chain_ids()
842                .try_collect()
843                .await?,
844            default: self.default_chain,
845        })
846    }
847
848    async fn block(
849        &self,
850        hash: Option<CryptoHash>,
851        chain_id: ChainId,
852    ) -> Result<Option<Arc<ConfirmedBlock>>, Error> {
853        let client = self
854            .context
855            .lock()
856            .await
857            .make_chain_client(chain_id)
858            .await?;
859        let hash = match hash {
860            Some(hash) => Some(hash),
861            None => client.chain_info().await?.block_hash,
862        };
863        if let Some(hash) = hash {
864            Ok(Some(client.read_confirmed_block(hash).await?))
865        } else {
866            Ok(None)
867        }
868    }
869
870    async fn events_from_index(
871        &self,
872        chain_id: ChainId,
873        stream_id: StreamId,
874        start_index: u32,
875    ) -> Result<Vec<IndexAndEvent>, Error> {
876        Ok(self
877            .context
878            .lock()
879            .await
880            .make_chain_client(chain_id)
881            .await?
882            .events_from_index(stream_id, start_index)
883            .await?)
884    }
885
886    async fn blocks(
887        &self,
888        from: Option<CryptoHash>,
889        chain_id: ChainId,
890        limit: Option<u32>,
891    ) -> Result<Vec<Arc<ConfirmedBlock>>, Error> {
892        let client = self
893            .context
894            .lock()
895            .await
896            .make_chain_client(chain_id)
897            .await?;
898        let limit = limit.unwrap_or(10);
899        let from = match from {
900            Some(from) => Some(from),
901            None => client.chain_info().await?.block_hash,
902        };
903        let Some(from) = from else {
904            return Ok(vec![]);
905        };
906        let mut hash = Some(from);
907        let mut values = Vec::new();
908        for _ in 0..limit {
909            let Some(next_hash) = hash else {
910                break;
911            };
912            let value = client.read_confirmed_block(next_hash).await?;
913            hash = value.block().header.previous_block_hash;
914            values.push(value);
915        }
916        Ok(values)
917    }
918
919    /// Returns the version information on this node service.
920    async fn version(&self) -> linera_version::VersionInfo {
921        linera_version::VersionInfo::default()
922    }
923
924    /// Returns the bytes of an application formats blob (JSON-encoded `Formats`)
925    /// stored in the local node, given the formats blob hash carried by a
926    /// `ModuleId`. Returns `None` if the blob is not present locally.
927    async fn application_formats(
928        &self,
929        chain_id: ChainId,
930        formats_blob_hash: CryptoHash,
931    ) -> Result<Option<Vec<u8>>, Error> {
932        let client = self
933            .context
934            .lock()
935            .await
936            .make_chain_client(chain_id)
937            .await?;
938        let blob_id = linera_base::identifiers::BlobId::new(
939            formats_blob_hash,
940            linera_base::identifiers::BlobType::ApplicationFormats,
941        );
942        let blob = client.storage_client().read_blob(blob_id).await?;
943        Ok(blob.map(|b| b.bytes().to_vec()))
944    }
945}
946
947// What follows is a hack to add a chain_id field to `ChainStateView` based on
948// https://async-graphql.github.io/async-graphql/en/merging_objects.html
949
950struct ChainStateViewExtension(ChainId);
951
952#[async_graphql::Object(cache_control(no_cache))]
953impl ChainStateViewExtension {
954    async fn chain_id(&self) -> ChainId {
955        self.0
956    }
957}
958
959#[derive(MergedObject)]
960struct ChainStateExtendedView<S: Storage>(ChainStateViewExtension, ReadOnlyChainStateView<S>);
961
962/// A wrapper type that allows proxying GraphQL queries to a [`ChainStateView`] that's behind
963/// a read guard.
964pub struct ReadOnlyChainStateView<S: Storage>(ChainStateViewReadGuard<S>);
965
966impl<S: Storage> ContainerType for ReadOnlyChainStateView<S>
967where
968    ChainStateView<S::Context>: ContainerType,
969{
970    async fn resolve_field(
971        &self,
972        context: &async_graphql::Context<'_>,
973    ) -> async_graphql::ServerResult<Option<async_graphql::Value>> {
974        self.0.resolve_field(context).await
975    }
976}
977
978impl<S: Storage> OutputType for ReadOnlyChainStateView<S>
979where
980    ChainStateView<S::Context>: OutputType,
981{
982    fn type_name() -> Cow<'static, str> {
983        ChainStateView::<S::Context>::type_name()
984    }
985
986    fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
987        ChainStateView::<S::Context>::create_type_info(registry)
988    }
989
990    async fn resolve(
991        &self,
992        context: &async_graphql::ContextSelectionSet<'_>,
993        field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
994    ) -> async_graphql::ServerResult<async_graphql::Value> {
995        self.0.resolve(context, field).await
996    }
997}
998
999impl<S: Storage> ChainStateExtendedView<S> {
1000    fn new(view: ChainStateViewReadGuard<S>) -> Self {
1001        Self(
1002            ChainStateViewExtension(view.chain_id()),
1003            ReadOnlyChainStateView(view),
1004        )
1005    }
1006}
1007
1008/// A summary of an application registered on a chain.
1009#[derive(SimpleObject)]
1010pub struct ApplicationOverview {
1011    id: ApplicationId,
1012    description: ApplicationDescription,
1013    link: String,
1014}
1015
1016impl ApplicationOverview {
1017    fn new(
1018        id: ApplicationId,
1019        description: ApplicationDescription,
1020        port: NonZeroU16,
1021        chain_id: ChainId,
1022    ) -> Self {
1023        Self {
1024            id,
1025            description,
1026            link: format!(
1027                "http://localhost:{}/chains/{}/applications/{}",
1028                port.get(),
1029                chain_id,
1030                id
1031            ),
1032        }
1033    }
1034}
1035
1036/// Schema type that can be either full (with mutations) or read-only.
1037pub enum NodeServiceSchema<C>
1038where
1039    C: ClientContext + 'static,
1040{
1041    /// Full schema with mutations enabled.
1042    Full(Schema<QueryRoot<C>, MutationRoot<C>, SubscriptionRoot<C>>),
1043    /// Read-only schema with mutations disabled.
1044    ReadOnly(Schema<QueryRoot<C>, EmptyMutation, SubscriptionRoot<C>>),
1045}
1046
1047impl<C> NodeServiceSchema<C>
1048where
1049    C: ClientContext,
1050{
1051    /// Executes a GraphQL request.
1052    pub async fn execute(&self, request: impl Into<Request>) -> Response {
1053        match self {
1054            Self::Full(schema) => schema.execute(request).await,
1055            Self::ReadOnly(schema) => schema.execute(request).await,
1056        }
1057    }
1058
1059    /// Returns the SDL (Schema Definition Language) representation.
1060    pub fn sdl(&self) -> String {
1061        match self {
1062            Self::Full(schema) => schema.sdl(),
1063            Self::ReadOnly(schema) => schema.sdl(),
1064        }
1065    }
1066}
1067
1068impl<C> Clone for NodeServiceSchema<C>
1069where
1070    C: ClientContext,
1071{
1072    fn clone(&self) -> Self {
1073        match self {
1074            Self::Full(schema) => Self::Full(schema.clone()),
1075            Self::ReadOnly(schema) => Self::ReadOnly(schema.clone()),
1076        }
1077    }
1078}
1079
1080#[cfg(with_metrics)]
1081mod query_cache_metrics {
1082    use std::sync::LazyLock;
1083
1084    use linera_base::prometheus_util::{register_int_counter_vec, register_int_gauge};
1085    use prometheus::{IntCounterVec, IntGauge};
1086
1087    pub static QUERY_CACHE_HIT: LazyLock<IntCounterVec> = LazyLock::new(|| {
1088        register_int_counter_vec("query_response_cache_hit", "Query response cache hits", &[])
1089    });
1090
1091    pub static QUERY_CACHE_MISS: LazyLock<IntCounterVec> = LazyLock::new(|| {
1092        register_int_counter_vec(
1093            "query_response_cache_miss",
1094            "Query response cache misses",
1095            &[],
1096        )
1097    });
1098
1099    pub static QUERY_CACHE_INVALIDATION: LazyLock<IntCounterVec> = LazyLock::new(|| {
1100        register_int_counter_vec(
1101            "query_response_cache_invalidation",
1102            "Query response cache invalidations (per chain)",
1103            &[],
1104        )
1105    });
1106
1107    pub static QUERY_CACHE_ENTRIES: LazyLock<IntGauge> = LazyLock::new(|| {
1108        register_int_gauge(
1109            "query_response_cache_entries",
1110            "Current number of cached query responses across all chains",
1111        )
1112    });
1113}
1114
1115/// Per-chain cache state: an LRU map plus the `next_block_height` at the time the
1116/// cache was last invalidated. Both are behind the same mutex.
1117struct PerChainCache {
1118    lru: LruCache<(ApplicationId, Vec<u8>), Vec<u8>>,
1119    next_block_height: BlockHeight,
1120}
1121
1122/// An LRU cache for application query responses, keyed per chain.
1123///
1124/// Caches serialized response bytes keyed on `(chain_id, application_id, request_bytes)`.
1125/// The entire per-chain cache is invalidated when a `NewBlock` notification arrives.
1126///
1127/// To prevent a race where a slow query inserts stale data *after* an invalidation,
1128/// each insert carries the chain's `next_block_height` at query time.
1129/// If a newer block has since been processed, the insert is silently dropped.
1130struct QueryResponseCache {
1131    chains: papaya::HashMap<ChainId, StdMutex<PerChainCache>>,
1132    /// Chains for which we have registered a notification subscription.
1133    subscribed: papaya::HashSet<ChainId>,
1134    /// Sender half of the notification channel, used to subscribe new chains lazily.
1135    notification_sender: StdMutex<Option<tokio::sync::mpsc::UnboundedSender<Notification>>>,
1136    capacity_per_chain: std::num::NonZeroUsize,
1137}
1138
1139impl QueryResponseCache {
1140    fn new(capacity_per_chain: usize) -> Self {
1141        Self {
1142            chains: papaya::HashMap::new(),
1143            subscribed: papaya::HashSet::new(),
1144            notification_sender: StdMutex::new(None),
1145            capacity_per_chain: std::num::NonZeroUsize::new(capacity_per_chain)
1146                .expect("capacity must be > 0"),
1147        }
1148    }
1149
1150    /// Stores the notification sender (called once during startup).
1151    fn set_notification_sender(&self, sender: tokio::sync::mpsc::UnboundedSender<Notification>) {
1152        *self
1153            .notification_sender
1154            .lock()
1155            .expect("sender mutex poisoned") = Some(sender);
1156    }
1157
1158    /// Returns the notification sender, if set.
1159    fn notification_sender(&self) -> Option<tokio::sync::mpsc::UnboundedSender<Notification>> {
1160        self.notification_sender
1161            .lock()
1162            .expect("sender mutex poisoned")
1163            .clone()
1164    }
1165
1166    /// Marks a chain as subscribed to notifications.
1167    fn mark_subscribed(&self, chain_id: ChainId) {
1168        self.subscribed.pin().insert(chain_id);
1169    }
1170
1171    /// Returns `true` if the chain is not yet subscribed to notifications.
1172    fn needs_subscription(&self, chain_id: &ChainId) -> bool {
1173        !self.subscribed.pin().contains(chain_id)
1174    }
1175
1176    /// Marks initial chains as subscribed (called during startup).
1177    fn mark_all_subscribed(&self, chain_ids: &[ChainId]) {
1178        let pinned = self.subscribed.pin();
1179        for &chain_id in chain_ids {
1180            pinned.insert(chain_id);
1181        }
1182    }
1183
1184    /// Looks up a cached response. Returns `Some(bytes)` on hit, `None` on miss
1185    /// (including when the chain has no cache entry yet).
1186    fn get(&self, chain_id: ChainId, app_id: &ApplicationId, request: &[u8]) -> Option<Vec<u8>> {
1187        let pinned = self.chains.pin();
1188        let result = pinned.get(&chain_id).and_then(|mutex| {
1189            mutex
1190                .lock()
1191                .expect("LRU mutex poisoned")
1192                .lru
1193                .get(&(*app_id, request.to_vec()))
1194                .cloned()
1195        });
1196        #[cfg(with_metrics)]
1197        {
1198            let metric = if result.is_some() {
1199                &query_cache_metrics::QUERY_CACHE_HIT
1200            } else {
1201                &query_cache_metrics::QUERY_CACHE_MISS
1202            };
1203            metric.with_label_values(&[]).inc();
1204        }
1205        result
1206    }
1207
1208    /// Inserts a response into the cache, unless the chain's `next_block_height` has
1209    /// advanced past the caller's snapshot (which would mean a new block arrived and
1210    /// this response is potentially stale).
1211    fn insert(
1212        &self,
1213        chain_id: ChainId,
1214        app_id: ApplicationId,
1215        request: Vec<u8>,
1216        response: Vec<u8>,
1217        next_block_height: BlockHeight,
1218    ) {
1219        let pinned = self.chains.pin();
1220        let capacity = self.capacity_per_chain;
1221        let mutex = pinned.get_or_insert_with(chain_id, || {
1222            StdMutex::new(PerChainCache {
1223                lru: LruCache::new(capacity),
1224                next_block_height,
1225            })
1226        });
1227        let mut cache = mutex.lock().expect("LRU mutex poisoned");
1228        if next_block_height < cache.next_block_height {
1229            return; // A new block arrived since this query started; discard stale response.
1230        }
1231        // If the chain has advanced since the last cache update, also clear stale entries.
1232        // Note: This should not happen if notifications are timely. Also, this only
1233        // works when we have a cache miss.
1234        if next_block_height > cache.next_block_height {
1235            debug!(
1236                "Unexpected query cache invalidation for chain {chain_id}:\
1237                 {next_block_height} > {}",
1238                cache.next_block_height
1239            );
1240            #[cfg(with_metrics)]
1241            {
1242                #[expect(
1243                    clippy::cast_possible_wrap,
1244                    reason = "LRU cache size fits in i64 for any realistic cache"
1245                )]
1246                let cache_len = cache.lru.len() as i64;
1247                query_cache_metrics::QUERY_CACHE_ENTRIES.sub(cache_len);
1248                query_cache_metrics::QUERY_CACHE_INVALIDATION
1249                    .with_label_values(&[])
1250                    .inc();
1251            }
1252            cache.lru.clear();
1253            cache.next_block_height = next_block_height;
1254        }
1255        #[cfg(with_metrics)]
1256        let prev_len = cache.lru.len();
1257        cache.lru.put((app_id, request), response);
1258        #[cfg(with_metrics)]
1259        if cache.lru.len() != prev_len {
1260            query_cache_metrics::QUERY_CACHE_ENTRIES.inc();
1261        }
1262    }
1263
1264    /// Called when a `NewBlock` notification arrives. Records the new
1265    /// `next_block_height` and clears all cached responses for the chain.
1266    fn invalidate_chain(&self, chain_id: &ChainId, next_block_height: BlockHeight) {
1267        let pinned = self.chains.pin();
1268        let capacity = self.capacity_per_chain;
1269        let mutex = pinned.get_or_insert_with(*chain_id, || {
1270            StdMutex::new(PerChainCache {
1271                lru: LruCache::new(capacity),
1272                next_block_height,
1273            })
1274        });
1275        let mut cache = mutex.lock().expect("LRU mutex poisoned");
1276        if next_block_height > cache.next_block_height {
1277            #[cfg(with_metrics)]
1278            {
1279                #[expect(
1280                    clippy::cast_possible_wrap,
1281                    reason = "LRU cache size fits in i64 for any realistic cache"
1282                )]
1283                let cache_len = cache.lru.len() as i64;
1284                query_cache_metrics::QUERY_CACHE_ENTRIES.sub(cache_len);
1285                query_cache_metrics::QUERY_CACHE_INVALIDATION
1286                    .with_label_values(&[])
1287                    .inc();
1288            }
1289            cache.lru.clear();
1290            cache.next_block_height = next_block_height;
1291        } else {
1292            debug!(
1293                "Query cache for chain {chain_id} was already invalidated:\
1294                 {next_block_height} <= {}",
1295                cache.next_block_height
1296            );
1297        }
1298    }
1299}
1300
1301/// The `NodeService` is a server that exposes a web-server to the client.
1302/// The node service is primarily used to explore the state of a chain in GraphQL.
1303pub struct NodeService<C>
1304where
1305    C: ClientContext + 'static,
1306{
1307    config: ChainListenerConfig,
1308    port: NonZeroU16,
1309    #[cfg(with_metrics)]
1310    metrics_port: NonZeroU16,
1311    default_chain: Option<ChainId>,
1312    context: Arc<Mutex<C>>,
1313    /// If true, disallow mutations and prevent queries from scheduling operations.
1314    read_only: bool,
1315    /// Optional LRU cache for application query responses. `None` when caching is disabled.
1316    query_cache: Option<Arc<QueryResponseCache>>,
1317    query_subscriptions: Option<Arc<crate::query_subscription::QuerySubscriptionManager>>,
1318    cancellation_token: CancellationToken,
1319    enable_memory_profiling: bool,
1320    /// If true, do not start the chain listener; serve queries from local state only.
1321    pause: bool,
1322}
1323
1324impl<C> Clone for NodeService<C>
1325where
1326    C: ClientContext + 'static,
1327{
1328    fn clone(&self) -> Self {
1329        Self {
1330            config: self.config.clone(),
1331            port: self.port,
1332            #[cfg(with_metrics)]
1333            metrics_port: self.metrics_port,
1334            default_chain: self.default_chain,
1335            context: Arc::clone(&self.context),
1336            read_only: self.read_only,
1337            query_cache: self.query_cache.clone(),
1338            query_subscriptions: self.query_subscriptions.clone(),
1339            cancellation_token: self.cancellation_token.clone(),
1340            enable_memory_profiling: self.enable_memory_profiling,
1341            pause: self.pause,
1342        }
1343    }
1344}
1345
1346impl<C> NodeService<C>
1347where
1348    C: ClientContext,
1349{
1350    /// Creates a new instance of the node service given a client chain and a port.
1351    ///
1352    /// `query_cache_size` controls the per-chain LRU cache capacity for application query
1353    /// responses. Pass `None` to disable the cache (the default). Enable with
1354    /// `--query-cache-size <N>`. Incompatible with `--long-lived-services`.
1355    #[expect(clippy::too_many_arguments)]
1356    pub fn new(
1357        config: ChainListenerConfig,
1358        port: NonZeroU16,
1359        #[cfg(with_metrics)] metrics_port: NonZeroU16,
1360        default_chain: Option<ChainId>,
1361        context: Arc<Mutex<C>>,
1362        read_only: bool,
1363        query_cache_size: Option<usize>,
1364        query_subscriptions: Option<Arc<crate::query_subscription::QuerySubscriptionManager>>,
1365        cancellation_token: CancellationToken,
1366        enable_memory_profiling: bool,
1367        pause: bool,
1368    ) -> Self {
1369        let query_cache = query_cache_size.map(|size| Arc::new(QueryResponseCache::new(size)));
1370        Self {
1371            config,
1372            port,
1373            #[cfg(with_metrics)]
1374            metrics_port,
1375            default_chain,
1376            context,
1377            read_only,
1378            query_cache,
1379            query_subscriptions,
1380            cancellation_token,
1381            enable_memory_profiling,
1382            pause,
1383        }
1384    }
1385
1386    /// Returns the socket address on which the metrics endpoint is served.
1387    #[cfg(with_metrics)]
1388    pub fn metrics_address(&self) -> SocketAddr {
1389        SocketAddr::from(([0, 0, 0, 0], self.metrics_port.get()))
1390    }
1391
1392    /// Builds the GraphQL schema served by the node service.
1393    pub fn schema(&self) -> NodeServiceSchema<C> {
1394        let query = QueryRoot {
1395            context: Arc::clone(&self.context),
1396            port: self.port,
1397            default_chain: self.default_chain,
1398        };
1399        let subscription = SubscriptionRoot {
1400            context: Arc::clone(&self.context),
1401            query_subscriptions: self.query_subscriptions.clone(),
1402            cancellation_token: self.cancellation_token.clone(),
1403        };
1404
1405        if self.read_only {
1406            NodeServiceSchema::ReadOnly(Schema::build(query, EmptyMutation, subscription).finish())
1407        } else {
1408            NodeServiceSchema::Full(
1409                Schema::build(
1410                    query,
1411                    MutationRoot {
1412                        context: Arc::clone(&self.context),
1413                    },
1414                    subscription,
1415                )
1416                .finish(),
1417            )
1418        }
1419    }
1420
1421    /// Runs the node service.
1422    #[instrument(name = "node_service", level = "info", skip_all, fields(port = ?self.port))]
1423    pub async fn run(
1424        self,
1425        cancellation_token: CancellationToken,
1426        command_receiver: UnboundedReceiver<ListenerCommand>,
1427    ) -> Result<(), anyhow::Error> {
1428        let port = self.port.get();
1429        let index_handler = axum::routing::get(util::graphiql).post(Self::index_handler);
1430        let application_handler =
1431            axum::routing::get(util::graphiql).post(Self::application_handler);
1432
1433        #[cfg(with_metrics)]
1434        monitoring_server::start_metrics_with_profiling(
1435            self.metrics_address(),
1436            cancellation_token.clone(),
1437            self.enable_memory_profiling,
1438        )
1439        .await;
1440
1441        let base_router = Router::new()
1442            .route("/", index_handler)
1443            .route(
1444                "/chains/{chain_id}/applications/{application_id}",
1445                application_handler,
1446            )
1447            .route("/ready", axum::routing::get(|| async { "ready!" }));
1448
1449        // Create router with appropriate schema for WebSocket subscriptions.
1450        let app = match self.schema() {
1451            NodeServiceSchema::Full(schema) => {
1452                base_router.route_service("/ws", GraphQLSubscription::new(schema))
1453            }
1454            NodeServiceSchema::ReadOnly(schema) => {
1455                base_router.route_service("/ws", GraphQLSubscription::new(schema))
1456            }
1457        }
1458        .layer(Extension(self.clone()))
1459        // TODO(#551): Provide application authentication.
1460        .layer(CorsLayer::permissive());
1461
1462        info!("GraphiQL IDE: http://localhost:{}", port);
1463
1464        // Spawn the cache invalidation listener if caching is enabled.
1465        if let Some(cache) = &self.query_cache {
1466            let guard = self.context.lock().await;
1467            let chain_ids: Vec<ChainId> = guard.wallet().chain_ids().try_collect().await?;
1468            let (tx, mut receiver) = tokio::sync::mpsc::unbounded_channel();
1469            guard.client().subscribe_extra(chain_ids.clone(), &tx);
1470            cache.mark_all_subscribed(&chain_ids);
1471            cache.set_notification_sender(tx);
1472            drop(guard);
1473            let cache = Arc::clone(cache);
1474            tokio::spawn(async move {
1475                while let Some(notification) = receiver.recv().await {
1476                    if let Reason::NewBlock { height, .. } = notification.reason {
1477                        let next_block_height = height
1478                            .try_add_one()
1479                            .expect("block height should not overflow");
1480                        cache.invalidate_chain(&notification.chain_id, next_block_height);
1481                    }
1482                }
1483            });
1484        }
1485
1486        let tcp_listener =
1487            tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
1488        let server = axum::serve(tcp_listener, app)
1489            .with_graceful_shutdown(cancellation_token.clone().cancelled_owned())
1490            .into_future();
1491
1492        if self.pause {
1493            info!("Running in paused mode: chain synchronization is disabled");
1494            server.await?;
1495        } else {
1496            let storage = self.context.lock().await.storage().clone();
1497            let chain_listener = ChainListener::new(
1498                self.config,
1499                self.context,
1500                storage,
1501                cancellation_token.clone(),
1502                command_receiver,
1503                true,
1504            )
1505            .run()
1506            .await?;
1507            let mut chain_listener = Box::pin(chain_listener).fuse();
1508            futures::select! {
1509                result = chain_listener => result?,
1510                result = Box::pin(server).fuse() => result?,
1511            };
1512        }
1513
1514        Ok(())
1515    }
1516
1517    /// Handles service queries for user applications (including mutations).
1518    async fn handle_service_request(
1519        &self,
1520        application_id: ApplicationId,
1521        request: Vec<u8>,
1522        chain_id: ChainId,
1523        block_hash: Option<CryptoHash>,
1524    ) -> Result<Vec<u8>, NodeServiceError> {
1525        // Only cache read-only queries against the latest state (block_hash == None).
1526        let cache = block_hash
1527            .is_none()
1528            .then_some(self.query_cache.as_ref())
1529            .flatten();
1530
1531        // Return immediately on cache hit.
1532        if let Some(cache) = cache {
1533            if let Some(cached) = cache.get(chain_id, &application_id, &request) {
1534                return Ok(cached);
1535            }
1536        }
1537
1538        let (
1539            QueryOutcome {
1540                response,
1541                operations,
1542            },
1543            block_height,
1544        ) = self
1545            .query_user_application(application_id, request.clone(), chain_id, block_hash)
1546            .await?;
1547        if operations.is_empty() {
1548            if let Some(cache) = cache {
1549                // Lazily subscribe to notifications for chains discovered after startup.
1550                if cache.needs_subscription(&chain_id) {
1551                    if let Some(sender) = cache.notification_sender() {
1552                        self.context
1553                            .lock()
1554                            .await
1555                            .client()
1556                            .subscribe_extra(vec![chain_id], &sender);
1557                        cache.mark_subscribed(chain_id);
1558                    }
1559                }
1560                cache.insert(
1561                    chain_id,
1562                    application_id,
1563                    request,
1564                    response.clone(),
1565                    block_height,
1566                );
1567            }
1568            return Ok(response);
1569        }
1570
1571        if self.read_only {
1572            return Err(NodeServiceError::ReadOnlyModeOperationsNotAllowed);
1573        }
1574
1575        trace!("Query requested a new block with operations: {operations:?}");
1576        let client = self
1577            .context
1578            .lock()
1579            .await
1580            .make_chain_client(chain_id)
1581            .await?;
1582        let hash = loop {
1583            let timeout = match client
1584                .execute_operations(operations.clone(), vec![])
1585                .await?
1586            {
1587                ClientOutcome::Committed(certificate) => break certificate.hash(),
1588                ClientOutcome::Conflict(certificate) => {
1589                    return Err(chain_client::Error::Conflict(certificate.hash()).into());
1590                }
1591                ClientOutcome::WaitForTimeout(timeout) => timeout,
1592            };
1593            let mut stream = client.subscribe().map_err(|_| {
1594                chain_client::Error::InternalError("Could not subscribe to the local node.")
1595            })?;
1596            util::wait_for_next_round(&mut stream, timeout).await;
1597        };
1598        let response = async_graphql::Response::new(hash.to_value());
1599        Ok(serde_json::to_vec(&response)?)
1600    }
1601
1602    /// Queries a user application, returning the raw [`QueryOutcome`] and the height of the
1603    /// chain's latest block at the time of the query (used for cache staleness detection).
1604    async fn query_user_application(
1605        &self,
1606        application_id: ApplicationId,
1607        bytes: Vec<u8>,
1608        chain_id: ChainId,
1609        block_hash: Option<CryptoHash>,
1610    ) -> Result<(QueryOutcome<Vec<u8>>, BlockHeight), NodeServiceError> {
1611        let query = Query::User {
1612            application_id,
1613            bytes,
1614        };
1615        let client = self
1616            .context
1617            .lock()
1618            .await
1619            .make_chain_client(chain_id)
1620            .await?;
1621        let (
1622            QueryOutcome {
1623                response,
1624                operations,
1625            },
1626            next_block_height,
1627        ) = client.query_application(query, block_hash).await?;
1628        match response {
1629            QueryResponse::System(_) => {
1630                unreachable!("cannot get a system response for a user query")
1631            }
1632            QueryResponse::User(user_response_bytes) => Ok((
1633                QueryOutcome {
1634                    response: user_response_bytes,
1635                    operations,
1636                },
1637                next_block_height,
1638            )),
1639        }
1640    }
1641
1642    /// Executes a GraphQL query and generates a response for our `Schema`.
1643    async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
1644        service
1645            .0
1646            .schema()
1647            .execute(request.into_inner())
1648            .await
1649            .into()
1650    }
1651
1652    /// Executes a GraphQL query against an application.
1653    /// Pattern matches on the `OperationType` of the query and routes the query
1654    /// accordingly.
1655    async fn application_handler(
1656        Path((chain_id, application_id)): Path<(String, String)>,
1657        service: Extension<Self>,
1658        request: String,
1659    ) -> Result<Vec<u8>, NodeServiceError> {
1660        let chain_id: ChainId = chain_id.parse().map_err(NodeServiceError::InvalidChainId)?;
1661        let application_id: ApplicationId = application_id.parse()?;
1662
1663        debug!(
1664            %chain_id,
1665            %application_id,
1666            "processing request for application:\n{:?}",
1667            &request
1668        );
1669        let response = service
1670            .0
1671            .handle_service_request(application_id, request.into_bytes(), chain_id, None)
1672            .await?;
1673
1674        Ok(response)
1675    }
1676}
1677
1678#[cfg(test)]
1679mod tests {
1680    use linera_base::{
1681        crypto::CryptoHash,
1682        data_types::BlockHeight,
1683        identifiers::{ApplicationId, ChainId},
1684    };
1685
1686    use super::QueryResponseCache;
1687
1688    fn test_chain(n: u64) -> ChainId {
1689        ChainId(CryptoHash::test_hash(format!("chain-{n}")))
1690    }
1691
1692    fn test_app(n: u64) -> ApplicationId {
1693        ApplicationId::new(CryptoHash::test_hash(format!("app-{n}")))
1694    }
1695
1696    #[test]
1697    fn cache_hit_and_miss() {
1698        let cache = QueryResponseCache::new(100);
1699        let chain = test_chain(0);
1700        let app = test_app(0);
1701        let request = b"query { balance }".to_vec();
1702        let response = b"{ \"balance\": 42 }".to_vec();
1703
1704        // Unknown chain — get returns None.
1705        assert!(cache.get(chain, &app, &request).is_none());
1706
1707        // Insert creates the per-chain entry.
1708        cache.insert(
1709            chain,
1710            app,
1711            request.clone(),
1712            response.clone(),
1713            BlockHeight(1),
1714        );
1715
1716        // Hit after insert.
1717        assert_eq!(cache.get(chain, &app, &request), Some(response));
1718    }
1719
1720    #[test]
1721    fn per_chain_isolation() {
1722        let cache = QueryResponseCache::new(100);
1723        let chain_a = test_chain(0);
1724        let chain_b = test_chain(1);
1725        let app = test_app(0);
1726        let request = b"q".to_vec();
1727        let response = b"r".to_vec();
1728
1729        cache.insert(
1730            chain_a,
1731            app,
1732            request.clone(),
1733            response.clone(),
1734            BlockHeight(1),
1735        );
1736
1737        // Invalidating chain B must not affect chain A.
1738        cache.invalidate_chain(&chain_b, BlockHeight(1));
1739        assert_eq!(cache.get(chain_a, &app, &request), Some(response));
1740    }
1741
1742    #[test]
1743    fn invalidation_clears_all_entries() {
1744        let cache = QueryResponseCache::new(100);
1745        let chain = test_chain(0);
1746        let app = test_app(0);
1747
1748        cache.insert(chain, app, b"q1".to_vec(), b"r1".to_vec(), BlockHeight(1));
1749        cache.insert(chain, app, b"q2".to_vec(), b"r2".to_vec(), BlockHeight(1));
1750
1751        cache.invalidate_chain(&chain, BlockHeight(2));
1752        assert!(cache.get(chain, &app, b"q1").is_none());
1753        assert!(cache.get(chain, &app, b"q2").is_none());
1754    }
1755
1756    #[test]
1757    fn lru_eviction() {
1758        let cache = QueryResponseCache::new(2);
1759        let chain = test_chain(0);
1760        let app = test_app(0);
1761
1762        cache.insert(chain, app, b"q1".to_vec(), b"r1".to_vec(), BlockHeight(1));
1763        cache.insert(chain, app, b"q2".to_vec(), b"r2".to_vec(), BlockHeight(1));
1764        // Third insert evicts q1 (least recently used).
1765        cache.insert(chain, app, b"q3".to_vec(), b"r3".to_vec(), BlockHeight(1));
1766
1767        assert!(cache.get(chain, &app, b"q1").is_none());
1768        assert!(cache.get(chain, &app, b"q2").is_some());
1769        assert!(cache.get(chain, &app, b"q3").is_some());
1770    }
1771
1772    #[test]
1773    fn stale_insert_rejected_after_invalidation() {
1774        let cache = QueryResponseCache::new(100);
1775        let chain = test_chain(0);
1776        let app = test_app(0);
1777
1778        // Chain is at block 3. A query starts and snapshots this height.
1779        cache.insert(chain, app, b"q0".to_vec(), b"r0".to_vec(), BlockHeight(3));
1780        let stale_height = BlockHeight(3);
1781
1782        // Block 4 arrives while the query is in flight.
1783        cache.invalidate_chain(&chain, BlockHeight(4));
1784
1785        // Slow query finishes and tries to insert with the stale height.
1786        cache.insert(chain, app, b"q".to_vec(), b"stale".to_vec(), stale_height);
1787
1788        // The stale insert should have been rejected.
1789        assert!(cache.get(chain, &app, b"q").is_none());
1790    }
1791}