linera_service/
node_service.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{borrow::Cow, future::IntoFuture, iter, net::SocketAddr, num::NonZeroU16, sync::Arc};
5
6use async_graphql::{
7    futures_util::Stream, resolver_utils::ContainerType, EmptyMutation, Error, MergedObject,
8    OutputType, Request, Response, ScalarType, Schema, SimpleObject, Subscription,
9};
10use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
11use axum::{extract::Path, http::StatusCode, response, response::IntoResponse, Extension, Router};
12use futures::{lock::Mutex, Future, FutureExt as _, TryStreamExt as _};
13use linera_base::{
14    crypto::{CryptoError, CryptoHash},
15    data_types::{
16        Amount, ApplicationDescription, ApplicationPermissions, Bytecode, Epoch, TimeDelta,
17    },
18    identifiers::{
19        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
20    },
21    ownership::{ChainOwnership, TimeoutConfig},
22    vm::VmRuntime,
23    BcsHexParseError,
24};
25use linera_chain::{
26    types::{ConfirmedBlock, GenericCertificate},
27    ChainStateView,
28};
29use linera_client::chain_listener::{
30    ChainListener, ChainListenerConfig, ClientContext, ListenerCommand,
31};
32use linera_core::{
33    client::chain_client::{self, ChainClient},
34    data_types::ClientOutcome,
35    wallet::Wallet as _,
36    worker::Notification,
37};
38use linera_execution::{
39    committee::Committee, system::AdminOperation, Operation, Query, QueryOutcome, QueryResponse,
40    SystemOperation,
41};
42#[cfg(with_metrics)]
43use linera_metrics::monitoring_server;
44use linera_sdk::linera_base_types::BlobContent;
45use serde::{Deserialize, Serialize};
46use serde_json::json;
47use tokio::sync::{mpsc::UnboundedReceiver, OwnedRwLockReadGuard};
48use tokio_util::sync::CancellationToken;
49use tower_http::cors::CorsLayer;
50use tracing::{debug, info, instrument, trace};
51
52use crate::util;
53
54#[derive(SimpleObject, Serialize, Deserialize, Clone)]
55pub struct Chains {
56    pub list: Vec<ChainId>,
57    pub default: Option<ChainId>,
58}
59
60/// Our root GraphQL query type.
61pub struct QueryRoot<C> {
62    context: Arc<Mutex<C>>,
63    port: NonZeroU16,
64    default_chain: Option<ChainId>,
65}
66
67/// Our root GraphQL subscription type.
68pub struct SubscriptionRoot<C> {
69    context: Arc<Mutex<C>>,
70}
71
72/// Our root GraphQL mutation type.
73pub struct MutationRoot<C> {
74    context: Arc<Mutex<C>>,
75}
76
77#[derive(Debug, thiserror::Error)]
78enum NodeServiceError {
79    #[error(transparent)]
80    ChainClient(#[from] chain_client::Error),
81    #[error(transparent)]
82    BcsHex(#[from] BcsHexParseError),
83    #[error(transparent)]
84    Json(#[from] serde_json::Error),
85    #[error("malformed chain ID: {0}")]
86    InvalidChainId(CryptoError),
87    #[error(transparent)]
88    Client(#[from] linera_client::Error),
89    #[error("scheduling operations from queries is disabled in read-only mode")]
90    ReadOnlyModeOperationsNotAllowed,
91}
92
93impl IntoResponse for NodeServiceError {
94    fn into_response(self) -> response::Response {
95        let status = match self {
96            NodeServiceError::InvalidChainId(_) | NodeServiceError::BcsHex(_) => {
97                StatusCode::BAD_REQUEST
98            }
99            NodeServiceError::ReadOnlyModeOperationsNotAllowed => StatusCode::FORBIDDEN,
100            _ => StatusCode::INTERNAL_SERVER_ERROR,
101        };
102        let body = json!({"error": self.to_string()}).to_string();
103        (status, body).into_response()
104    }
105}
106
107#[Subscription]
108impl<C> SubscriptionRoot<C>
109where
110    C: ClientContext + 'static,
111{
112    /// Subscribes to notifications from the specified chain.
113    async fn notifications(
114        &self,
115        chain_id: ChainId,
116    ) -> Result<impl Stream<Item = Notification>, Error> {
117        let client = self
118            .context
119            .lock()
120            .await
121            .make_chain_client(chain_id)
122            .await?;
123        Ok(client.subscribe()?)
124    }
125}
126
127impl<C> MutationRoot<C>
128where
129    C: ClientContext,
130{
131    async fn execute_system_operation(
132        &self,
133        system_operation: SystemOperation,
134        chain_id: ChainId,
135    ) -> Result<CryptoHash, Error> {
136        let certificate = self
137            .apply_client_command(&chain_id, move |client| {
138                let operation = Operation::system(system_operation.clone());
139                async move {
140                    let result = client
141                        .execute_operation(operation)
142                        .await
143                        .map_err(Error::from);
144                    (result, client)
145                }
146            })
147            .await?;
148        Ok(certificate.hash())
149    }
150
151    /// Applies the given function to the chain client.
152    /// Updates the wallet regardless of the outcome. As long as the function returns a round
153    /// timeout, it will wait and retry.
154    async fn apply_client_command<F, Fut, T>(
155        &self,
156        chain_id: &ChainId,
157        mut f: F,
158    ) -> Result<T, Error>
159    where
160        F: FnMut(ChainClient<C::Environment>) -> Fut,
161        Fut: Future<Output = (Result<ClientOutcome<T>, Error>, ChainClient<C::Environment>)>,
162    {
163        loop {
164            let client = self
165                .context
166                .lock()
167                .await
168                .make_chain_client(*chain_id)
169                .await?;
170            let mut stream = client.subscribe()?;
171            let (result, client) = f(client).await;
172            self.context.lock().await.update_wallet(&client).await?;
173            let timeout = match result? {
174                ClientOutcome::Committed(t) => return Ok(t),
175                ClientOutcome::Conflict(certificate) => {
176                    return Err(chain_client::Error::Conflict(certificate.hash()).into());
177                }
178                ClientOutcome::WaitForTimeout(timeout) => timeout,
179            };
180            drop(client);
181            util::wait_for_next_round(&mut stream, timeout).await;
182        }
183    }
184}
185
186#[async_graphql::Object(cache_control(no_cache))]
187impl<C> MutationRoot<C>
188where
189    C: ClientContext + 'static,
190{
191    /// Processes the inbox and returns the lists of certificate hashes that were created, if any.
192    async fn process_inbox(
193        &self,
194        #[graphql(desc = "The chain whose inbox is being processed.")] chain_id: ChainId,
195    ) -> Result<Vec<CryptoHash>, Error> {
196        let mut hashes = Vec::new();
197        loop {
198            let client = self
199                .context
200                .lock()
201                .await
202                .make_chain_client(chain_id)
203                .await?;
204            let result = client.process_inbox().await;
205            self.context.lock().await.update_wallet(&client).await?;
206            let (certificates, maybe_timeout) = result?;
207            hashes.extend(certificates.into_iter().map(|cert| cert.hash()));
208            match maybe_timeout {
209                None => return Ok(hashes),
210                Some(timestamp) => {
211                    let mut stream = client.subscribe()?;
212                    drop(client);
213                    util::wait_for_next_round(&mut stream, timestamp).await;
214                }
215            }
216        }
217    }
218
219    /// Synchronizes the chain with the validators. Returns the chain's length.
220    ///
221    /// This is only used for testing, to make sure that a client is up to date.
222    // TODO(#4718): Remove this mutation.
223    async fn sync(
224        &self,
225        #[graphql(desc = "The chain being synchronized.")] chain_id: ChainId,
226    ) -> Result<u64, Error> {
227        let client = self
228            .context
229            .lock()
230            .await
231            .make_chain_client(chain_id)
232            .await?;
233        let info = client.synchronize_from_validators().await?;
234        self.context.lock().await.update_wallet(&client).await?;
235        Ok(info.next_block_height.0)
236    }
237
238    /// Retries the pending block that was unsuccessfully proposed earlier.
239    async fn retry_pending_block(
240        &self,
241        #[graphql(desc = "The chain on whose block is being retried.")] chain_id: ChainId,
242    ) -> Result<Option<CryptoHash>, Error> {
243        let client = self
244            .context
245            .lock()
246            .await
247            .make_chain_client(chain_id)
248            .await?;
249        let outcome = client.process_pending_block().await?;
250        self.context.lock().await.update_wallet(&client).await?;
251        match outcome {
252            ClientOutcome::Committed(Some(certificate)) => Ok(Some(certificate.hash())),
253            ClientOutcome::Committed(None) => Ok(None),
254            ClientOutcome::WaitForTimeout(timeout) => Err(Error::from(format!(
255                "Please try again at {}",
256                timeout.timestamp
257            ))),
258            ClientOutcome::Conflict(certificate) => Err(Error::from(format!(
259                "A different block was committed: {}",
260                certificate.hash()
261            ))),
262        }
263    }
264
265    /// Transfers `amount` units of value from the given owner's account to the recipient.
266    /// If no owner is given, try to take the units out of the chain account.
267    async fn transfer(
268        &self,
269        #[graphql(desc = "The chain which native tokens are being transferred from.")]
270        chain_id: ChainId,
271        #[graphql(desc = "The account being debited on the chain.")] owner: AccountOwner,
272        #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
273        #[graphql(desc = "The amount being transferred.")] amount: Amount,
274    ) -> Result<CryptoHash, Error> {
275        self.apply_client_command(&chain_id, move |client| async move {
276            let result = client
277                .transfer(owner, amount, recipient)
278                .await
279                .map_err(Error::from)
280                .map(|outcome| outcome.map(|certificate| certificate.hash()));
281            (result, client)
282        })
283        .await
284    }
285
286    /// Claims `amount` units of value from the given owner's account in the remote
287    /// `target` chain. Depending on its configuration, the `target` chain may refuse to
288    /// process the message.
289    async fn claim(
290        &self,
291        #[graphql(desc = "The chain for whom owner is one of the owner.")] chain_id: ChainId,
292        #[graphql(desc = "The owner of chain targetId being debited.")] owner: AccountOwner,
293        #[graphql(desc = "The chain whose owner is being debited.")] target_id: ChainId,
294        #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
295        #[graphql(desc = "The amount being transferred.")] amount: Amount,
296    ) -> Result<CryptoHash, Error> {
297        self.apply_client_command(&chain_id, move |client| async move {
298            let result = client
299                .claim(owner, target_id, recipient, amount)
300                .await
301                .map_err(Error::from)
302                .map(|outcome| outcome.map(|certificate| certificate.hash()));
303            (result, client)
304        })
305        .await
306    }
307
308    /// Test if a data blob is readable from a transaction in the current chain.
309    // TODO(#2490): Consider removing or renaming this.
310    async fn read_data_blob(
311        &self,
312        chain_id: ChainId,
313        hash: CryptoHash,
314    ) -> Result<CryptoHash, Error> {
315        self.apply_client_command(&chain_id, move |client| async move {
316            let result = client
317                .read_data_blob(hash)
318                .await
319                .map_err(Error::from)
320                .map(|outcome| outcome.map(|certificate| certificate.hash()));
321            (result, client)
322        })
323        .await
324    }
325
326    /// Creates a new single-owner chain.
327    async fn open_chain(
328        &self,
329        #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
330        #[graphql(desc = "The owner of the new chain.")] owner: AccountOwner,
331        #[graphql(desc = "The balance of the chain being created. Zero if `None`.")]
332        balance: Option<Amount>,
333    ) -> Result<ChainId, Error> {
334        let ownership = ChainOwnership::single(owner);
335        let balance = balance.unwrap_or(Amount::ZERO);
336        let description = self
337            .apply_client_command(&chain_id, move |client| {
338                let ownership = ownership.clone();
339                async move {
340                    let result = client
341                        .open_chain(ownership, ApplicationPermissions::default(), balance)
342                        .await
343                        .map_err(Error::from)
344                        .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
345                    (result, client)
346                }
347            })
348            .await?;
349        Ok(description.id())
350    }
351
352    /// Creates a new multi-owner chain.
353    #[expect(clippy::too_many_arguments)]
354    async fn open_multi_owner_chain(
355        &self,
356        #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
357        #[graphql(desc = "Permissions for applications on the new chain")]
358        application_permissions: Option<ApplicationPermissions>,
359        #[graphql(desc = "The owners of the chain")] owners: Vec<AccountOwner>,
360        #[graphql(desc = "The weights of the owners")] weights: Option<Vec<u64>>,
361        #[graphql(desc = "The number of multi-leader rounds")] multi_leader_rounds: Option<u32>,
362        #[graphql(desc = "The balance of the chain. Zero if `None`")] balance: Option<Amount>,
363        #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
364        fast_round_ms: Option<u64>,
365        #[graphql(
366            desc = "The duration of the first single-leader and all multi-leader rounds",
367            default = 10_000
368        )]
369        base_timeout_ms: u64,
370        #[graphql(
371            desc = "The number of milliseconds by which the timeout increases after each \
372                    single-leader round",
373            default = 1_000
374        )]
375        timeout_increment_ms: u64,
376        #[graphql(
377            desc = "The age of an incoming tracked or protected message after which the \
378                    validators start transitioning the chain to fallback mode, in milliseconds.",
379            default = 86_400_000
380        )]
381        fallback_duration_ms: u64,
382    ) -> Result<ChainId, Error> {
383        let owners = if let Some(weights) = weights {
384            if weights.len() != owners.len() {
385                return Err(Error::new(format!(
386                    "There are {} owners but {} weights.",
387                    owners.len(),
388                    weights.len()
389                )));
390            }
391            owners.into_iter().zip(weights).collect::<Vec<_>>()
392        } else {
393            owners
394                .into_iter()
395                .zip(iter::repeat(100))
396                .collect::<Vec<_>>()
397        };
398        let multi_leader_rounds = multi_leader_rounds.unwrap_or(u32::MAX);
399        let timeout_config = TimeoutConfig {
400            fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
401            base_timeout: TimeDelta::from_millis(base_timeout_ms),
402            timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
403            fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
404        };
405        let ownership = ChainOwnership::multiple(owners, multi_leader_rounds, timeout_config);
406        let balance = balance.unwrap_or(Amount::ZERO);
407        let description = self
408            .apply_client_command(&chain_id, move |client| {
409                let ownership = ownership.clone();
410                let application_permissions = application_permissions.clone().unwrap_or_default();
411                async move {
412                    let result = client
413                        .open_chain(ownership, application_permissions, balance)
414                        .await
415                        .map_err(Error::from)
416                        .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
417                    (result, client)
418                }
419            })
420            .await?;
421        Ok(description.id())
422    }
423
424    /// Closes the chain. Returns the new block hash if successful or `None` if it was already closed.
425    async fn close_chain(
426        &self,
427        #[graphql(desc = "The chain being closed.")] chain_id: ChainId,
428    ) -> Result<Option<CryptoHash>, Error> {
429        let maybe_cert = self
430            .apply_client_command(&chain_id, |client| async move {
431                let result = client.close_chain().await.map_err(Error::from);
432                (result, client)
433            })
434            .await?;
435        Ok(maybe_cert.as_ref().map(GenericCertificate::hash))
436    }
437
438    /// Changes the chain to a single-owner chain
439    async fn change_owner(
440        &self,
441        #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
442        #[graphql(desc = "The new single owner of the chain")] new_owner: AccountOwner,
443    ) -> Result<CryptoHash, Error> {
444        let operation = SystemOperation::ChangeOwnership {
445            super_owners: vec![new_owner],
446            owners: Vec::new(),
447            first_leader: None,
448            multi_leader_rounds: 2,
449            open_multi_leader_rounds: false,
450            timeout_config: TimeoutConfig::default(),
451        };
452        self.execute_system_operation(operation, chain_id).await
453    }
454
455    /// Changes the ownership of the chain
456    #[expect(clippy::too_many_arguments)]
457    async fn change_multiple_owners(
458        &self,
459        #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
460        #[graphql(desc = "The new list of owners of the chain")] new_owners: Vec<AccountOwner>,
461        #[graphql(desc = "The new list of weights of the owners")] new_weights: Vec<u64>,
462        #[graphql(desc = "The multi-leader round of the chain")] multi_leader_rounds: u32,
463        #[graphql(
464            desc = "Whether multi-leader rounds are unrestricted, that is not limited to chain owners."
465        )]
466        open_multi_leader_rounds: bool,
467        #[graphql(desc = "The leader of the first single-leader round. \
468                          If not set, this is random like other rounds.")]
469        first_leader: Option<AccountOwner>,
470        #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
471        fast_round_ms: Option<u64>,
472        #[graphql(
473            desc = "The duration of the first single-leader and all multi-leader rounds",
474            default = 10_000
475        )]
476        base_timeout_ms: u64,
477        #[graphql(
478            desc = "The number of milliseconds by which the timeout increases after each \
479                    single-leader round",
480            default = 1_000
481        )]
482        timeout_increment_ms: u64,
483        #[graphql(
484            desc = "The age of an incoming tracked or protected message after which the \
485                    validators start transitioning the chain to fallback mode, in milliseconds.",
486            default = 86_400_000
487        )]
488        fallback_duration_ms: u64,
489    ) -> Result<CryptoHash, Error> {
490        let operation = SystemOperation::ChangeOwnership {
491            super_owners: Vec::new(),
492            owners: new_owners.into_iter().zip(new_weights).collect(),
493            first_leader,
494            multi_leader_rounds,
495            open_multi_leader_rounds,
496            timeout_config: TimeoutConfig {
497                fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
498                base_timeout: TimeDelta::from_millis(base_timeout_ms),
499                timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
500                fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
501            },
502        };
503        self.execute_system_operation(operation, chain_id).await
504    }
505
506    /// Changes the application permissions configuration on this chain.
507    #[expect(clippy::too_many_arguments)]
508    async fn change_application_permissions(
509        &self,
510        #[graphql(desc = "The chain whose permissions are being changed")] chain_id: ChainId,
511        #[graphql(
512            desc = "These applications are allowed to manage the chain: close it, change \
513                    application permissions, and change ownership."
514        )]
515        manage_chain: Vec<ApplicationId>,
516        #[graphql(
517            desc = "If this is `None`, all system operations and application operations are allowed.
518If it is `Some`, only operations from the specified applications are allowed,
519and no system operations."
520        )]
521        execute_operations: Option<Vec<ApplicationId>>,
522        #[graphql(
523            desc = "At least one operation or incoming message from each of these applications must occur in every block."
524        )]
525        mandatory_applications: Vec<ApplicationId>,
526        #[graphql(
527            desc = "These applications are allowed to perform calls to services as oracles."
528        )]
529        call_service_as_oracle: Option<Vec<ApplicationId>>,
530        #[graphql(desc = "These applications are allowed to perform HTTP requests.")]
531        make_http_requests: Option<Vec<ApplicationId>>,
532    ) -> Result<CryptoHash, Error> {
533        let operation = SystemOperation::ChangeApplicationPermissions(ApplicationPermissions {
534            execute_operations,
535            mandatory_applications,
536            manage_chain,
537            call_service_as_oracle,
538            make_http_requests,
539        });
540        self.execute_system_operation(operation, chain_id).await
541    }
542
543    /// (admin chain only) Registers a new committee. This will notify the subscribers of
544    /// the admin chain so that they can migrate to the new epoch (by accepting the
545    /// notification as an "incoming message" in a next block).
546    async fn create_committee(
547        &self,
548        chain_id: ChainId,
549        committee: Committee,
550    ) -> Result<CryptoHash, Error> {
551        Ok(self
552            .apply_client_command(&chain_id, move |client| {
553                let committee = committee.clone();
554                async move {
555                    let result = client
556                        .stage_new_committee(committee)
557                        .await
558                        .map_err(Error::from);
559                    (result, client)
560                }
561            })
562            .await?
563            .hash())
564    }
565
566    /// (admin chain only) Removes a committee. Once this message is accepted by a chain,
567    /// blocks from the retired epoch will not be accepted until they are followed (hence
568    /// re-certified) by a block certified by a recent committee.
569    async fn remove_committee(&self, chain_id: ChainId, epoch: Epoch) -> Result<CryptoHash, Error> {
570        let operation = SystemOperation::Admin(AdminOperation::RemoveCommittee { epoch });
571        self.execute_system_operation(operation, chain_id).await
572    }
573
574    /// Publishes a new application module.
575    async fn publish_module(
576        &self,
577        #[graphql(desc = "The chain publishing the module")] chain_id: ChainId,
578        #[graphql(desc = "The bytecode of the contract code")] contract: Bytecode,
579        #[graphql(desc = "The bytecode of the service code (only relevant for WebAssembly)")]
580        service: Bytecode,
581        #[graphql(desc = "The virtual machine being used (either Wasm or Evm)")]
582        vm_runtime: VmRuntime,
583    ) -> Result<ModuleId, Error> {
584        self.apply_client_command(&chain_id, move |client| {
585            let contract = contract.clone();
586            let service = service.clone();
587            async move {
588                let result = client
589                    .publish_module(contract, service, vm_runtime)
590                    .await
591                    .map_err(Error::from)
592                    .map(|outcome| outcome.map(|(module_id, _)| module_id));
593                (result, client)
594            }
595        })
596        .await
597    }
598
599    /// Publishes a new data blob.
600    async fn publish_data_blob(
601        &self,
602        #[graphql(desc = "The chain paying for the blob publication")] chain_id: ChainId,
603        #[graphql(desc = "The content of the data blob being created")] bytes: Vec<u8>,
604    ) -> Result<CryptoHash, Error> {
605        self.apply_client_command(&chain_id, |client| {
606            let bytes = bytes.clone();
607            async move {
608                let result = client.publish_data_blob(bytes).await.map_err(Error::from);
609                (result, client)
610            }
611        })
612        .await
613        .map(|_| CryptoHash::new(&BlobContent::new_data(bytes)))
614    }
615
616    /// Creates a new application.
617    async fn create_application(
618        &self,
619        #[graphql(desc = "The chain paying for the creation of the application")] chain_id: ChainId,
620        #[graphql(desc = "The module ID of the application being created")] module_id: ModuleId,
621        #[graphql(desc = "The JSON serialization of the parameters of the application")]
622        parameters: String,
623        #[graphql(
624            desc = "The JSON serialization of the instantiation argument of the application"
625        )]
626        instantiation_argument: String,
627        #[graphql(desc = "The dependencies of the application being created")]
628        required_application_ids: Vec<ApplicationId>,
629    ) -> Result<ApplicationId, Error> {
630        self.apply_client_command(&chain_id, move |client| {
631            let parameters = parameters.as_bytes().to_vec();
632            let instantiation_argument = instantiation_argument.as_bytes().to_vec();
633            let required_application_ids = required_application_ids.clone();
634            async move {
635                let result = client
636                    .create_application_untyped(
637                        module_id,
638                        parameters,
639                        instantiation_argument,
640                        required_application_ids,
641                    )
642                    .await
643                    .map_err(Error::from)
644                    .map(|outcome| outcome.map(|(application_id, _)| application_id));
645                (result, client)
646            }
647        })
648        .await
649    }
650}
651
652#[async_graphql::Object(cache_control(no_cache))]
653impl<C> QueryRoot<C>
654where
655    C: ClientContext + 'static,
656{
657    async fn chain(
658        &self,
659        chain_id: ChainId,
660    ) -> Result<
661        ChainStateExtendedView<<C::Environment as linera_core::Environment>::StorageContext>,
662        Error,
663    > {
664        let client = self
665            .context
666            .lock()
667            .await
668            .make_chain_client(chain_id)
669            .await?;
670        let view = client.chain_state_view().await?;
671        Ok(ChainStateExtendedView::new(view))
672    }
673
674    async fn applications(&self, chain_id: ChainId) -> Result<Vec<ApplicationOverview>, Error> {
675        let client = self
676            .context
677            .lock()
678            .await
679            .make_chain_client(chain_id)
680            .await?;
681        let applications = client
682            .chain_state_view()
683            .await?
684            .execution_state
685            .list_applications()
686            .await?;
687
688        let overviews = applications
689            .into_iter()
690            .map(|(id, description)| ApplicationOverview::new(id, description, self.port, chain_id))
691            .collect();
692
693        Ok(overviews)
694    }
695
696    async fn chains(&self) -> Result<Chains, Error> {
697        Ok(Chains {
698            list: self
699                .context
700                .lock()
701                .await
702                .wallet()
703                .chain_ids()
704                .try_collect()
705                .await?,
706            default: self.default_chain,
707        })
708    }
709
710    async fn block(
711        &self,
712        hash: Option<CryptoHash>,
713        chain_id: ChainId,
714    ) -> Result<Option<ConfirmedBlock>, Error> {
715        let client = self
716            .context
717            .lock()
718            .await
719            .make_chain_client(chain_id)
720            .await?;
721        let hash = match hash {
722            Some(hash) => Some(hash),
723            None => client.chain_info().await?.block_hash,
724        };
725        if let Some(hash) = hash {
726            let block = client.read_confirmed_block(hash).await?;
727            Ok(Some(block))
728        } else {
729            Ok(None)
730        }
731    }
732
733    async fn events_from_index(
734        &self,
735        chain_id: ChainId,
736        stream_id: StreamId,
737        start_index: u32,
738    ) -> Result<Vec<IndexAndEvent>, Error> {
739        Ok(self
740            .context
741            .lock()
742            .await
743            .make_chain_client(chain_id)
744            .await?
745            .events_from_index(stream_id, start_index)
746            .await?)
747    }
748
749    async fn blocks(
750        &self,
751        from: Option<CryptoHash>,
752        chain_id: ChainId,
753        limit: Option<u32>,
754    ) -> Result<Vec<ConfirmedBlock>, Error> {
755        let client = self
756            .context
757            .lock()
758            .await
759            .make_chain_client(chain_id)
760            .await?;
761        let limit = limit.unwrap_or(10);
762        let from = match from {
763            Some(from) => Some(from),
764            None => client.chain_info().await?.block_hash,
765        };
766        let Some(from) = from else {
767            return Ok(vec![]);
768        };
769        let mut hash = Some(from);
770        let mut values = Vec::new();
771        for _ in 0..limit {
772            let Some(next_hash) = hash else {
773                break;
774            };
775            let value = client.read_confirmed_block(next_hash).await?;
776            hash = value.block().header.previous_block_hash;
777            values.push(value);
778        }
779        Ok(values)
780    }
781
782    /// Returns the version information on this node service.
783    async fn version(&self) -> linera_version::VersionInfo {
784        linera_version::VersionInfo::default()
785    }
786}
787
788// What follows is a hack to add a chain_id field to `ChainStateView` based on
789// https://async-graphql.github.io/async-graphql/en/merging_objects.html
790
791struct ChainStateViewExtension(ChainId);
792
793#[async_graphql::Object(cache_control(no_cache))]
794impl ChainStateViewExtension {
795    async fn chain_id(&self) -> ChainId {
796        self.0
797    }
798}
799
800#[derive(MergedObject)]
801struct ChainStateExtendedView<C>(ChainStateViewExtension, ReadOnlyChainStateView<C>)
802where
803    C: linera_views::context::Context + Clone + Send + Sync + 'static,
804    C::Extra: linera_execution::ExecutionRuntimeContext;
805
806/// A wrapper type that allows proxying GraphQL queries to a [`ChainStateView`] that's behind an
807/// [`OwnedRwLockReadGuard`].
808pub struct ReadOnlyChainStateView<C>(OwnedRwLockReadGuard<ChainStateView<C>>)
809where
810    C: linera_views::context::Context + Clone + Send + Sync + 'static;
811
812impl<C> ContainerType for ReadOnlyChainStateView<C>
813where
814    C: linera_views::context::Context + Clone + Send + Sync + 'static,
815{
816    async fn resolve_field(
817        &self,
818        context: &async_graphql::Context<'_>,
819    ) -> async_graphql::ServerResult<Option<async_graphql::Value>> {
820        self.0.resolve_field(context).await
821    }
822}
823
824impl<C> OutputType for ReadOnlyChainStateView<C>
825where
826    C: linera_views::context::Context + Clone + Send + Sync + 'static,
827{
828    fn type_name() -> Cow<'static, str> {
829        ChainStateView::<C>::type_name()
830    }
831
832    fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
833        ChainStateView::<C>::create_type_info(registry)
834    }
835
836    async fn resolve(
837        &self,
838        context: &async_graphql::ContextSelectionSet<'_>,
839        field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
840    ) -> async_graphql::ServerResult<async_graphql::Value> {
841        self.0.resolve(context, field).await
842    }
843}
844
845impl<C> ChainStateExtendedView<C>
846where
847    C: linera_views::context::Context + Clone + Send + Sync + 'static,
848    C::Extra: linera_execution::ExecutionRuntimeContext,
849{
850    fn new(view: OwnedRwLockReadGuard<ChainStateView<C>>) -> Self {
851        Self(
852            ChainStateViewExtension(view.chain_id()),
853            ReadOnlyChainStateView(view),
854        )
855    }
856}
857
858#[derive(SimpleObject)]
859pub struct ApplicationOverview {
860    id: ApplicationId,
861    description: ApplicationDescription,
862    link: String,
863}
864
865impl ApplicationOverview {
866    fn new(
867        id: ApplicationId,
868        description: ApplicationDescription,
869        port: NonZeroU16,
870        chain_id: ChainId,
871    ) -> Self {
872        Self {
873            id,
874            description,
875            link: format!(
876                "http://localhost:{}/chains/{}/applications/{}",
877                port.get(),
878                chain_id,
879                id
880            ),
881        }
882    }
883}
884
885/// Schema type that can be either full (with mutations) or read-only.
886pub enum NodeServiceSchema<C>
887where
888    C: ClientContext + 'static,
889{
890    /// Full schema with mutations enabled.
891    Full(Schema<QueryRoot<C>, MutationRoot<C>, SubscriptionRoot<C>>),
892    /// Read-only schema with mutations disabled.
893    ReadOnly(Schema<QueryRoot<C>, EmptyMutation, SubscriptionRoot<C>>),
894}
895
896impl<C> NodeServiceSchema<C>
897where
898    C: ClientContext,
899{
900    /// Executes a GraphQL request.
901    pub async fn execute(&self, request: impl Into<Request>) -> Response {
902        match self {
903            Self::Full(schema) => schema.execute(request).await,
904            Self::ReadOnly(schema) => schema.execute(request).await,
905        }
906    }
907
908    /// Returns the SDL (Schema Definition Language) representation.
909    pub fn sdl(&self) -> String {
910        match self {
911            Self::Full(schema) => schema.sdl(),
912            Self::ReadOnly(schema) => schema.sdl(),
913        }
914    }
915}
916
917impl<C> Clone for NodeServiceSchema<C>
918where
919    C: ClientContext,
920{
921    fn clone(&self) -> Self {
922        match self {
923            Self::Full(schema) => Self::Full(schema.clone()),
924            Self::ReadOnly(schema) => Self::ReadOnly(schema.clone()),
925        }
926    }
927}
928
929/// The `NodeService` is a server that exposes a web-server to the client.
930/// The node service is primarily used to explore the state of a chain in GraphQL.
931pub struct NodeService<C>
932where
933    C: ClientContext + 'static,
934{
935    config: ChainListenerConfig,
936    port: NonZeroU16,
937    #[cfg(with_metrics)]
938    metrics_port: NonZeroU16,
939    default_chain: Option<ChainId>,
940    context: Arc<Mutex<C>>,
941    /// If true, disallow mutations and prevent queries from scheduling operations.
942    read_only: bool,
943}
944
945impl<C> Clone for NodeService<C>
946where
947    C: ClientContext + 'static,
948{
949    fn clone(&self) -> Self {
950        Self {
951            config: self.config.clone(),
952            port: self.port,
953            #[cfg(with_metrics)]
954            metrics_port: self.metrics_port,
955            default_chain: self.default_chain,
956            context: Arc::clone(&self.context),
957            read_only: self.read_only,
958        }
959    }
960}
961
962impl<C> NodeService<C>
963where
964    C: ClientContext,
965{
966    /// Creates a new instance of the node service given a client chain and a port.
967    pub fn new(
968        config: ChainListenerConfig,
969        port: NonZeroU16,
970        #[cfg(with_metrics)] metrics_port: NonZeroU16,
971        default_chain: Option<ChainId>,
972        context: Arc<Mutex<C>>,
973        read_only: bool,
974    ) -> Self {
975        Self {
976            config,
977            port,
978            #[cfg(with_metrics)]
979            metrics_port,
980            default_chain,
981            context,
982            read_only,
983        }
984    }
985
986    #[cfg(with_metrics)]
987    pub fn metrics_address(&self) -> SocketAddr {
988        SocketAddr::from(([0, 0, 0, 0], self.metrics_port.get()))
989    }
990
991    pub fn schema(&self) -> NodeServiceSchema<C> {
992        let query = QueryRoot {
993            context: Arc::clone(&self.context),
994            port: self.port,
995            default_chain: self.default_chain,
996        };
997        let subscription = SubscriptionRoot {
998            context: Arc::clone(&self.context),
999        };
1000
1001        if self.read_only {
1002            NodeServiceSchema::ReadOnly(Schema::build(query, EmptyMutation, subscription).finish())
1003        } else {
1004            NodeServiceSchema::Full(
1005                Schema::build(
1006                    query,
1007                    MutationRoot {
1008                        context: Arc::clone(&self.context),
1009                    },
1010                    subscription,
1011                )
1012                .finish(),
1013            )
1014        }
1015    }
1016
1017    /// Runs the node service.
1018    #[instrument(name = "node_service", level = "info", skip_all, fields(port = ?self.port))]
1019    pub async fn run(
1020        self,
1021        cancellation_token: CancellationToken,
1022        command_receiver: UnboundedReceiver<ListenerCommand>,
1023    ) -> Result<(), anyhow::Error> {
1024        let port = self.port.get();
1025        let index_handler = axum::routing::get(util::graphiql).post(Self::index_handler);
1026        let application_handler =
1027            axum::routing::get(util::graphiql).post(Self::application_handler);
1028
1029        #[cfg(with_metrics)]
1030        monitoring_server::start_metrics(self.metrics_address(), cancellation_token.clone());
1031
1032        let base_router = Router::new()
1033            .route("/", index_handler)
1034            .route(
1035                "/chains/{chain_id}/applications/{application_id}",
1036                application_handler,
1037            )
1038            .route("/ready", axum::routing::get(|| async { "ready!" }));
1039
1040        // Create router with appropriate schema for WebSocket subscriptions.
1041        let app = match self.schema() {
1042            NodeServiceSchema::Full(schema) => {
1043                base_router.route_service("/ws", GraphQLSubscription::new(schema))
1044            }
1045            NodeServiceSchema::ReadOnly(schema) => {
1046                base_router.route_service("/ws", GraphQLSubscription::new(schema))
1047            }
1048        }
1049        .layer(Extension(self.clone()))
1050        // TODO(#551): Provide application authentication.
1051        .layer(CorsLayer::permissive());
1052
1053        info!("GraphiQL IDE: http://localhost:{}", port);
1054
1055        let storage = self.context.lock().await.storage().clone();
1056
1057        let chain_listener = ChainListener::new(
1058            self.config,
1059            self.context,
1060            storage,
1061            cancellation_token.clone(),
1062            command_receiver,
1063            true,
1064        )
1065        .run()
1066        .await?;
1067        let mut chain_listener = Box::pin(chain_listener).fuse();
1068        let tcp_listener =
1069            tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
1070        let server = axum::serve(tcp_listener, app)
1071            .with_graceful_shutdown(cancellation_token.cancelled_owned())
1072            .into_future();
1073        futures::select! {
1074            result = chain_listener => result?,
1075            result = Box::pin(server).fuse() => result?,
1076        };
1077
1078        Ok(())
1079    }
1080
1081    /// Handles service queries for user applications (including mutations).
1082    async fn handle_service_request(
1083        &self,
1084        application_id: ApplicationId,
1085        request: Vec<u8>,
1086        chain_id: ChainId,
1087        block_hash: Option<CryptoHash>,
1088    ) -> Result<Vec<u8>, NodeServiceError> {
1089        let QueryOutcome {
1090            response,
1091            operations,
1092        } = self
1093            .query_user_application(application_id, request, chain_id, block_hash)
1094            .await?;
1095        if operations.is_empty() {
1096            return Ok(response);
1097        }
1098
1099        if self.read_only {
1100            return Err(NodeServiceError::ReadOnlyModeOperationsNotAllowed);
1101        }
1102
1103        trace!("Query requested a new block with operations: {operations:?}");
1104        let client = self
1105            .context
1106            .lock()
1107            .await
1108            .make_chain_client(chain_id)
1109            .await?;
1110        let hash = loop {
1111            let timeout = match client
1112                .execute_operations(operations.clone(), vec![])
1113                .await?
1114            {
1115                ClientOutcome::Committed(certificate) => break certificate.hash(),
1116                ClientOutcome::Conflict(certificate) => {
1117                    return Err(chain_client::Error::Conflict(certificate.hash()).into());
1118                }
1119                ClientOutcome::WaitForTimeout(timeout) => timeout,
1120            };
1121            let mut stream = client.subscribe().map_err(|_| {
1122                chain_client::Error::InternalError("Could not subscribe to the local node.")
1123            })?;
1124            util::wait_for_next_round(&mut stream, timeout).await;
1125        };
1126        let response = async_graphql::Response::new(hash.to_value());
1127        Ok(serde_json::to_vec(&response)?)
1128    }
1129
1130    /// Queries a user application, returning the raw [`QueryOutcome`].
1131    async fn query_user_application(
1132        &self,
1133        application_id: ApplicationId,
1134        bytes: Vec<u8>,
1135        chain_id: ChainId,
1136        block_hash: Option<CryptoHash>,
1137    ) -> Result<QueryOutcome<Vec<u8>>, NodeServiceError> {
1138        let query = Query::User {
1139            application_id,
1140            bytes,
1141        };
1142        let client = self
1143            .context
1144            .lock()
1145            .await
1146            .make_chain_client(chain_id)
1147            .await?;
1148        let QueryOutcome {
1149            response,
1150            operations,
1151        } = client.query_application(query, block_hash).await?;
1152        match response {
1153            QueryResponse::System(_) => {
1154                unreachable!("cannot get a system response for a user query")
1155            }
1156            QueryResponse::User(user_response_bytes) => Ok(QueryOutcome {
1157                response: user_response_bytes,
1158                operations,
1159            }),
1160        }
1161    }
1162
1163    /// Executes a GraphQL query and generates a response for our `Schema`.
1164    async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
1165        service
1166            .0
1167            .schema()
1168            .execute(request.into_inner())
1169            .await
1170            .into()
1171    }
1172
1173    /// Executes a GraphQL query against an application.
1174    /// Pattern matches on the `OperationType` of the query and routes the query
1175    /// accordingly.
1176    async fn application_handler(
1177        Path((chain_id, application_id)): Path<(String, String)>,
1178        service: Extension<Self>,
1179        request: String,
1180    ) -> Result<Vec<u8>, NodeServiceError> {
1181        let chain_id: ChainId = chain_id.parse().map_err(NodeServiceError::InvalidChainId)?;
1182        let application_id: ApplicationId = application_id.parse()?;
1183
1184        debug!(
1185            %chain_id,
1186            %application_id,
1187            "processing request for application:\n{:?}",
1188            &request
1189        );
1190        let response = service
1191            .0
1192            .handle_service_request(application_id, request.into_bytes(), chain_id, None)
1193            .await?;
1194
1195        Ok(response)
1196    }
1197}