linera_service/
node_service.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{borrow::Cow, future::IntoFuture, iter, net::SocketAddr, num::NonZeroU16, sync::Arc};
5
6use async_graphql::{
7    futures_util::Stream, resolver_utils::ContainerType, Error, MergedObject, OutputType,
8    ScalarType, Schema, SimpleObject, Subscription,
9};
10use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
11use axum::{extract::Path, http::StatusCode, response, response::IntoResponse, Extension, Router};
12use futures::{lock::Mutex, Future, FutureExt as _};
13use linera_base::{
14    crypto::{CryptoError, CryptoHash},
15    data_types::{
16        Amount, ApplicationDescription, ApplicationPermissions, Bytecode, Epoch, TimeDelta,
17    },
18    identifiers::{AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId},
19    ownership::{ChainOwnership, TimeoutConfig},
20    vm::VmRuntime,
21    BcsHexParseError,
22};
23use linera_chain::{
24    types::{ConfirmedBlock, GenericCertificate},
25    ChainStateView,
26};
27use linera_client::chain_listener::{ChainListener, ChainListenerConfig, ClientContext};
28use linera_core::{
29    client::{ChainClient, ChainClientError},
30    data_types::ClientOutcome,
31    worker::Notification,
32};
33use linera_execution::{
34    committee::Committee,
35    system::{AdminOperation, Recipient},
36    Operation, Query, QueryOutcome, QueryResponse, SystemOperation,
37};
38use linera_sdk::linera_base_types::BlobContent;
39use serde::{Deserialize, Serialize};
40use serde_json::json;
41use thiserror::Error as ThisError;
42use tokio::sync::OwnedRwLockReadGuard;
43use tokio_util::sync::CancellationToken;
44use tower_http::cors::CorsLayer;
45use tracing::{debug, error, info, instrument, trace};
46
47use crate::util;
48
49#[derive(SimpleObject, Serialize, Deserialize, Clone)]
50pub struct Chains {
51    pub list: Vec<ChainId>,
52    pub default: Option<ChainId>,
53}
54
55/// Our root GraphQL query type.
56pub struct QueryRoot<C> {
57    context: Arc<Mutex<C>>,
58    port: NonZeroU16,
59    default_chain: Option<ChainId>,
60}
61
62/// Our root GraphQL subscription type.
63pub struct SubscriptionRoot<C> {
64    context: Arc<Mutex<C>>,
65}
66
67/// Our root GraphQL mutation type.
68pub struct MutationRoot<C> {
69    context: Arc<Mutex<C>>,
70}
71
72#[derive(Debug, ThisError)]
73enum NodeServiceError {
74    #[error(transparent)]
75    ChainClientError(#[from] ChainClientError),
76    #[error(transparent)]
77    BcsHexError(#[from] BcsHexParseError),
78    #[error(transparent)]
79    JsonError(#[from] serde_json::Error),
80    #[error("malformed chain ID: {0}")]
81    InvalidChainId(CryptoError),
82}
83
84impl IntoResponse for NodeServiceError {
85    fn into_response(self) -> response::Response {
86        let tuple = match self {
87            NodeServiceError::BcsHexError(e) => (StatusCode::BAD_REQUEST, vec![e.to_string()]),
88            NodeServiceError::ChainClientError(e) => {
89                (StatusCode::INTERNAL_SERVER_ERROR, vec![e.to_string()])
90            }
91            NodeServiceError::JsonError(e) => {
92                (StatusCode::INTERNAL_SERVER_ERROR, vec![e.to_string()])
93            }
94            NodeServiceError::InvalidChainId(_) => (
95                StatusCode::BAD_REQUEST,
96                vec!["invalid chain ID".to_string()],
97            ),
98        };
99        let tuple = (tuple.0, json!({"error": tuple.1}).to_string());
100        tuple.into_response()
101    }
102}
103
104#[Subscription]
105impl<C> SubscriptionRoot<C>
106where
107    C: ClientContext + 'static,
108{
109    /// Subscribes to notifications from the specified chain.
110    async fn notifications(
111        &self,
112        chain_id: ChainId,
113    ) -> Result<impl Stream<Item = Notification>, Error> {
114        let client = self.context.lock().await.make_chain_client(chain_id);
115        Ok(client.subscribe().await?)
116    }
117}
118
119impl<C> MutationRoot<C>
120where
121    C: ClientContext,
122{
123    async fn execute_system_operation(
124        &self,
125        system_operation: SystemOperation,
126        chain_id: ChainId,
127    ) -> Result<CryptoHash, Error> {
128        let certificate = self
129            .apply_client_command(&chain_id, move |client| {
130                let operation = Operation::system(system_operation.clone());
131                async move {
132                    let result = client
133                        .execute_operation(operation)
134                        .await
135                        .map_err(Error::from);
136                    (result, client)
137                }
138            })
139            .await?;
140        Ok(certificate.hash())
141    }
142
143    /// Applies the given function to the chain client.
144    /// Updates the wallet regardless of the outcome. As long as the function returns a round
145    /// timeout, it will wait and retry.
146    async fn apply_client_command<F, Fut, T>(
147        &self,
148        chain_id: &ChainId,
149        mut f: F,
150    ) -> Result<T, Error>
151    where
152        F: FnMut(ChainClient<C::Environment>) -> Fut,
153        Fut: Future<Output = (Result<ClientOutcome<T>, Error>, ChainClient<C::Environment>)>,
154    {
155        loop {
156            let client = self.context.lock().await.make_chain_client(*chain_id);
157            let mut stream = client.subscribe().await?;
158            let (result, client) = f(client).await;
159            self.context.lock().await.update_wallet(&client).await?;
160            let timeout = match result? {
161                ClientOutcome::Committed(t) => return Ok(t),
162                ClientOutcome::WaitForTimeout(timeout) => timeout,
163            };
164            drop(client);
165            util::wait_for_next_round(&mut stream, timeout).await;
166        }
167    }
168}
169
170#[async_graphql::Object(cache_control(no_cache))]
171impl<C> MutationRoot<C>
172where
173    C: ClientContext + 'static,
174{
175    /// Processes the inbox and returns the lists of certificate hashes that were created, if any.
176    async fn process_inbox(&self, chain_id: ChainId) -> Result<Vec<CryptoHash>, Error> {
177        let mut hashes = Vec::new();
178        loop {
179            let client = self.context.lock().await.make_chain_client(chain_id);
180            client.synchronize_from_validators().await?;
181            let result = client.process_inbox_without_prepare().await;
182            self.context.lock().await.update_wallet(&client).await?;
183            let (certificates, maybe_timeout) = result?;
184            hashes.extend(certificates.into_iter().map(|cert| cert.hash()));
185            match maybe_timeout {
186                None => return Ok(hashes),
187                Some(timestamp) => {
188                    let mut stream = client.subscribe().await?;
189                    drop(client);
190                    util::wait_for_next_round(&mut stream, timestamp).await;
191                }
192            }
193        }
194    }
195
196    /// Retries the pending block that was unsuccessfully proposed earlier.
197    async fn retry_pending_block(&self, chain_id: ChainId) -> Result<Option<CryptoHash>, Error> {
198        let client = self.context.lock().await.make_chain_client(chain_id);
199        let outcome = client.process_pending_block().await?;
200        self.context.lock().await.update_wallet(&client).await?;
201        match outcome {
202            ClientOutcome::Committed(Some(certificate)) => Ok(Some(certificate.hash())),
203            ClientOutcome::Committed(None) => Ok(None),
204            ClientOutcome::WaitForTimeout(timeout) => Err(Error::from(format!(
205                "Please try again at {}",
206                timeout.timestamp
207            ))),
208        }
209    }
210
211    /// Transfers `amount` units of value from the given owner's account to the recipient.
212    /// If no owner is given, try to take the units out of the chain account.
213    async fn transfer(
214        &self,
215        chain_id: ChainId,
216        owner: AccountOwner,
217        recipient: Recipient,
218        amount: Amount,
219    ) -> Result<CryptoHash, Error> {
220        self.apply_client_command(&chain_id, move |client| async move {
221            let result = client
222                .transfer(owner, amount, recipient)
223                .await
224                .map_err(Error::from)
225                .map(|outcome| outcome.map(|certificate| certificate.hash()));
226            (result, client)
227        })
228        .await
229    }
230
231    /// Claims `amount` units of value from the given owner's account in the remote
232    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
233    /// process the message.
234    async fn claim(
235        &self,
236        chain_id: ChainId,
237        owner: AccountOwner,
238        target_id: ChainId,
239        recipient: Recipient,
240        amount: Amount,
241    ) -> Result<CryptoHash, Error> {
242        self.apply_client_command(&chain_id, move |client| async move {
243            let result = client
244                .claim(owner, target_id, recipient, amount)
245                .await
246                .map_err(Error::from)
247                .map(|outcome| outcome.map(|certificate| certificate.hash()));
248            (result, client)
249        })
250        .await
251    }
252
253    /// Test if a data blob is readable from a transaction in the current chain.
254    // TODO(#2490): Consider removing or renaming this.
255    async fn read_data_blob(
256        &self,
257        chain_id: ChainId,
258        hash: CryptoHash,
259    ) -> Result<CryptoHash, Error> {
260        self.apply_client_command(&chain_id, move |client| async move {
261            let result = client
262                .read_data_blob(hash)
263                .await
264                .map_err(Error::from)
265                .map(|outcome| outcome.map(|certificate| certificate.hash()));
266            (result, client)
267        })
268        .await
269    }
270
271    /// Creates (or activates) a new chain with the given owner.
272    /// This will automatically subscribe to the future committees created by `admin_id`.
273    async fn open_chain(
274        &self,
275        chain_id: ChainId,
276        owner: AccountOwner,
277        balance: Option<Amount>,
278    ) -> Result<ChainId, Error> {
279        let ownership = ChainOwnership::single(owner);
280        let balance = balance.unwrap_or(Amount::ZERO);
281        let description = self
282            .apply_client_command(&chain_id, move |client| {
283                let ownership = ownership.clone();
284                async move {
285                    let result = client
286                        .open_chain(ownership, ApplicationPermissions::default(), balance)
287                        .await
288                        .map_err(Error::from)
289                        .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
290                    (result, client)
291                }
292            })
293            .await?;
294        Ok(description.id())
295    }
296
297    /// Creates (or activates) a new chain by installing the given authentication keys.
298    /// This will automatically subscribe to the future committees created by `admin_id`.
299    #[expect(clippy::too_many_arguments)]
300    async fn open_multi_owner_chain(
301        &self,
302        chain_id: ChainId,
303        application_permissions: Option<ApplicationPermissions>,
304        owners: Vec<AccountOwner>,
305        weights: Option<Vec<u64>>,
306        multi_leader_rounds: Option<u32>,
307        balance: Option<Amount>,
308        #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
309        fast_round_ms: Option<u64>,
310        #[graphql(
311            desc = "The duration of the first single-leader and all multi-leader rounds",
312            default = 10_000
313        )]
314        base_timeout_ms: u64,
315        #[graphql(
316            desc = "The number of milliseconds by which the timeout increases after each \
317                    single-leader round",
318            default = 1_000
319        )]
320        timeout_increment_ms: u64,
321        #[graphql(
322            desc = "The age of an incoming tracked or protected message after which the \
323                    validators start transitioning the chain to fallback mode, in milliseconds.",
324            default = 86_400_000
325        )]
326        fallback_duration_ms: u64,
327    ) -> Result<ChainId, Error> {
328        let owners = if let Some(weights) = weights {
329            if weights.len() != owners.len() {
330                return Err(Error::new(format!(
331                    "There are {} owners but {} weights.",
332                    owners.len(),
333                    weights.len()
334                )));
335            }
336            owners.into_iter().zip(weights).collect::<Vec<_>>()
337        } else {
338            owners
339                .into_iter()
340                .zip(iter::repeat(100))
341                .collect::<Vec<_>>()
342        };
343        let multi_leader_rounds = multi_leader_rounds.unwrap_or(u32::MAX);
344        let timeout_config = TimeoutConfig {
345            fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
346            base_timeout: TimeDelta::from_millis(base_timeout_ms),
347            timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
348            fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
349        };
350        let ownership = ChainOwnership::multiple(owners, multi_leader_rounds, timeout_config);
351        let balance = balance.unwrap_or(Amount::ZERO);
352        let description = self
353            .apply_client_command(&chain_id, move |client| {
354                let ownership = ownership.clone();
355                let application_permissions = application_permissions.clone().unwrap_or_default();
356                async move {
357                    let result = client
358                        .open_chain(ownership, application_permissions, balance)
359                        .await
360                        .map_err(Error::from)
361                        .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
362                    (result, client)
363                }
364            })
365            .await?;
366        Ok(description.id())
367    }
368
369    /// Closes the chain. Returns `None` if it was already closed.
370    async fn close_chain(&self, chain_id: ChainId) -> Result<Option<CryptoHash>, Error> {
371        let maybe_cert = self
372            .apply_client_command(&chain_id, |client| async move {
373                let result = client.close_chain().await.map_err(Error::from);
374                (result, client)
375            })
376            .await?;
377        Ok(maybe_cert.as_ref().map(GenericCertificate::hash))
378    }
379
380    /// Changes the authentication key of the chain.
381    async fn change_owner(
382        &self,
383        chain_id: ChainId,
384        new_owner: AccountOwner,
385    ) -> Result<CryptoHash, Error> {
386        let operation = SystemOperation::ChangeOwnership {
387            super_owners: vec![new_owner],
388            owners: Vec::new(),
389            multi_leader_rounds: 2,
390            open_multi_leader_rounds: false,
391            timeout_config: TimeoutConfig::default(),
392        };
393        self.execute_system_operation(operation, chain_id).await
394    }
395
396    /// Changes the authentication key of the chain.
397    #[expect(clippy::too_many_arguments)]
398    async fn change_multiple_owners(
399        &self,
400        chain_id: ChainId,
401        new_owners: Vec<AccountOwner>,
402        new_weights: Vec<u64>,
403        multi_leader_rounds: u32,
404        open_multi_leader_rounds: bool,
405        #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
406        fast_round_ms: Option<u64>,
407        #[graphql(
408            desc = "The duration of the first single-leader and all multi-leader rounds",
409            default = 10_000
410        )]
411        base_timeout_ms: u64,
412        #[graphql(
413            desc = "The number of milliseconds by which the timeout increases after each \
414                    single-leader round",
415            default = 1_000
416        )]
417        timeout_increment_ms: u64,
418        #[graphql(
419            desc = "The age of an incoming tracked or protected message after which the \
420                    validators start transitioning the chain to fallback mode, in milliseconds.",
421            default = 86_400_000
422        )]
423        fallback_duration_ms: u64,
424    ) -> Result<CryptoHash, Error> {
425        let operation = SystemOperation::ChangeOwnership {
426            super_owners: Vec::new(),
427            owners: new_owners.into_iter().zip(new_weights).collect(),
428            multi_leader_rounds,
429            open_multi_leader_rounds,
430            timeout_config: TimeoutConfig {
431                fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
432                base_timeout: TimeDelta::from_millis(base_timeout_ms),
433                timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
434                fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
435            },
436        };
437        self.execute_system_operation(operation, chain_id).await
438    }
439
440    /// Changes the application permissions configuration on this chain.
441    #[expect(clippy::too_many_arguments)]
442    async fn change_application_permissions(
443        &self,
444        chain_id: ChainId,
445        close_chain: Vec<ApplicationId>,
446        execute_operations: Option<Vec<ApplicationId>>,
447        mandatory_applications: Vec<ApplicationId>,
448        change_application_permissions: Vec<ApplicationId>,
449        call_service_as_oracle: Option<Vec<ApplicationId>>,
450        make_http_requests: Option<Vec<ApplicationId>>,
451    ) -> Result<CryptoHash, Error> {
452        let operation = SystemOperation::ChangeApplicationPermissions(ApplicationPermissions {
453            execute_operations,
454            mandatory_applications,
455            close_chain,
456            change_application_permissions,
457            call_service_as_oracle,
458            make_http_requests,
459        });
460        self.execute_system_operation(operation, chain_id).await
461    }
462
463    /// (admin chain only) Registers a new committee. This will notify the subscribers of
464    /// the admin chain so that they can migrate to the new epoch (by accepting the
465    /// notification as an "incoming message" in a next block).
466    async fn create_committee(
467        &self,
468        chain_id: ChainId,
469        committee: Committee,
470    ) -> Result<CryptoHash, Error> {
471        Ok(self
472            .apply_client_command(&chain_id, move |client| {
473                let committee = committee.clone();
474                async move {
475                    let result = client
476                        .stage_new_committee(committee)
477                        .await
478                        .map_err(Error::from);
479                    (result, client)
480                }
481            })
482            .await?
483            .hash())
484    }
485
486    /// (admin chain only) Removes a committee. Once this message is accepted by a chain,
487    /// blocks from the retired epoch will not be accepted until they are followed (hence
488    /// re-certified) by a block certified by a recent committee.
489    async fn remove_committee(&self, chain_id: ChainId, epoch: Epoch) -> Result<CryptoHash, Error> {
490        let operation = SystemOperation::Admin(AdminOperation::RemoveCommittee { epoch });
491        self.execute_system_operation(operation, chain_id).await
492    }
493
494    /// Publishes a new application module.
495    async fn publish_module(
496        &self,
497        chain_id: ChainId,
498        contract: Bytecode,
499        service: Bytecode,
500        vm_runtime: VmRuntime,
501    ) -> Result<ModuleId, Error> {
502        self.apply_client_command(&chain_id, move |client| {
503            let contract = contract.clone();
504            let service = service.clone();
505            async move {
506                let result = client
507                    .publish_module(contract, service, vm_runtime)
508                    .await
509                    .map_err(Error::from)
510                    .map(|outcome| outcome.map(|(module_id, _)| module_id));
511                (result, client)
512            }
513        })
514        .await
515    }
516
517    /// Publishes a new data blob.
518    async fn publish_data_blob(
519        &self,
520        chain_id: ChainId,
521        bytes: Vec<u8>,
522    ) -> Result<CryptoHash, Error> {
523        self.apply_client_command(&chain_id, |client| {
524            let bytes = bytes.clone();
525            async move {
526                let result = client.publish_data_blob(bytes).await.map_err(Error::from);
527                (result, client)
528            }
529        })
530        .await
531        .map(|_| CryptoHash::new(&BlobContent::new_data(bytes)))
532    }
533
534    /// Creates a new application.
535    async fn create_application(
536        &self,
537        chain_id: ChainId,
538        module_id: ModuleId,
539        parameters: String,
540        instantiation_argument: String,
541        required_application_ids: Vec<ApplicationId>,
542    ) -> Result<ApplicationId, Error> {
543        self.apply_client_command(&chain_id, move |client| {
544            let parameters = parameters.as_bytes().to_vec();
545            let instantiation_argument = instantiation_argument.as_bytes().to_vec();
546            let required_application_ids = required_application_ids.clone();
547            async move {
548                let result = client
549                    .create_application_untyped(
550                        module_id,
551                        parameters,
552                        instantiation_argument,
553                        required_application_ids,
554                    )
555                    .await
556                    .map_err(Error::from)
557                    .map(|outcome| outcome.map(|(application_id, _)| application_id));
558                (result, client)
559            }
560        })
561        .await
562    }
563}
564
565#[async_graphql::Object(cache_control(no_cache))]
566impl<C> QueryRoot<C>
567where
568    C: ClientContext + 'static,
569{
570    async fn chain(
571        &self,
572        chain_id: ChainId,
573    ) -> Result<
574        ChainStateExtendedView<<C::Environment as linera_core::Environment>::StorageContext>,
575        Error,
576    > {
577        let client = self.context.lock().await.make_chain_client(chain_id);
578        let view = client.chain_state_view().await?;
579        Ok(ChainStateExtendedView::new(view))
580    }
581
582    async fn applications(&self, chain_id: ChainId) -> Result<Vec<ApplicationOverview>, Error> {
583        let client = self.context.lock().await.make_chain_client(chain_id);
584        let applications = client
585            .chain_state_view()
586            .await?
587            .execution_state
588            .list_applications()
589            .await?;
590
591        let overviews = applications
592            .into_iter()
593            .map(|(id, description)| ApplicationOverview::new(id, description, self.port, chain_id))
594            .collect();
595
596        Ok(overviews)
597    }
598
599    async fn chains(&self) -> Result<Chains, Error> {
600        Ok(Chains {
601            list: self.context.lock().await.wallet().chain_ids(),
602            default: self.default_chain,
603        })
604    }
605
606    async fn block(
607        &self,
608        hash: Option<CryptoHash>,
609        chain_id: ChainId,
610    ) -> Result<Option<ConfirmedBlock>, Error> {
611        let client = self.context.lock().await.make_chain_client(chain_id);
612        let hash = match hash {
613            Some(hash) => Some(hash),
614            None => {
615                let view = client.chain_state_view().await?;
616                view.tip_state.get().block_hash
617            }
618        };
619        if let Some(hash) = hash {
620            let block = client.read_confirmed_block(hash).await?;
621            Ok(Some(block))
622        } else {
623            Ok(None)
624        }
625    }
626
627    async fn events_from_index(
628        &self,
629        chain_id: ChainId,
630        stream_id: StreamId,
631        start_index: u32,
632    ) -> Result<Vec<IndexAndEvent>, Error> {
633        Ok(self
634            .context
635            .lock()
636            .await
637            .make_chain_client(chain_id)
638            .events_from_index(stream_id, start_index)
639            .await?)
640    }
641
642    async fn blocks(
643        &self,
644        from: Option<CryptoHash>,
645        chain_id: ChainId,
646        limit: Option<u32>,
647    ) -> Result<Vec<ConfirmedBlock>, Error> {
648        let client = self.context.lock().await.make_chain_client(chain_id);
649        let limit = limit.unwrap_or(10);
650        let from = match from {
651            Some(from) => Some(from),
652            None => {
653                let view = client.chain_state_view().await?;
654                view.tip_state.get().block_hash
655            }
656        };
657        let Some(from) = from else {
658            return Ok(vec![]);
659        };
660        let mut hash = Some(from);
661        let mut values = Vec::new();
662        for _ in 0..limit {
663            let Some(next_hash) = hash else {
664                break;
665            };
666            let value = client.read_confirmed_block(next_hash).await?;
667            hash = value.block().header.previous_block_hash;
668            values.push(value);
669        }
670        Ok(values)
671    }
672
673    /// Returns the version information on this node service.
674    async fn version(&self) -> linera_version::VersionInfo {
675        linera_version::VersionInfo::default()
676    }
677}
678
679// What follows is a hack to add a chain_id field to `ChainStateView` based on
680// https://async-graphql.github.io/async-graphql/en/merging_objects.html
681
682struct ChainStateViewExtension(ChainId);
683
684#[async_graphql::Object(cache_control(no_cache))]
685impl ChainStateViewExtension {
686    async fn chain_id(&self) -> ChainId {
687        self.0
688    }
689}
690
691#[derive(MergedObject)]
692struct ChainStateExtendedView<C>(ChainStateViewExtension, ReadOnlyChainStateView<C>)
693where
694    C: linera_views::context::Context + Clone + Send + Sync + 'static,
695    C::Extra: linera_execution::ExecutionRuntimeContext;
696
697/// A wrapper type that allows proxying GraphQL queries to a [`ChainStateView`] that's behind an
698/// [`OwnedRwLockReadGuard`].
699pub struct ReadOnlyChainStateView<C>(OwnedRwLockReadGuard<ChainStateView<C>>)
700where
701    C: linera_views::context::Context + Clone + Send + Sync + 'static;
702
703impl<C> ContainerType for ReadOnlyChainStateView<C>
704where
705    C: linera_views::context::Context + Clone + Send + Sync + 'static,
706{
707    async fn resolve_field(
708        &self,
709        context: &async_graphql::Context<'_>,
710    ) -> async_graphql::ServerResult<Option<async_graphql::Value>> {
711        self.0.resolve_field(context).await
712    }
713}
714
715impl<C> OutputType for ReadOnlyChainStateView<C>
716where
717    C: linera_views::context::Context + Clone + Send + Sync + 'static,
718{
719    fn type_name() -> Cow<'static, str> {
720        ChainStateView::<C>::type_name()
721    }
722
723    fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
724        ChainStateView::<C>::create_type_info(registry)
725    }
726
727    async fn resolve(
728        &self,
729        context: &async_graphql::ContextSelectionSet<'_>,
730        field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
731    ) -> async_graphql::ServerResult<async_graphql::Value> {
732        self.0.resolve(context, field).await
733    }
734}
735
736impl<C> ChainStateExtendedView<C>
737where
738    C: linera_views::context::Context + Clone + Send + Sync + 'static,
739    C::Extra: linera_execution::ExecutionRuntimeContext,
740{
741    fn new(view: OwnedRwLockReadGuard<ChainStateView<C>>) -> Self {
742        Self(
743            ChainStateViewExtension(view.chain_id()),
744            ReadOnlyChainStateView(view),
745        )
746    }
747}
748
749#[derive(SimpleObject)]
750pub struct ApplicationOverview {
751    id: ApplicationId,
752    description: ApplicationDescription,
753    link: String,
754}
755
756impl ApplicationOverview {
757    fn new(
758        id: ApplicationId,
759        description: ApplicationDescription,
760        port: NonZeroU16,
761        chain_id: ChainId,
762    ) -> Self {
763        Self {
764            id,
765            description,
766            link: format!(
767                "http://localhost:{}/chains/{}/applications/{}",
768                port.get(),
769                chain_id,
770                id
771            ),
772        }
773    }
774}
775
776/// The `NodeService` is a server that exposes a web-server to the client.
777/// The node service is primarily used to explore the state of a chain in GraphQL.
778pub struct NodeService<C>
779where
780    C: ClientContext + 'static,
781{
782    config: ChainListenerConfig,
783    port: NonZeroU16,
784    default_chain: Option<ChainId>,
785    context: Arc<Mutex<C>>,
786}
787
788impl<C> Clone for NodeService<C>
789where
790    C: ClientContext + 'static,
791{
792    fn clone(&self) -> Self {
793        Self {
794            config: self.config.clone(),
795            port: self.port,
796            default_chain: self.default_chain,
797            context: Arc::clone(&self.context),
798        }
799    }
800}
801
802impl<C> NodeService<C>
803where
804    C: ClientContext,
805{
806    /// Creates a new instance of the node service given a client chain and a port.
807    pub async fn new(
808        config: ChainListenerConfig,
809        port: NonZeroU16,
810        default_chain: Option<ChainId>,
811        context: C,
812    ) -> Self {
813        Self {
814            config,
815            port,
816            default_chain,
817            context: Arc::new(Mutex::new(context)),
818        }
819    }
820
821    pub fn schema(&self) -> Schema<QueryRoot<C>, MutationRoot<C>, SubscriptionRoot<C>> {
822        Schema::build(
823            QueryRoot {
824                context: Arc::clone(&self.context),
825                port: self.port,
826                default_chain: self.default_chain,
827            },
828            MutationRoot {
829                context: Arc::clone(&self.context),
830            },
831            SubscriptionRoot {
832                context: Arc::clone(&self.context),
833            },
834        )
835        .finish()
836    }
837
838    /// Runs the node service.
839    #[instrument(name = "node_service", level = "info", skip_all, fields(port = ?self.port))]
840    pub async fn run(self, cancellation_token: CancellationToken) -> Result<(), anyhow::Error> {
841        let port = self.port.get();
842        let index_handler = axum::routing::get(util::graphiql).post(Self::index_handler);
843        let application_handler =
844            axum::routing::get(util::graphiql).post(Self::application_handler);
845
846        let app = Router::new()
847            .route("/", index_handler)
848            .route(
849                "/chains/{chain_id}/applications/{application_id}",
850                application_handler,
851            )
852            .route("/ready", axum::routing::get(|| async { "ready!" }))
853            .route_service("/ws", GraphQLSubscription::new(self.schema()))
854            .layer(Extension(self.clone()))
855            // TODO(#551): Provide application authentication.
856            .layer(CorsLayer::permissive());
857
858        info!("GraphiQL IDE: http://localhost:{}", port);
859
860        let storage = self.context.lock().await.storage().clone();
861
862        let chain_listener =
863            ChainListener::new(self.config, self.context, storage, cancellation_token).run();
864        let mut chain_listener = Box::pin(chain_listener).fuse();
865        let tcp_listener =
866            tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
867        let server = axum::serve(tcp_listener, app).into_future();
868        futures::select! {
869            result = chain_listener => result?,
870            result = Box::pin(server).fuse() => result?,
871        };
872
873        Ok(())
874    }
875
876    /// Handles service queries for user applications (including mutations).
877    async fn handle_service_request(
878        &self,
879        application_id: ApplicationId,
880        request: Vec<u8>,
881        chain_id: ChainId,
882    ) -> Result<Vec<u8>, NodeServiceError> {
883        let QueryOutcome {
884            response,
885            operations,
886        } = self
887            .query_user_application(application_id, request, chain_id)
888            .await?;
889        if operations.is_empty() {
890            return Ok(response);
891        }
892
893        trace!("Query requested a new block with operations: {operations:?}");
894        let client = self.context.lock().await.make_chain_client(chain_id);
895        let hash = loop {
896            let timeout = match client
897                .execute_operations(operations.clone(), vec![])
898                .await?
899            {
900                ClientOutcome::Committed(certificate) => break certificate.hash(),
901                ClientOutcome::WaitForTimeout(timeout) => timeout,
902            };
903            let mut stream = client.subscribe().await.map_err(|_| {
904                ChainClientError::InternalError("Could not subscribe to the local node.")
905            })?;
906            util::wait_for_next_round(&mut stream, timeout).await;
907        };
908        let response = async_graphql::Response::new(hash.to_value());
909        Ok(serde_json::to_vec(&response)?)
910    }
911
912    /// Queries a user application, returning the raw [`QueryOutcome`].
913    async fn query_user_application(
914        &self,
915        application_id: ApplicationId,
916        bytes: Vec<u8>,
917        chain_id: ChainId,
918    ) -> Result<QueryOutcome<Vec<u8>>, NodeServiceError> {
919        let query = Query::User {
920            application_id,
921            bytes,
922        };
923        let client = self.context.lock().await.make_chain_client(chain_id);
924        let QueryOutcome {
925            response,
926            operations,
927        } = client.query_application(query).await?;
928        match response {
929            QueryResponse::System(_) => {
930                unreachable!("cannot get a system response for a user query")
931            }
932            QueryResponse::User(user_response_bytes) => Ok(QueryOutcome {
933                response: user_response_bytes,
934                operations,
935            }),
936        }
937    }
938
939    /// Executes a GraphQL query and generates a response for our `Schema`.
940    async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
941        service
942            .0
943            .schema()
944            .execute(request.into_inner())
945            .await
946            .into()
947    }
948
949    /// Executes a GraphQL query against an application.
950    /// Pattern matches on the `OperationType` of the query and routes the query
951    /// accordingly.
952    async fn application_handler(
953        Path((chain_id, application_id)): Path<(String, String)>,
954        service: Extension<Self>,
955        request: String,
956    ) -> Result<Vec<u8>, NodeServiceError> {
957        let chain_id: ChainId = chain_id.parse().map_err(NodeServiceError::InvalidChainId)?;
958        let application_id: ApplicationId = application_id.parse()?;
959
960        debug!(
961            "Processing request for application {application_id} on chain {chain_id}:\n{:?}",
962            &request
963        );
964        let response = service
965            .0
966            .handle_service_request(application_id, request.into_bytes(), chain_id)
967            .await?;
968
969        Ok(response)
970    }
971}