linera_service/
node_service.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{borrow::Cow, future::IntoFuture, iter, net::SocketAddr, num::NonZeroU16, sync::Arc};
5
6use async_graphql::{
7    futures_util::Stream, resolver_utils::ContainerType, Error, MergedObject, OutputType,
8    ScalarType, Schema, SimpleObject, Subscription,
9};
10use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
11use axum::{extract::Path, http::StatusCode, response, response::IntoResponse, Extension, Router};
12use futures::{lock::Mutex, Future, FutureExt as _};
13use linera_base::{
14    crypto::{CryptoError, CryptoHash},
15    data_types::{
16        Amount, ApplicationDescription, ApplicationPermissions, Bytecode, Epoch, TimeDelta,
17    },
18    identifiers::{
19        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
20    },
21    ownership::{ChainOwnership, TimeoutConfig},
22    vm::VmRuntime,
23    BcsHexParseError,
24};
25use linera_chain::{
26    types::{ConfirmedBlock, GenericCertificate},
27    ChainStateView,
28};
29use linera_client::chain_listener::{ChainListener, ChainListenerConfig, ClientContext};
30use linera_core::{
31    client::{ChainClient, ChainClientError},
32    data_types::ClientOutcome,
33    worker::Notification,
34};
35use linera_execution::{
36    committee::Committee, system::AdminOperation, Operation, Query, QueryOutcome, QueryResponse,
37    SystemOperation,
38};
39use linera_sdk::linera_base_types::BlobContent;
40use serde::{Deserialize, Serialize};
41use serde_json::json;
42use thiserror::Error as ThisError;
43use tokio::sync::OwnedRwLockReadGuard;
44use tokio_util::sync::CancellationToken;
45use tower_http::cors::CorsLayer;
46use tracing::{debug, error, info, instrument, trace};
47
48use crate::util;
49
50#[derive(SimpleObject, Serialize, Deserialize, Clone)]
51pub struct Chains {
52    pub list: Vec<ChainId>,
53    pub default: Option<ChainId>,
54}
55
56/// Our root GraphQL query type.
57pub struct QueryRoot<C> {
58    context: Arc<Mutex<C>>,
59    port: NonZeroU16,
60    default_chain: Option<ChainId>,
61}
62
63/// Our root GraphQL subscription type.
64pub struct SubscriptionRoot<C> {
65    context: Arc<Mutex<C>>,
66}
67
68/// Our root GraphQL mutation type.
69pub struct MutationRoot<C> {
70    context: Arc<Mutex<C>>,
71}
72
73#[derive(Debug, ThisError)]
74enum NodeServiceError {
75    #[error(transparent)]
76    ChainClientError(#[from] ChainClientError),
77    #[error(transparent)]
78    BcsHexError(#[from] BcsHexParseError),
79    #[error(transparent)]
80    JsonError(#[from] serde_json::Error),
81    #[error("malformed chain ID: {0}")]
82    InvalidChainId(CryptoError),
83}
84
85impl IntoResponse for NodeServiceError {
86    fn into_response(self) -> response::Response {
87        let tuple = match self {
88            NodeServiceError::BcsHexError(e) => (StatusCode::BAD_REQUEST, vec![e.to_string()]),
89            NodeServiceError::ChainClientError(e) => {
90                (StatusCode::INTERNAL_SERVER_ERROR, vec![e.to_string()])
91            }
92            NodeServiceError::JsonError(e) => {
93                (StatusCode::INTERNAL_SERVER_ERROR, vec![e.to_string()])
94            }
95            NodeServiceError::InvalidChainId(_) => (
96                StatusCode::BAD_REQUEST,
97                vec!["invalid chain ID".to_string()],
98            ),
99        };
100        let tuple = (tuple.0, json!({"error": tuple.1}).to_string());
101        tuple.into_response()
102    }
103}
104
105#[Subscription]
106impl<C> SubscriptionRoot<C>
107where
108    C: ClientContext + 'static,
109{
110    /// Subscribes to notifications from the specified chain.
111    async fn notifications(
112        &self,
113        chain_id: ChainId,
114    ) -> Result<impl Stream<Item = Notification>, Error> {
115        let client = self.context.lock().await.make_chain_client(chain_id);
116        Ok(client.subscribe()?)
117    }
118}
119
120impl<C> MutationRoot<C>
121where
122    C: ClientContext,
123{
124    async fn execute_system_operation(
125        &self,
126        system_operation: SystemOperation,
127        chain_id: ChainId,
128    ) -> Result<CryptoHash, Error> {
129        let certificate = self
130            .apply_client_command(&chain_id, move |client| {
131                let operation = Operation::system(system_operation.clone());
132                async move {
133                    let result = client
134                        .execute_operation(operation)
135                        .await
136                        .map_err(Error::from);
137                    (result, client)
138                }
139            })
140            .await?;
141        Ok(certificate.hash())
142    }
143
144    /// Applies the given function to the chain client.
145    /// Updates the wallet regardless of the outcome. As long as the function returns a round
146    /// timeout, it will wait and retry.
147    async fn apply_client_command<F, Fut, T>(
148        &self,
149        chain_id: &ChainId,
150        mut f: F,
151    ) -> Result<T, Error>
152    where
153        F: FnMut(ChainClient<C::Environment>) -> Fut,
154        Fut: Future<Output = (Result<ClientOutcome<T>, Error>, ChainClient<C::Environment>)>,
155    {
156        loop {
157            let client = self.context.lock().await.make_chain_client(*chain_id);
158            let mut stream = client.subscribe()?;
159            let (result, client) = f(client).await;
160            self.context.lock().await.update_wallet(&client).await?;
161            let timeout = match result? {
162                ClientOutcome::Committed(t) => return Ok(t),
163                ClientOutcome::WaitForTimeout(timeout) => timeout,
164            };
165            drop(client);
166            util::wait_for_next_round(&mut stream, timeout).await;
167        }
168    }
169}
170
171#[async_graphql::Object(cache_control(no_cache))]
172impl<C> MutationRoot<C>
173where
174    C: ClientContext + 'static,
175{
176    /// Processes the inbox and returns the lists of certificate hashes that were created, if any.
177    async fn process_inbox(
178        &self,
179        #[graphql(desc = "The chain whose inbox is being processed.")] chain_id: ChainId,
180    ) -> Result<Vec<CryptoHash>, Error> {
181        let mut hashes = Vec::new();
182        loop {
183            let client = self.context.lock().await.make_chain_client(chain_id);
184            client.synchronize_from_validators().await?;
185            let result = client.process_inbox_without_prepare().await;
186            self.context.lock().await.update_wallet(&client).await?;
187            let (certificates, maybe_timeout) = result?;
188            hashes.extend(certificates.into_iter().map(|cert| cert.hash()));
189            match maybe_timeout {
190                None => return Ok(hashes),
191                Some(timestamp) => {
192                    let mut stream = client.subscribe()?;
193                    drop(client);
194                    util::wait_for_next_round(&mut stream, timestamp).await;
195                }
196            }
197        }
198    }
199
200    /// Retries the pending block that was unsuccessfully proposed earlier.
201    async fn retry_pending_block(
202        &self,
203        #[graphql(desc = "The chain on whose block is being retried.")] chain_id: ChainId,
204    ) -> Result<Option<CryptoHash>, Error> {
205        let client = self.context.lock().await.make_chain_client(chain_id);
206        let outcome = client.process_pending_block().await?;
207        self.context.lock().await.update_wallet(&client).await?;
208        match outcome {
209            ClientOutcome::Committed(Some(certificate)) => Ok(Some(certificate.hash())),
210            ClientOutcome::Committed(None) => Ok(None),
211            ClientOutcome::WaitForTimeout(timeout) => Err(Error::from(format!(
212                "Please try again at {}",
213                timeout.timestamp
214            ))),
215        }
216    }
217
218    /// Transfers `amount` units of value from the given owner's account to the recipient.
219    /// If no owner is given, try to take the units out of the chain account.
220    async fn transfer(
221        &self,
222        #[graphql(desc = "The chain which native tokens are being transferred from.")]
223        chain_id: ChainId,
224        #[graphql(desc = "The account being debited on the chain.")] owner: AccountOwner,
225        #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
226        #[graphql(desc = "The amount being transferred.")] amount: Amount,
227    ) -> Result<CryptoHash, Error> {
228        self.apply_client_command(&chain_id, move |client| async move {
229            let result = client
230                .transfer(owner, amount, recipient)
231                .await
232                .map_err(Error::from)
233                .map(|outcome| outcome.map(|certificate| certificate.hash()));
234            (result, client)
235        })
236        .await
237    }
238
239    /// Claims `amount` units of value from the given owner's account in the remote
240    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
241    /// process the message.
242    async fn claim(
243        &self,
244        #[graphql(desc = "The chain for whom owner is one of the owner.")] chain_id: ChainId,
245        #[graphql(desc = "The owner of chain targetId being debited.")] owner: AccountOwner,
246        #[graphql(desc = "The chain whose owner is being debited.")] target_id: ChainId,
247        #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
248        #[graphql(desc = "The amount being transferred.")] amount: Amount,
249    ) -> Result<CryptoHash, Error> {
250        self.apply_client_command(&chain_id, move |client| async move {
251            let result = client
252                .claim(owner, target_id, recipient, amount)
253                .await
254                .map_err(Error::from)
255                .map(|outcome| outcome.map(|certificate| certificate.hash()));
256            (result, client)
257        })
258        .await
259    }
260
261    /// Test if a data blob is readable from a transaction in the current chain.
262    // TODO(#2490): Consider removing or renaming this.
263    async fn read_data_blob(
264        &self,
265        chain_id: ChainId,
266        hash: CryptoHash,
267    ) -> Result<CryptoHash, Error> {
268        self.apply_client_command(&chain_id, move |client| async move {
269            let result = client
270                .read_data_blob(hash)
271                .await
272                .map_err(Error::from)
273                .map(|outcome| outcome.map(|certificate| certificate.hash()));
274            (result, client)
275        })
276        .await
277    }
278
279    /// Creates a new single-owner chain.
280    async fn open_chain(
281        &self,
282        #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
283        #[graphql(desc = "The owner of the new chain.")] owner: AccountOwner,
284        #[graphql(desc = "The balance of the chain being created. Zero if `None`.")]
285        balance: Option<Amount>,
286    ) -> Result<ChainId, Error> {
287        let ownership = ChainOwnership::single(owner);
288        let balance = balance.unwrap_or(Amount::ZERO);
289        let description = self
290            .apply_client_command(&chain_id, move |client| {
291                let ownership = ownership.clone();
292                async move {
293                    let result = client
294                        .open_chain(ownership, ApplicationPermissions::default(), balance)
295                        .await
296                        .map_err(Error::from)
297                        .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
298                    (result, client)
299                }
300            })
301            .await?;
302        Ok(description.id())
303    }
304
305    /// Creates a new multi-owner chain.
306    #[expect(clippy::too_many_arguments)]
307    async fn open_multi_owner_chain(
308        &self,
309        #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
310        #[graphql(desc = "Permissions for applications on the new chain")]
311        application_permissions: Option<ApplicationPermissions>,
312        #[graphql(desc = "The owners of the chain")] owners: Vec<AccountOwner>,
313        #[graphql(desc = "The weights of the owners")] weights: Option<Vec<u64>>,
314        #[graphql(desc = "The number of multi-leader rounds")] multi_leader_rounds: Option<u32>,
315        #[graphql(desc = "The balance of the chain. Zero if `None`")] balance: Option<Amount>,
316        #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
317        fast_round_ms: Option<u64>,
318        #[graphql(
319            desc = "The duration of the first single-leader and all multi-leader rounds",
320            default = 10_000
321        )]
322        base_timeout_ms: u64,
323        #[graphql(
324            desc = "The number of milliseconds by which the timeout increases after each \
325                    single-leader round",
326            default = 1_000
327        )]
328        timeout_increment_ms: u64,
329        #[graphql(
330            desc = "The age of an incoming tracked or protected message after which the \
331                    validators start transitioning the chain to fallback mode, in milliseconds.",
332            default = 86_400_000
333        )]
334        fallback_duration_ms: u64,
335    ) -> Result<ChainId, Error> {
336        let owners = if let Some(weights) = weights {
337            if weights.len() != owners.len() {
338                return Err(Error::new(format!(
339                    "There are {} owners but {} weights.",
340                    owners.len(),
341                    weights.len()
342                )));
343            }
344            owners.into_iter().zip(weights).collect::<Vec<_>>()
345        } else {
346            owners
347                .into_iter()
348                .zip(iter::repeat(100))
349                .collect::<Vec<_>>()
350        };
351        let multi_leader_rounds = multi_leader_rounds.unwrap_or(u32::MAX);
352        let timeout_config = TimeoutConfig {
353            fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
354            base_timeout: TimeDelta::from_millis(base_timeout_ms),
355            timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
356            fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
357        };
358        let ownership = ChainOwnership::multiple(owners, multi_leader_rounds, timeout_config);
359        let balance = balance.unwrap_or(Amount::ZERO);
360        let description = self
361            .apply_client_command(&chain_id, move |client| {
362                let ownership = ownership.clone();
363                let application_permissions = application_permissions.clone().unwrap_or_default();
364                async move {
365                    let result = client
366                        .open_chain(ownership, application_permissions, balance)
367                        .await
368                        .map_err(Error::from)
369                        .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
370                    (result, client)
371                }
372            })
373            .await?;
374        Ok(description.id())
375    }
376
377    /// Closes the chain. Returns the new block hash if successful or `None` if it was already closed.
378    async fn close_chain(
379        &self,
380        #[graphql(desc = "The chain being closed.")] chain_id: ChainId,
381    ) -> Result<Option<CryptoHash>, Error> {
382        let maybe_cert = self
383            .apply_client_command(&chain_id, |client| async move {
384                let result = client.close_chain().await.map_err(Error::from);
385                (result, client)
386            })
387            .await?;
388        Ok(maybe_cert.as_ref().map(GenericCertificate::hash))
389    }
390
391    /// Changes the chain to a single-owner chain
392    async fn change_owner(
393        &self,
394        #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
395        #[graphql(desc = "The new single owner of the chain")] new_owner: AccountOwner,
396    ) -> Result<CryptoHash, Error> {
397        let operation = SystemOperation::ChangeOwnership {
398            super_owners: vec![new_owner],
399            owners: Vec::new(),
400            multi_leader_rounds: 2,
401            open_multi_leader_rounds: false,
402            timeout_config: TimeoutConfig::default(),
403        };
404        self.execute_system_operation(operation, chain_id).await
405    }
406
407    /// Changes the ownership of the chain
408    #[expect(clippy::too_many_arguments)]
409    async fn change_multiple_owners(
410        &self,
411        #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
412        #[graphql(desc = "The new list of owners of the chain")] new_owners: Vec<AccountOwner>,
413        #[graphql(desc = "The new list of weights of the owners")] new_weights: Vec<u64>,
414        #[graphql(desc = "The multi-leader round of the chain")] multi_leader_rounds: u32,
415        #[graphql(
416            desc = "Whether multi-leader rounds are unrestricted, that is not limited to chain owners."
417        )]
418        open_multi_leader_rounds: bool,
419        #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
420        fast_round_ms: Option<u64>,
421        #[graphql(
422            desc = "The duration of the first single-leader and all multi-leader rounds",
423            default = 10_000
424        )]
425        base_timeout_ms: u64,
426        #[graphql(
427            desc = "The number of milliseconds by which the timeout increases after each \
428                    single-leader round",
429            default = 1_000
430        )]
431        timeout_increment_ms: u64,
432        #[graphql(
433            desc = "The age of an incoming tracked or protected message after which the \
434                    validators start transitioning the chain to fallback mode, in milliseconds.",
435            default = 86_400_000
436        )]
437        fallback_duration_ms: u64,
438    ) -> Result<CryptoHash, Error> {
439        let operation = SystemOperation::ChangeOwnership {
440            super_owners: Vec::new(),
441            owners: new_owners.into_iter().zip(new_weights).collect(),
442            multi_leader_rounds,
443            open_multi_leader_rounds,
444            timeout_config: TimeoutConfig {
445                fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
446                base_timeout: TimeDelta::from_millis(base_timeout_ms),
447                timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
448                fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
449            },
450        };
451        self.execute_system_operation(operation, chain_id).await
452    }
453
454    /// Changes the application permissions configuration on this chain.
455    #[expect(clippy::too_many_arguments)]
456    async fn change_application_permissions(
457        &self,
458        #[graphql(desc = "The chain whose permissions are being changed")] chain_id: ChainId,
459        #[graphql(desc = "These applications are allowed to close the current chain.")]
460        close_chain: Vec<ApplicationId>,
461        #[graphql(
462            desc = "If this is `None`, all system operations and application operations are allowed.
463If it is `Some`, only operations from the specified applications are allowed,
464and no system operations."
465        )]
466        execute_operations: Option<Vec<ApplicationId>>,
467        #[graphql(
468            desc = "At least one operation or incoming message from each of these applications must occur in every block."
469        )]
470        mandatory_applications: Vec<ApplicationId>,
471        #[graphql(desc = "These applications are allowed to change the application permissions.")]
472        change_application_permissions: Vec<ApplicationId>,
473        #[graphql(
474            desc = "These applications are allowed to perform calls to services as oracles."
475        )]
476        call_service_as_oracle: Option<Vec<ApplicationId>>,
477        #[graphql(desc = "These applications are allowed to perform HTTP requests.")]
478        make_http_requests: Option<Vec<ApplicationId>>,
479    ) -> Result<CryptoHash, Error> {
480        let operation = SystemOperation::ChangeApplicationPermissions(ApplicationPermissions {
481            execute_operations,
482            mandatory_applications,
483            close_chain,
484            change_application_permissions,
485            call_service_as_oracle,
486            make_http_requests,
487        });
488        self.execute_system_operation(operation, chain_id).await
489    }
490
491    /// (admin chain only) Registers a new committee. This will notify the subscribers of
492    /// the admin chain so that they can migrate to the new epoch (by accepting the
493    /// notification as an "incoming message" in a next block).
494    async fn create_committee(
495        &self,
496        chain_id: ChainId,
497        committee: Committee,
498    ) -> Result<CryptoHash, Error> {
499        Ok(self
500            .apply_client_command(&chain_id, move |client| {
501                let committee = committee.clone();
502                async move {
503                    let result = client
504                        .stage_new_committee(committee)
505                        .await
506                        .map_err(Error::from);
507                    (result, client)
508                }
509            })
510            .await?
511            .hash())
512    }
513
514    /// (admin chain only) Removes a committee. Once this message is accepted by a chain,
515    /// blocks from the retired epoch will not be accepted until they are followed (hence
516    /// re-certified) by a block certified by a recent committee.
517    async fn remove_committee(&self, chain_id: ChainId, epoch: Epoch) -> Result<CryptoHash, Error> {
518        let operation = SystemOperation::Admin(AdminOperation::RemoveCommittee { epoch });
519        self.execute_system_operation(operation, chain_id).await
520    }
521
522    /// Publishes a new application module.
523    async fn publish_module(
524        &self,
525        #[graphql(desc = "The chain publishing the module")] chain_id: ChainId,
526        #[graphql(desc = "The bytecode of the contract code")] contract: Bytecode,
527        #[graphql(desc = "The bytecode of the service code (only relevant for WebAssembly)")]
528        service: Bytecode,
529        #[graphql(desc = "The virtual machine being used (either Wasm or Evm)")]
530        vm_runtime: VmRuntime,
531    ) -> Result<ModuleId, Error> {
532        self.apply_client_command(&chain_id, move |client| {
533            let contract = contract.clone();
534            let service = service.clone();
535            async move {
536                let result = client
537                    .publish_module(contract, service, vm_runtime)
538                    .await
539                    .map_err(Error::from)
540                    .map(|outcome| outcome.map(|(module_id, _)| module_id));
541                (result, client)
542            }
543        })
544        .await
545    }
546
547    /// Publishes a new data blob.
548    async fn publish_data_blob(
549        &self,
550        #[graphql(desc = "The chain paying for the blob publication")] chain_id: ChainId,
551        #[graphql(desc = "The content of the data blob being created")] bytes: Vec<u8>,
552    ) -> Result<CryptoHash, Error> {
553        self.apply_client_command(&chain_id, |client| {
554            let bytes = bytes.clone();
555            async move {
556                let result = client.publish_data_blob(bytes).await.map_err(Error::from);
557                (result, client)
558            }
559        })
560        .await
561        .map(|_| CryptoHash::new(&BlobContent::new_data(bytes)))
562    }
563
564    /// Creates a new application.
565    async fn create_application(
566        &self,
567        #[graphql(desc = "The chain paying for the creation of the application")] chain_id: ChainId,
568        #[graphql(desc = "The module ID of the application being created")] module_id: ModuleId,
569        #[graphql(desc = "The JSON serialization of the parameters of the application")]
570        parameters: String,
571        #[graphql(
572            desc = "The JSON serialization of the instantiation argument of the application"
573        )]
574        instantiation_argument: String,
575        #[graphql(desc = "The dependencies of the application being created")]
576        required_application_ids: Vec<ApplicationId>,
577    ) -> Result<ApplicationId, Error> {
578        self.apply_client_command(&chain_id, move |client| {
579            let parameters = parameters.as_bytes().to_vec();
580            let instantiation_argument = instantiation_argument.as_bytes().to_vec();
581            let required_application_ids = required_application_ids.clone();
582            async move {
583                let result = client
584                    .create_application_untyped(
585                        module_id,
586                        parameters,
587                        instantiation_argument,
588                        required_application_ids,
589                    )
590                    .await
591                    .map_err(Error::from)
592                    .map(|outcome| outcome.map(|(application_id, _)| application_id));
593                (result, client)
594            }
595        })
596        .await
597    }
598}
599
600#[async_graphql::Object(cache_control(no_cache))]
601impl<C> QueryRoot<C>
602where
603    C: ClientContext + 'static,
604{
605    async fn chain(
606        &self,
607        chain_id: ChainId,
608    ) -> Result<
609        ChainStateExtendedView<<C::Environment as linera_core::Environment>::StorageContext>,
610        Error,
611    > {
612        let client = self.context.lock().await.make_chain_client(chain_id);
613        let view = client.chain_state_view().await?;
614        Ok(ChainStateExtendedView::new(view))
615    }
616
617    async fn applications(&self, chain_id: ChainId) -> Result<Vec<ApplicationOverview>, Error> {
618        let client = self.context.lock().await.make_chain_client(chain_id);
619        let applications = client
620            .chain_state_view()
621            .await?
622            .execution_state
623            .list_applications()
624            .await?;
625
626        let overviews = applications
627            .into_iter()
628            .map(|(id, description)| ApplicationOverview::new(id, description, self.port, chain_id))
629            .collect();
630
631        Ok(overviews)
632    }
633
634    async fn chains(&self) -> Result<Chains, Error> {
635        Ok(Chains {
636            list: self.context.lock().await.wallet().chain_ids(),
637            default: self.default_chain,
638        })
639    }
640
641    async fn block(
642        &self,
643        hash: Option<CryptoHash>,
644        chain_id: ChainId,
645    ) -> Result<Option<ConfirmedBlock>, Error> {
646        let client = self.context.lock().await.make_chain_client(chain_id);
647        let hash = match hash {
648            Some(hash) => Some(hash),
649            None => {
650                let view = client.chain_state_view().await?;
651                view.tip_state.get().block_hash
652            }
653        };
654        if let Some(hash) = hash {
655            let block = client.read_confirmed_block(hash).await?;
656            Ok(Some(block))
657        } else {
658            Ok(None)
659        }
660    }
661
662    async fn events_from_index(
663        &self,
664        chain_id: ChainId,
665        stream_id: StreamId,
666        start_index: u32,
667    ) -> Result<Vec<IndexAndEvent>, Error> {
668        Ok(self
669            .context
670            .lock()
671            .await
672            .make_chain_client(chain_id)
673            .events_from_index(stream_id, start_index)
674            .await?)
675    }
676
677    async fn blocks(
678        &self,
679        from: Option<CryptoHash>,
680        chain_id: ChainId,
681        limit: Option<u32>,
682    ) -> Result<Vec<ConfirmedBlock>, Error> {
683        let client = self.context.lock().await.make_chain_client(chain_id);
684        let limit = limit.unwrap_or(10);
685        let from = match from {
686            Some(from) => Some(from),
687            None => {
688                let view = client.chain_state_view().await?;
689                view.tip_state.get().block_hash
690            }
691        };
692        let Some(from) = from else {
693            return Ok(vec![]);
694        };
695        let mut hash = Some(from);
696        let mut values = Vec::new();
697        for _ in 0..limit {
698            let Some(next_hash) = hash else {
699                break;
700            };
701            let value = client.read_confirmed_block(next_hash).await?;
702            hash = value.block().header.previous_block_hash;
703            values.push(value);
704        }
705        Ok(values)
706    }
707
708    /// Returns the version information on this node service.
709    async fn version(&self) -> linera_version::VersionInfo {
710        linera_version::VersionInfo::default()
711    }
712}
713
714// What follows is a hack to add a chain_id field to `ChainStateView` based on
715// https://async-graphql.github.io/async-graphql/en/merging_objects.html
716
717struct ChainStateViewExtension(ChainId);
718
719#[async_graphql::Object(cache_control(no_cache))]
720impl ChainStateViewExtension {
721    async fn chain_id(&self) -> ChainId {
722        self.0
723    }
724}
725
726#[derive(MergedObject)]
727struct ChainStateExtendedView<C>(ChainStateViewExtension, ReadOnlyChainStateView<C>)
728where
729    C: linera_views::context::Context + Clone + Send + Sync + 'static,
730    C::Extra: linera_execution::ExecutionRuntimeContext;
731
732/// A wrapper type that allows proxying GraphQL queries to a [`ChainStateView`] that's behind an
733/// [`OwnedRwLockReadGuard`].
734pub struct ReadOnlyChainStateView<C>(OwnedRwLockReadGuard<ChainStateView<C>>)
735where
736    C: linera_views::context::Context + Clone + Send + Sync + 'static;
737
738impl<C> ContainerType for ReadOnlyChainStateView<C>
739where
740    C: linera_views::context::Context + Clone + Send + Sync + 'static,
741{
742    async fn resolve_field(
743        &self,
744        context: &async_graphql::Context<'_>,
745    ) -> async_graphql::ServerResult<Option<async_graphql::Value>> {
746        self.0.resolve_field(context).await
747    }
748}
749
750impl<C> OutputType for ReadOnlyChainStateView<C>
751where
752    C: linera_views::context::Context + Clone + Send + Sync + 'static,
753{
754    fn type_name() -> Cow<'static, str> {
755        ChainStateView::<C>::type_name()
756    }
757
758    fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
759        ChainStateView::<C>::create_type_info(registry)
760    }
761
762    async fn resolve(
763        &self,
764        context: &async_graphql::ContextSelectionSet<'_>,
765        field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
766    ) -> async_graphql::ServerResult<async_graphql::Value> {
767        self.0.resolve(context, field).await
768    }
769}
770
771impl<C> ChainStateExtendedView<C>
772where
773    C: linera_views::context::Context + Clone + Send + Sync + 'static,
774    C::Extra: linera_execution::ExecutionRuntimeContext,
775{
776    fn new(view: OwnedRwLockReadGuard<ChainStateView<C>>) -> Self {
777        Self(
778            ChainStateViewExtension(view.chain_id()),
779            ReadOnlyChainStateView(view),
780        )
781    }
782}
783
784#[derive(SimpleObject)]
785pub struct ApplicationOverview {
786    id: ApplicationId,
787    description: ApplicationDescription,
788    link: String,
789}
790
791impl ApplicationOverview {
792    fn new(
793        id: ApplicationId,
794        description: ApplicationDescription,
795        port: NonZeroU16,
796        chain_id: ChainId,
797    ) -> Self {
798        Self {
799            id,
800            description,
801            link: format!(
802                "http://localhost:{}/chains/{}/applications/{}",
803                port.get(),
804                chain_id,
805                id
806            ),
807        }
808    }
809}
810
811/// The `NodeService` is a server that exposes a web-server to the client.
812/// The node service is primarily used to explore the state of a chain in GraphQL.
813pub struct NodeService<C>
814where
815    C: ClientContext + 'static,
816{
817    config: ChainListenerConfig,
818    port: NonZeroU16,
819    default_chain: Option<ChainId>,
820    context: Arc<Mutex<C>>,
821}
822
823impl<C> Clone for NodeService<C>
824where
825    C: ClientContext + 'static,
826{
827    fn clone(&self) -> Self {
828        Self {
829            config: self.config.clone(),
830            port: self.port,
831            default_chain: self.default_chain,
832            context: Arc::clone(&self.context),
833        }
834    }
835}
836
837impl<C> NodeService<C>
838where
839    C: ClientContext,
840{
841    /// Creates a new instance of the node service given a client chain and a port.
842    pub async fn new(
843        config: ChainListenerConfig,
844        port: NonZeroU16,
845        default_chain: Option<ChainId>,
846        context: C,
847    ) -> Self {
848        Self {
849            config,
850            port,
851            default_chain,
852            context: Arc::new(Mutex::new(context)),
853        }
854    }
855
856    pub fn schema(&self) -> Schema<QueryRoot<C>, MutationRoot<C>, SubscriptionRoot<C>> {
857        Schema::build(
858            QueryRoot {
859                context: Arc::clone(&self.context),
860                port: self.port,
861                default_chain: self.default_chain,
862            },
863            MutationRoot {
864                context: Arc::clone(&self.context),
865            },
866            SubscriptionRoot {
867                context: Arc::clone(&self.context),
868            },
869        )
870        .finish()
871    }
872
873    /// Runs the node service.
874    #[instrument(name = "node_service", level = "info", skip_all, fields(port = ?self.port))]
875    pub async fn run(self, cancellation_token: CancellationToken) -> Result<(), anyhow::Error> {
876        let port = self.port.get();
877        let index_handler = axum::routing::get(util::graphiql).post(Self::index_handler);
878        let application_handler =
879            axum::routing::get(util::graphiql).post(Self::application_handler);
880
881        let app = Router::new()
882            .route("/", index_handler)
883            .route(
884                "/chains/{chain_id}/applications/{application_id}",
885                application_handler,
886            )
887            .route("/ready", axum::routing::get(|| async { "ready!" }))
888            .route_service("/ws", GraphQLSubscription::new(self.schema()))
889            .layer(Extension(self.clone()))
890            // TODO(#551): Provide application authentication.
891            .layer(CorsLayer::permissive());
892
893        info!("GraphiQL IDE: http://localhost:{}", port);
894
895        let storage = self.context.lock().await.storage().clone();
896
897        let chain_listener =
898            ChainListener::new(self.config, self.context, storage, cancellation_token)
899                .run()
900                .await?;
901        let mut chain_listener = Box::pin(chain_listener).fuse();
902        let tcp_listener =
903            tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
904        let server = axum::serve(tcp_listener, app).into_future();
905        futures::select! {
906            result = chain_listener => result?,
907            result = Box::pin(server).fuse() => result?,
908        };
909
910        Ok(())
911    }
912
913    /// Handles service queries for user applications (including mutations).
914    async fn handle_service_request(
915        &self,
916        application_id: ApplicationId,
917        request: Vec<u8>,
918        chain_id: ChainId,
919    ) -> Result<Vec<u8>, NodeServiceError> {
920        let QueryOutcome {
921            response,
922            operations,
923        } = self
924            .query_user_application(application_id, request, chain_id)
925            .await?;
926        if operations.is_empty() {
927            return Ok(response);
928        }
929
930        trace!("Query requested a new block with operations: {operations:?}");
931        let client = self.context.lock().await.make_chain_client(chain_id);
932        let hash = loop {
933            let timeout = match client
934                .execute_operations(operations.clone(), vec![])
935                .await?
936            {
937                ClientOutcome::Committed(certificate) => break certificate.hash(),
938                ClientOutcome::WaitForTimeout(timeout) => timeout,
939            };
940            let mut stream = client.subscribe().map_err(|_| {
941                ChainClientError::InternalError("Could not subscribe to the local node.")
942            })?;
943            util::wait_for_next_round(&mut stream, timeout).await;
944        };
945        let response = async_graphql::Response::new(hash.to_value());
946        Ok(serde_json::to_vec(&response)?)
947    }
948
949    /// Queries a user application, returning the raw [`QueryOutcome`].
950    async fn query_user_application(
951        &self,
952        application_id: ApplicationId,
953        bytes: Vec<u8>,
954        chain_id: ChainId,
955    ) -> Result<QueryOutcome<Vec<u8>>, NodeServiceError> {
956        let query = Query::User {
957            application_id,
958            bytes,
959        };
960        let client = self.context.lock().await.make_chain_client(chain_id);
961        let QueryOutcome {
962            response,
963            operations,
964        } = client.query_application(query).await?;
965        match response {
966            QueryResponse::System(_) => {
967                unreachable!("cannot get a system response for a user query")
968            }
969            QueryResponse::User(user_response_bytes) => Ok(QueryOutcome {
970                response: user_response_bytes,
971                operations,
972            }),
973        }
974    }
975
976    /// Executes a GraphQL query and generates a response for our `Schema`.
977    async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
978        service
979            .0
980            .schema()
981            .execute(request.into_inner())
982            .await
983            .into()
984    }
985
986    /// Executes a GraphQL query against an application.
987    /// Pattern matches on the `OperationType` of the query and routes the query
988    /// accordingly.
989    async fn application_handler(
990        Path((chain_id, application_id)): Path<(String, String)>,
991        service: Extension<Self>,
992        request: String,
993    ) -> Result<Vec<u8>, NodeServiceError> {
994        let chain_id: ChainId = chain_id.parse().map_err(NodeServiceError::InvalidChainId)?;
995        let application_id: ApplicationId = application_id.parse()?;
996
997        debug!(
998            "Processing request for application {application_id} on chain {chain_id}:\n{:?}",
999            &request
1000        );
1001        let response = service
1002            .0
1003            .handle_service_request(application_id, request.into_bytes(), chain_id)
1004            .await?;
1005
1006        Ok(response)
1007    }
1008}