1use std::{borrow::Cow, future::IntoFuture, iter, net::SocketAddr, num::NonZeroU16, sync::Arc};
5
6use async_graphql::{
7 futures_util::Stream, resolver_utils::ContainerType, Error, MergedObject, OutputType,
8 ScalarType, Schema, SimpleObject, Subscription,
9};
10use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
11use axum::{extract::Path, http::StatusCode, response, response::IntoResponse, Extension, Router};
12use futures::{lock::Mutex, Future, FutureExt as _, TryStreamExt as _};
13use linera_base::{
14 crypto::{CryptoError, CryptoHash},
15 data_types::{
16 Amount, ApplicationDescription, ApplicationPermissions, Bytecode, Epoch, TimeDelta,
17 },
18 identifiers::{
19 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
20 },
21 ownership::{ChainOwnership, TimeoutConfig},
22 vm::VmRuntime,
23 BcsHexParseError,
24};
25use linera_chain::{
26 types::{ConfirmedBlock, GenericCertificate},
27 ChainStateView,
28};
29use linera_client::chain_listener::{
30 ChainListener, ChainListenerConfig, ClientContext, ListenerCommand,
31};
32use linera_core::{
33 client::chain_client::{self, ChainClient},
34 data_types::ClientOutcome,
35 wallet::Wallet as _,
36 worker::Notification,
37};
38use linera_execution::{
39 committee::Committee, system::AdminOperation, Operation, Query, QueryOutcome, QueryResponse,
40 SystemOperation,
41};
42#[cfg(with_metrics)]
43use linera_metrics::monitoring_server;
44use linera_sdk::linera_base_types::BlobContent;
45use serde::{Deserialize, Serialize};
46use serde_json::json;
47use tokio::sync::{mpsc::UnboundedReceiver, OwnedRwLockReadGuard};
48use tokio_util::sync::CancellationToken;
49use tower_http::cors::CorsLayer;
50use tracing::{debug, error, info, instrument, trace};
51
52use crate::util;
53
54#[derive(SimpleObject, Serialize, Deserialize, Clone)]
55pub struct Chains {
56 pub list: Vec<ChainId>,
57 pub default: Option<ChainId>,
58}
59
60pub struct QueryRoot<C> {
62 context: Arc<Mutex<C>>,
63 port: NonZeroU16,
64 default_chain: Option<ChainId>,
65}
66
67pub struct SubscriptionRoot<C> {
69 context: Arc<Mutex<C>>,
70}
71
72pub struct MutationRoot<C> {
74 context: Arc<Mutex<C>>,
75}
76
77#[derive(Debug, thiserror::Error)]
78enum NodeServiceError {
79 #[error(transparent)]
80 ChainClient(#[from] chain_client::Error),
81 #[error(transparent)]
82 BcsHex(#[from] BcsHexParseError),
83 #[error(transparent)]
84 Json(#[from] serde_json::Error),
85 #[error("malformed chain ID: {0}")]
86 InvalidChainId(CryptoError),
87 #[error(transparent)]
88 Client(#[from] linera_client::Error),
89}
90
91impl IntoResponse for NodeServiceError {
92 fn into_response(self) -> response::Response {
93 let status = if let NodeServiceError::InvalidChainId(_) | NodeServiceError::BcsHex(_) = self
94 {
95 StatusCode::BAD_REQUEST
96 } else {
97 StatusCode::INTERNAL_SERVER_ERROR
98 };
99 let body = json!({"error": self.to_string()}).to_string();
100 (status, body).into_response()
101 }
102}
103
104#[Subscription]
105impl<C> SubscriptionRoot<C>
106where
107 C: ClientContext + 'static,
108{
109 async fn notifications(
111 &self,
112 chain_id: ChainId,
113 ) -> Result<impl Stream<Item = Notification>, Error> {
114 let client = self
115 .context
116 .lock()
117 .await
118 .make_chain_client(chain_id)
119 .await?;
120 Ok(client.subscribe()?)
121 }
122}
123
124impl<C> MutationRoot<C>
125where
126 C: ClientContext,
127{
128 async fn execute_system_operation(
129 &self,
130 system_operation: SystemOperation,
131 chain_id: ChainId,
132 ) -> Result<CryptoHash, Error> {
133 let certificate = self
134 .apply_client_command(&chain_id, move |client| {
135 let operation = Operation::system(system_operation.clone());
136 async move {
137 let result = client
138 .execute_operation(operation)
139 .await
140 .map_err(Error::from);
141 (result, client)
142 }
143 })
144 .await?;
145 Ok(certificate.hash())
146 }
147
148 async fn apply_client_command<F, Fut, T>(
152 &self,
153 chain_id: &ChainId,
154 mut f: F,
155 ) -> Result<T, Error>
156 where
157 F: FnMut(ChainClient<C::Environment>) -> Fut,
158 Fut: Future<Output = (Result<ClientOutcome<T>, Error>, ChainClient<C::Environment>)>,
159 {
160 loop {
161 let client = self
162 .context
163 .lock()
164 .await
165 .make_chain_client(*chain_id)
166 .await?;
167 let mut stream = client.subscribe()?;
168 let (result, client) = f(client).await;
169 self.context.lock().await.update_wallet(&client).await?;
170 let timeout = match result? {
171 ClientOutcome::Committed(t) => return Ok(t),
172 ClientOutcome::WaitForTimeout(timeout) => timeout,
173 };
174 drop(client);
175 util::wait_for_next_round(&mut stream, timeout).await;
176 }
177 }
178}
179
180#[async_graphql::Object(cache_control(no_cache))]
181impl<C> MutationRoot<C>
182where
183 C: ClientContext + 'static,
184{
185 async fn process_inbox(
187 &self,
188 #[graphql(desc = "The chain whose inbox is being processed.")] chain_id: ChainId,
189 ) -> Result<Vec<CryptoHash>, Error> {
190 let mut hashes = Vec::new();
191 loop {
192 let client = self
193 .context
194 .lock()
195 .await
196 .make_chain_client(chain_id)
197 .await?;
198 let result = client.process_inbox().await;
199 self.context.lock().await.update_wallet(&client).await?;
200 let (certificates, maybe_timeout) = result?;
201 hashes.extend(certificates.into_iter().map(|cert| cert.hash()));
202 match maybe_timeout {
203 None => return Ok(hashes),
204 Some(timestamp) => {
205 let mut stream = client.subscribe()?;
206 drop(client);
207 util::wait_for_next_round(&mut stream, timestamp).await;
208 }
209 }
210 }
211 }
212
213 async fn sync(
218 &self,
219 #[graphql(desc = "The chain being synchronized.")] chain_id: ChainId,
220 ) -> Result<u64, Error> {
221 let client = self
222 .context
223 .lock()
224 .await
225 .make_chain_client(chain_id)
226 .await?;
227 let info = client.synchronize_from_validators().await?;
228 self.context.lock().await.update_wallet(&client).await?;
229 Ok(info.next_block_height.0)
230 }
231
232 async fn retry_pending_block(
234 &self,
235 #[graphql(desc = "The chain on whose block is being retried.")] chain_id: ChainId,
236 ) -> Result<Option<CryptoHash>, Error> {
237 let client = self
238 .context
239 .lock()
240 .await
241 .make_chain_client(chain_id)
242 .await?;
243 let outcome = client.process_pending_block().await?;
244 self.context.lock().await.update_wallet(&client).await?;
245 match outcome {
246 ClientOutcome::Committed(Some(certificate)) => Ok(Some(certificate.hash())),
247 ClientOutcome::Committed(None) => Ok(None),
248 ClientOutcome::WaitForTimeout(timeout) => Err(Error::from(format!(
249 "Please try again at {}",
250 timeout.timestamp
251 ))),
252 }
253 }
254
255 async fn transfer(
258 &self,
259 #[graphql(desc = "The chain which native tokens are being transferred from.")]
260 chain_id: ChainId,
261 #[graphql(desc = "The account being debited on the chain.")] owner: AccountOwner,
262 #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
263 #[graphql(desc = "The amount being transferred.")] amount: Amount,
264 ) -> Result<CryptoHash, Error> {
265 self.apply_client_command(&chain_id, move |client| async move {
266 let result = client
267 .transfer(owner, amount, recipient)
268 .await
269 .map_err(Error::from)
270 .map(|outcome| outcome.map(|certificate| certificate.hash()));
271 (result, client)
272 })
273 .await
274 }
275
276 async fn claim(
280 &self,
281 #[graphql(desc = "The chain for whom owner is one of the owner.")] chain_id: ChainId,
282 #[graphql(desc = "The owner of chain targetId being debited.")] owner: AccountOwner,
283 #[graphql(desc = "The chain whose owner is being debited.")] target_id: ChainId,
284 #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
285 #[graphql(desc = "The amount being transferred.")] amount: Amount,
286 ) -> Result<CryptoHash, Error> {
287 self.apply_client_command(&chain_id, move |client| async move {
288 let result = client
289 .claim(owner, target_id, recipient, amount)
290 .await
291 .map_err(Error::from)
292 .map(|outcome| outcome.map(|certificate| certificate.hash()));
293 (result, client)
294 })
295 .await
296 }
297
298 async fn read_data_blob(
301 &self,
302 chain_id: ChainId,
303 hash: CryptoHash,
304 ) -> Result<CryptoHash, Error> {
305 self.apply_client_command(&chain_id, move |client| async move {
306 let result = client
307 .read_data_blob(hash)
308 .await
309 .map_err(Error::from)
310 .map(|outcome| outcome.map(|certificate| certificate.hash()));
311 (result, client)
312 })
313 .await
314 }
315
316 async fn open_chain(
318 &self,
319 #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
320 #[graphql(desc = "The owner of the new chain.")] owner: AccountOwner,
321 #[graphql(desc = "The balance of the chain being created. Zero if `None`.")]
322 balance: Option<Amount>,
323 ) -> Result<ChainId, Error> {
324 let ownership = ChainOwnership::single(owner);
325 let balance = balance.unwrap_or(Amount::ZERO);
326 let description = self
327 .apply_client_command(&chain_id, move |client| {
328 let ownership = ownership.clone();
329 async move {
330 let result = client
331 .open_chain(ownership, ApplicationPermissions::default(), balance)
332 .await
333 .map_err(Error::from)
334 .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
335 (result, client)
336 }
337 })
338 .await?;
339 Ok(description.id())
340 }
341
342 #[expect(clippy::too_many_arguments)]
344 async fn open_multi_owner_chain(
345 &self,
346 #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
347 #[graphql(desc = "Permissions for applications on the new chain")]
348 application_permissions: Option<ApplicationPermissions>,
349 #[graphql(desc = "The owners of the chain")] owners: Vec<AccountOwner>,
350 #[graphql(desc = "The weights of the owners")] weights: Option<Vec<u64>>,
351 #[graphql(desc = "The number of multi-leader rounds")] multi_leader_rounds: Option<u32>,
352 #[graphql(desc = "The balance of the chain. Zero if `None`")] balance: Option<Amount>,
353 #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
354 fast_round_ms: Option<u64>,
355 #[graphql(
356 desc = "The duration of the first single-leader and all multi-leader rounds",
357 default = 10_000
358 )]
359 base_timeout_ms: u64,
360 #[graphql(
361 desc = "The number of milliseconds by which the timeout increases after each \
362 single-leader round",
363 default = 1_000
364 )]
365 timeout_increment_ms: u64,
366 #[graphql(
367 desc = "The age of an incoming tracked or protected message after which the \
368 validators start transitioning the chain to fallback mode, in milliseconds.",
369 default = 86_400_000
370 )]
371 fallback_duration_ms: u64,
372 ) -> Result<ChainId, Error> {
373 let owners = if let Some(weights) = weights {
374 if weights.len() != owners.len() {
375 return Err(Error::new(format!(
376 "There are {} owners but {} weights.",
377 owners.len(),
378 weights.len()
379 )));
380 }
381 owners.into_iter().zip(weights).collect::<Vec<_>>()
382 } else {
383 owners
384 .into_iter()
385 .zip(iter::repeat(100))
386 .collect::<Vec<_>>()
387 };
388 let multi_leader_rounds = multi_leader_rounds.unwrap_or(u32::MAX);
389 let timeout_config = TimeoutConfig {
390 fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
391 base_timeout: TimeDelta::from_millis(base_timeout_ms),
392 timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
393 fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
394 };
395 let ownership = ChainOwnership::multiple(owners, multi_leader_rounds, timeout_config);
396 let balance = balance.unwrap_or(Amount::ZERO);
397 let description = self
398 .apply_client_command(&chain_id, move |client| {
399 let ownership = ownership.clone();
400 let application_permissions = application_permissions.clone().unwrap_or_default();
401 async move {
402 let result = client
403 .open_chain(ownership, application_permissions, balance)
404 .await
405 .map_err(Error::from)
406 .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
407 (result, client)
408 }
409 })
410 .await?;
411 Ok(description.id())
412 }
413
414 async fn close_chain(
416 &self,
417 #[graphql(desc = "The chain being closed.")] chain_id: ChainId,
418 ) -> Result<Option<CryptoHash>, Error> {
419 let maybe_cert = self
420 .apply_client_command(&chain_id, |client| async move {
421 let result = client.close_chain().await.map_err(Error::from);
422 (result, client)
423 })
424 .await?;
425 Ok(maybe_cert.as_ref().map(GenericCertificate::hash))
426 }
427
428 async fn change_owner(
430 &self,
431 #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
432 #[graphql(desc = "The new single owner of the chain")] new_owner: AccountOwner,
433 ) -> Result<CryptoHash, Error> {
434 let operation = SystemOperation::ChangeOwnership {
435 super_owners: vec![new_owner],
436 owners: Vec::new(),
437 first_leader: None,
438 multi_leader_rounds: 2,
439 open_multi_leader_rounds: false,
440 timeout_config: TimeoutConfig::default(),
441 };
442 self.execute_system_operation(operation, chain_id).await
443 }
444
445 #[expect(clippy::too_many_arguments)]
447 async fn change_multiple_owners(
448 &self,
449 #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
450 #[graphql(desc = "The new list of owners of the chain")] new_owners: Vec<AccountOwner>,
451 #[graphql(desc = "The new list of weights of the owners")] new_weights: Vec<u64>,
452 #[graphql(desc = "The multi-leader round of the chain")] multi_leader_rounds: u32,
453 #[graphql(
454 desc = "Whether multi-leader rounds are unrestricted, that is not limited to chain owners."
455 )]
456 open_multi_leader_rounds: bool,
457 #[graphql(desc = "The leader of the first single-leader round. \
458 If not set, this is random like other rounds.")]
459 first_leader: Option<AccountOwner>,
460 #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
461 fast_round_ms: Option<u64>,
462 #[graphql(
463 desc = "The duration of the first single-leader and all multi-leader rounds",
464 default = 10_000
465 )]
466 base_timeout_ms: u64,
467 #[graphql(
468 desc = "The number of milliseconds by which the timeout increases after each \
469 single-leader round",
470 default = 1_000
471 )]
472 timeout_increment_ms: u64,
473 #[graphql(
474 desc = "The age of an incoming tracked or protected message after which the \
475 validators start transitioning the chain to fallback mode, in milliseconds.",
476 default = 86_400_000
477 )]
478 fallback_duration_ms: u64,
479 ) -> Result<CryptoHash, Error> {
480 let operation = SystemOperation::ChangeOwnership {
481 super_owners: Vec::new(),
482 owners: new_owners.into_iter().zip(new_weights).collect(),
483 first_leader,
484 multi_leader_rounds,
485 open_multi_leader_rounds,
486 timeout_config: TimeoutConfig {
487 fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
488 base_timeout: TimeDelta::from_millis(base_timeout_ms),
489 timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
490 fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
491 },
492 };
493 self.execute_system_operation(operation, chain_id).await
494 }
495
496 #[expect(clippy::too_many_arguments)]
498 async fn change_application_permissions(
499 &self,
500 #[graphql(desc = "The chain whose permissions are being changed")] chain_id: ChainId,
501 #[graphql(desc = "These applications are allowed to close the current chain.")]
502 close_chain: Vec<ApplicationId>,
503 #[graphql(
504 desc = "If this is `None`, all system operations and application operations are allowed.
505If it is `Some`, only operations from the specified applications are allowed,
506and no system operations."
507 )]
508 execute_operations: Option<Vec<ApplicationId>>,
509 #[graphql(
510 desc = "At least one operation or incoming message from each of these applications must occur in every block."
511 )]
512 mandatory_applications: Vec<ApplicationId>,
513 #[graphql(desc = "These applications are allowed to change the application permissions.")]
514 change_application_permissions: Vec<ApplicationId>,
515 #[graphql(
516 desc = "These applications are allowed to perform calls to services as oracles."
517 )]
518 call_service_as_oracle: Option<Vec<ApplicationId>>,
519 #[graphql(desc = "These applications are allowed to perform HTTP requests.")]
520 make_http_requests: Option<Vec<ApplicationId>>,
521 ) -> Result<CryptoHash, Error> {
522 let operation = SystemOperation::ChangeApplicationPermissions(ApplicationPermissions {
523 execute_operations,
524 mandatory_applications,
525 close_chain,
526 change_application_permissions,
527 call_service_as_oracle,
528 make_http_requests,
529 });
530 self.execute_system_operation(operation, chain_id).await
531 }
532
533 async fn create_committee(
537 &self,
538 chain_id: ChainId,
539 committee: Committee,
540 ) -> Result<CryptoHash, Error> {
541 Ok(self
542 .apply_client_command(&chain_id, move |client| {
543 let committee = committee.clone();
544 async move {
545 let result = client
546 .stage_new_committee(committee)
547 .await
548 .map_err(Error::from);
549 (result, client)
550 }
551 })
552 .await?
553 .hash())
554 }
555
556 async fn remove_committee(&self, chain_id: ChainId, epoch: Epoch) -> Result<CryptoHash, Error> {
560 let operation = SystemOperation::Admin(AdminOperation::RemoveCommittee { epoch });
561 self.execute_system_operation(operation, chain_id).await
562 }
563
564 async fn publish_module(
566 &self,
567 #[graphql(desc = "The chain publishing the module")] chain_id: ChainId,
568 #[graphql(desc = "The bytecode of the contract code")] contract: Bytecode,
569 #[graphql(desc = "The bytecode of the service code (only relevant for WebAssembly)")]
570 service: Bytecode,
571 #[graphql(desc = "The virtual machine being used (either Wasm or Evm)")]
572 vm_runtime: VmRuntime,
573 ) -> Result<ModuleId, Error> {
574 self.apply_client_command(&chain_id, move |client| {
575 let contract = contract.clone();
576 let service = service.clone();
577 async move {
578 let result = client
579 .publish_module(contract, service, vm_runtime)
580 .await
581 .map_err(Error::from)
582 .map(|outcome| outcome.map(|(module_id, _)| module_id));
583 (result, client)
584 }
585 })
586 .await
587 }
588
589 async fn publish_data_blob(
591 &self,
592 #[graphql(desc = "The chain paying for the blob publication")] chain_id: ChainId,
593 #[graphql(desc = "The content of the data blob being created")] bytes: Vec<u8>,
594 ) -> Result<CryptoHash, Error> {
595 self.apply_client_command(&chain_id, |client| {
596 let bytes = bytes.clone();
597 async move {
598 let result = client.publish_data_blob(bytes).await.map_err(Error::from);
599 (result, client)
600 }
601 })
602 .await
603 .map(|_| CryptoHash::new(&BlobContent::new_data(bytes)))
604 }
605
606 async fn create_application(
608 &self,
609 #[graphql(desc = "The chain paying for the creation of the application")] chain_id: ChainId,
610 #[graphql(desc = "The module ID of the application being created")] module_id: ModuleId,
611 #[graphql(desc = "The JSON serialization of the parameters of the application")]
612 parameters: String,
613 #[graphql(
614 desc = "The JSON serialization of the instantiation argument of the application"
615 )]
616 instantiation_argument: String,
617 #[graphql(desc = "The dependencies of the application being created")]
618 required_application_ids: Vec<ApplicationId>,
619 ) -> Result<ApplicationId, Error> {
620 self.apply_client_command(&chain_id, move |client| {
621 let parameters = parameters.as_bytes().to_vec();
622 let instantiation_argument = instantiation_argument.as_bytes().to_vec();
623 let required_application_ids = required_application_ids.clone();
624 async move {
625 let result = client
626 .create_application_untyped(
627 module_id,
628 parameters,
629 instantiation_argument,
630 required_application_ids,
631 )
632 .await
633 .map_err(Error::from)
634 .map(|outcome| outcome.map(|(application_id, _)| application_id));
635 (result, client)
636 }
637 })
638 .await
639 }
640}
641
642#[async_graphql::Object(cache_control(no_cache))]
643impl<C> QueryRoot<C>
644where
645 C: ClientContext + 'static,
646{
647 async fn chain(
648 &self,
649 chain_id: ChainId,
650 ) -> Result<
651 ChainStateExtendedView<<C::Environment as linera_core::Environment>::StorageContext>,
652 Error,
653 > {
654 let client = self
655 .context
656 .lock()
657 .await
658 .make_chain_client(chain_id)
659 .await?;
660 let view = client.chain_state_view().await?;
661 Ok(ChainStateExtendedView::new(view))
662 }
663
664 async fn applications(&self, chain_id: ChainId) -> Result<Vec<ApplicationOverview>, Error> {
665 let client = self
666 .context
667 .lock()
668 .await
669 .make_chain_client(chain_id)
670 .await?;
671 let applications = client
672 .chain_state_view()
673 .await?
674 .execution_state
675 .list_applications()
676 .await?;
677
678 let overviews = applications
679 .into_iter()
680 .map(|(id, description)| ApplicationOverview::new(id, description, self.port, chain_id))
681 .collect();
682
683 Ok(overviews)
684 }
685
686 async fn chains(&self) -> Result<Chains, Error> {
687 Ok(Chains {
688 list: self
689 .context
690 .lock()
691 .await
692 .wallet()
693 .chain_ids()
694 .try_collect()
695 .await?,
696 default: self.default_chain,
697 })
698 }
699
700 async fn block(
701 &self,
702 hash: Option<CryptoHash>,
703 chain_id: ChainId,
704 ) -> Result<Option<ConfirmedBlock>, Error> {
705 let client = self
706 .context
707 .lock()
708 .await
709 .make_chain_client(chain_id)
710 .await?;
711 let hash = match hash {
712 Some(hash) => Some(hash),
713 None => client.chain_info().await?.block_hash,
714 };
715 if let Some(hash) = hash {
716 let block = client.read_confirmed_block(hash).await?;
717 Ok(Some(block))
718 } else {
719 Ok(None)
720 }
721 }
722
723 async fn events_from_index(
724 &self,
725 chain_id: ChainId,
726 stream_id: StreamId,
727 start_index: u32,
728 ) -> Result<Vec<IndexAndEvent>, Error> {
729 Ok(self
730 .context
731 .lock()
732 .await
733 .make_chain_client(chain_id)
734 .await?
735 .events_from_index(stream_id, start_index)
736 .await?)
737 }
738
739 async fn blocks(
740 &self,
741 from: Option<CryptoHash>,
742 chain_id: ChainId,
743 limit: Option<u32>,
744 ) -> Result<Vec<ConfirmedBlock>, Error> {
745 let client = self
746 .context
747 .lock()
748 .await
749 .make_chain_client(chain_id)
750 .await?;
751 let limit = limit.unwrap_or(10);
752 let from = match from {
753 Some(from) => Some(from),
754 None => client.chain_info().await?.block_hash,
755 };
756 let Some(from) = from else {
757 return Ok(vec![]);
758 };
759 let mut hash = Some(from);
760 let mut values = Vec::new();
761 for _ in 0..limit {
762 let Some(next_hash) = hash else {
763 break;
764 };
765 let value = client.read_confirmed_block(next_hash).await?;
766 hash = value.block().header.previous_block_hash;
767 values.push(value);
768 }
769 Ok(values)
770 }
771
772 async fn version(&self) -> linera_version::VersionInfo {
774 linera_version::VersionInfo::default()
775 }
776}
777
778struct ChainStateViewExtension(ChainId);
782
783#[async_graphql::Object(cache_control(no_cache))]
784impl ChainStateViewExtension {
785 async fn chain_id(&self) -> ChainId {
786 self.0
787 }
788}
789
790#[derive(MergedObject)]
791struct ChainStateExtendedView<C>(ChainStateViewExtension, ReadOnlyChainStateView<C>)
792where
793 C: linera_views::context::Context + Clone + Send + Sync + 'static,
794 C::Extra: linera_execution::ExecutionRuntimeContext;
795
796pub struct ReadOnlyChainStateView<C>(OwnedRwLockReadGuard<ChainStateView<C>>)
799where
800 C: linera_views::context::Context + Clone + Send + Sync + 'static;
801
802impl<C> ContainerType for ReadOnlyChainStateView<C>
803where
804 C: linera_views::context::Context + Clone + Send + Sync + 'static,
805{
806 async fn resolve_field(
807 &self,
808 context: &async_graphql::Context<'_>,
809 ) -> async_graphql::ServerResult<Option<async_graphql::Value>> {
810 self.0.resolve_field(context).await
811 }
812}
813
814impl<C> OutputType for ReadOnlyChainStateView<C>
815where
816 C: linera_views::context::Context + Clone + Send + Sync + 'static,
817{
818 fn type_name() -> Cow<'static, str> {
819 ChainStateView::<C>::type_name()
820 }
821
822 fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
823 ChainStateView::<C>::create_type_info(registry)
824 }
825
826 async fn resolve(
827 &self,
828 context: &async_graphql::ContextSelectionSet<'_>,
829 field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
830 ) -> async_graphql::ServerResult<async_graphql::Value> {
831 self.0.resolve(context, field).await
832 }
833}
834
835impl<C> ChainStateExtendedView<C>
836where
837 C: linera_views::context::Context + Clone + Send + Sync + 'static,
838 C::Extra: linera_execution::ExecutionRuntimeContext,
839{
840 fn new(view: OwnedRwLockReadGuard<ChainStateView<C>>) -> Self {
841 Self(
842 ChainStateViewExtension(view.chain_id()),
843 ReadOnlyChainStateView(view),
844 )
845 }
846}
847
848#[derive(SimpleObject)]
849pub struct ApplicationOverview {
850 id: ApplicationId,
851 description: ApplicationDescription,
852 link: String,
853}
854
855impl ApplicationOverview {
856 fn new(
857 id: ApplicationId,
858 description: ApplicationDescription,
859 port: NonZeroU16,
860 chain_id: ChainId,
861 ) -> Self {
862 Self {
863 id,
864 description,
865 link: format!(
866 "http://localhost:{}/chains/{}/applications/{}",
867 port.get(),
868 chain_id,
869 id
870 ),
871 }
872 }
873}
874
875pub struct NodeService<C>
878where
879 C: ClientContext + 'static,
880{
881 config: ChainListenerConfig,
882 port: NonZeroU16,
883 #[cfg(with_metrics)]
884 metrics_port: NonZeroU16,
885 default_chain: Option<ChainId>,
886 context: Arc<Mutex<C>>,
887}
888
889impl<C> Clone for NodeService<C>
890where
891 C: ClientContext + 'static,
892{
893 fn clone(&self) -> Self {
894 Self {
895 config: self.config.clone(),
896 port: self.port,
897 #[cfg(with_metrics)]
898 metrics_port: self.metrics_port,
899 default_chain: self.default_chain,
900 context: Arc::clone(&self.context),
901 }
902 }
903}
904
905impl<C> NodeService<C>
906where
907 C: ClientContext,
908{
909 pub fn new(
911 config: ChainListenerConfig,
912 port: NonZeroU16,
913 #[cfg(with_metrics)] metrics_port: NonZeroU16,
914 default_chain: Option<ChainId>,
915 context: Arc<Mutex<C>>,
916 ) -> Self {
917 Self {
918 config,
919 port,
920 #[cfg(with_metrics)]
921 metrics_port,
922 default_chain,
923 context,
924 }
925 }
926
927 #[cfg(with_metrics)]
928 pub fn metrics_address(&self) -> SocketAddr {
929 SocketAddr::from(([0, 0, 0, 0], self.metrics_port.get()))
930 }
931
932 pub fn schema(&self) -> Schema<QueryRoot<C>, MutationRoot<C>, SubscriptionRoot<C>> {
933 Schema::build(
934 QueryRoot {
935 context: Arc::clone(&self.context),
936 port: self.port,
937 default_chain: self.default_chain,
938 },
939 MutationRoot {
940 context: Arc::clone(&self.context),
941 },
942 SubscriptionRoot {
943 context: Arc::clone(&self.context),
944 },
945 )
946 .finish()
947 }
948
949 #[instrument(name = "node_service", level = "info", skip_all, fields(port = ?self.port))]
951 pub async fn run(
952 self,
953 cancellation_token: CancellationToken,
954 command_receiver: UnboundedReceiver<ListenerCommand>,
955 ) -> Result<(), anyhow::Error> {
956 let port = self.port.get();
957 let index_handler = axum::routing::get(util::graphiql).post(Self::index_handler);
958 let application_handler =
959 axum::routing::get(util::graphiql).post(Self::application_handler);
960
961 #[cfg(with_metrics)]
962 monitoring_server::start_metrics(self.metrics_address(), cancellation_token.clone());
963
964 let app = Router::new()
965 .route("/", index_handler)
966 .route(
967 "/chains/{chain_id}/applications/{application_id}",
968 application_handler,
969 )
970 .route("/ready", axum::routing::get(|| async { "ready!" }))
971 .route_service("/ws", GraphQLSubscription::new(self.schema()))
972 .layer(Extension(self.clone()))
973 .layer(CorsLayer::permissive());
975
976 info!("GraphiQL IDE: http://localhost:{}", port);
977
978 let storage = self.context.lock().await.storage().clone();
979
980 let chain_listener = ChainListener::new(
981 self.config,
982 self.context,
983 storage,
984 cancellation_token.clone(),
985 command_receiver,
986 )
987 .run(true)
988 .await?;
989 let mut chain_listener = Box::pin(chain_listener).fuse();
990 let tcp_listener =
991 tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
992 let server = axum::serve(tcp_listener, app)
993 .with_graceful_shutdown(cancellation_token.cancelled_owned())
994 .into_future();
995 futures::select! {
996 result = chain_listener => result?,
997 result = Box::pin(server).fuse() => result?,
998 };
999
1000 Ok(())
1001 }
1002
1003 async fn handle_service_request(
1005 &self,
1006 application_id: ApplicationId,
1007 request: Vec<u8>,
1008 chain_id: ChainId,
1009 block_hash: Option<CryptoHash>,
1010 ) -> Result<Vec<u8>, NodeServiceError> {
1011 let QueryOutcome {
1012 response,
1013 operations,
1014 } = self
1015 .query_user_application(application_id, request, chain_id, block_hash)
1016 .await?;
1017 if operations.is_empty() {
1018 return Ok(response);
1019 }
1020
1021 trace!("Query requested a new block with operations: {operations:?}");
1022 let client = self
1023 .context
1024 .lock()
1025 .await
1026 .make_chain_client(chain_id)
1027 .await?;
1028 let hash = loop {
1029 let timeout = match client
1030 .execute_operations(operations.clone(), vec![])
1031 .await?
1032 {
1033 ClientOutcome::Committed(certificate) => break certificate.hash(),
1034 ClientOutcome::WaitForTimeout(timeout) => timeout,
1035 };
1036 let mut stream = client.subscribe().map_err(|_| {
1037 chain_client::Error::InternalError("Could not subscribe to the local node.")
1038 })?;
1039 util::wait_for_next_round(&mut stream, timeout).await;
1040 };
1041 let response = async_graphql::Response::new(hash.to_value());
1042 Ok(serde_json::to_vec(&response)?)
1043 }
1044
1045 async fn query_user_application(
1047 &self,
1048 application_id: ApplicationId,
1049 bytes: Vec<u8>,
1050 chain_id: ChainId,
1051 block_hash: Option<CryptoHash>,
1052 ) -> Result<QueryOutcome<Vec<u8>>, NodeServiceError> {
1053 let query = Query::User {
1054 application_id,
1055 bytes,
1056 };
1057 let client = self
1058 .context
1059 .lock()
1060 .await
1061 .make_chain_client(chain_id)
1062 .await?;
1063 let QueryOutcome {
1064 response,
1065 operations,
1066 } = client.query_application(query, block_hash).await?;
1067 match response {
1068 QueryResponse::System(_) => {
1069 unreachable!("cannot get a system response for a user query")
1070 }
1071 QueryResponse::User(user_response_bytes) => Ok(QueryOutcome {
1072 response: user_response_bytes,
1073 operations,
1074 }),
1075 }
1076 }
1077
1078 async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
1080 service
1081 .0
1082 .schema()
1083 .execute(request.into_inner())
1084 .await
1085 .into()
1086 }
1087
1088 async fn application_handler(
1092 Path((chain_id, application_id)): Path<(String, String)>,
1093 service: Extension<Self>,
1094 request: String,
1095 ) -> Result<Vec<u8>, NodeServiceError> {
1096 let chain_id: ChainId = chain_id.parse().map_err(NodeServiceError::InvalidChainId)?;
1097 let application_id: ApplicationId = application_id.parse()?;
1098
1099 debug!(
1100 %chain_id,
1101 %application_id,
1102 "processing request for application:\n{:?}",
1103 &request
1104 );
1105 let response = service
1106 .0
1107 .handle_service_request(application_id, request.into_bytes(), chain_id, None)
1108 .await?;
1109
1110 Ok(response)
1111 }
1112}