linera_service/
node_service.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{borrow::Cow, future::IntoFuture, iter, net::SocketAddr, num::NonZeroU16, sync::Arc};
5
6use async_graphql::{
7    futures_util::Stream, resolver_utils::ContainerType, Error, MergedObject, OutputType,
8    ScalarType, Schema, SimpleObject, Subscription,
9};
10use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
11use axum::{extract::Path, http::StatusCode, response, response::IntoResponse, Extension, Router};
12use futures::{lock::Mutex, Future, FutureExt as _};
13use linera_base::{
14    crypto::{CryptoError, CryptoHash},
15    data_types::{
16        Amount, ApplicationDescription, ApplicationPermissions, Bytecode, Epoch, TimeDelta,
17    },
18    identifiers::{
19        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
20    },
21    ownership::{ChainOwnership, TimeoutConfig},
22    vm::VmRuntime,
23    BcsHexParseError,
24};
25use linera_chain::{
26    types::{ConfirmedBlock, GenericCertificate},
27    ChainStateView,
28};
29use linera_client::chain_listener::{ChainListener, ChainListenerConfig, ClientContext};
30use linera_core::{
31    client::chain_client::{self, ChainClient},
32    data_types::ClientOutcome,
33    worker::Notification,
34};
35use linera_execution::{
36    committee::Committee, system::AdminOperation, Operation, Query, QueryOutcome, QueryResponse,
37    SystemOperation,
38};
39#[cfg(with_metrics)]
40use linera_metrics::monitoring_server;
41use linera_sdk::linera_base_types::BlobContent;
42use serde::{Deserialize, Serialize};
43use serde_json::json;
44use thiserror::Error as ThisError;
45use tokio::sync::OwnedRwLockReadGuard;
46use tokio_util::sync::CancellationToken;
47use tower_http::cors::CorsLayer;
48use tracing::{debug, error, info, instrument, trace};
49
50use crate::util;
51
52#[derive(SimpleObject, Serialize, Deserialize, Clone)]
53pub struct Chains {
54    pub list: Vec<ChainId>,
55    pub default: Option<ChainId>,
56}
57
58/// Our root GraphQL query type.
59pub struct QueryRoot<C> {
60    context: Arc<Mutex<C>>,
61    port: NonZeroU16,
62    default_chain: Option<ChainId>,
63}
64
65/// Our root GraphQL subscription type.
66pub struct SubscriptionRoot<C> {
67    context: Arc<Mutex<C>>,
68}
69
70/// Our root GraphQL mutation type.
71pub struct MutationRoot<C> {
72    context: Arc<Mutex<C>>,
73}
74
75#[derive(Debug, ThisError)]
76enum NodeServiceError {
77    #[error(transparent)]
78    ChainClientError(#[from] chain_client::Error),
79    #[error(transparent)]
80    BcsHexError(#[from] BcsHexParseError),
81    #[error(transparent)]
82    JsonError(#[from] serde_json::Error),
83    #[error("malformed chain ID: {0}")]
84    InvalidChainId(CryptoError),
85}
86
87impl IntoResponse for NodeServiceError {
88    fn into_response(self) -> response::Response {
89        let tuple = match self {
90            NodeServiceError::BcsHexError(e) => (StatusCode::BAD_REQUEST, vec![e.to_string()]),
91            NodeServiceError::ChainClientError(e) => {
92                (StatusCode::INTERNAL_SERVER_ERROR, vec![e.to_string()])
93            }
94            NodeServiceError::JsonError(e) => {
95                (StatusCode::INTERNAL_SERVER_ERROR, vec![e.to_string()])
96            }
97            NodeServiceError::InvalidChainId(_) => (
98                StatusCode::BAD_REQUEST,
99                vec!["invalid chain ID".to_string()],
100            ),
101        };
102        let tuple = (tuple.0, json!({"error": tuple.1}).to_string());
103        tuple.into_response()
104    }
105}
106
107#[Subscription]
108impl<C> SubscriptionRoot<C>
109where
110    C: ClientContext + 'static,
111{
112    /// Subscribes to notifications from the specified chain.
113    async fn notifications(
114        &self,
115        chain_id: ChainId,
116    ) -> Result<impl Stream<Item = Notification>, Error> {
117        let client = self.context.lock().await.make_chain_client(chain_id);
118        Ok(client.subscribe()?)
119    }
120}
121
122impl<C> MutationRoot<C>
123where
124    C: ClientContext,
125{
126    async fn execute_system_operation(
127        &self,
128        system_operation: SystemOperation,
129        chain_id: ChainId,
130    ) -> Result<CryptoHash, Error> {
131        let certificate = self
132            .apply_client_command(&chain_id, move |client| {
133                let operation = Operation::system(system_operation.clone());
134                async move {
135                    let result = client
136                        .execute_operation(operation)
137                        .await
138                        .map_err(Error::from);
139                    (result, client)
140                }
141            })
142            .await?;
143        Ok(certificate.hash())
144    }
145
146    /// Applies the given function to the chain client.
147    /// Updates the wallet regardless of the outcome. As long as the function returns a round
148    /// timeout, it will wait and retry.
149    async fn apply_client_command<F, Fut, T>(
150        &self,
151        chain_id: &ChainId,
152        mut f: F,
153    ) -> Result<T, Error>
154    where
155        F: FnMut(ChainClient<C::Environment>) -> Fut,
156        Fut: Future<Output = (Result<ClientOutcome<T>, Error>, ChainClient<C::Environment>)>,
157    {
158        loop {
159            let client = self.context.lock().await.make_chain_client(*chain_id);
160            let mut stream = client.subscribe()?;
161            let (result, client) = f(client).await;
162            self.context.lock().await.update_wallet(&client).await?;
163            let timeout = match result? {
164                ClientOutcome::Committed(t) => return Ok(t),
165                ClientOutcome::WaitForTimeout(timeout) => timeout,
166            };
167            drop(client);
168            util::wait_for_next_round(&mut stream, timeout).await;
169        }
170    }
171}
172
173#[async_graphql::Object(cache_control(no_cache))]
174impl<C> MutationRoot<C>
175where
176    C: ClientContext + 'static,
177{
178    /// Processes the inbox and returns the lists of certificate hashes that were created, if any.
179    async fn process_inbox(
180        &self,
181        #[graphql(desc = "The chain whose inbox is being processed.")] chain_id: ChainId,
182    ) -> Result<Vec<CryptoHash>, Error> {
183        let mut hashes = Vec::new();
184        loop {
185            let client = self.context.lock().await.make_chain_client(chain_id);
186            let result = client.process_inbox().await;
187            self.context.lock().await.update_wallet(&client).await?;
188            let (certificates, maybe_timeout) = result?;
189            hashes.extend(certificates.into_iter().map(|cert| cert.hash()));
190            match maybe_timeout {
191                None => return Ok(hashes),
192                Some(timestamp) => {
193                    let mut stream = client.subscribe()?;
194                    drop(client);
195                    util::wait_for_next_round(&mut stream, timestamp).await;
196                }
197            }
198        }
199    }
200
201    /// Synchronizes the chain with the validators. Returns the chain's length.
202    ///
203    /// This is only used for testing, to make sure that a client is up to date.
204    // TODO(#4718): Remove this mutation.
205    async fn sync(
206        &self,
207        #[graphql(desc = "The chain being synchronized.")] chain_id: ChainId,
208    ) -> Result<u64, Error> {
209        let client = self.context.lock().await.make_chain_client(chain_id);
210        let info = client.synchronize_from_validators().await?;
211        self.context.lock().await.update_wallet(&client).await?;
212        Ok(info.next_block_height.0)
213    }
214
215    /// Retries the pending block that was unsuccessfully proposed earlier.
216    async fn retry_pending_block(
217        &self,
218        #[graphql(desc = "The chain on whose block is being retried.")] chain_id: ChainId,
219    ) -> Result<Option<CryptoHash>, Error> {
220        let client = self.context.lock().await.make_chain_client(chain_id);
221        let outcome = client.process_pending_block().await?;
222        self.context.lock().await.update_wallet(&client).await?;
223        match outcome {
224            ClientOutcome::Committed(Some(certificate)) => Ok(Some(certificate.hash())),
225            ClientOutcome::Committed(None) => Ok(None),
226            ClientOutcome::WaitForTimeout(timeout) => Err(Error::from(format!(
227                "Please try again at {}",
228                timeout.timestamp
229            ))),
230        }
231    }
232
233    /// Transfers `amount` units of value from the given owner's account to the recipient.
234    /// If no owner is given, try to take the units out of the chain account.
235    async fn transfer(
236        &self,
237        #[graphql(desc = "The chain which native tokens are being transferred from.")]
238        chain_id: ChainId,
239        #[graphql(desc = "The account being debited on the chain.")] owner: AccountOwner,
240        #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
241        #[graphql(desc = "The amount being transferred.")] amount: Amount,
242    ) -> Result<CryptoHash, Error> {
243        self.apply_client_command(&chain_id, move |client| async move {
244            let result = client
245                .transfer(owner, amount, recipient)
246                .await
247                .map_err(Error::from)
248                .map(|outcome| outcome.map(|certificate| certificate.hash()));
249            (result, client)
250        })
251        .await
252    }
253
254    /// Claims `amount` units of value from the given owner's account in the remote
255    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
256    /// process the message.
257    async fn claim(
258        &self,
259        #[graphql(desc = "The chain for whom owner is one of the owner.")] chain_id: ChainId,
260        #[graphql(desc = "The owner of chain targetId being debited.")] owner: AccountOwner,
261        #[graphql(desc = "The chain whose owner is being debited.")] target_id: ChainId,
262        #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
263        #[graphql(desc = "The amount being transferred.")] amount: Amount,
264    ) -> Result<CryptoHash, Error> {
265        self.apply_client_command(&chain_id, move |client| async move {
266            let result = client
267                .claim(owner, target_id, recipient, amount)
268                .await
269                .map_err(Error::from)
270                .map(|outcome| outcome.map(|certificate| certificate.hash()));
271            (result, client)
272        })
273        .await
274    }
275
276    /// Test if a data blob is readable from a transaction in the current chain.
277    // TODO(#2490): Consider removing or renaming this.
278    async fn read_data_blob(
279        &self,
280        chain_id: ChainId,
281        hash: CryptoHash,
282    ) -> Result<CryptoHash, Error> {
283        self.apply_client_command(&chain_id, move |client| async move {
284            let result = client
285                .read_data_blob(hash)
286                .await
287                .map_err(Error::from)
288                .map(|outcome| outcome.map(|certificate| certificate.hash()));
289            (result, client)
290        })
291        .await
292    }
293
294    /// Creates a new single-owner chain.
295    async fn open_chain(
296        &self,
297        #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
298        #[graphql(desc = "The owner of the new chain.")] owner: AccountOwner,
299        #[graphql(desc = "The balance of the chain being created. Zero if `None`.")]
300        balance: Option<Amount>,
301    ) -> Result<ChainId, Error> {
302        let ownership = ChainOwnership::single(owner);
303        let balance = balance.unwrap_or(Amount::ZERO);
304        let description = self
305            .apply_client_command(&chain_id, move |client| {
306                let ownership = ownership.clone();
307                async move {
308                    let result = client
309                        .open_chain(ownership, ApplicationPermissions::default(), balance)
310                        .await
311                        .map_err(Error::from)
312                        .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
313                    (result, client)
314                }
315            })
316            .await?;
317        Ok(description.id())
318    }
319
320    /// Creates a new multi-owner chain.
321    #[expect(clippy::too_many_arguments)]
322    async fn open_multi_owner_chain(
323        &self,
324        #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
325        #[graphql(desc = "Permissions for applications on the new chain")]
326        application_permissions: Option<ApplicationPermissions>,
327        #[graphql(desc = "The owners of the chain")] owners: Vec<AccountOwner>,
328        #[graphql(desc = "The weights of the owners")] weights: Option<Vec<u64>>,
329        #[graphql(desc = "The number of multi-leader rounds")] multi_leader_rounds: Option<u32>,
330        #[graphql(desc = "The balance of the chain. Zero if `None`")] balance: Option<Amount>,
331        #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
332        fast_round_ms: Option<u64>,
333        #[graphql(
334            desc = "The duration of the first single-leader and all multi-leader rounds",
335            default = 10_000
336        )]
337        base_timeout_ms: u64,
338        #[graphql(
339            desc = "The number of milliseconds by which the timeout increases after each \
340                    single-leader round",
341            default = 1_000
342        )]
343        timeout_increment_ms: u64,
344        #[graphql(
345            desc = "The age of an incoming tracked or protected message after which the \
346                    validators start transitioning the chain to fallback mode, in milliseconds.",
347            default = 86_400_000
348        )]
349        fallback_duration_ms: u64,
350    ) -> Result<ChainId, Error> {
351        let owners = if let Some(weights) = weights {
352            if weights.len() != owners.len() {
353                return Err(Error::new(format!(
354                    "There are {} owners but {} weights.",
355                    owners.len(),
356                    weights.len()
357                )));
358            }
359            owners.into_iter().zip(weights).collect::<Vec<_>>()
360        } else {
361            owners
362                .into_iter()
363                .zip(iter::repeat(100))
364                .collect::<Vec<_>>()
365        };
366        let multi_leader_rounds = multi_leader_rounds.unwrap_or(u32::MAX);
367        let timeout_config = TimeoutConfig {
368            fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
369            base_timeout: TimeDelta::from_millis(base_timeout_ms),
370            timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
371            fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
372        };
373        let ownership = ChainOwnership::multiple(owners, multi_leader_rounds, timeout_config);
374        let balance = balance.unwrap_or(Amount::ZERO);
375        let description = self
376            .apply_client_command(&chain_id, move |client| {
377                let ownership = ownership.clone();
378                let application_permissions = application_permissions.clone().unwrap_or_default();
379                async move {
380                    let result = client
381                        .open_chain(ownership, application_permissions, balance)
382                        .await
383                        .map_err(Error::from)
384                        .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
385                    (result, client)
386                }
387            })
388            .await?;
389        Ok(description.id())
390    }
391
392    /// Closes the chain. Returns the new block hash if successful or `None` if it was already closed.
393    async fn close_chain(
394        &self,
395        #[graphql(desc = "The chain being closed.")] chain_id: ChainId,
396    ) -> Result<Option<CryptoHash>, Error> {
397        let maybe_cert = self
398            .apply_client_command(&chain_id, |client| async move {
399                let result = client.close_chain().await.map_err(Error::from);
400                (result, client)
401            })
402            .await?;
403        Ok(maybe_cert.as_ref().map(GenericCertificate::hash))
404    }
405
406    /// Changes the chain to a single-owner chain
407    async fn change_owner(
408        &self,
409        #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
410        #[graphql(desc = "The new single owner of the chain")] new_owner: AccountOwner,
411    ) -> Result<CryptoHash, Error> {
412        let operation = SystemOperation::ChangeOwnership {
413            super_owners: vec![new_owner],
414            owners: Vec::new(),
415            multi_leader_rounds: 2,
416            open_multi_leader_rounds: false,
417            timeout_config: TimeoutConfig::default(),
418        };
419        self.execute_system_operation(operation, chain_id).await
420    }
421
422    /// Changes the ownership of the chain
423    #[expect(clippy::too_many_arguments)]
424    async fn change_multiple_owners(
425        &self,
426        #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
427        #[graphql(desc = "The new list of owners of the chain")] new_owners: Vec<AccountOwner>,
428        #[graphql(desc = "The new list of weights of the owners")] new_weights: Vec<u64>,
429        #[graphql(desc = "The multi-leader round of the chain")] multi_leader_rounds: u32,
430        #[graphql(
431            desc = "Whether multi-leader rounds are unrestricted, that is not limited to chain owners."
432        )]
433        open_multi_leader_rounds: bool,
434        #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
435        fast_round_ms: Option<u64>,
436        #[graphql(
437            desc = "The duration of the first single-leader and all multi-leader rounds",
438            default = 10_000
439        )]
440        base_timeout_ms: u64,
441        #[graphql(
442            desc = "The number of milliseconds by which the timeout increases after each \
443                    single-leader round",
444            default = 1_000
445        )]
446        timeout_increment_ms: u64,
447        #[graphql(
448            desc = "The age of an incoming tracked or protected message after which the \
449                    validators start transitioning the chain to fallback mode, in milliseconds.",
450            default = 86_400_000
451        )]
452        fallback_duration_ms: u64,
453    ) -> Result<CryptoHash, Error> {
454        let operation = SystemOperation::ChangeOwnership {
455            super_owners: Vec::new(),
456            owners: new_owners.into_iter().zip(new_weights).collect(),
457            multi_leader_rounds,
458            open_multi_leader_rounds,
459            timeout_config: TimeoutConfig {
460                fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
461                base_timeout: TimeDelta::from_millis(base_timeout_ms),
462                timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
463                fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
464            },
465        };
466        self.execute_system_operation(operation, chain_id).await
467    }
468
469    /// Changes the application permissions configuration on this chain.
470    #[expect(clippy::too_many_arguments)]
471    async fn change_application_permissions(
472        &self,
473        #[graphql(desc = "The chain whose permissions are being changed")] chain_id: ChainId,
474        #[graphql(desc = "These applications are allowed to close the current chain.")]
475        close_chain: Vec<ApplicationId>,
476        #[graphql(
477            desc = "If this is `None`, all system operations and application operations are allowed.
478If it is `Some`, only operations from the specified applications are allowed,
479and no system operations."
480        )]
481        execute_operations: Option<Vec<ApplicationId>>,
482        #[graphql(
483            desc = "At least one operation or incoming message from each of these applications must occur in every block."
484        )]
485        mandatory_applications: Vec<ApplicationId>,
486        #[graphql(desc = "These applications are allowed to change the application permissions.")]
487        change_application_permissions: Vec<ApplicationId>,
488        #[graphql(
489            desc = "These applications are allowed to perform calls to services as oracles."
490        )]
491        call_service_as_oracle: Option<Vec<ApplicationId>>,
492        #[graphql(desc = "These applications are allowed to perform HTTP requests.")]
493        make_http_requests: Option<Vec<ApplicationId>>,
494    ) -> Result<CryptoHash, Error> {
495        let operation = SystemOperation::ChangeApplicationPermissions(ApplicationPermissions {
496            execute_operations,
497            mandatory_applications,
498            close_chain,
499            change_application_permissions,
500            call_service_as_oracle,
501            make_http_requests,
502        });
503        self.execute_system_operation(operation, chain_id).await
504    }
505
506    /// (admin chain only) Registers a new committee. This will notify the subscribers of
507    /// the admin chain so that they can migrate to the new epoch (by accepting the
508    /// notification as an "incoming message" in a next block).
509    async fn create_committee(
510        &self,
511        chain_id: ChainId,
512        committee: Committee,
513    ) -> Result<CryptoHash, Error> {
514        Ok(self
515            .apply_client_command(&chain_id, move |client| {
516                let committee = committee.clone();
517                async move {
518                    let result = client
519                        .stage_new_committee(committee)
520                        .await
521                        .map_err(Error::from);
522                    (result, client)
523                }
524            })
525            .await?
526            .hash())
527    }
528
529    /// (admin chain only) Removes a committee. Once this message is accepted by a chain,
530    /// blocks from the retired epoch will not be accepted until they are followed (hence
531    /// re-certified) by a block certified by a recent committee.
532    async fn remove_committee(&self, chain_id: ChainId, epoch: Epoch) -> Result<CryptoHash, Error> {
533        let operation = SystemOperation::Admin(AdminOperation::RemoveCommittee { epoch });
534        self.execute_system_operation(operation, chain_id).await
535    }
536
537    /// Publishes a new application module.
538    async fn publish_module(
539        &self,
540        #[graphql(desc = "The chain publishing the module")] chain_id: ChainId,
541        #[graphql(desc = "The bytecode of the contract code")] contract: Bytecode,
542        #[graphql(desc = "The bytecode of the service code (only relevant for WebAssembly)")]
543        service: Bytecode,
544        #[graphql(desc = "The virtual machine being used (either Wasm or Evm)")]
545        vm_runtime: VmRuntime,
546    ) -> Result<ModuleId, Error> {
547        self.apply_client_command(&chain_id, move |client| {
548            let contract = contract.clone();
549            let service = service.clone();
550            async move {
551                let result = client
552                    .publish_module(contract, service, vm_runtime)
553                    .await
554                    .map_err(Error::from)
555                    .map(|outcome| outcome.map(|(module_id, _)| module_id));
556                (result, client)
557            }
558        })
559        .await
560    }
561
562    /// Publishes a new data blob.
563    async fn publish_data_blob(
564        &self,
565        #[graphql(desc = "The chain paying for the blob publication")] chain_id: ChainId,
566        #[graphql(desc = "The content of the data blob being created")] bytes: Vec<u8>,
567    ) -> Result<CryptoHash, Error> {
568        self.apply_client_command(&chain_id, |client| {
569            let bytes = bytes.clone();
570            async move {
571                let result = client.publish_data_blob(bytes).await.map_err(Error::from);
572                (result, client)
573            }
574        })
575        .await
576        .map(|_| CryptoHash::new(&BlobContent::new_data(bytes)))
577    }
578
579    /// Creates a new application.
580    async fn create_application(
581        &self,
582        #[graphql(desc = "The chain paying for the creation of the application")] chain_id: ChainId,
583        #[graphql(desc = "The module ID of the application being created")] module_id: ModuleId,
584        #[graphql(desc = "The JSON serialization of the parameters of the application")]
585        parameters: String,
586        #[graphql(
587            desc = "The JSON serialization of the instantiation argument of the application"
588        )]
589        instantiation_argument: String,
590        #[graphql(desc = "The dependencies of the application being created")]
591        required_application_ids: Vec<ApplicationId>,
592    ) -> Result<ApplicationId, Error> {
593        self.apply_client_command(&chain_id, move |client| {
594            let parameters = parameters.as_bytes().to_vec();
595            let instantiation_argument = instantiation_argument.as_bytes().to_vec();
596            let required_application_ids = required_application_ids.clone();
597            async move {
598                let result = client
599                    .create_application_untyped(
600                        module_id,
601                        parameters,
602                        instantiation_argument,
603                        required_application_ids,
604                    )
605                    .await
606                    .map_err(Error::from)
607                    .map(|outcome| outcome.map(|(application_id, _)| application_id));
608                (result, client)
609            }
610        })
611        .await
612    }
613}
614
615#[async_graphql::Object(cache_control(no_cache))]
616impl<C> QueryRoot<C>
617where
618    C: ClientContext + 'static,
619{
620    async fn chain(
621        &self,
622        chain_id: ChainId,
623    ) -> Result<
624        ChainStateExtendedView<<C::Environment as linera_core::Environment>::StorageContext>,
625        Error,
626    > {
627        let client = self.context.lock().await.make_chain_client(chain_id);
628        let view = client.chain_state_view().await?;
629        Ok(ChainStateExtendedView::new(view))
630    }
631
632    async fn applications(&self, chain_id: ChainId) -> Result<Vec<ApplicationOverview>, Error> {
633        let client = self.context.lock().await.make_chain_client(chain_id);
634        let applications = client
635            .chain_state_view()
636            .await?
637            .execution_state
638            .list_applications()
639            .await?;
640
641        let overviews = applications
642            .into_iter()
643            .map(|(id, description)| ApplicationOverview::new(id, description, self.port, chain_id))
644            .collect();
645
646        Ok(overviews)
647    }
648
649    async fn chains(&self) -> Result<Chains, Error> {
650        Ok(Chains {
651            list: self.context.lock().await.wallet().chain_ids(),
652            default: self.default_chain,
653        })
654    }
655
656    async fn block(
657        &self,
658        hash: Option<CryptoHash>,
659        chain_id: ChainId,
660    ) -> Result<Option<ConfirmedBlock>, Error> {
661        let client = self.context.lock().await.make_chain_client(chain_id);
662        let hash = match hash {
663            Some(hash) => Some(hash),
664            None => client.chain_info().await?.block_hash,
665        };
666        if let Some(hash) = hash {
667            let block = client.read_confirmed_block(hash).await?;
668            Ok(Some(block))
669        } else {
670            Ok(None)
671        }
672    }
673
674    async fn events_from_index(
675        &self,
676        chain_id: ChainId,
677        stream_id: StreamId,
678        start_index: u32,
679    ) -> Result<Vec<IndexAndEvent>, Error> {
680        Ok(self
681            .context
682            .lock()
683            .await
684            .make_chain_client(chain_id)
685            .events_from_index(stream_id, start_index)
686            .await?)
687    }
688
689    async fn blocks(
690        &self,
691        from: Option<CryptoHash>,
692        chain_id: ChainId,
693        limit: Option<u32>,
694    ) -> Result<Vec<ConfirmedBlock>, Error> {
695        let client = self.context.lock().await.make_chain_client(chain_id);
696        let limit = limit.unwrap_or(10);
697        let from = match from {
698            Some(from) => Some(from),
699            None => client.chain_info().await?.block_hash,
700        };
701        let Some(from) = from else {
702            return Ok(vec![]);
703        };
704        let mut hash = Some(from);
705        let mut values = Vec::new();
706        for _ in 0..limit {
707            let Some(next_hash) = hash else {
708                break;
709            };
710            let value = client.read_confirmed_block(next_hash).await?;
711            hash = value.block().header.previous_block_hash;
712            values.push(value);
713        }
714        Ok(values)
715    }
716
717    /// Returns the version information on this node service.
718    async fn version(&self) -> linera_version::VersionInfo {
719        linera_version::VersionInfo::default()
720    }
721}
722
723// What follows is a hack to add a chain_id field to `ChainStateView` based on
724// https://async-graphql.github.io/async-graphql/en/merging_objects.html
725
726struct ChainStateViewExtension(ChainId);
727
728#[async_graphql::Object(cache_control(no_cache))]
729impl ChainStateViewExtension {
730    async fn chain_id(&self) -> ChainId {
731        self.0
732    }
733}
734
735#[derive(MergedObject)]
736struct ChainStateExtendedView<C>(ChainStateViewExtension, ReadOnlyChainStateView<C>)
737where
738    C: linera_views::context::Context + Clone + Send + Sync + 'static,
739    C::Extra: linera_execution::ExecutionRuntimeContext;
740
741/// A wrapper type that allows proxying GraphQL queries to a [`ChainStateView`] that's behind an
742/// [`OwnedRwLockReadGuard`].
743pub struct ReadOnlyChainStateView<C>(OwnedRwLockReadGuard<ChainStateView<C>>)
744where
745    C: linera_views::context::Context + Clone + Send + Sync + 'static;
746
747impl<C> ContainerType for ReadOnlyChainStateView<C>
748where
749    C: linera_views::context::Context + Clone + Send + Sync + 'static,
750{
751    async fn resolve_field(
752        &self,
753        context: &async_graphql::Context<'_>,
754    ) -> async_graphql::ServerResult<Option<async_graphql::Value>> {
755        self.0.resolve_field(context).await
756    }
757}
758
759impl<C> OutputType for ReadOnlyChainStateView<C>
760where
761    C: linera_views::context::Context + Clone + Send + Sync + 'static,
762{
763    fn type_name() -> Cow<'static, str> {
764        ChainStateView::<C>::type_name()
765    }
766
767    fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
768        ChainStateView::<C>::create_type_info(registry)
769    }
770
771    async fn resolve(
772        &self,
773        context: &async_graphql::ContextSelectionSet<'_>,
774        field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
775    ) -> async_graphql::ServerResult<async_graphql::Value> {
776        self.0.resolve(context, field).await
777    }
778}
779
780impl<C> ChainStateExtendedView<C>
781where
782    C: linera_views::context::Context + Clone + Send + Sync + 'static,
783    C::Extra: linera_execution::ExecutionRuntimeContext,
784{
785    fn new(view: OwnedRwLockReadGuard<ChainStateView<C>>) -> Self {
786        Self(
787            ChainStateViewExtension(view.chain_id()),
788            ReadOnlyChainStateView(view),
789        )
790    }
791}
792
793#[derive(SimpleObject)]
794pub struct ApplicationOverview {
795    id: ApplicationId,
796    description: ApplicationDescription,
797    link: String,
798}
799
800impl ApplicationOverview {
801    fn new(
802        id: ApplicationId,
803        description: ApplicationDescription,
804        port: NonZeroU16,
805        chain_id: ChainId,
806    ) -> Self {
807        Self {
808            id,
809            description,
810            link: format!(
811                "http://localhost:{}/chains/{}/applications/{}",
812                port.get(),
813                chain_id,
814                id
815            ),
816        }
817    }
818}
819
820/// The `NodeService` is a server that exposes a web-server to the client.
821/// The node service is primarily used to explore the state of a chain in GraphQL.
822pub struct NodeService<C>
823where
824    C: ClientContext + 'static,
825{
826    config: ChainListenerConfig,
827    port: NonZeroU16,
828    #[cfg(with_metrics)]
829    metrics_port: NonZeroU16,
830    default_chain: Option<ChainId>,
831    context: Arc<Mutex<C>>,
832}
833
834impl<C> Clone for NodeService<C>
835where
836    C: ClientContext + 'static,
837{
838    fn clone(&self) -> Self {
839        Self {
840            config: self.config.clone(),
841            port: self.port,
842            #[cfg(with_metrics)]
843            metrics_port: self.metrics_port,
844            default_chain: self.default_chain,
845            context: Arc::clone(&self.context),
846        }
847    }
848}
849
850impl<C> NodeService<C>
851where
852    C: ClientContext,
853{
854    /// Creates a new instance of the node service given a client chain and a port.
855    pub fn new(
856        config: ChainListenerConfig,
857        port: NonZeroU16,
858        #[cfg(with_metrics)] metrics_port: NonZeroU16,
859        default_chain: Option<ChainId>,
860        context: C,
861    ) -> Self {
862        Self {
863            config,
864            port,
865            #[cfg(with_metrics)]
866            metrics_port,
867            default_chain,
868            context: Arc::new(Mutex::new(context)),
869        }
870    }
871
872    #[cfg(with_metrics)]
873    pub fn metrics_address(&self) -> SocketAddr {
874        SocketAddr::from(([0, 0, 0, 0], self.metrics_port.get()))
875    }
876
877    pub fn schema(&self) -> Schema<QueryRoot<C>, MutationRoot<C>, SubscriptionRoot<C>> {
878        Schema::build(
879            QueryRoot {
880                context: Arc::clone(&self.context),
881                port: self.port,
882                default_chain: self.default_chain,
883            },
884            MutationRoot {
885                context: Arc::clone(&self.context),
886            },
887            SubscriptionRoot {
888                context: Arc::clone(&self.context),
889            },
890        )
891        .finish()
892    }
893
894    /// Runs the node service.
895    #[instrument(name = "node_service", level = "info", skip_all, fields(port = ?self.port))]
896    pub async fn run(self, cancellation_token: CancellationToken) -> Result<(), anyhow::Error> {
897        let port = self.port.get();
898        let index_handler = axum::routing::get(util::graphiql).post(Self::index_handler);
899        let application_handler =
900            axum::routing::get(util::graphiql).post(Self::application_handler);
901
902        #[cfg(with_metrics)]
903        monitoring_server::start_metrics(self.metrics_address(), cancellation_token.clone());
904
905        let app = Router::new()
906            .route("/", index_handler)
907            .route(
908                "/chains/{chain_id}/applications/{application_id}",
909                application_handler,
910            )
911            .route("/ready", axum::routing::get(|| async { "ready!" }))
912            .route_service("/ws", GraphQLSubscription::new(self.schema()))
913            .layer(Extension(self.clone()))
914            // TODO(#551): Provide application authentication.
915            .layer(CorsLayer::permissive());
916
917        info!("GraphiQL IDE: http://localhost:{}", port);
918
919        let storage = self.context.lock().await.storage().clone();
920
921        let chain_listener = ChainListener::new(
922            self.config,
923            self.context,
924            storage,
925            cancellation_token.clone(),
926        )
927        .run(true)
928        .await?;
929        let mut chain_listener = Box::pin(chain_listener).fuse();
930        let tcp_listener =
931            tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
932        let server = axum::serve(tcp_listener, app)
933            .with_graceful_shutdown(cancellation_token.cancelled_owned())
934            .into_future();
935        futures::select! {
936            result = chain_listener => result?,
937            result = Box::pin(server).fuse() => result?,
938        };
939
940        Ok(())
941    }
942
943    /// Handles service queries for user applications (including mutations).
944    async fn handle_service_request(
945        &self,
946        application_id: ApplicationId,
947        request: Vec<u8>,
948        chain_id: ChainId,
949        block_hash: Option<CryptoHash>,
950    ) -> Result<Vec<u8>, NodeServiceError> {
951        let QueryOutcome {
952            response,
953            operations,
954        } = self
955            .query_user_application(application_id, request, chain_id, block_hash)
956            .await?;
957        if operations.is_empty() {
958            return Ok(response);
959        }
960
961        trace!("Query requested a new block with operations: {operations:?}");
962        let client = self.context.lock().await.make_chain_client(chain_id);
963        let hash = loop {
964            let timeout = match client
965                .execute_operations(operations.clone(), vec![])
966                .await?
967            {
968                ClientOutcome::Committed(certificate) => break certificate.hash(),
969                ClientOutcome::WaitForTimeout(timeout) => timeout,
970            };
971            let mut stream = client.subscribe().map_err(|_| {
972                chain_client::Error::InternalError("Could not subscribe to the local node.")
973            })?;
974            util::wait_for_next_round(&mut stream, timeout).await;
975        };
976        let response = async_graphql::Response::new(hash.to_value());
977        Ok(serde_json::to_vec(&response)?)
978    }
979
980    /// Queries a user application, returning the raw [`QueryOutcome`].
981    async fn query_user_application(
982        &self,
983        application_id: ApplicationId,
984        bytes: Vec<u8>,
985        chain_id: ChainId,
986        block_hash: Option<CryptoHash>,
987    ) -> Result<QueryOutcome<Vec<u8>>, NodeServiceError> {
988        let query = Query::User {
989            application_id,
990            bytes,
991        };
992        let client = self.context.lock().await.make_chain_client(chain_id);
993        let QueryOutcome {
994            response,
995            operations,
996        } = client.query_application(query, block_hash).await?;
997        match response {
998            QueryResponse::System(_) => {
999                unreachable!("cannot get a system response for a user query")
1000            }
1001            QueryResponse::User(user_response_bytes) => Ok(QueryOutcome {
1002                response: user_response_bytes,
1003                operations,
1004            }),
1005        }
1006    }
1007
1008    /// Executes a GraphQL query and generates a response for our `Schema`.
1009    async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
1010        service
1011            .0
1012            .schema()
1013            .execute(request.into_inner())
1014            .await
1015            .into()
1016    }
1017
1018    /// Executes a GraphQL query against an application.
1019    /// Pattern matches on the `OperationType` of the query and routes the query
1020    /// accordingly.
1021    async fn application_handler(
1022        Path((chain_id, application_id)): Path<(String, String)>,
1023        service: Extension<Self>,
1024        request: String,
1025    ) -> Result<Vec<u8>, NodeServiceError> {
1026        let chain_id: ChainId = chain_id.parse().map_err(NodeServiceError::InvalidChainId)?;
1027        let application_id: ApplicationId = application_id.parse()?;
1028
1029        debug!(
1030            %chain_id,
1031            %application_id,
1032            "processing request for application:\n{:?}",
1033            &request
1034        );
1035        let response = service
1036            .0
1037            .handle_service_request(application_id, request.into_bytes(), chain_id, None)
1038            .await?;
1039
1040        Ok(response)
1041    }
1042}