1use std::{borrow::Cow, future::IntoFuture, iter, net::SocketAddr, num::NonZeroU16, sync::Arc};
5
6use async_graphql::{
7 futures_util::Stream, resolver_utils::ContainerType, EmptyMutation, Error, MergedObject,
8 OutputType, Request, Response, ScalarType, Schema, SimpleObject, Subscription,
9};
10use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
11use axum::{extract::Path, http::StatusCode, response, response::IntoResponse, Extension, Router};
12use futures::{lock::Mutex, Future, FutureExt as _, TryStreamExt as _};
13use linera_base::{
14 crypto::{CryptoError, CryptoHash},
15 data_types::{
16 Amount, ApplicationDescription, ApplicationPermissions, Bytecode, Epoch, TimeDelta,
17 },
18 identifiers::{
19 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
20 },
21 ownership::{ChainOwnership, TimeoutConfig},
22 vm::VmRuntime,
23 BcsHexParseError,
24};
25use linera_chain::{
26 types::{ConfirmedBlock, GenericCertificate},
27 ChainStateView,
28};
29use linera_client::chain_listener::{
30 ChainListener, ChainListenerConfig, ClientContext, ListenerCommand,
31};
32use linera_core::{
33 client::chain_client::{self, ChainClient},
34 data_types::ClientOutcome,
35 wallet::Wallet as _,
36 worker::Notification,
37};
38use linera_execution::{
39 committee::Committee, system::AdminOperation, Operation, Query, QueryOutcome, QueryResponse,
40 SystemOperation,
41};
42#[cfg(with_metrics)]
43use linera_metrics::monitoring_server;
44use linera_sdk::linera_base_types::BlobContent;
45use serde::{Deserialize, Serialize};
46use serde_json::json;
47use tokio::sync::{mpsc::UnboundedReceiver, OwnedRwLockReadGuard};
48use tokio_util::sync::CancellationToken;
49use tower_http::cors::CorsLayer;
50use tracing::{debug, info, instrument, trace};
51
52use crate::util;
53
54#[derive(SimpleObject, Serialize, Deserialize, Clone)]
55pub struct Chains {
56 pub list: Vec<ChainId>,
57 pub default: Option<ChainId>,
58}
59
60pub struct QueryRoot<C> {
62 context: Arc<Mutex<C>>,
63 port: NonZeroU16,
64 default_chain: Option<ChainId>,
65}
66
67pub struct SubscriptionRoot<C> {
69 context: Arc<Mutex<C>>,
70}
71
72pub struct MutationRoot<C> {
74 context: Arc<Mutex<C>>,
75}
76
77#[derive(Debug, thiserror::Error)]
78enum NodeServiceError {
79 #[error(transparent)]
80 ChainClient(#[from] chain_client::Error),
81 #[error(transparent)]
82 BcsHex(#[from] BcsHexParseError),
83 #[error(transparent)]
84 Json(#[from] serde_json::Error),
85 #[error("malformed chain ID: {0}")]
86 InvalidChainId(CryptoError),
87 #[error(transparent)]
88 Client(#[from] linera_client::Error),
89 #[error("scheduling operations from queries is disabled in read-only mode")]
90 ReadOnlyModeOperationsNotAllowed,
91}
92
93impl IntoResponse for NodeServiceError {
94 fn into_response(self) -> response::Response {
95 let status = match self {
96 NodeServiceError::InvalidChainId(_) | NodeServiceError::BcsHex(_) => {
97 StatusCode::BAD_REQUEST
98 }
99 NodeServiceError::ReadOnlyModeOperationsNotAllowed => StatusCode::FORBIDDEN,
100 _ => StatusCode::INTERNAL_SERVER_ERROR,
101 };
102 let body = json!({"error": self.to_string()}).to_string();
103 (status, body).into_response()
104 }
105}
106
107#[Subscription]
108impl<C> SubscriptionRoot<C>
109where
110 C: ClientContext + 'static,
111{
112 async fn notifications(
114 &self,
115 chain_id: ChainId,
116 ) -> Result<impl Stream<Item = Notification>, Error> {
117 let client = self
118 .context
119 .lock()
120 .await
121 .make_chain_client(chain_id)
122 .await?;
123 Ok(client.subscribe()?)
124 }
125}
126
127impl<C> MutationRoot<C>
128where
129 C: ClientContext,
130{
131 async fn execute_system_operation(
132 &self,
133 system_operation: SystemOperation,
134 chain_id: ChainId,
135 ) -> Result<CryptoHash, Error> {
136 let certificate = self
137 .apply_client_command(&chain_id, move |client| {
138 let operation = Operation::system(system_operation.clone());
139 async move {
140 let result = client
141 .execute_operation(operation)
142 .await
143 .map_err(Error::from);
144 (result, client)
145 }
146 })
147 .await?;
148 Ok(certificate.hash())
149 }
150
151 async fn apply_client_command<F, Fut, T>(
155 &self,
156 chain_id: &ChainId,
157 mut f: F,
158 ) -> Result<T, Error>
159 where
160 F: FnMut(ChainClient<C::Environment>) -> Fut,
161 Fut: Future<Output = (Result<ClientOutcome<T>, Error>, ChainClient<C::Environment>)>,
162 {
163 loop {
164 let client = self
165 .context
166 .lock()
167 .await
168 .make_chain_client(*chain_id)
169 .await?;
170 let mut stream = client.subscribe()?;
171 let (result, client) = f(client).await;
172 self.context.lock().await.update_wallet(&client).await?;
173 let timeout = match result? {
174 ClientOutcome::Committed(t) => return Ok(t),
175 ClientOutcome::Conflict(certificate) => {
176 return Err(chain_client::Error::Conflict(certificate.hash()).into());
177 }
178 ClientOutcome::WaitForTimeout(timeout) => timeout,
179 };
180 drop(client);
181 util::wait_for_next_round(&mut stream, timeout).await;
182 }
183 }
184}
185
186#[async_graphql::Object(cache_control(no_cache))]
187impl<C> MutationRoot<C>
188where
189 C: ClientContext + 'static,
190{
191 async fn process_inbox(
193 &self,
194 #[graphql(desc = "The chain whose inbox is being processed.")] chain_id: ChainId,
195 ) -> Result<Vec<CryptoHash>, Error> {
196 let mut hashes = Vec::new();
197 loop {
198 let client = self
199 .context
200 .lock()
201 .await
202 .make_chain_client(chain_id)
203 .await?;
204 let result = client.process_inbox().await;
205 self.context.lock().await.update_wallet(&client).await?;
206 let (certificates, maybe_timeout) = result?;
207 hashes.extend(certificates.into_iter().map(|cert| cert.hash()));
208 match maybe_timeout {
209 None => return Ok(hashes),
210 Some(timestamp) => {
211 let mut stream = client.subscribe()?;
212 drop(client);
213 util::wait_for_next_round(&mut stream, timestamp).await;
214 }
215 }
216 }
217 }
218
219 async fn sync(
224 &self,
225 #[graphql(desc = "The chain being synchronized.")] chain_id: ChainId,
226 ) -> Result<u64, Error> {
227 let client = self
228 .context
229 .lock()
230 .await
231 .make_chain_client(chain_id)
232 .await?;
233 let info = client.synchronize_from_validators().await?;
234 self.context.lock().await.update_wallet(&client).await?;
235 Ok(info.next_block_height.0)
236 }
237
238 async fn retry_pending_block(
240 &self,
241 #[graphql(desc = "The chain on whose block is being retried.")] chain_id: ChainId,
242 ) -> Result<Option<CryptoHash>, Error> {
243 let client = self
244 .context
245 .lock()
246 .await
247 .make_chain_client(chain_id)
248 .await?;
249 let outcome = client.process_pending_block().await?;
250 self.context.lock().await.update_wallet(&client).await?;
251 match outcome {
252 ClientOutcome::Committed(Some(certificate)) => Ok(Some(certificate.hash())),
253 ClientOutcome::Committed(None) => Ok(None),
254 ClientOutcome::WaitForTimeout(timeout) => Err(Error::from(format!(
255 "Please try again at {}",
256 timeout.timestamp
257 ))),
258 ClientOutcome::Conflict(certificate) => Err(Error::from(format!(
259 "A different block was committed: {}",
260 certificate.hash()
261 ))),
262 }
263 }
264
265 async fn transfer(
268 &self,
269 #[graphql(desc = "The chain which native tokens are being transferred from.")]
270 chain_id: ChainId,
271 #[graphql(desc = "The account being debited on the chain.")] owner: AccountOwner,
272 #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
273 #[graphql(desc = "The amount being transferred.")] amount: Amount,
274 ) -> Result<CryptoHash, Error> {
275 self.apply_client_command(&chain_id, move |client| async move {
276 let result = client
277 .transfer(owner, amount, recipient)
278 .await
279 .map_err(Error::from)
280 .map(|outcome| outcome.map(|certificate| certificate.hash()));
281 (result, client)
282 })
283 .await
284 }
285
286 async fn claim(
290 &self,
291 #[graphql(desc = "The chain for whom owner is one of the owner.")] chain_id: ChainId,
292 #[graphql(desc = "The owner of chain targetId being debited.")] owner: AccountOwner,
293 #[graphql(desc = "The chain whose owner is being debited.")] target_id: ChainId,
294 #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
295 #[graphql(desc = "The amount being transferred.")] amount: Amount,
296 ) -> Result<CryptoHash, Error> {
297 self.apply_client_command(&chain_id, move |client| async move {
298 let result = client
299 .claim(owner, target_id, recipient, amount)
300 .await
301 .map_err(Error::from)
302 .map(|outcome| outcome.map(|certificate| certificate.hash()));
303 (result, client)
304 })
305 .await
306 }
307
308 async fn read_data_blob(
311 &self,
312 chain_id: ChainId,
313 hash: CryptoHash,
314 ) -> Result<CryptoHash, Error> {
315 self.apply_client_command(&chain_id, move |client| async move {
316 let result = client
317 .read_data_blob(hash)
318 .await
319 .map_err(Error::from)
320 .map(|outcome| outcome.map(|certificate| certificate.hash()));
321 (result, client)
322 })
323 .await
324 }
325
326 async fn open_chain(
328 &self,
329 #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
330 #[graphql(desc = "The owner of the new chain.")] owner: AccountOwner,
331 #[graphql(desc = "The balance of the chain being created. Zero if `None`.")]
332 balance: Option<Amount>,
333 ) -> Result<ChainId, Error> {
334 let ownership = ChainOwnership::single(owner);
335 let balance = balance.unwrap_or(Amount::ZERO);
336 let description = self
337 .apply_client_command(&chain_id, move |client| {
338 let ownership = ownership.clone();
339 async move {
340 let result = client
341 .open_chain(ownership, ApplicationPermissions::default(), balance)
342 .await
343 .map_err(Error::from)
344 .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
345 (result, client)
346 }
347 })
348 .await?;
349 Ok(description.id())
350 }
351
352 #[expect(clippy::too_many_arguments)]
354 async fn open_multi_owner_chain(
355 &self,
356 #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
357 #[graphql(desc = "Permissions for applications on the new chain")]
358 application_permissions: Option<ApplicationPermissions>,
359 #[graphql(desc = "The owners of the chain")] owners: Vec<AccountOwner>,
360 #[graphql(desc = "The weights of the owners")] weights: Option<Vec<u64>>,
361 #[graphql(desc = "The number of multi-leader rounds")] multi_leader_rounds: Option<u32>,
362 #[graphql(desc = "The balance of the chain. Zero if `None`")] balance: Option<Amount>,
363 #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
364 fast_round_ms: Option<u64>,
365 #[graphql(
366 desc = "The duration of the first single-leader and all multi-leader rounds",
367 default = 10_000
368 )]
369 base_timeout_ms: u64,
370 #[graphql(
371 desc = "The number of milliseconds by which the timeout increases after each \
372 single-leader round",
373 default = 1_000
374 )]
375 timeout_increment_ms: u64,
376 #[graphql(
377 desc = "The age of an incoming tracked or protected message after which the \
378 validators start transitioning the chain to fallback mode, in milliseconds.",
379 default = 86_400_000
380 )]
381 fallback_duration_ms: u64,
382 ) -> Result<ChainId, Error> {
383 let owners = if let Some(weights) = weights {
384 if weights.len() != owners.len() {
385 return Err(Error::new(format!(
386 "There are {} owners but {} weights.",
387 owners.len(),
388 weights.len()
389 )));
390 }
391 owners.into_iter().zip(weights).collect::<Vec<_>>()
392 } else {
393 owners
394 .into_iter()
395 .zip(iter::repeat(100))
396 .collect::<Vec<_>>()
397 };
398 let multi_leader_rounds = multi_leader_rounds.unwrap_or(u32::MAX);
399 let timeout_config = TimeoutConfig {
400 fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
401 base_timeout: TimeDelta::from_millis(base_timeout_ms),
402 timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
403 fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
404 };
405 let ownership = ChainOwnership::multiple(owners, multi_leader_rounds, timeout_config);
406 let balance = balance.unwrap_or(Amount::ZERO);
407 let description = self
408 .apply_client_command(&chain_id, move |client| {
409 let ownership = ownership.clone();
410 let application_permissions = application_permissions.clone().unwrap_or_default();
411 async move {
412 let result = client
413 .open_chain(ownership, application_permissions, balance)
414 .await
415 .map_err(Error::from)
416 .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
417 (result, client)
418 }
419 })
420 .await?;
421 Ok(description.id())
422 }
423
424 async fn close_chain(
426 &self,
427 #[graphql(desc = "The chain being closed.")] chain_id: ChainId,
428 ) -> Result<Option<CryptoHash>, Error> {
429 let maybe_cert = self
430 .apply_client_command(&chain_id, |client| async move {
431 let result = client.close_chain().await.map_err(Error::from);
432 (result, client)
433 })
434 .await?;
435 Ok(maybe_cert.as_ref().map(GenericCertificate::hash))
436 }
437
438 async fn change_owner(
440 &self,
441 #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
442 #[graphql(desc = "The new single owner of the chain")] new_owner: AccountOwner,
443 ) -> Result<CryptoHash, Error> {
444 let operation = SystemOperation::ChangeOwnership {
445 super_owners: vec![new_owner],
446 owners: Vec::new(),
447 first_leader: None,
448 multi_leader_rounds: 2,
449 open_multi_leader_rounds: false,
450 timeout_config: TimeoutConfig::default(),
451 };
452 self.execute_system_operation(operation, chain_id).await
453 }
454
455 #[expect(clippy::too_many_arguments)]
457 async fn change_multiple_owners(
458 &self,
459 #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
460 #[graphql(desc = "The new list of owners of the chain")] new_owners: Vec<AccountOwner>,
461 #[graphql(desc = "The new list of weights of the owners")] new_weights: Vec<u64>,
462 #[graphql(desc = "The multi-leader round of the chain")] multi_leader_rounds: u32,
463 #[graphql(
464 desc = "Whether multi-leader rounds are unrestricted, that is not limited to chain owners."
465 )]
466 open_multi_leader_rounds: bool,
467 #[graphql(desc = "The leader of the first single-leader round. \
468 If not set, this is random like other rounds.")]
469 first_leader: Option<AccountOwner>,
470 #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
471 fast_round_ms: Option<u64>,
472 #[graphql(
473 desc = "The duration of the first single-leader and all multi-leader rounds",
474 default = 10_000
475 )]
476 base_timeout_ms: u64,
477 #[graphql(
478 desc = "The number of milliseconds by which the timeout increases after each \
479 single-leader round",
480 default = 1_000
481 )]
482 timeout_increment_ms: u64,
483 #[graphql(
484 desc = "The age of an incoming tracked or protected message after which the \
485 validators start transitioning the chain to fallback mode, in milliseconds.",
486 default = 86_400_000
487 )]
488 fallback_duration_ms: u64,
489 ) -> Result<CryptoHash, Error> {
490 let operation = SystemOperation::ChangeOwnership {
491 super_owners: Vec::new(),
492 owners: new_owners.into_iter().zip(new_weights).collect(),
493 first_leader,
494 multi_leader_rounds,
495 open_multi_leader_rounds,
496 timeout_config: TimeoutConfig {
497 fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
498 base_timeout: TimeDelta::from_millis(base_timeout_ms),
499 timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
500 fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
501 },
502 };
503 self.execute_system_operation(operation, chain_id).await
504 }
505
506 #[expect(clippy::too_many_arguments)]
508 async fn change_application_permissions(
509 &self,
510 #[graphql(desc = "The chain whose permissions are being changed")] chain_id: ChainId,
511 #[graphql(
512 desc = "These applications are allowed to manage the chain: close it, change \
513 application permissions, and change ownership."
514 )]
515 manage_chain: Vec<ApplicationId>,
516 #[graphql(
517 desc = "If this is `None`, all system operations and application operations are allowed.
518If it is `Some`, only operations from the specified applications are allowed,
519and no system operations."
520 )]
521 execute_operations: Option<Vec<ApplicationId>>,
522 #[graphql(
523 desc = "At least one operation or incoming message from each of these applications must occur in every block."
524 )]
525 mandatory_applications: Vec<ApplicationId>,
526 #[graphql(
527 desc = "These applications are allowed to perform calls to services as oracles."
528 )]
529 call_service_as_oracle: Option<Vec<ApplicationId>>,
530 #[graphql(desc = "These applications are allowed to perform HTTP requests.")]
531 make_http_requests: Option<Vec<ApplicationId>>,
532 ) -> Result<CryptoHash, Error> {
533 let operation = SystemOperation::ChangeApplicationPermissions(ApplicationPermissions {
534 execute_operations,
535 mandatory_applications,
536 manage_chain,
537 call_service_as_oracle,
538 make_http_requests,
539 });
540 self.execute_system_operation(operation, chain_id).await
541 }
542
543 async fn create_committee(
547 &self,
548 chain_id: ChainId,
549 committee: Committee,
550 ) -> Result<CryptoHash, Error> {
551 Ok(self
552 .apply_client_command(&chain_id, move |client| {
553 let committee = committee.clone();
554 async move {
555 let result = client
556 .stage_new_committee(committee)
557 .await
558 .map_err(Error::from);
559 (result, client)
560 }
561 })
562 .await?
563 .hash())
564 }
565
566 async fn remove_committee(&self, chain_id: ChainId, epoch: Epoch) -> Result<CryptoHash, Error> {
570 let operation = SystemOperation::Admin(AdminOperation::RemoveCommittee { epoch });
571 self.execute_system_operation(operation, chain_id).await
572 }
573
574 async fn publish_module(
576 &self,
577 #[graphql(desc = "The chain publishing the module")] chain_id: ChainId,
578 #[graphql(desc = "The bytecode of the contract code")] contract: Bytecode,
579 #[graphql(desc = "The bytecode of the service code (only relevant for WebAssembly)")]
580 service: Bytecode,
581 #[graphql(desc = "The virtual machine being used (either Wasm or Evm)")]
582 vm_runtime: VmRuntime,
583 ) -> Result<ModuleId, Error> {
584 self.apply_client_command(&chain_id, move |client| {
585 let contract = contract.clone();
586 let service = service.clone();
587 async move {
588 let result = client
589 .publish_module(contract, service, vm_runtime)
590 .await
591 .map_err(Error::from)
592 .map(|outcome| outcome.map(|(module_id, _)| module_id));
593 (result, client)
594 }
595 })
596 .await
597 }
598
599 async fn publish_data_blob(
601 &self,
602 #[graphql(desc = "The chain paying for the blob publication")] chain_id: ChainId,
603 #[graphql(desc = "The content of the data blob being created")] bytes: Vec<u8>,
604 ) -> Result<CryptoHash, Error> {
605 self.apply_client_command(&chain_id, |client| {
606 let bytes = bytes.clone();
607 async move {
608 let result = client.publish_data_blob(bytes).await.map_err(Error::from);
609 (result, client)
610 }
611 })
612 .await
613 .map(|_| CryptoHash::new(&BlobContent::new_data(bytes)))
614 }
615
616 async fn create_application(
618 &self,
619 #[graphql(desc = "The chain paying for the creation of the application")] chain_id: ChainId,
620 #[graphql(desc = "The module ID of the application being created")] module_id: ModuleId,
621 #[graphql(desc = "The JSON serialization of the parameters of the application")]
622 parameters: String,
623 #[graphql(
624 desc = "The JSON serialization of the instantiation argument of the application"
625 )]
626 instantiation_argument: String,
627 #[graphql(desc = "The dependencies of the application being created")]
628 required_application_ids: Vec<ApplicationId>,
629 ) -> Result<ApplicationId, Error> {
630 self.apply_client_command(&chain_id, move |client| {
631 let parameters = parameters.as_bytes().to_vec();
632 let instantiation_argument = instantiation_argument.as_bytes().to_vec();
633 let required_application_ids = required_application_ids.clone();
634 async move {
635 let result = client
636 .create_application_untyped(
637 module_id,
638 parameters,
639 instantiation_argument,
640 required_application_ids,
641 )
642 .await
643 .map_err(Error::from)
644 .map(|outcome| outcome.map(|(application_id, _)| application_id));
645 (result, client)
646 }
647 })
648 .await
649 }
650}
651
652#[async_graphql::Object(cache_control(no_cache))]
653impl<C> QueryRoot<C>
654where
655 C: ClientContext + 'static,
656{
657 async fn chain(
658 &self,
659 chain_id: ChainId,
660 ) -> Result<
661 ChainStateExtendedView<<C::Environment as linera_core::Environment>::StorageContext>,
662 Error,
663 > {
664 let client = self
665 .context
666 .lock()
667 .await
668 .make_chain_client(chain_id)
669 .await?;
670 let view = client.chain_state_view().await?;
671 Ok(ChainStateExtendedView::new(view))
672 }
673
674 async fn applications(&self, chain_id: ChainId) -> Result<Vec<ApplicationOverview>, Error> {
675 let client = self
676 .context
677 .lock()
678 .await
679 .make_chain_client(chain_id)
680 .await?;
681 let applications = client
682 .chain_state_view()
683 .await?
684 .execution_state
685 .list_applications()
686 .await?;
687
688 let overviews = applications
689 .into_iter()
690 .map(|(id, description)| ApplicationOverview::new(id, description, self.port, chain_id))
691 .collect();
692
693 Ok(overviews)
694 }
695
696 async fn chains(&self) -> Result<Chains, Error> {
697 Ok(Chains {
698 list: self
699 .context
700 .lock()
701 .await
702 .wallet()
703 .chain_ids()
704 .try_collect()
705 .await?,
706 default: self.default_chain,
707 })
708 }
709
710 async fn block(
711 &self,
712 hash: Option<CryptoHash>,
713 chain_id: ChainId,
714 ) -> Result<Option<ConfirmedBlock>, Error> {
715 let client = self
716 .context
717 .lock()
718 .await
719 .make_chain_client(chain_id)
720 .await?;
721 let hash = match hash {
722 Some(hash) => Some(hash),
723 None => client.chain_info().await?.block_hash,
724 };
725 if let Some(hash) = hash {
726 let block = client.read_confirmed_block(hash).await?;
727 Ok(Some(block))
728 } else {
729 Ok(None)
730 }
731 }
732
733 async fn events_from_index(
734 &self,
735 chain_id: ChainId,
736 stream_id: StreamId,
737 start_index: u32,
738 ) -> Result<Vec<IndexAndEvent>, Error> {
739 Ok(self
740 .context
741 .lock()
742 .await
743 .make_chain_client(chain_id)
744 .await?
745 .events_from_index(stream_id, start_index)
746 .await?)
747 }
748
749 async fn blocks(
750 &self,
751 from: Option<CryptoHash>,
752 chain_id: ChainId,
753 limit: Option<u32>,
754 ) -> Result<Vec<ConfirmedBlock>, Error> {
755 let client = self
756 .context
757 .lock()
758 .await
759 .make_chain_client(chain_id)
760 .await?;
761 let limit = limit.unwrap_or(10);
762 let from = match from {
763 Some(from) => Some(from),
764 None => client.chain_info().await?.block_hash,
765 };
766 let Some(from) = from else {
767 return Ok(vec![]);
768 };
769 let mut hash = Some(from);
770 let mut values = Vec::new();
771 for _ in 0..limit {
772 let Some(next_hash) = hash else {
773 break;
774 };
775 let value = client.read_confirmed_block(next_hash).await?;
776 hash = value.block().header.previous_block_hash;
777 values.push(value);
778 }
779 Ok(values)
780 }
781
782 async fn version(&self) -> linera_version::VersionInfo {
784 linera_version::VersionInfo::default()
785 }
786}
787
788struct ChainStateViewExtension(ChainId);
792
793#[async_graphql::Object(cache_control(no_cache))]
794impl ChainStateViewExtension {
795 async fn chain_id(&self) -> ChainId {
796 self.0
797 }
798}
799
800#[derive(MergedObject)]
801struct ChainStateExtendedView<C>(ChainStateViewExtension, ReadOnlyChainStateView<C>)
802where
803 C: linera_views::context::Context + Clone + Send + Sync + 'static,
804 C::Extra: linera_execution::ExecutionRuntimeContext;
805
806pub struct ReadOnlyChainStateView<C>(OwnedRwLockReadGuard<ChainStateView<C>>)
809where
810 C: linera_views::context::Context + Clone + Send + Sync + 'static;
811
812impl<C> ContainerType for ReadOnlyChainStateView<C>
813where
814 C: linera_views::context::Context + Clone + Send + Sync + 'static,
815{
816 async fn resolve_field(
817 &self,
818 context: &async_graphql::Context<'_>,
819 ) -> async_graphql::ServerResult<Option<async_graphql::Value>> {
820 self.0.resolve_field(context).await
821 }
822}
823
824impl<C> OutputType for ReadOnlyChainStateView<C>
825where
826 C: linera_views::context::Context + Clone + Send + Sync + 'static,
827{
828 fn type_name() -> Cow<'static, str> {
829 ChainStateView::<C>::type_name()
830 }
831
832 fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
833 ChainStateView::<C>::create_type_info(registry)
834 }
835
836 async fn resolve(
837 &self,
838 context: &async_graphql::ContextSelectionSet<'_>,
839 field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
840 ) -> async_graphql::ServerResult<async_graphql::Value> {
841 self.0.resolve(context, field).await
842 }
843}
844
845impl<C> ChainStateExtendedView<C>
846where
847 C: linera_views::context::Context + Clone + Send + Sync + 'static,
848 C::Extra: linera_execution::ExecutionRuntimeContext,
849{
850 fn new(view: OwnedRwLockReadGuard<ChainStateView<C>>) -> Self {
851 Self(
852 ChainStateViewExtension(view.chain_id()),
853 ReadOnlyChainStateView(view),
854 )
855 }
856}
857
858#[derive(SimpleObject)]
859pub struct ApplicationOverview {
860 id: ApplicationId,
861 description: ApplicationDescription,
862 link: String,
863}
864
865impl ApplicationOverview {
866 fn new(
867 id: ApplicationId,
868 description: ApplicationDescription,
869 port: NonZeroU16,
870 chain_id: ChainId,
871 ) -> Self {
872 Self {
873 id,
874 description,
875 link: format!(
876 "http://localhost:{}/chains/{}/applications/{}",
877 port.get(),
878 chain_id,
879 id
880 ),
881 }
882 }
883}
884
885pub enum NodeServiceSchema<C>
887where
888 C: ClientContext + 'static,
889{
890 Full(Schema<QueryRoot<C>, MutationRoot<C>, SubscriptionRoot<C>>),
892 ReadOnly(Schema<QueryRoot<C>, EmptyMutation, SubscriptionRoot<C>>),
894}
895
896impl<C> NodeServiceSchema<C>
897where
898 C: ClientContext,
899{
900 pub async fn execute(&self, request: impl Into<Request>) -> Response {
902 match self {
903 Self::Full(schema) => schema.execute(request).await,
904 Self::ReadOnly(schema) => schema.execute(request).await,
905 }
906 }
907
908 pub fn sdl(&self) -> String {
910 match self {
911 Self::Full(schema) => schema.sdl(),
912 Self::ReadOnly(schema) => schema.sdl(),
913 }
914 }
915}
916
917impl<C> Clone for NodeServiceSchema<C>
918where
919 C: ClientContext,
920{
921 fn clone(&self) -> Self {
922 match self {
923 Self::Full(schema) => Self::Full(schema.clone()),
924 Self::ReadOnly(schema) => Self::ReadOnly(schema.clone()),
925 }
926 }
927}
928
929pub struct NodeService<C>
932where
933 C: ClientContext + 'static,
934{
935 config: ChainListenerConfig,
936 port: NonZeroU16,
937 #[cfg(with_metrics)]
938 metrics_port: NonZeroU16,
939 default_chain: Option<ChainId>,
940 context: Arc<Mutex<C>>,
941 read_only: bool,
943}
944
945impl<C> Clone for NodeService<C>
946where
947 C: ClientContext + 'static,
948{
949 fn clone(&self) -> Self {
950 Self {
951 config: self.config.clone(),
952 port: self.port,
953 #[cfg(with_metrics)]
954 metrics_port: self.metrics_port,
955 default_chain: self.default_chain,
956 context: Arc::clone(&self.context),
957 read_only: self.read_only,
958 }
959 }
960}
961
962impl<C> NodeService<C>
963where
964 C: ClientContext,
965{
966 pub fn new(
968 config: ChainListenerConfig,
969 port: NonZeroU16,
970 #[cfg(with_metrics)] metrics_port: NonZeroU16,
971 default_chain: Option<ChainId>,
972 context: Arc<Mutex<C>>,
973 read_only: bool,
974 ) -> Self {
975 Self {
976 config,
977 port,
978 #[cfg(with_metrics)]
979 metrics_port,
980 default_chain,
981 context,
982 read_only,
983 }
984 }
985
986 #[cfg(with_metrics)]
987 pub fn metrics_address(&self) -> SocketAddr {
988 SocketAddr::from(([0, 0, 0, 0], self.metrics_port.get()))
989 }
990
991 pub fn schema(&self) -> NodeServiceSchema<C> {
992 let query = QueryRoot {
993 context: Arc::clone(&self.context),
994 port: self.port,
995 default_chain: self.default_chain,
996 };
997 let subscription = SubscriptionRoot {
998 context: Arc::clone(&self.context),
999 };
1000
1001 if self.read_only {
1002 NodeServiceSchema::ReadOnly(Schema::build(query, EmptyMutation, subscription).finish())
1003 } else {
1004 NodeServiceSchema::Full(
1005 Schema::build(
1006 query,
1007 MutationRoot {
1008 context: Arc::clone(&self.context),
1009 },
1010 subscription,
1011 )
1012 .finish(),
1013 )
1014 }
1015 }
1016
1017 #[instrument(name = "node_service", level = "info", skip_all, fields(port = ?self.port))]
1019 pub async fn run(
1020 self,
1021 cancellation_token: CancellationToken,
1022 command_receiver: UnboundedReceiver<ListenerCommand>,
1023 ) -> Result<(), anyhow::Error> {
1024 let port = self.port.get();
1025 let index_handler = axum::routing::get(util::graphiql).post(Self::index_handler);
1026 let application_handler =
1027 axum::routing::get(util::graphiql).post(Self::application_handler);
1028
1029 #[cfg(with_metrics)]
1030 monitoring_server::start_metrics(self.metrics_address(), cancellation_token.clone());
1031
1032 let base_router = Router::new()
1033 .route("/", index_handler)
1034 .route(
1035 "/chains/{chain_id}/applications/{application_id}",
1036 application_handler,
1037 )
1038 .route("/ready", axum::routing::get(|| async { "ready!" }));
1039
1040 let app = match self.schema() {
1042 NodeServiceSchema::Full(schema) => {
1043 base_router.route_service("/ws", GraphQLSubscription::new(schema))
1044 }
1045 NodeServiceSchema::ReadOnly(schema) => {
1046 base_router.route_service("/ws", GraphQLSubscription::new(schema))
1047 }
1048 }
1049 .layer(Extension(self.clone()))
1050 .layer(CorsLayer::permissive());
1052
1053 info!("GraphiQL IDE: http://localhost:{}", port);
1054
1055 let storage = self.context.lock().await.storage().clone();
1056
1057 let chain_listener = ChainListener::new(
1058 self.config,
1059 self.context,
1060 storage,
1061 cancellation_token.clone(),
1062 command_receiver,
1063 true,
1064 )
1065 .run()
1066 .await?;
1067 let mut chain_listener = Box::pin(chain_listener).fuse();
1068 let tcp_listener =
1069 tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
1070 let server = axum::serve(tcp_listener, app)
1071 .with_graceful_shutdown(cancellation_token.cancelled_owned())
1072 .into_future();
1073 futures::select! {
1074 result = chain_listener => result?,
1075 result = Box::pin(server).fuse() => result?,
1076 };
1077
1078 Ok(())
1079 }
1080
1081 async fn handle_service_request(
1083 &self,
1084 application_id: ApplicationId,
1085 request: Vec<u8>,
1086 chain_id: ChainId,
1087 block_hash: Option<CryptoHash>,
1088 ) -> Result<Vec<u8>, NodeServiceError> {
1089 let QueryOutcome {
1090 response,
1091 operations,
1092 } = self
1093 .query_user_application(application_id, request, chain_id, block_hash)
1094 .await?;
1095 if operations.is_empty() {
1096 return Ok(response);
1097 }
1098
1099 if self.read_only {
1100 return Err(NodeServiceError::ReadOnlyModeOperationsNotAllowed);
1101 }
1102
1103 trace!("Query requested a new block with operations: {operations:?}");
1104 let client = self
1105 .context
1106 .lock()
1107 .await
1108 .make_chain_client(chain_id)
1109 .await?;
1110 let hash = loop {
1111 let timeout = match client
1112 .execute_operations(operations.clone(), vec![])
1113 .await?
1114 {
1115 ClientOutcome::Committed(certificate) => break certificate.hash(),
1116 ClientOutcome::Conflict(certificate) => {
1117 return Err(chain_client::Error::Conflict(certificate.hash()).into());
1118 }
1119 ClientOutcome::WaitForTimeout(timeout) => timeout,
1120 };
1121 let mut stream = client.subscribe().map_err(|_| {
1122 chain_client::Error::InternalError("Could not subscribe to the local node.")
1123 })?;
1124 util::wait_for_next_round(&mut stream, timeout).await;
1125 };
1126 let response = async_graphql::Response::new(hash.to_value());
1127 Ok(serde_json::to_vec(&response)?)
1128 }
1129
1130 async fn query_user_application(
1132 &self,
1133 application_id: ApplicationId,
1134 bytes: Vec<u8>,
1135 chain_id: ChainId,
1136 block_hash: Option<CryptoHash>,
1137 ) -> Result<QueryOutcome<Vec<u8>>, NodeServiceError> {
1138 let query = Query::User {
1139 application_id,
1140 bytes,
1141 };
1142 let client = self
1143 .context
1144 .lock()
1145 .await
1146 .make_chain_client(chain_id)
1147 .await?;
1148 let QueryOutcome {
1149 response,
1150 operations,
1151 } = client.query_application(query, block_hash).await?;
1152 match response {
1153 QueryResponse::System(_) => {
1154 unreachable!("cannot get a system response for a user query")
1155 }
1156 QueryResponse::User(user_response_bytes) => Ok(QueryOutcome {
1157 response: user_response_bytes,
1158 operations,
1159 }),
1160 }
1161 }
1162
1163 async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
1165 service
1166 .0
1167 .schema()
1168 .execute(request.into_inner())
1169 .await
1170 .into()
1171 }
1172
1173 async fn application_handler(
1177 Path((chain_id, application_id)): Path<(String, String)>,
1178 service: Extension<Self>,
1179 request: String,
1180 ) -> Result<Vec<u8>, NodeServiceError> {
1181 let chain_id: ChainId = chain_id.parse().map_err(NodeServiceError::InvalidChainId)?;
1182 let application_id: ApplicationId = application_id.parse()?;
1183
1184 debug!(
1185 %chain_id,
1186 %application_id,
1187 "processing request for application:\n{:?}",
1188 &request
1189 );
1190 let response = service
1191 .0
1192 .handle_service_request(application_id, request.into_bytes(), chain_id, None)
1193 .await?;
1194
1195 Ok(response)
1196 }
1197}