linera_service/
node_service.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{borrow::Cow, future::IntoFuture, iter, net::SocketAddr, num::NonZeroU16, sync::Arc};
5
6use async_graphql::{
7    futures_util::Stream, resolver_utils::ContainerType, EmptyMutation, Error, MergedObject,
8    OutputType, Request, Response, ScalarType, Schema, SimpleObject, Subscription,
9};
10use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
11use axum::{extract::Path, http::StatusCode, response, response::IntoResponse, Extension, Router};
12use futures::{lock::Mutex, Future, FutureExt as _, TryStreamExt as _};
13use linera_base::{
14    crypto::{CryptoError, CryptoHash},
15    data_types::{
16        Amount, ApplicationDescription, ApplicationPermissions, Bytecode, Epoch, TimeDelta,
17    },
18    identifiers::{
19        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
20    },
21    ownership::{ChainOwnership, TimeoutConfig},
22    vm::VmRuntime,
23    BcsHexParseError,
24};
25use linera_chain::{
26    types::{ConfirmedBlock, GenericCertificate},
27    ChainStateView,
28};
29use linera_client::chain_listener::{
30    ChainListener, ChainListenerConfig, ClientContext, ListenerCommand,
31};
32use linera_core::{
33    client::chain_client::{self, ChainClient},
34    data_types::ClientOutcome,
35    wallet::Wallet as _,
36    worker::Notification,
37};
38use linera_execution::{
39    committee::Committee, system::AdminOperation, Operation, Query, QueryOutcome, QueryResponse,
40    SystemOperation,
41};
42#[cfg(with_metrics)]
43use linera_metrics::monitoring_server;
44use linera_sdk::linera_base_types::BlobContent;
45use serde::{Deserialize, Serialize};
46use serde_json::json;
47use tokio::sync::{mpsc::UnboundedReceiver, OwnedRwLockReadGuard};
48use tokio_util::sync::CancellationToken;
49use tower_http::cors::CorsLayer;
50use tracing::{debug, error, info, instrument, trace};
51
52use crate::util;
53
54#[derive(SimpleObject, Serialize, Deserialize, Clone)]
55pub struct Chains {
56    pub list: Vec<ChainId>,
57    pub default: Option<ChainId>,
58}
59
60/// Our root GraphQL query type.
61pub struct QueryRoot<C> {
62    context: Arc<Mutex<C>>,
63    port: NonZeroU16,
64    default_chain: Option<ChainId>,
65}
66
67/// Our root GraphQL subscription type.
68pub struct SubscriptionRoot<C> {
69    context: Arc<Mutex<C>>,
70}
71
72/// Our root GraphQL mutation type.
73pub struct MutationRoot<C> {
74    context: Arc<Mutex<C>>,
75}
76
77#[derive(Debug, thiserror::Error)]
78enum NodeServiceError {
79    #[error(transparent)]
80    ChainClient(#[from] chain_client::Error),
81    #[error(transparent)]
82    BcsHex(#[from] BcsHexParseError),
83    #[error(transparent)]
84    Json(#[from] serde_json::Error),
85    #[error("malformed chain ID: {0}")]
86    InvalidChainId(CryptoError),
87    #[error(transparent)]
88    Client(#[from] linera_client::Error),
89    #[error("scheduling operations from queries is disabled in read-only mode")]
90    ReadOnlyModeOperationsNotAllowed,
91}
92
93impl IntoResponse for NodeServiceError {
94    fn into_response(self) -> response::Response {
95        let status = match self {
96            NodeServiceError::InvalidChainId(_) | NodeServiceError::BcsHex(_) => {
97                StatusCode::BAD_REQUEST
98            }
99            NodeServiceError::ReadOnlyModeOperationsNotAllowed => StatusCode::FORBIDDEN,
100            _ => StatusCode::INTERNAL_SERVER_ERROR,
101        };
102        let body = json!({"error": self.to_string()}).to_string();
103        (status, body).into_response()
104    }
105}
106
107#[Subscription]
108impl<C> SubscriptionRoot<C>
109where
110    C: ClientContext + 'static,
111{
112    /// Subscribes to notifications from the specified chain.
113    async fn notifications(
114        &self,
115        chain_id: ChainId,
116    ) -> Result<impl Stream<Item = Notification>, Error> {
117        let client = self
118            .context
119            .lock()
120            .await
121            .make_chain_client(chain_id)
122            .await?;
123        Ok(client.subscribe()?)
124    }
125}
126
127impl<C> MutationRoot<C>
128where
129    C: ClientContext,
130{
131    async fn execute_system_operation(
132        &self,
133        system_operation: SystemOperation,
134        chain_id: ChainId,
135    ) -> Result<CryptoHash, Error> {
136        let certificate = self
137            .apply_client_command(&chain_id, move |client| {
138                let operation = Operation::system(system_operation.clone());
139                async move {
140                    let result = client
141                        .execute_operation(operation)
142                        .await
143                        .map_err(Error::from);
144                    (result, client)
145                }
146            })
147            .await?;
148        Ok(certificate.hash())
149    }
150
151    /// Applies the given function to the chain client.
152    /// Updates the wallet regardless of the outcome. As long as the function returns a round
153    /// timeout, it will wait and retry.
154    async fn apply_client_command<F, Fut, T>(
155        &self,
156        chain_id: &ChainId,
157        mut f: F,
158    ) -> Result<T, Error>
159    where
160        F: FnMut(ChainClient<C::Environment>) -> Fut,
161        Fut: Future<Output = (Result<ClientOutcome<T>, Error>, ChainClient<C::Environment>)>,
162    {
163        loop {
164            let client = self
165                .context
166                .lock()
167                .await
168                .make_chain_client(*chain_id)
169                .await?;
170            let mut stream = client.subscribe()?;
171            let (result, client) = f(client).await;
172            self.context.lock().await.update_wallet(&client).await?;
173            let timeout = match result? {
174                ClientOutcome::Committed(t) => return Ok(t),
175                ClientOutcome::WaitForTimeout(timeout) => timeout,
176            };
177            drop(client);
178            util::wait_for_next_round(&mut stream, timeout).await;
179        }
180    }
181}
182
183#[async_graphql::Object(cache_control(no_cache))]
184impl<C> MutationRoot<C>
185where
186    C: ClientContext + 'static,
187{
188    /// Processes the inbox and returns the lists of certificate hashes that were created, if any.
189    async fn process_inbox(
190        &self,
191        #[graphql(desc = "The chain whose inbox is being processed.")] chain_id: ChainId,
192    ) -> Result<Vec<CryptoHash>, Error> {
193        let mut hashes = Vec::new();
194        loop {
195            let client = self
196                .context
197                .lock()
198                .await
199                .make_chain_client(chain_id)
200                .await?;
201            let result = client.process_inbox().await;
202            self.context.lock().await.update_wallet(&client).await?;
203            let (certificates, maybe_timeout) = result?;
204            hashes.extend(certificates.into_iter().map(|cert| cert.hash()));
205            match maybe_timeout {
206                None => return Ok(hashes),
207                Some(timestamp) => {
208                    let mut stream = client.subscribe()?;
209                    drop(client);
210                    util::wait_for_next_round(&mut stream, timestamp).await;
211                }
212            }
213        }
214    }
215
216    /// Synchronizes the chain with the validators. Returns the chain's length.
217    ///
218    /// This is only used for testing, to make sure that a client is up to date.
219    // TODO(#4718): Remove this mutation.
220    async fn sync(
221        &self,
222        #[graphql(desc = "The chain being synchronized.")] chain_id: ChainId,
223    ) -> Result<u64, Error> {
224        let client = self
225            .context
226            .lock()
227            .await
228            .make_chain_client(chain_id)
229            .await?;
230        let info = client.synchronize_from_validators().await?;
231        self.context.lock().await.update_wallet(&client).await?;
232        Ok(info.next_block_height.0)
233    }
234
235    /// Retries the pending block that was unsuccessfully proposed earlier.
236    async fn retry_pending_block(
237        &self,
238        #[graphql(desc = "The chain on whose block is being retried.")] chain_id: ChainId,
239    ) -> Result<Option<CryptoHash>, Error> {
240        let client = self
241            .context
242            .lock()
243            .await
244            .make_chain_client(chain_id)
245            .await?;
246        let outcome = client.process_pending_block().await?;
247        self.context.lock().await.update_wallet(&client).await?;
248        match outcome {
249            ClientOutcome::Committed(Some(certificate)) => Ok(Some(certificate.hash())),
250            ClientOutcome::Committed(None) => Ok(None),
251            ClientOutcome::WaitForTimeout(timeout) => Err(Error::from(format!(
252                "Please try again at {}",
253                timeout.timestamp
254            ))),
255        }
256    }
257
258    /// Transfers `amount` units of value from the given owner's account to the recipient.
259    /// If no owner is given, try to take the units out of the chain account.
260    async fn transfer(
261        &self,
262        #[graphql(desc = "The chain which native tokens are being transferred from.")]
263        chain_id: ChainId,
264        #[graphql(desc = "The account being debited on the chain.")] owner: AccountOwner,
265        #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
266        #[graphql(desc = "The amount being transferred.")] amount: Amount,
267    ) -> Result<CryptoHash, Error> {
268        self.apply_client_command(&chain_id, move |client| async move {
269            let result = client
270                .transfer(owner, amount, recipient)
271                .await
272                .map_err(Error::from)
273                .map(|outcome| outcome.map(|certificate| certificate.hash()));
274            (result, client)
275        })
276        .await
277    }
278
279    /// Claims `amount` units of value from the given owner's account in the remote
280    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
281    /// process the message.
282    async fn claim(
283        &self,
284        #[graphql(desc = "The chain for whom owner is one of the owner.")] chain_id: ChainId,
285        #[graphql(desc = "The owner of chain targetId being debited.")] owner: AccountOwner,
286        #[graphql(desc = "The chain whose owner is being debited.")] target_id: ChainId,
287        #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
288        #[graphql(desc = "The amount being transferred.")] amount: Amount,
289    ) -> Result<CryptoHash, Error> {
290        self.apply_client_command(&chain_id, move |client| async move {
291            let result = client
292                .claim(owner, target_id, recipient, amount)
293                .await
294                .map_err(Error::from)
295                .map(|outcome| outcome.map(|certificate| certificate.hash()));
296            (result, client)
297        })
298        .await
299    }
300
301    /// Test if a data blob is readable from a transaction in the current chain.
302    // TODO(#2490): Consider removing or renaming this.
303    async fn read_data_blob(
304        &self,
305        chain_id: ChainId,
306        hash: CryptoHash,
307    ) -> Result<CryptoHash, Error> {
308        self.apply_client_command(&chain_id, move |client| async move {
309            let result = client
310                .read_data_blob(hash)
311                .await
312                .map_err(Error::from)
313                .map(|outcome| outcome.map(|certificate| certificate.hash()));
314            (result, client)
315        })
316        .await
317    }
318
319    /// Creates a new single-owner chain.
320    async fn open_chain(
321        &self,
322        #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
323        #[graphql(desc = "The owner of the new chain.")] owner: AccountOwner,
324        #[graphql(desc = "The balance of the chain being created. Zero if `None`.")]
325        balance: Option<Amount>,
326    ) -> Result<ChainId, Error> {
327        let ownership = ChainOwnership::single(owner);
328        let balance = balance.unwrap_or(Amount::ZERO);
329        let description = self
330            .apply_client_command(&chain_id, move |client| {
331                let ownership = ownership.clone();
332                async move {
333                    let result = client
334                        .open_chain(ownership, ApplicationPermissions::default(), balance)
335                        .await
336                        .map_err(Error::from)
337                        .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
338                    (result, client)
339                }
340            })
341            .await?;
342        Ok(description.id())
343    }
344
345    /// Creates a new multi-owner chain.
346    #[expect(clippy::too_many_arguments)]
347    async fn open_multi_owner_chain(
348        &self,
349        #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
350        #[graphql(desc = "Permissions for applications on the new chain")]
351        application_permissions: Option<ApplicationPermissions>,
352        #[graphql(desc = "The owners of the chain")] owners: Vec<AccountOwner>,
353        #[graphql(desc = "The weights of the owners")] weights: Option<Vec<u64>>,
354        #[graphql(desc = "The number of multi-leader rounds")] multi_leader_rounds: Option<u32>,
355        #[graphql(desc = "The balance of the chain. Zero if `None`")] balance: Option<Amount>,
356        #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
357        fast_round_ms: Option<u64>,
358        #[graphql(
359            desc = "The duration of the first single-leader and all multi-leader rounds",
360            default = 10_000
361        )]
362        base_timeout_ms: u64,
363        #[graphql(
364            desc = "The number of milliseconds by which the timeout increases after each \
365                    single-leader round",
366            default = 1_000
367        )]
368        timeout_increment_ms: u64,
369        #[graphql(
370            desc = "The age of an incoming tracked or protected message after which the \
371                    validators start transitioning the chain to fallback mode, in milliseconds.",
372            default = 86_400_000
373        )]
374        fallback_duration_ms: u64,
375    ) -> Result<ChainId, Error> {
376        let owners = if let Some(weights) = weights {
377            if weights.len() != owners.len() {
378                return Err(Error::new(format!(
379                    "There are {} owners but {} weights.",
380                    owners.len(),
381                    weights.len()
382                )));
383            }
384            owners.into_iter().zip(weights).collect::<Vec<_>>()
385        } else {
386            owners
387                .into_iter()
388                .zip(iter::repeat(100))
389                .collect::<Vec<_>>()
390        };
391        let multi_leader_rounds = multi_leader_rounds.unwrap_or(u32::MAX);
392        let timeout_config = TimeoutConfig {
393            fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
394            base_timeout: TimeDelta::from_millis(base_timeout_ms),
395            timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
396            fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
397        };
398        let ownership = ChainOwnership::multiple(owners, multi_leader_rounds, timeout_config);
399        let balance = balance.unwrap_or(Amount::ZERO);
400        let description = self
401            .apply_client_command(&chain_id, move |client| {
402                let ownership = ownership.clone();
403                let application_permissions = application_permissions.clone().unwrap_or_default();
404                async move {
405                    let result = client
406                        .open_chain(ownership, application_permissions, balance)
407                        .await
408                        .map_err(Error::from)
409                        .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
410                    (result, client)
411                }
412            })
413            .await?;
414        Ok(description.id())
415    }
416
417    /// Closes the chain. Returns the new block hash if successful or `None` if it was already closed.
418    async fn close_chain(
419        &self,
420        #[graphql(desc = "The chain being closed.")] chain_id: ChainId,
421    ) -> Result<Option<CryptoHash>, Error> {
422        let maybe_cert = self
423            .apply_client_command(&chain_id, |client| async move {
424                let result = client.close_chain().await.map_err(Error::from);
425                (result, client)
426            })
427            .await?;
428        Ok(maybe_cert.as_ref().map(GenericCertificate::hash))
429    }
430
431    /// Changes the chain to a single-owner chain
432    async fn change_owner(
433        &self,
434        #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
435        #[graphql(desc = "The new single owner of the chain")] new_owner: AccountOwner,
436    ) -> Result<CryptoHash, Error> {
437        let operation = SystemOperation::ChangeOwnership {
438            super_owners: vec![new_owner],
439            owners: Vec::new(),
440            first_leader: None,
441            multi_leader_rounds: 2,
442            open_multi_leader_rounds: false,
443            timeout_config: TimeoutConfig::default(),
444        };
445        self.execute_system_operation(operation, chain_id).await
446    }
447
448    /// Changes the ownership of the chain
449    #[expect(clippy::too_many_arguments)]
450    async fn change_multiple_owners(
451        &self,
452        #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
453        #[graphql(desc = "The new list of owners of the chain")] new_owners: Vec<AccountOwner>,
454        #[graphql(desc = "The new list of weights of the owners")] new_weights: Vec<u64>,
455        #[graphql(desc = "The multi-leader round of the chain")] multi_leader_rounds: u32,
456        #[graphql(
457            desc = "Whether multi-leader rounds are unrestricted, that is not limited to chain owners."
458        )]
459        open_multi_leader_rounds: bool,
460        #[graphql(desc = "The leader of the first single-leader round. \
461                          If not set, this is random like other rounds.")]
462        first_leader: Option<AccountOwner>,
463        #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
464        fast_round_ms: Option<u64>,
465        #[graphql(
466            desc = "The duration of the first single-leader and all multi-leader rounds",
467            default = 10_000
468        )]
469        base_timeout_ms: u64,
470        #[graphql(
471            desc = "The number of milliseconds by which the timeout increases after each \
472                    single-leader round",
473            default = 1_000
474        )]
475        timeout_increment_ms: u64,
476        #[graphql(
477            desc = "The age of an incoming tracked or protected message after which the \
478                    validators start transitioning the chain to fallback mode, in milliseconds.",
479            default = 86_400_000
480        )]
481        fallback_duration_ms: u64,
482    ) -> Result<CryptoHash, Error> {
483        let operation = SystemOperation::ChangeOwnership {
484            super_owners: Vec::new(),
485            owners: new_owners.into_iter().zip(new_weights).collect(),
486            first_leader,
487            multi_leader_rounds,
488            open_multi_leader_rounds,
489            timeout_config: TimeoutConfig {
490                fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
491                base_timeout: TimeDelta::from_millis(base_timeout_ms),
492                timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
493                fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
494            },
495        };
496        self.execute_system_operation(operation, chain_id).await
497    }
498
499    /// Changes the application permissions configuration on this chain.
500    #[expect(clippy::too_many_arguments)]
501    async fn change_application_permissions(
502        &self,
503        #[graphql(desc = "The chain whose permissions are being changed")] chain_id: ChainId,
504        #[graphql(
505            desc = "These applications are allowed to manage the chain: close it, change \
506                    application permissions, and change ownership."
507        )]
508        manage_chain: Vec<ApplicationId>,
509        #[graphql(
510            desc = "If this is `None`, all system operations and application operations are allowed.
511If it is `Some`, only operations from the specified applications are allowed,
512and no system operations."
513        )]
514        execute_operations: Option<Vec<ApplicationId>>,
515        #[graphql(
516            desc = "At least one operation or incoming message from each of these applications must occur in every block."
517        )]
518        mandatory_applications: Vec<ApplicationId>,
519        #[graphql(
520            desc = "These applications are allowed to perform calls to services as oracles."
521        )]
522        call_service_as_oracle: Option<Vec<ApplicationId>>,
523        #[graphql(desc = "These applications are allowed to perform HTTP requests.")]
524        make_http_requests: Option<Vec<ApplicationId>>,
525    ) -> Result<CryptoHash, Error> {
526        let operation = SystemOperation::ChangeApplicationPermissions(ApplicationPermissions {
527            execute_operations,
528            mandatory_applications,
529            manage_chain,
530            call_service_as_oracle,
531            make_http_requests,
532        });
533        self.execute_system_operation(operation, chain_id).await
534    }
535
536    /// (admin chain only) Registers a new committee. This will notify the subscribers of
537    /// the admin chain so that they can migrate to the new epoch (by accepting the
538    /// notification as an "incoming message" in a next block).
539    async fn create_committee(
540        &self,
541        chain_id: ChainId,
542        committee: Committee,
543    ) -> Result<CryptoHash, Error> {
544        Ok(self
545            .apply_client_command(&chain_id, move |client| {
546                let committee = committee.clone();
547                async move {
548                    let result = client
549                        .stage_new_committee(committee)
550                        .await
551                        .map_err(Error::from);
552                    (result, client)
553                }
554            })
555            .await?
556            .hash())
557    }
558
559    /// (admin chain only) Removes a committee. Once this message is accepted by a chain,
560    /// blocks from the retired epoch will not be accepted until they are followed (hence
561    /// re-certified) by a block certified by a recent committee.
562    async fn remove_committee(&self, chain_id: ChainId, epoch: Epoch) -> Result<CryptoHash, Error> {
563        let operation = SystemOperation::Admin(AdminOperation::RemoveCommittee { epoch });
564        self.execute_system_operation(operation, chain_id).await
565    }
566
567    /// Publishes a new application module.
568    async fn publish_module(
569        &self,
570        #[graphql(desc = "The chain publishing the module")] chain_id: ChainId,
571        #[graphql(desc = "The bytecode of the contract code")] contract: Bytecode,
572        #[graphql(desc = "The bytecode of the service code (only relevant for WebAssembly)")]
573        service: Bytecode,
574        #[graphql(desc = "The virtual machine being used (either Wasm or Evm)")]
575        vm_runtime: VmRuntime,
576    ) -> Result<ModuleId, Error> {
577        self.apply_client_command(&chain_id, move |client| {
578            let contract = contract.clone();
579            let service = service.clone();
580            async move {
581                let result = client
582                    .publish_module(contract, service, vm_runtime)
583                    .await
584                    .map_err(Error::from)
585                    .map(|outcome| outcome.map(|(module_id, _)| module_id));
586                (result, client)
587            }
588        })
589        .await
590    }
591
592    /// Publishes a new data blob.
593    async fn publish_data_blob(
594        &self,
595        #[graphql(desc = "The chain paying for the blob publication")] chain_id: ChainId,
596        #[graphql(desc = "The content of the data blob being created")] bytes: Vec<u8>,
597    ) -> Result<CryptoHash, Error> {
598        self.apply_client_command(&chain_id, |client| {
599            let bytes = bytes.clone();
600            async move {
601                let result = client.publish_data_blob(bytes).await.map_err(Error::from);
602                (result, client)
603            }
604        })
605        .await
606        .map(|_| CryptoHash::new(&BlobContent::new_data(bytes)))
607    }
608
609    /// Creates a new application.
610    async fn create_application(
611        &self,
612        #[graphql(desc = "The chain paying for the creation of the application")] chain_id: ChainId,
613        #[graphql(desc = "The module ID of the application being created")] module_id: ModuleId,
614        #[graphql(desc = "The JSON serialization of the parameters of the application")]
615        parameters: String,
616        #[graphql(
617            desc = "The JSON serialization of the instantiation argument of the application"
618        )]
619        instantiation_argument: String,
620        #[graphql(desc = "The dependencies of the application being created")]
621        required_application_ids: Vec<ApplicationId>,
622    ) -> Result<ApplicationId, Error> {
623        self.apply_client_command(&chain_id, move |client| {
624            let parameters = parameters.as_bytes().to_vec();
625            let instantiation_argument = instantiation_argument.as_bytes().to_vec();
626            let required_application_ids = required_application_ids.clone();
627            async move {
628                let result = client
629                    .create_application_untyped(
630                        module_id,
631                        parameters,
632                        instantiation_argument,
633                        required_application_ids,
634                    )
635                    .await
636                    .map_err(Error::from)
637                    .map(|outcome| outcome.map(|(application_id, _)| application_id));
638                (result, client)
639            }
640        })
641        .await
642    }
643}
644
645#[async_graphql::Object(cache_control(no_cache))]
646impl<C> QueryRoot<C>
647where
648    C: ClientContext + 'static,
649{
650    async fn chain(
651        &self,
652        chain_id: ChainId,
653    ) -> Result<
654        ChainStateExtendedView<<C::Environment as linera_core::Environment>::StorageContext>,
655        Error,
656    > {
657        let client = self
658            .context
659            .lock()
660            .await
661            .make_chain_client(chain_id)
662            .await?;
663        let view = client.chain_state_view().await?;
664        Ok(ChainStateExtendedView::new(view))
665    }
666
667    async fn applications(&self, chain_id: ChainId) -> Result<Vec<ApplicationOverview>, Error> {
668        let client = self
669            .context
670            .lock()
671            .await
672            .make_chain_client(chain_id)
673            .await?;
674        let applications = client
675            .chain_state_view()
676            .await?
677            .execution_state
678            .list_applications()
679            .await?;
680
681        let overviews = applications
682            .into_iter()
683            .map(|(id, description)| ApplicationOverview::new(id, description, self.port, chain_id))
684            .collect();
685
686        Ok(overviews)
687    }
688
689    async fn chains(&self) -> Result<Chains, Error> {
690        Ok(Chains {
691            list: self
692                .context
693                .lock()
694                .await
695                .wallet()
696                .chain_ids()
697                .try_collect()
698                .await?,
699            default: self.default_chain,
700        })
701    }
702
703    async fn block(
704        &self,
705        hash: Option<CryptoHash>,
706        chain_id: ChainId,
707    ) -> Result<Option<ConfirmedBlock>, Error> {
708        let client = self
709            .context
710            .lock()
711            .await
712            .make_chain_client(chain_id)
713            .await?;
714        let hash = match hash {
715            Some(hash) => Some(hash),
716            None => client.chain_info().await?.block_hash,
717        };
718        if let Some(hash) = hash {
719            let block = client.read_confirmed_block(hash).await?;
720            Ok(Some(block))
721        } else {
722            Ok(None)
723        }
724    }
725
726    async fn events_from_index(
727        &self,
728        chain_id: ChainId,
729        stream_id: StreamId,
730        start_index: u32,
731    ) -> Result<Vec<IndexAndEvent>, Error> {
732        Ok(self
733            .context
734            .lock()
735            .await
736            .make_chain_client(chain_id)
737            .await?
738            .events_from_index(stream_id, start_index)
739            .await?)
740    }
741
742    async fn blocks(
743        &self,
744        from: Option<CryptoHash>,
745        chain_id: ChainId,
746        limit: Option<u32>,
747    ) -> Result<Vec<ConfirmedBlock>, Error> {
748        let client = self
749            .context
750            .lock()
751            .await
752            .make_chain_client(chain_id)
753            .await?;
754        let limit = limit.unwrap_or(10);
755        let from = match from {
756            Some(from) => Some(from),
757            None => client.chain_info().await?.block_hash,
758        };
759        let Some(from) = from else {
760            return Ok(vec![]);
761        };
762        let mut hash = Some(from);
763        let mut values = Vec::new();
764        for _ in 0..limit {
765            let Some(next_hash) = hash else {
766                break;
767            };
768            let value = client.read_confirmed_block(next_hash).await?;
769            hash = value.block().header.previous_block_hash;
770            values.push(value);
771        }
772        Ok(values)
773    }
774
775    /// Returns the version information on this node service.
776    async fn version(&self) -> linera_version::VersionInfo {
777        linera_version::VersionInfo::default()
778    }
779}
780
781// What follows is a hack to add a chain_id field to `ChainStateView` based on
782// https://async-graphql.github.io/async-graphql/en/merging_objects.html
783
784struct ChainStateViewExtension(ChainId);
785
786#[async_graphql::Object(cache_control(no_cache))]
787impl ChainStateViewExtension {
788    async fn chain_id(&self) -> ChainId {
789        self.0
790    }
791}
792
793#[derive(MergedObject)]
794struct ChainStateExtendedView<C>(ChainStateViewExtension, ReadOnlyChainStateView<C>)
795where
796    C: linera_views::context::Context + Clone + Send + Sync + 'static,
797    C::Extra: linera_execution::ExecutionRuntimeContext;
798
799/// A wrapper type that allows proxying GraphQL queries to a [`ChainStateView`] that's behind an
800/// [`OwnedRwLockReadGuard`].
801pub struct ReadOnlyChainStateView<C>(OwnedRwLockReadGuard<ChainStateView<C>>)
802where
803    C: linera_views::context::Context + Clone + Send + Sync + 'static;
804
805impl<C> ContainerType for ReadOnlyChainStateView<C>
806where
807    C: linera_views::context::Context + Clone + Send + Sync + 'static,
808{
809    async fn resolve_field(
810        &self,
811        context: &async_graphql::Context<'_>,
812    ) -> async_graphql::ServerResult<Option<async_graphql::Value>> {
813        self.0.resolve_field(context).await
814    }
815}
816
817impl<C> OutputType for ReadOnlyChainStateView<C>
818where
819    C: linera_views::context::Context + Clone + Send + Sync + 'static,
820{
821    fn type_name() -> Cow<'static, str> {
822        ChainStateView::<C>::type_name()
823    }
824
825    fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
826        ChainStateView::<C>::create_type_info(registry)
827    }
828
829    async fn resolve(
830        &self,
831        context: &async_graphql::ContextSelectionSet<'_>,
832        field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
833    ) -> async_graphql::ServerResult<async_graphql::Value> {
834        self.0.resolve(context, field).await
835    }
836}
837
838impl<C> ChainStateExtendedView<C>
839where
840    C: linera_views::context::Context + Clone + Send + Sync + 'static,
841    C::Extra: linera_execution::ExecutionRuntimeContext,
842{
843    fn new(view: OwnedRwLockReadGuard<ChainStateView<C>>) -> Self {
844        Self(
845            ChainStateViewExtension(view.chain_id()),
846            ReadOnlyChainStateView(view),
847        )
848    }
849}
850
851#[derive(SimpleObject)]
852pub struct ApplicationOverview {
853    id: ApplicationId,
854    description: ApplicationDescription,
855    link: String,
856}
857
858impl ApplicationOverview {
859    fn new(
860        id: ApplicationId,
861        description: ApplicationDescription,
862        port: NonZeroU16,
863        chain_id: ChainId,
864    ) -> Self {
865        Self {
866            id,
867            description,
868            link: format!(
869                "http://localhost:{}/chains/{}/applications/{}",
870                port.get(),
871                chain_id,
872                id
873            ),
874        }
875    }
876}
877
878/// Schema type that can be either full (with mutations) or read-only.
879pub enum NodeServiceSchema<C>
880where
881    C: ClientContext + 'static,
882{
883    /// Full schema with mutations enabled.
884    Full(Schema<QueryRoot<C>, MutationRoot<C>, SubscriptionRoot<C>>),
885    /// Read-only schema with mutations disabled.
886    ReadOnly(Schema<QueryRoot<C>, EmptyMutation, SubscriptionRoot<C>>),
887}
888
889impl<C> NodeServiceSchema<C>
890where
891    C: ClientContext,
892{
893    /// Executes a GraphQL request.
894    pub async fn execute(&self, request: impl Into<Request>) -> Response {
895        match self {
896            Self::Full(schema) => schema.execute(request).await,
897            Self::ReadOnly(schema) => schema.execute(request).await,
898        }
899    }
900
901    /// Returns the SDL (Schema Definition Language) representation.
902    pub fn sdl(&self) -> String {
903        match self {
904            Self::Full(schema) => schema.sdl(),
905            Self::ReadOnly(schema) => schema.sdl(),
906        }
907    }
908}
909
910impl<C> Clone for NodeServiceSchema<C>
911where
912    C: ClientContext,
913{
914    fn clone(&self) -> Self {
915        match self {
916            Self::Full(schema) => Self::Full(schema.clone()),
917            Self::ReadOnly(schema) => Self::ReadOnly(schema.clone()),
918        }
919    }
920}
921
922/// The `NodeService` is a server that exposes a web-server to the client.
923/// The node service is primarily used to explore the state of a chain in GraphQL.
924pub struct NodeService<C>
925where
926    C: ClientContext + 'static,
927{
928    config: ChainListenerConfig,
929    port: NonZeroU16,
930    #[cfg(with_metrics)]
931    metrics_port: NonZeroU16,
932    default_chain: Option<ChainId>,
933    context: Arc<Mutex<C>>,
934    /// If true, disallow mutations and prevent queries from scheduling operations.
935    read_only: bool,
936}
937
938impl<C> Clone for NodeService<C>
939where
940    C: ClientContext + 'static,
941{
942    fn clone(&self) -> Self {
943        Self {
944            config: self.config.clone(),
945            port: self.port,
946            #[cfg(with_metrics)]
947            metrics_port: self.metrics_port,
948            default_chain: self.default_chain,
949            context: Arc::clone(&self.context),
950            read_only: self.read_only,
951        }
952    }
953}
954
955impl<C> NodeService<C>
956where
957    C: ClientContext,
958{
959    /// Creates a new instance of the node service given a client chain and a port.
960    pub fn new(
961        config: ChainListenerConfig,
962        port: NonZeroU16,
963        #[cfg(with_metrics)] metrics_port: NonZeroU16,
964        default_chain: Option<ChainId>,
965        context: Arc<Mutex<C>>,
966        read_only: bool,
967    ) -> Self {
968        Self {
969            config,
970            port,
971            #[cfg(with_metrics)]
972            metrics_port,
973            default_chain,
974            context,
975            read_only,
976        }
977    }
978
979    #[cfg(with_metrics)]
980    pub fn metrics_address(&self) -> SocketAddr {
981        SocketAddr::from(([0, 0, 0, 0], self.metrics_port.get()))
982    }
983
984    pub fn schema(&self) -> NodeServiceSchema<C> {
985        let query = QueryRoot {
986            context: Arc::clone(&self.context),
987            port: self.port,
988            default_chain: self.default_chain,
989        };
990        let subscription = SubscriptionRoot {
991            context: Arc::clone(&self.context),
992        };
993
994        if self.read_only {
995            NodeServiceSchema::ReadOnly(Schema::build(query, EmptyMutation, subscription).finish())
996        } else {
997            NodeServiceSchema::Full(
998                Schema::build(
999                    query,
1000                    MutationRoot {
1001                        context: Arc::clone(&self.context),
1002                    },
1003                    subscription,
1004                )
1005                .finish(),
1006            )
1007        }
1008    }
1009
1010    /// Runs the node service.
1011    #[instrument(name = "node_service", level = "info", skip_all, fields(port = ?self.port))]
1012    pub async fn run(
1013        self,
1014        cancellation_token: CancellationToken,
1015        command_receiver: UnboundedReceiver<ListenerCommand>,
1016    ) -> Result<(), anyhow::Error> {
1017        let port = self.port.get();
1018        let index_handler = axum::routing::get(util::graphiql).post(Self::index_handler);
1019        let application_handler =
1020            axum::routing::get(util::graphiql).post(Self::application_handler);
1021
1022        #[cfg(with_metrics)]
1023        monitoring_server::start_metrics(self.metrics_address(), cancellation_token.clone());
1024
1025        let base_router = Router::new()
1026            .route("/", index_handler)
1027            .route(
1028                "/chains/{chain_id}/applications/{application_id}",
1029                application_handler,
1030            )
1031            .route("/ready", axum::routing::get(|| async { "ready!" }));
1032
1033        // Create router with appropriate schema for WebSocket subscriptions.
1034        let app = match self.schema() {
1035            NodeServiceSchema::Full(schema) => {
1036                base_router.route_service("/ws", GraphQLSubscription::new(schema))
1037            }
1038            NodeServiceSchema::ReadOnly(schema) => {
1039                base_router.route_service("/ws", GraphQLSubscription::new(schema))
1040            }
1041        }
1042        .layer(Extension(self.clone()))
1043        // TODO(#551): Provide application authentication.
1044        .layer(CorsLayer::permissive());
1045
1046        info!("GraphiQL IDE: http://localhost:{}", port);
1047
1048        let storage = self.context.lock().await.storage().clone();
1049
1050        let chain_listener = ChainListener::new(
1051            self.config,
1052            self.context,
1053            storage,
1054            cancellation_token.clone(),
1055            command_receiver,
1056            true,
1057        )
1058        .run()
1059        .await?;
1060        let mut chain_listener = Box::pin(chain_listener).fuse();
1061        let tcp_listener =
1062            tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
1063        let server = axum::serve(tcp_listener, app)
1064            .with_graceful_shutdown(cancellation_token.cancelled_owned())
1065            .into_future();
1066        futures::select! {
1067            result = chain_listener => result?,
1068            result = Box::pin(server).fuse() => result?,
1069        };
1070
1071        Ok(())
1072    }
1073
1074    /// Handles service queries for user applications (including mutations).
1075    async fn handle_service_request(
1076        &self,
1077        application_id: ApplicationId,
1078        request: Vec<u8>,
1079        chain_id: ChainId,
1080        block_hash: Option<CryptoHash>,
1081    ) -> Result<Vec<u8>, NodeServiceError> {
1082        let QueryOutcome {
1083            response,
1084            operations,
1085        } = self
1086            .query_user_application(application_id, request, chain_id, block_hash)
1087            .await?;
1088        if operations.is_empty() {
1089            return Ok(response);
1090        }
1091
1092        if self.read_only {
1093            return Err(NodeServiceError::ReadOnlyModeOperationsNotAllowed);
1094        }
1095
1096        trace!("Query requested a new block with operations: {operations:?}");
1097        let client = self
1098            .context
1099            .lock()
1100            .await
1101            .make_chain_client(chain_id)
1102            .await?;
1103        let hash = loop {
1104            let timeout = match client
1105                .execute_operations(operations.clone(), vec![])
1106                .await?
1107            {
1108                ClientOutcome::Committed(certificate) => break certificate.hash(),
1109                ClientOutcome::WaitForTimeout(timeout) => timeout,
1110            };
1111            let mut stream = client.subscribe().map_err(|_| {
1112                chain_client::Error::InternalError("Could not subscribe to the local node.")
1113            })?;
1114            util::wait_for_next_round(&mut stream, timeout).await;
1115        };
1116        let response = async_graphql::Response::new(hash.to_value());
1117        Ok(serde_json::to_vec(&response)?)
1118    }
1119
1120    /// Queries a user application, returning the raw [`QueryOutcome`].
1121    async fn query_user_application(
1122        &self,
1123        application_id: ApplicationId,
1124        bytes: Vec<u8>,
1125        chain_id: ChainId,
1126        block_hash: Option<CryptoHash>,
1127    ) -> Result<QueryOutcome<Vec<u8>>, NodeServiceError> {
1128        let query = Query::User {
1129            application_id,
1130            bytes,
1131        };
1132        let client = self
1133            .context
1134            .lock()
1135            .await
1136            .make_chain_client(chain_id)
1137            .await?;
1138        let QueryOutcome {
1139            response,
1140            operations,
1141        } = client.query_application(query, block_hash).await?;
1142        match response {
1143            QueryResponse::System(_) => {
1144                unreachable!("cannot get a system response for a user query")
1145            }
1146            QueryResponse::User(user_response_bytes) => Ok(QueryOutcome {
1147                response: user_response_bytes,
1148                operations,
1149            }),
1150        }
1151    }
1152
1153    /// Executes a GraphQL query and generates a response for our `Schema`.
1154    async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
1155        service
1156            .0
1157            .schema()
1158            .execute(request.into_inner())
1159            .await
1160            .into()
1161    }
1162
1163    /// Executes a GraphQL query against an application.
1164    /// Pattern matches on the `OperationType` of the query and routes the query
1165    /// accordingly.
1166    async fn application_handler(
1167        Path((chain_id, application_id)): Path<(String, String)>,
1168        service: Extension<Self>,
1169        request: String,
1170    ) -> Result<Vec<u8>, NodeServiceError> {
1171        let chain_id: ChainId = chain_id.parse().map_err(NodeServiceError::InvalidChainId)?;
1172        let application_id: ApplicationId = application_id.parse()?;
1173
1174        debug!(
1175            %chain_id,
1176            %application_id,
1177            "processing request for application:\n{:?}",
1178            &request
1179        );
1180        let response = service
1181            .0
1182            .handle_service_request(application_id, request.into_bytes(), chain_id, None)
1183            .await?;
1184
1185        Ok(response)
1186    }
1187}