linera_service/
node_service.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{borrow::Cow, future::IntoFuture, iter, net::SocketAddr, num::NonZeroU16, sync::Arc};
5
6use async_graphql::{
7    futures_util::Stream, resolver_utils::ContainerType, Error, MergedObject, OutputType,
8    ScalarType, Schema, SimpleObject, Subscription,
9};
10use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
11use axum::{extract::Path, http::StatusCode, response, response::IntoResponse, Extension, Router};
12use futures::{lock::Mutex, Future, FutureExt as _, TryStreamExt as _};
13use linera_base::{
14    crypto::{CryptoError, CryptoHash},
15    data_types::{
16        Amount, ApplicationDescription, ApplicationPermissions, Bytecode, Epoch, TimeDelta,
17    },
18    identifiers::{
19        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
20    },
21    ownership::{ChainOwnership, TimeoutConfig},
22    vm::VmRuntime,
23    BcsHexParseError,
24};
25use linera_chain::{
26    types::{ConfirmedBlock, GenericCertificate},
27    ChainStateView,
28};
29use linera_client::chain_listener::{
30    ChainListener, ChainListenerConfig, ClientContext, ListenerCommand,
31};
32use linera_core::{
33    client::chain_client::{self, ChainClient},
34    data_types::ClientOutcome,
35    wallet::Wallet as _,
36    worker::Notification,
37};
38use linera_execution::{
39    committee::Committee, system::AdminOperation, Operation, Query, QueryOutcome, QueryResponse,
40    SystemOperation,
41};
42#[cfg(with_metrics)]
43use linera_metrics::monitoring_server;
44use linera_sdk::linera_base_types::BlobContent;
45use serde::{Deserialize, Serialize};
46use serde_json::json;
47use tokio::sync::{mpsc::UnboundedReceiver, OwnedRwLockReadGuard};
48use tokio_util::sync::CancellationToken;
49use tower_http::cors::CorsLayer;
50use tracing::{debug, error, info, instrument, trace};
51
52use crate::util;
53
54#[derive(SimpleObject, Serialize, Deserialize, Clone)]
55pub struct Chains {
56    pub list: Vec<ChainId>,
57    pub default: Option<ChainId>,
58}
59
60/// Our root GraphQL query type.
61pub struct QueryRoot<C> {
62    context: Arc<Mutex<C>>,
63    port: NonZeroU16,
64    default_chain: Option<ChainId>,
65}
66
67/// Our root GraphQL subscription type.
68pub struct SubscriptionRoot<C> {
69    context: Arc<Mutex<C>>,
70}
71
72/// Our root GraphQL mutation type.
73pub struct MutationRoot<C> {
74    context: Arc<Mutex<C>>,
75}
76
77#[derive(Debug, thiserror::Error)]
78enum NodeServiceError {
79    #[error(transparent)]
80    ChainClient(#[from] chain_client::Error),
81    #[error(transparent)]
82    BcsHex(#[from] BcsHexParseError),
83    #[error(transparent)]
84    Json(#[from] serde_json::Error),
85    #[error("malformed chain ID: {0}")]
86    InvalidChainId(CryptoError),
87    #[error(transparent)]
88    Client(#[from] linera_client::Error),
89}
90
91impl IntoResponse for NodeServiceError {
92    fn into_response(self) -> response::Response {
93        let status = if let NodeServiceError::InvalidChainId(_) | NodeServiceError::BcsHex(_) = self
94        {
95            StatusCode::BAD_REQUEST
96        } else {
97            StatusCode::INTERNAL_SERVER_ERROR
98        };
99        let body = json!({"error": self.to_string()}).to_string();
100        (status, body).into_response()
101    }
102}
103
104#[Subscription]
105impl<C> SubscriptionRoot<C>
106where
107    C: ClientContext + 'static,
108{
109    /// Subscribes to notifications from the specified chain.
110    async fn notifications(
111        &self,
112        chain_id: ChainId,
113    ) -> Result<impl Stream<Item = Notification>, Error> {
114        let client = self
115            .context
116            .lock()
117            .await
118            .make_chain_client(chain_id)
119            .await?;
120        Ok(client.subscribe()?)
121    }
122}
123
124impl<C> MutationRoot<C>
125where
126    C: ClientContext,
127{
128    async fn execute_system_operation(
129        &self,
130        system_operation: SystemOperation,
131        chain_id: ChainId,
132    ) -> Result<CryptoHash, Error> {
133        let certificate = self
134            .apply_client_command(&chain_id, move |client| {
135                let operation = Operation::system(system_operation.clone());
136                async move {
137                    let result = client
138                        .execute_operation(operation)
139                        .await
140                        .map_err(Error::from);
141                    (result, client)
142                }
143            })
144            .await?;
145        Ok(certificate.hash())
146    }
147
148    /// Applies the given function to the chain client.
149    /// Updates the wallet regardless of the outcome. As long as the function returns a round
150    /// timeout, it will wait and retry.
151    async fn apply_client_command<F, Fut, T>(
152        &self,
153        chain_id: &ChainId,
154        mut f: F,
155    ) -> Result<T, Error>
156    where
157        F: FnMut(ChainClient<C::Environment>) -> Fut,
158        Fut: Future<Output = (Result<ClientOutcome<T>, Error>, ChainClient<C::Environment>)>,
159    {
160        loop {
161            let client = self
162                .context
163                .lock()
164                .await
165                .make_chain_client(*chain_id)
166                .await?;
167            let mut stream = client.subscribe()?;
168            let (result, client) = f(client).await;
169            self.context.lock().await.update_wallet(&client).await?;
170            let timeout = match result? {
171                ClientOutcome::Committed(t) => return Ok(t),
172                ClientOutcome::WaitForTimeout(timeout) => timeout,
173            };
174            drop(client);
175            util::wait_for_next_round(&mut stream, timeout).await;
176        }
177    }
178}
179
180#[async_graphql::Object(cache_control(no_cache))]
181impl<C> MutationRoot<C>
182where
183    C: ClientContext + 'static,
184{
185    /// Processes the inbox and returns the lists of certificate hashes that were created, if any.
186    async fn process_inbox(
187        &self,
188        #[graphql(desc = "The chain whose inbox is being processed.")] chain_id: ChainId,
189    ) -> Result<Vec<CryptoHash>, Error> {
190        let mut hashes = Vec::new();
191        loop {
192            let client = self
193                .context
194                .lock()
195                .await
196                .make_chain_client(chain_id)
197                .await?;
198            let result = client.process_inbox().await;
199            self.context.lock().await.update_wallet(&client).await?;
200            let (certificates, maybe_timeout) = result?;
201            hashes.extend(certificates.into_iter().map(|cert| cert.hash()));
202            match maybe_timeout {
203                None => return Ok(hashes),
204                Some(timestamp) => {
205                    let mut stream = client.subscribe()?;
206                    drop(client);
207                    util::wait_for_next_round(&mut stream, timestamp).await;
208                }
209            }
210        }
211    }
212
213    /// Synchronizes the chain with the validators. Returns the chain's length.
214    ///
215    /// This is only used for testing, to make sure that a client is up to date.
216    // TODO(#4718): Remove this mutation.
217    async fn sync(
218        &self,
219        #[graphql(desc = "The chain being synchronized.")] chain_id: ChainId,
220    ) -> Result<u64, Error> {
221        let client = self
222            .context
223            .lock()
224            .await
225            .make_chain_client(chain_id)
226            .await?;
227        let info = client.synchronize_from_validators().await?;
228        self.context.lock().await.update_wallet(&client).await?;
229        Ok(info.next_block_height.0)
230    }
231
232    /// Retries the pending block that was unsuccessfully proposed earlier.
233    async fn retry_pending_block(
234        &self,
235        #[graphql(desc = "The chain on whose block is being retried.")] chain_id: ChainId,
236    ) -> Result<Option<CryptoHash>, Error> {
237        let client = self
238            .context
239            .lock()
240            .await
241            .make_chain_client(chain_id)
242            .await?;
243        let outcome = client.process_pending_block().await?;
244        self.context.lock().await.update_wallet(&client).await?;
245        match outcome {
246            ClientOutcome::Committed(Some(certificate)) => Ok(Some(certificate.hash())),
247            ClientOutcome::Committed(None) => Ok(None),
248            ClientOutcome::WaitForTimeout(timeout) => Err(Error::from(format!(
249                "Please try again at {}",
250                timeout.timestamp
251            ))),
252        }
253    }
254
255    /// Transfers `amount` units of value from the given owner's account to the recipient.
256    /// If no owner is given, try to take the units out of the chain account.
257    async fn transfer(
258        &self,
259        #[graphql(desc = "The chain which native tokens are being transferred from.")]
260        chain_id: ChainId,
261        #[graphql(desc = "The account being debited on the chain.")] owner: AccountOwner,
262        #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
263        #[graphql(desc = "The amount being transferred.")] amount: Amount,
264    ) -> Result<CryptoHash, Error> {
265        self.apply_client_command(&chain_id, move |client| async move {
266            let result = client
267                .transfer(owner, amount, recipient)
268                .await
269                .map_err(Error::from)
270                .map(|outcome| outcome.map(|certificate| certificate.hash()));
271            (result, client)
272        })
273        .await
274    }
275
276    /// Claims `amount` units of value from the given owner's account in the remote
277    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
278    /// process the message.
279    async fn claim(
280        &self,
281        #[graphql(desc = "The chain for whom owner is one of the owner.")] chain_id: ChainId,
282        #[graphql(desc = "The owner of chain targetId being debited.")] owner: AccountOwner,
283        #[graphql(desc = "The chain whose owner is being debited.")] target_id: ChainId,
284        #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
285        #[graphql(desc = "The amount being transferred.")] amount: Amount,
286    ) -> Result<CryptoHash, Error> {
287        self.apply_client_command(&chain_id, move |client| async move {
288            let result = client
289                .claim(owner, target_id, recipient, amount)
290                .await
291                .map_err(Error::from)
292                .map(|outcome| outcome.map(|certificate| certificate.hash()));
293            (result, client)
294        })
295        .await
296    }
297
298    /// Test if a data blob is readable from a transaction in the current chain.
299    // TODO(#2490): Consider removing or renaming this.
300    async fn read_data_blob(
301        &self,
302        chain_id: ChainId,
303        hash: CryptoHash,
304    ) -> Result<CryptoHash, Error> {
305        self.apply_client_command(&chain_id, move |client| async move {
306            let result = client
307                .read_data_blob(hash)
308                .await
309                .map_err(Error::from)
310                .map(|outcome| outcome.map(|certificate| certificate.hash()));
311            (result, client)
312        })
313        .await
314    }
315
316    /// Creates a new single-owner chain.
317    async fn open_chain(
318        &self,
319        #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
320        #[graphql(desc = "The owner of the new chain.")] owner: AccountOwner,
321        #[graphql(desc = "The balance of the chain being created. Zero if `None`.")]
322        balance: Option<Amount>,
323    ) -> Result<ChainId, Error> {
324        let ownership = ChainOwnership::single(owner);
325        let balance = balance.unwrap_or(Amount::ZERO);
326        let description = self
327            .apply_client_command(&chain_id, move |client| {
328                let ownership = ownership.clone();
329                async move {
330                    let result = client
331                        .open_chain(ownership, ApplicationPermissions::default(), balance)
332                        .await
333                        .map_err(Error::from)
334                        .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
335                    (result, client)
336                }
337            })
338            .await?;
339        Ok(description.id())
340    }
341
342    /// Creates a new multi-owner chain.
343    #[expect(clippy::too_many_arguments)]
344    async fn open_multi_owner_chain(
345        &self,
346        #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
347        #[graphql(desc = "Permissions for applications on the new chain")]
348        application_permissions: Option<ApplicationPermissions>,
349        #[graphql(desc = "The owners of the chain")] owners: Vec<AccountOwner>,
350        #[graphql(desc = "The weights of the owners")] weights: Option<Vec<u64>>,
351        #[graphql(desc = "The number of multi-leader rounds")] multi_leader_rounds: Option<u32>,
352        #[graphql(desc = "The balance of the chain. Zero if `None`")] balance: Option<Amount>,
353        #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
354        fast_round_ms: Option<u64>,
355        #[graphql(
356            desc = "The duration of the first single-leader and all multi-leader rounds",
357            default = 10_000
358        )]
359        base_timeout_ms: u64,
360        #[graphql(
361            desc = "The number of milliseconds by which the timeout increases after each \
362                    single-leader round",
363            default = 1_000
364        )]
365        timeout_increment_ms: u64,
366        #[graphql(
367            desc = "The age of an incoming tracked or protected message after which the \
368                    validators start transitioning the chain to fallback mode, in milliseconds.",
369            default = 86_400_000
370        )]
371        fallback_duration_ms: u64,
372    ) -> Result<ChainId, Error> {
373        let owners = if let Some(weights) = weights {
374            if weights.len() != owners.len() {
375                return Err(Error::new(format!(
376                    "There are {} owners but {} weights.",
377                    owners.len(),
378                    weights.len()
379                )));
380            }
381            owners.into_iter().zip(weights).collect::<Vec<_>>()
382        } else {
383            owners
384                .into_iter()
385                .zip(iter::repeat(100))
386                .collect::<Vec<_>>()
387        };
388        let multi_leader_rounds = multi_leader_rounds.unwrap_or(u32::MAX);
389        let timeout_config = TimeoutConfig {
390            fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
391            base_timeout: TimeDelta::from_millis(base_timeout_ms),
392            timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
393            fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
394        };
395        let ownership = ChainOwnership::multiple(owners, multi_leader_rounds, timeout_config);
396        let balance = balance.unwrap_or(Amount::ZERO);
397        let description = self
398            .apply_client_command(&chain_id, move |client| {
399                let ownership = ownership.clone();
400                let application_permissions = application_permissions.clone().unwrap_or_default();
401                async move {
402                    let result = client
403                        .open_chain(ownership, application_permissions, balance)
404                        .await
405                        .map_err(Error::from)
406                        .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
407                    (result, client)
408                }
409            })
410            .await?;
411        Ok(description.id())
412    }
413
414    /// Closes the chain. Returns the new block hash if successful or `None` if it was already closed.
415    async fn close_chain(
416        &self,
417        #[graphql(desc = "The chain being closed.")] chain_id: ChainId,
418    ) -> Result<Option<CryptoHash>, Error> {
419        let maybe_cert = self
420            .apply_client_command(&chain_id, |client| async move {
421                let result = client.close_chain().await.map_err(Error::from);
422                (result, client)
423            })
424            .await?;
425        Ok(maybe_cert.as_ref().map(GenericCertificate::hash))
426    }
427
428    /// Changes the chain to a single-owner chain
429    async fn change_owner(
430        &self,
431        #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
432        #[graphql(desc = "The new single owner of the chain")] new_owner: AccountOwner,
433    ) -> Result<CryptoHash, Error> {
434        let operation = SystemOperation::ChangeOwnership {
435            super_owners: vec![new_owner],
436            owners: Vec::new(),
437            first_leader: None,
438            multi_leader_rounds: 2,
439            open_multi_leader_rounds: false,
440            timeout_config: TimeoutConfig::default(),
441        };
442        self.execute_system_operation(operation, chain_id).await
443    }
444
445    /// Changes the ownership of the chain
446    #[expect(clippy::too_many_arguments)]
447    async fn change_multiple_owners(
448        &self,
449        #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
450        #[graphql(desc = "The new list of owners of the chain")] new_owners: Vec<AccountOwner>,
451        #[graphql(desc = "The new list of weights of the owners")] new_weights: Vec<u64>,
452        #[graphql(desc = "The multi-leader round of the chain")] multi_leader_rounds: u32,
453        #[graphql(
454            desc = "Whether multi-leader rounds are unrestricted, that is not limited to chain owners."
455        )]
456        open_multi_leader_rounds: bool,
457        #[graphql(desc = "The leader of the first single-leader round. \
458                          If not set, this is random like other rounds.")]
459        first_leader: Option<AccountOwner>,
460        #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
461        fast_round_ms: Option<u64>,
462        #[graphql(
463            desc = "The duration of the first single-leader and all multi-leader rounds",
464            default = 10_000
465        )]
466        base_timeout_ms: u64,
467        #[graphql(
468            desc = "The number of milliseconds by which the timeout increases after each \
469                    single-leader round",
470            default = 1_000
471        )]
472        timeout_increment_ms: u64,
473        #[graphql(
474            desc = "The age of an incoming tracked or protected message after which the \
475                    validators start transitioning the chain to fallback mode, in milliseconds.",
476            default = 86_400_000
477        )]
478        fallback_duration_ms: u64,
479    ) -> Result<CryptoHash, Error> {
480        let operation = SystemOperation::ChangeOwnership {
481            super_owners: Vec::new(),
482            owners: new_owners.into_iter().zip(new_weights).collect(),
483            first_leader,
484            multi_leader_rounds,
485            open_multi_leader_rounds,
486            timeout_config: TimeoutConfig {
487                fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
488                base_timeout: TimeDelta::from_millis(base_timeout_ms),
489                timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
490                fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
491            },
492        };
493        self.execute_system_operation(operation, chain_id).await
494    }
495
496    /// Changes the application permissions configuration on this chain.
497    #[expect(clippy::too_many_arguments)]
498    async fn change_application_permissions(
499        &self,
500        #[graphql(desc = "The chain whose permissions are being changed")] chain_id: ChainId,
501        #[graphql(desc = "These applications are allowed to close the current chain.")]
502        close_chain: Vec<ApplicationId>,
503        #[graphql(
504            desc = "If this is `None`, all system operations and application operations are allowed.
505If it is `Some`, only operations from the specified applications are allowed,
506and no system operations."
507        )]
508        execute_operations: Option<Vec<ApplicationId>>,
509        #[graphql(
510            desc = "At least one operation or incoming message from each of these applications must occur in every block."
511        )]
512        mandatory_applications: Vec<ApplicationId>,
513        #[graphql(desc = "These applications are allowed to change the application permissions.")]
514        change_application_permissions: Vec<ApplicationId>,
515        #[graphql(
516            desc = "These applications are allowed to perform calls to services as oracles."
517        )]
518        call_service_as_oracle: Option<Vec<ApplicationId>>,
519        #[graphql(desc = "These applications are allowed to perform HTTP requests.")]
520        make_http_requests: Option<Vec<ApplicationId>>,
521    ) -> Result<CryptoHash, Error> {
522        let operation = SystemOperation::ChangeApplicationPermissions(ApplicationPermissions {
523            execute_operations,
524            mandatory_applications,
525            close_chain,
526            change_application_permissions,
527            call_service_as_oracle,
528            make_http_requests,
529        });
530        self.execute_system_operation(operation, chain_id).await
531    }
532
533    /// (admin chain only) Registers a new committee. This will notify the subscribers of
534    /// the admin chain so that they can migrate to the new epoch (by accepting the
535    /// notification as an "incoming message" in a next block).
536    async fn create_committee(
537        &self,
538        chain_id: ChainId,
539        committee: Committee,
540    ) -> Result<CryptoHash, Error> {
541        Ok(self
542            .apply_client_command(&chain_id, move |client| {
543                let committee = committee.clone();
544                async move {
545                    let result = client
546                        .stage_new_committee(committee)
547                        .await
548                        .map_err(Error::from);
549                    (result, client)
550                }
551            })
552            .await?
553            .hash())
554    }
555
556    /// (admin chain only) Removes a committee. Once this message is accepted by a chain,
557    /// blocks from the retired epoch will not be accepted until they are followed (hence
558    /// re-certified) by a block certified by a recent committee.
559    async fn remove_committee(&self, chain_id: ChainId, epoch: Epoch) -> Result<CryptoHash, Error> {
560        let operation = SystemOperation::Admin(AdminOperation::RemoveCommittee { epoch });
561        self.execute_system_operation(operation, chain_id).await
562    }
563
564    /// Publishes a new application module.
565    async fn publish_module(
566        &self,
567        #[graphql(desc = "The chain publishing the module")] chain_id: ChainId,
568        #[graphql(desc = "The bytecode of the contract code")] contract: Bytecode,
569        #[graphql(desc = "The bytecode of the service code (only relevant for WebAssembly)")]
570        service: Bytecode,
571        #[graphql(desc = "The virtual machine being used (either Wasm or Evm)")]
572        vm_runtime: VmRuntime,
573    ) -> Result<ModuleId, Error> {
574        self.apply_client_command(&chain_id, move |client| {
575            let contract = contract.clone();
576            let service = service.clone();
577            async move {
578                let result = client
579                    .publish_module(contract, service, vm_runtime)
580                    .await
581                    .map_err(Error::from)
582                    .map(|outcome| outcome.map(|(module_id, _)| module_id));
583                (result, client)
584            }
585        })
586        .await
587    }
588
589    /// Publishes a new data blob.
590    async fn publish_data_blob(
591        &self,
592        #[graphql(desc = "The chain paying for the blob publication")] chain_id: ChainId,
593        #[graphql(desc = "The content of the data blob being created")] bytes: Vec<u8>,
594    ) -> Result<CryptoHash, Error> {
595        self.apply_client_command(&chain_id, |client| {
596            let bytes = bytes.clone();
597            async move {
598                let result = client.publish_data_blob(bytes).await.map_err(Error::from);
599                (result, client)
600            }
601        })
602        .await
603        .map(|_| CryptoHash::new(&BlobContent::new_data(bytes)))
604    }
605
606    /// Creates a new application.
607    async fn create_application(
608        &self,
609        #[graphql(desc = "The chain paying for the creation of the application")] chain_id: ChainId,
610        #[graphql(desc = "The module ID of the application being created")] module_id: ModuleId,
611        #[graphql(desc = "The JSON serialization of the parameters of the application")]
612        parameters: String,
613        #[graphql(
614            desc = "The JSON serialization of the instantiation argument of the application"
615        )]
616        instantiation_argument: String,
617        #[graphql(desc = "The dependencies of the application being created")]
618        required_application_ids: Vec<ApplicationId>,
619    ) -> Result<ApplicationId, Error> {
620        self.apply_client_command(&chain_id, move |client| {
621            let parameters = parameters.as_bytes().to_vec();
622            let instantiation_argument = instantiation_argument.as_bytes().to_vec();
623            let required_application_ids = required_application_ids.clone();
624            async move {
625                let result = client
626                    .create_application_untyped(
627                        module_id,
628                        parameters,
629                        instantiation_argument,
630                        required_application_ids,
631                    )
632                    .await
633                    .map_err(Error::from)
634                    .map(|outcome| outcome.map(|(application_id, _)| application_id));
635                (result, client)
636            }
637        })
638        .await
639    }
640}
641
642#[async_graphql::Object(cache_control(no_cache))]
643impl<C> QueryRoot<C>
644where
645    C: ClientContext + 'static,
646{
647    async fn chain(
648        &self,
649        chain_id: ChainId,
650    ) -> Result<
651        ChainStateExtendedView<<C::Environment as linera_core::Environment>::StorageContext>,
652        Error,
653    > {
654        let client = self
655            .context
656            .lock()
657            .await
658            .make_chain_client(chain_id)
659            .await?;
660        let view = client.chain_state_view().await?;
661        Ok(ChainStateExtendedView::new(view))
662    }
663
664    async fn applications(&self, chain_id: ChainId) -> Result<Vec<ApplicationOverview>, Error> {
665        let client = self
666            .context
667            .lock()
668            .await
669            .make_chain_client(chain_id)
670            .await?;
671        let applications = client
672            .chain_state_view()
673            .await?
674            .execution_state
675            .list_applications()
676            .await?;
677
678        let overviews = applications
679            .into_iter()
680            .map(|(id, description)| ApplicationOverview::new(id, description, self.port, chain_id))
681            .collect();
682
683        Ok(overviews)
684    }
685
686    async fn chains(&self) -> Result<Chains, Error> {
687        Ok(Chains {
688            list: self
689                .context
690                .lock()
691                .await
692                .wallet()
693                .chain_ids()
694                .try_collect()
695                .await?,
696            default: self.default_chain,
697        })
698    }
699
700    async fn block(
701        &self,
702        hash: Option<CryptoHash>,
703        chain_id: ChainId,
704    ) -> Result<Option<ConfirmedBlock>, Error> {
705        let client = self
706            .context
707            .lock()
708            .await
709            .make_chain_client(chain_id)
710            .await?;
711        let hash = match hash {
712            Some(hash) => Some(hash),
713            None => client.chain_info().await?.block_hash,
714        };
715        if let Some(hash) = hash {
716            let block = client.read_confirmed_block(hash).await?;
717            Ok(Some(block))
718        } else {
719            Ok(None)
720        }
721    }
722
723    async fn events_from_index(
724        &self,
725        chain_id: ChainId,
726        stream_id: StreamId,
727        start_index: u32,
728    ) -> Result<Vec<IndexAndEvent>, Error> {
729        Ok(self
730            .context
731            .lock()
732            .await
733            .make_chain_client(chain_id)
734            .await?
735            .events_from_index(stream_id, start_index)
736            .await?)
737    }
738
739    async fn blocks(
740        &self,
741        from: Option<CryptoHash>,
742        chain_id: ChainId,
743        limit: Option<u32>,
744    ) -> Result<Vec<ConfirmedBlock>, Error> {
745        let client = self
746            .context
747            .lock()
748            .await
749            .make_chain_client(chain_id)
750            .await?;
751        let limit = limit.unwrap_or(10);
752        let from = match from {
753            Some(from) => Some(from),
754            None => client.chain_info().await?.block_hash,
755        };
756        let Some(from) = from else {
757            return Ok(vec![]);
758        };
759        let mut hash = Some(from);
760        let mut values = Vec::new();
761        for _ in 0..limit {
762            let Some(next_hash) = hash else {
763                break;
764            };
765            let value = client.read_confirmed_block(next_hash).await?;
766            hash = value.block().header.previous_block_hash;
767            values.push(value);
768        }
769        Ok(values)
770    }
771
772    /// Returns the version information on this node service.
773    async fn version(&self) -> linera_version::VersionInfo {
774        linera_version::VersionInfo::default()
775    }
776}
777
778// What follows is a hack to add a chain_id field to `ChainStateView` based on
779// https://async-graphql.github.io/async-graphql/en/merging_objects.html
780
781struct ChainStateViewExtension(ChainId);
782
783#[async_graphql::Object(cache_control(no_cache))]
784impl ChainStateViewExtension {
785    async fn chain_id(&self) -> ChainId {
786        self.0
787    }
788}
789
790#[derive(MergedObject)]
791struct ChainStateExtendedView<C>(ChainStateViewExtension, ReadOnlyChainStateView<C>)
792where
793    C: linera_views::context::Context + Clone + Send + Sync + 'static,
794    C::Extra: linera_execution::ExecutionRuntimeContext;
795
796/// A wrapper type that allows proxying GraphQL queries to a [`ChainStateView`] that's behind an
797/// [`OwnedRwLockReadGuard`].
798pub struct ReadOnlyChainStateView<C>(OwnedRwLockReadGuard<ChainStateView<C>>)
799where
800    C: linera_views::context::Context + Clone + Send + Sync + 'static;
801
802impl<C> ContainerType for ReadOnlyChainStateView<C>
803where
804    C: linera_views::context::Context + Clone + Send + Sync + 'static,
805{
806    async fn resolve_field(
807        &self,
808        context: &async_graphql::Context<'_>,
809    ) -> async_graphql::ServerResult<Option<async_graphql::Value>> {
810        self.0.resolve_field(context).await
811    }
812}
813
814impl<C> OutputType for ReadOnlyChainStateView<C>
815where
816    C: linera_views::context::Context + Clone + Send + Sync + 'static,
817{
818    fn type_name() -> Cow<'static, str> {
819        ChainStateView::<C>::type_name()
820    }
821
822    fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
823        ChainStateView::<C>::create_type_info(registry)
824    }
825
826    async fn resolve(
827        &self,
828        context: &async_graphql::ContextSelectionSet<'_>,
829        field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
830    ) -> async_graphql::ServerResult<async_graphql::Value> {
831        self.0.resolve(context, field).await
832    }
833}
834
835impl<C> ChainStateExtendedView<C>
836where
837    C: linera_views::context::Context + Clone + Send + Sync + 'static,
838    C::Extra: linera_execution::ExecutionRuntimeContext,
839{
840    fn new(view: OwnedRwLockReadGuard<ChainStateView<C>>) -> Self {
841        Self(
842            ChainStateViewExtension(view.chain_id()),
843            ReadOnlyChainStateView(view),
844        )
845    }
846}
847
848#[derive(SimpleObject)]
849pub struct ApplicationOverview {
850    id: ApplicationId,
851    description: ApplicationDescription,
852    link: String,
853}
854
855impl ApplicationOverview {
856    fn new(
857        id: ApplicationId,
858        description: ApplicationDescription,
859        port: NonZeroU16,
860        chain_id: ChainId,
861    ) -> Self {
862        Self {
863            id,
864            description,
865            link: format!(
866                "http://localhost:{}/chains/{}/applications/{}",
867                port.get(),
868                chain_id,
869                id
870            ),
871        }
872    }
873}
874
875/// The `NodeService` is a server that exposes a web-server to the client.
876/// The node service is primarily used to explore the state of a chain in GraphQL.
877pub struct NodeService<C>
878where
879    C: ClientContext + 'static,
880{
881    config: ChainListenerConfig,
882    port: NonZeroU16,
883    #[cfg(with_metrics)]
884    metrics_port: NonZeroU16,
885    default_chain: Option<ChainId>,
886    context: Arc<Mutex<C>>,
887}
888
889impl<C> Clone for NodeService<C>
890where
891    C: ClientContext + 'static,
892{
893    fn clone(&self) -> Self {
894        Self {
895            config: self.config.clone(),
896            port: self.port,
897            #[cfg(with_metrics)]
898            metrics_port: self.metrics_port,
899            default_chain: self.default_chain,
900            context: Arc::clone(&self.context),
901        }
902    }
903}
904
905impl<C> NodeService<C>
906where
907    C: ClientContext,
908{
909    /// Creates a new instance of the node service given a client chain and a port.
910    pub fn new(
911        config: ChainListenerConfig,
912        port: NonZeroU16,
913        #[cfg(with_metrics)] metrics_port: NonZeroU16,
914        default_chain: Option<ChainId>,
915        context: Arc<Mutex<C>>,
916    ) -> Self {
917        Self {
918            config,
919            port,
920            #[cfg(with_metrics)]
921            metrics_port,
922            default_chain,
923            context,
924        }
925    }
926
927    #[cfg(with_metrics)]
928    pub fn metrics_address(&self) -> SocketAddr {
929        SocketAddr::from(([0, 0, 0, 0], self.metrics_port.get()))
930    }
931
932    pub fn schema(&self) -> Schema<QueryRoot<C>, MutationRoot<C>, SubscriptionRoot<C>> {
933        Schema::build(
934            QueryRoot {
935                context: Arc::clone(&self.context),
936                port: self.port,
937                default_chain: self.default_chain,
938            },
939            MutationRoot {
940                context: Arc::clone(&self.context),
941            },
942            SubscriptionRoot {
943                context: Arc::clone(&self.context),
944            },
945        )
946        .finish()
947    }
948
949    /// Runs the node service.
950    #[instrument(name = "node_service", level = "info", skip_all, fields(port = ?self.port))]
951    pub async fn run(
952        self,
953        cancellation_token: CancellationToken,
954        command_receiver: UnboundedReceiver<ListenerCommand>,
955    ) -> Result<(), anyhow::Error> {
956        let port = self.port.get();
957        let index_handler = axum::routing::get(util::graphiql).post(Self::index_handler);
958        let application_handler =
959            axum::routing::get(util::graphiql).post(Self::application_handler);
960
961        #[cfg(with_metrics)]
962        monitoring_server::start_metrics(self.metrics_address(), cancellation_token.clone());
963
964        let app = Router::new()
965            .route("/", index_handler)
966            .route(
967                "/chains/{chain_id}/applications/{application_id}",
968                application_handler,
969            )
970            .route("/ready", axum::routing::get(|| async { "ready!" }))
971            .route_service("/ws", GraphQLSubscription::new(self.schema()))
972            .layer(Extension(self.clone()))
973            // TODO(#551): Provide application authentication.
974            .layer(CorsLayer::permissive());
975
976        info!("GraphiQL IDE: http://localhost:{}", port);
977
978        let storage = self.context.lock().await.storage().clone();
979
980        let chain_listener = ChainListener::new(
981            self.config,
982            self.context,
983            storage,
984            cancellation_token.clone(),
985            command_receiver,
986        )
987        .run(true)
988        .await?;
989        let mut chain_listener = Box::pin(chain_listener).fuse();
990        let tcp_listener =
991            tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
992        let server = axum::serve(tcp_listener, app)
993            .with_graceful_shutdown(cancellation_token.cancelled_owned())
994            .into_future();
995        futures::select! {
996            result = chain_listener => result?,
997            result = Box::pin(server).fuse() => result?,
998        };
999
1000        Ok(())
1001    }
1002
1003    /// Handles service queries for user applications (including mutations).
1004    async fn handle_service_request(
1005        &self,
1006        application_id: ApplicationId,
1007        request: Vec<u8>,
1008        chain_id: ChainId,
1009        block_hash: Option<CryptoHash>,
1010    ) -> Result<Vec<u8>, NodeServiceError> {
1011        let QueryOutcome {
1012            response,
1013            operations,
1014        } = self
1015            .query_user_application(application_id, request, chain_id, block_hash)
1016            .await?;
1017        if operations.is_empty() {
1018            return Ok(response);
1019        }
1020
1021        trace!("Query requested a new block with operations: {operations:?}");
1022        let client = self
1023            .context
1024            .lock()
1025            .await
1026            .make_chain_client(chain_id)
1027            .await?;
1028        let hash = loop {
1029            let timeout = match client
1030                .execute_operations(operations.clone(), vec![])
1031                .await?
1032            {
1033                ClientOutcome::Committed(certificate) => break certificate.hash(),
1034                ClientOutcome::WaitForTimeout(timeout) => timeout,
1035            };
1036            let mut stream = client.subscribe().map_err(|_| {
1037                chain_client::Error::InternalError("Could not subscribe to the local node.")
1038            })?;
1039            util::wait_for_next_round(&mut stream, timeout).await;
1040        };
1041        let response = async_graphql::Response::new(hash.to_value());
1042        Ok(serde_json::to_vec(&response)?)
1043    }
1044
1045    /// Queries a user application, returning the raw [`QueryOutcome`].
1046    async fn query_user_application(
1047        &self,
1048        application_id: ApplicationId,
1049        bytes: Vec<u8>,
1050        chain_id: ChainId,
1051        block_hash: Option<CryptoHash>,
1052    ) -> Result<QueryOutcome<Vec<u8>>, NodeServiceError> {
1053        let query = Query::User {
1054            application_id,
1055            bytes,
1056        };
1057        let client = self
1058            .context
1059            .lock()
1060            .await
1061            .make_chain_client(chain_id)
1062            .await?;
1063        let QueryOutcome {
1064            response,
1065            operations,
1066        } = client.query_application(query, block_hash).await?;
1067        match response {
1068            QueryResponse::System(_) => {
1069                unreachable!("cannot get a system response for a user query")
1070            }
1071            QueryResponse::User(user_response_bytes) => Ok(QueryOutcome {
1072                response: user_response_bytes,
1073                operations,
1074            }),
1075        }
1076    }
1077
1078    /// Executes a GraphQL query and generates a response for our `Schema`.
1079    async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
1080        service
1081            .0
1082            .schema()
1083            .execute(request.into_inner())
1084            .await
1085            .into()
1086    }
1087
1088    /// Executes a GraphQL query against an application.
1089    /// Pattern matches on the `OperationType` of the query and routes the query
1090    /// accordingly.
1091    async fn application_handler(
1092        Path((chain_id, application_id)): Path<(String, String)>,
1093        service: Extension<Self>,
1094        request: String,
1095    ) -> Result<Vec<u8>, NodeServiceError> {
1096        let chain_id: ChainId = chain_id.parse().map_err(NodeServiceError::InvalidChainId)?;
1097        let application_id: ApplicationId = application_id.parse()?;
1098
1099        debug!(
1100            %chain_id,
1101            %application_id,
1102            "processing request for application:\n{:?}",
1103            &request
1104        );
1105        let response = service
1106            .0
1107            .handle_service_request(application_id, request.into_bytes(), chain_id, None)
1108            .await?;
1109
1110        Ok(response)
1111    }
1112}