1use std::{
5 borrow::Cow,
6 future::IntoFuture,
7 iter,
8 net::SocketAddr,
9 num::NonZeroU16,
10 sync::{Arc, Mutex as StdMutex},
11};
12
13use async_graphql::{
14 futures_util::Stream,
15 registry::{MetaType, MetaTypeId, Registry},
16 resolver_utils::ContainerType,
17 EmptyMutation, Error, MergedObject, OutputType, Positioned, Request, Response, ScalarType,
18 Schema, SimpleObject, Subscription,
19};
20use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
21use axum::{extract::Path, http::StatusCode, response, response::IntoResponse, Extension, Router};
22use futures::{lock::Mutex, Future, FutureExt as _, StreamExt as _, TryStreamExt as _};
23use linera_base::{
24 crypto::{CryptoError, CryptoHash},
25 data_types::{
26 Amount, ApplicationDescription, ApplicationPermissions, BlockHeight, Bytecode, Epoch,
27 TimeDelta,
28 },
29 identifiers::{
30 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
31 },
32 ownership::{ChainOwnership, TimeoutConfig},
33 vm::VmRuntime,
34 BcsHexParseError,
35};
36use linera_chain::{
37 types::{ConfirmedBlock, GenericCertificate},
38 ChainStateView,
39};
40use linera_client::chain_listener::{
41 ChainListener, ChainListenerConfig, ClientContext, ListenerCommand,
42};
43use linera_core::{
44 client::chain_client::{self, ChainClient},
45 data_types::ClientOutcome,
46 wallet::Wallet as _,
47 worker::{ChainStateViewReadGuard, Notification, Reason},
48};
49use linera_execution::{
50 committee::Committee, system::AdminOperation, Operation, Query, QueryOutcome, QueryResponse,
51 SystemOperation,
52};
53#[cfg(with_metrics)]
54use linera_metrics::monitoring_server;
55use linera_sdk::linera_base_types::BlobContent;
56use linera_storage::Storage;
57use lru::LruCache;
58use serde::{Deserialize, Serialize};
59use serde_json::json;
60use tokio::sync::mpsc::UnboundedReceiver;
61use tokio_util::sync::CancellationToken;
62use tower_http::cors::CorsLayer;
63use tracing::{debug, info, instrument, trace};
64
65use crate::util;
66
67#[derive(Clone)]
73struct RawJson(String);
74
75impl OutputType for RawJson {
76 fn type_name() -> Cow<'static, str> {
77 Cow::Borrowed("JSON")
78 }
79
80 fn create_type_info(registry: &mut Registry) -> String {
81 registry.create_output_type::<Self, _>(MetaTypeId::Scalar, |_| MetaType::Scalar {
82 name: "JSON".to_string(),
83 description: Some("A scalar that can represent any JSON value.".to_string()),
84 is_valid: None,
85 visible: None,
86 inaccessible: false,
87 tags: Default::default(),
88 specified_by_url: None,
89 directive_invocations: Default::default(),
90 requires_scopes: Default::default(),
91 })
92 }
93
94 async fn resolve(
95 &self,
96 _ctx: &async_graphql::ContextSelectionSet<'_>,
97 _field: &Positioned<async_graphql::parser::types::Field>,
98 ) -> async_graphql::ServerResult<async_graphql::Value> {
99 Ok(async_graphql::Value::Object(
104 std::iter::once((
105 async_graphql::Name::new(async_graphql_value::RAW_VALUE_TOKEN),
106 async_graphql::Value::String(self.0.clone()),
107 ))
108 .collect(),
109 ))
110 }
111}
112
113#[derive(SimpleObject, Serialize, Deserialize, Clone)]
114pub struct Chains {
115 pub list: Vec<ChainId>,
116 pub default: Option<ChainId>,
117}
118
119pub struct QueryRoot<C> {
121 context: Arc<Mutex<C>>,
122 port: NonZeroU16,
123 default_chain: Option<ChainId>,
124}
125
126pub struct SubscriptionRoot<C> {
128 context: Arc<Mutex<C>>,
129 query_subscriptions: Option<Arc<crate::query_subscription::QuerySubscriptionManager>>,
130 cancellation_token: CancellationToken,
131}
132
133pub struct MutationRoot<C> {
135 context: Arc<Mutex<C>>,
136}
137
138#[derive(Debug, thiserror::Error)]
139enum NodeServiceError {
140 #[error(transparent)]
141 ChainClient(#[from] chain_client::Error),
142 #[error(transparent)]
143 BcsHex(#[from] BcsHexParseError),
144 #[error(transparent)]
145 Json(#[from] serde_json::Error),
146 #[error("malformed chain ID: {0}")]
147 InvalidChainId(CryptoError),
148 #[error(transparent)]
149 Client(#[from] linera_client::Error),
150 #[error("scheduling operations from queries is disabled in read-only mode")]
151 ReadOnlyModeOperationsNotAllowed,
152}
153
154impl IntoResponse for NodeServiceError {
155 fn into_response(self) -> response::Response {
156 let status = match self {
157 NodeServiceError::InvalidChainId(_) | NodeServiceError::BcsHex(_) => {
158 StatusCode::BAD_REQUEST
159 }
160 NodeServiceError::ReadOnlyModeOperationsNotAllowed => StatusCode::FORBIDDEN,
161 _ => StatusCode::INTERNAL_SERVER_ERROR,
162 };
163 let body = json!({"error": self.to_string()}).to_string();
164 (status, body).into_response()
165 }
166}
167
168#[Subscription]
169impl<C> SubscriptionRoot<C>
170where
171 C: ClientContext + 'static,
172{
173 async fn notifications(
175 &self,
176 chain_id: ChainId,
177 ) -> Result<impl Stream<Item = Notification>, Error> {
178 let client = self
179 .context
180 .lock()
181 .await
182 .make_chain_client(chain_id)
183 .await?;
184 Ok(client.subscribe()?)
185 }
186
187 async fn query_result(
190 &self,
191 #[graphql(desc = "Name of the registered subscription query.")] name: String,
192 #[graphql(desc = "The chain to watch.")] chain_id: ChainId,
193 #[graphql(desc = "The application to query.")] application_id: ApplicationId,
194 ) -> Result<impl Stream<Item = RawJson>, Error> {
195 let manager = self
196 .query_subscriptions
197 .as_ref()
198 .ok_or_else(|| Error::new("no subscription queries registered"))?;
199
200 let key = crate::query_subscription::SubscriptionKey {
201 name,
202 chain_id,
203 application_id,
204 };
205
206 let receiver = manager
207 .subscribe(
208 &key,
209 Arc::clone(&self.context),
210 self.cancellation_token.clone(),
211 )
212 .map_err(|e| Error::new(e.to_string()))?;
213
214 let current = receiver.borrow().clone();
219 let changes = tokio_stream::wrappers::WatchStream::from_changes(receiver)
220 .filter_map(|value| async move { value });
221 Ok(futures::stream::iter(current).chain(changes).map(RawJson))
222 }
223}
224
225impl<C> MutationRoot<C>
226where
227 C: ClientContext,
228{
229 async fn execute_system_operation(
230 &self,
231 system_operation: SystemOperation,
232 chain_id: ChainId,
233 ) -> Result<CryptoHash, Error> {
234 let certificate = self
235 .apply_client_command(&chain_id, move |client| {
236 let operation = Operation::system(system_operation.clone());
237 async move {
238 let result = client
239 .execute_operation(operation)
240 .await
241 .map_err(Error::from);
242 (result, client)
243 }
244 })
245 .await?;
246 Ok(certificate.hash())
247 }
248
249 async fn apply_client_command<F, Fut, T>(
253 &self,
254 chain_id: &ChainId,
255 mut f: F,
256 ) -> Result<T, Error>
257 where
258 F: FnMut(ChainClient<C::Environment>) -> Fut,
259 Fut: Future<Output = (Result<ClientOutcome<T>, Error>, ChainClient<C::Environment>)>,
260 {
261 loop {
262 let client = self
263 .context
264 .lock()
265 .await
266 .make_chain_client(*chain_id)
267 .await?;
268 let mut stream = client.subscribe()?;
269 let (result, client) = f(client).await;
270 self.context.lock().await.update_wallet(&client).await?;
271 let timeout = match result? {
272 ClientOutcome::Committed(t) => return Ok(t),
273 ClientOutcome::Conflict(certificate) => {
274 return Err(chain_client::Error::Conflict(certificate.hash()).into());
275 }
276 ClientOutcome::WaitForTimeout(timeout) => timeout,
277 };
278 drop(client);
279 util::wait_for_next_round(&mut stream, timeout).await;
280 }
281 }
282}
283
284#[async_graphql::Object(cache_control(no_cache))]
285impl<C> MutationRoot<C>
286where
287 C: ClientContext + 'static,
288{
289 async fn process_inbox(
291 &self,
292 #[graphql(desc = "The chain whose inbox is being processed.")] chain_id: ChainId,
293 ) -> Result<Vec<CryptoHash>, Error> {
294 let mut hashes = Vec::new();
295 loop {
296 let client = self
297 .context
298 .lock()
299 .await
300 .make_chain_client(chain_id)
301 .await?;
302 let result = client.process_inbox().await;
303 self.context.lock().await.update_wallet(&client).await?;
304 let (certificates, maybe_timeout) = result?;
305 hashes.extend(certificates.into_iter().map(|cert| cert.hash()));
306 match maybe_timeout {
307 None => return Ok(hashes),
308 Some(timestamp) => {
309 let mut stream = client.subscribe()?;
310 drop(client);
311 util::wait_for_next_round(&mut stream, timestamp).await;
312 }
313 }
314 }
315 }
316
317 async fn sync(
322 &self,
323 #[graphql(desc = "The chain being synchronized.")] chain_id: ChainId,
324 ) -> Result<u64, Error> {
325 let client = self
326 .context
327 .lock()
328 .await
329 .make_chain_client(chain_id)
330 .await?;
331 let info = client.synchronize_from_validators().await?;
332 self.context.lock().await.update_wallet(&client).await?;
333 Ok(info.next_block_height.0)
334 }
335
336 async fn retry_pending_block(
338 &self,
339 #[graphql(desc = "The chain on whose block is being retried.")] chain_id: ChainId,
340 ) -> Result<Option<CryptoHash>, Error> {
341 let client = self
342 .context
343 .lock()
344 .await
345 .make_chain_client(chain_id)
346 .await?;
347 let outcome = client.process_pending_block().await?;
348 self.context.lock().await.update_wallet(&client).await?;
349 match outcome {
350 ClientOutcome::Committed(Some(certificate)) => Ok(Some(certificate.hash())),
351 ClientOutcome::Committed(None) => Ok(None),
352 ClientOutcome::WaitForTimeout(timeout) => Err(Error::from(format!(
353 "Please try again at {}",
354 timeout.timestamp
355 ))),
356 ClientOutcome::Conflict(certificate) => Err(Error::from(format!(
357 "A different block was committed: {}",
358 certificate.hash()
359 ))),
360 }
361 }
362
363 async fn transfer(
366 &self,
367 #[graphql(desc = "The chain which native tokens are being transferred from.")]
368 chain_id: ChainId,
369 #[graphql(desc = "The account being debited on the chain.")] owner: AccountOwner,
370 #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
371 #[graphql(desc = "The amount being transferred.")] amount: Amount,
372 ) -> Result<CryptoHash, Error> {
373 self.apply_client_command(&chain_id, move |client| async move {
374 let result = client
375 .transfer(owner, amount, recipient)
376 .await
377 .map_err(Error::from)
378 .map(|outcome| outcome.map(|certificate| certificate.hash()));
379 (result, client)
380 })
381 .await
382 }
383
384 async fn claim(
388 &self,
389 #[graphql(desc = "The chain for whom owner is one of the owner.")] chain_id: ChainId,
390 #[graphql(desc = "The owner of chain targetId being debited.")] owner: AccountOwner,
391 #[graphql(desc = "The chain whose owner is being debited.")] target_id: ChainId,
392 #[graphql(desc = "The recipient of the transfer.")] recipient: Account,
393 #[graphql(desc = "The amount being transferred.")] amount: Amount,
394 ) -> Result<CryptoHash, Error> {
395 self.apply_client_command(&chain_id, move |client| async move {
396 let result = client
397 .claim(owner, target_id, recipient, amount)
398 .await
399 .map_err(Error::from)
400 .map(|outcome| outcome.map(|certificate| certificate.hash()));
401 (result, client)
402 })
403 .await
404 }
405
406 async fn read_data_blob(
409 &self,
410 chain_id: ChainId,
411 hash: CryptoHash,
412 ) -> Result<CryptoHash, Error> {
413 self.apply_client_command(&chain_id, move |client| async move {
414 let result = client
415 .read_data_blob(hash)
416 .await
417 .map_err(Error::from)
418 .map(|outcome| outcome.map(|certificate| certificate.hash()));
419 (result, client)
420 })
421 .await
422 }
423
424 async fn open_chain(
426 &self,
427 #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
428 #[graphql(desc = "The owner of the new chain.")] owner: AccountOwner,
429 #[graphql(desc = "The balance of the chain being created. Zero if `None`.")]
430 balance: Option<Amount>,
431 ) -> Result<ChainId, Error> {
432 let ownership = ChainOwnership::single(owner);
433 let balance = balance.unwrap_or(Amount::ZERO);
434 let description = self
435 .apply_client_command(&chain_id, move |client| {
436 let ownership = ownership.clone();
437 async move {
438 let result = client
439 .open_chain(ownership, ApplicationPermissions::default(), balance)
440 .await
441 .map_err(Error::from)
442 .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
443 (result, client)
444 }
445 })
446 .await?;
447 Ok(description.id())
448 }
449
450 #[expect(clippy::too_many_arguments)]
452 async fn open_multi_owner_chain(
453 &self,
454 #[graphql(desc = "The chain paying for the creation of the new chain.")] chain_id: ChainId,
455 #[graphql(desc = "Permissions for applications on the new chain")]
456 application_permissions: Option<ApplicationPermissions>,
457 #[graphql(desc = "The owners of the chain")] owners: Vec<AccountOwner>,
458 #[graphql(desc = "The weights of the owners")] weights: Option<Vec<u64>>,
459 #[graphql(desc = "The number of multi-leader rounds")] multi_leader_rounds: Option<u32>,
460 #[graphql(desc = "The balance of the chain. Zero if `None`")] balance: Option<Amount>,
461 #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
462 fast_round_ms: Option<u64>,
463 #[graphql(
464 desc = "The duration of the first single-leader and all multi-leader rounds",
465 default = 10_000
466 )]
467 base_timeout_ms: u64,
468 #[graphql(
469 desc = "The number of milliseconds by which the timeout increases after each \
470 single-leader round",
471 default = 1_000
472 )]
473 timeout_increment_ms: u64,
474 #[graphql(
475 desc = "The age of an incoming tracked or protected message after which the \
476 validators start transitioning the chain to fallback mode, in milliseconds.",
477 default = 86_400_000
478 )]
479 fallback_duration_ms: u64,
480 ) -> Result<ChainId, Error> {
481 let owners = if let Some(weights) = weights {
482 if weights.len() != owners.len() {
483 return Err(Error::new(format!(
484 "There are {} owners but {} weights.",
485 owners.len(),
486 weights.len()
487 )));
488 }
489 owners.into_iter().zip(weights).collect::<Vec<_>>()
490 } else {
491 owners
492 .into_iter()
493 .zip(iter::repeat(100))
494 .collect::<Vec<_>>()
495 };
496 let multi_leader_rounds = multi_leader_rounds.unwrap_or(u32::MAX);
497 let timeout_config = TimeoutConfig {
498 fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
499 base_timeout: TimeDelta::from_millis(base_timeout_ms),
500 timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
501 fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
502 };
503 let ownership = ChainOwnership::multiple(owners, multi_leader_rounds, timeout_config);
504 let balance = balance.unwrap_or(Amount::ZERO);
505 let description = self
506 .apply_client_command(&chain_id, move |client| {
507 let ownership = ownership.clone();
508 let application_permissions = application_permissions.clone().unwrap_or_default();
509 async move {
510 let result = client
511 .open_chain(ownership, application_permissions, balance)
512 .await
513 .map_err(Error::from)
514 .map(|outcome| outcome.map(|(chain_id, _)| chain_id));
515 (result, client)
516 }
517 })
518 .await?;
519 Ok(description.id())
520 }
521
522 async fn close_chain(
524 &self,
525 #[graphql(desc = "The chain being closed.")] chain_id: ChainId,
526 ) -> Result<Option<CryptoHash>, Error> {
527 let maybe_cert = self
528 .apply_client_command(&chain_id, |client| async move {
529 let result = client.close_chain().await.map_err(Error::from);
530 (result, client)
531 })
532 .await?;
533 Ok(maybe_cert.as_ref().map(GenericCertificate::hash))
534 }
535
536 async fn change_owner(
538 &self,
539 #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
540 #[graphql(desc = "The new single owner of the chain")] new_owner: AccountOwner,
541 ) -> Result<CryptoHash, Error> {
542 let operation = SystemOperation::ChangeOwnership {
543 super_owners: vec![new_owner],
544 owners: Vec::new(),
545 first_leader: None,
546 multi_leader_rounds: 5,
547 open_multi_leader_rounds: false,
548 timeout_config: TimeoutConfig::default(),
549 };
550 self.execute_system_operation(operation, chain_id).await
551 }
552
553 #[expect(clippy::too_many_arguments)]
555 async fn change_multiple_owners(
556 &self,
557 #[graphql(desc = "The chain whose ownership changes")] chain_id: ChainId,
558 #[graphql(desc = "The new list of owners of the chain")] new_owners: Vec<AccountOwner>,
559 #[graphql(desc = "The new list of weights of the owners")] new_weights: Vec<u64>,
560 #[graphql(desc = "The multi-leader round of the chain")] multi_leader_rounds: u32,
561 #[graphql(
562 desc = "Whether multi-leader rounds are unrestricted, that is not limited to chain owners."
563 )]
564 open_multi_leader_rounds: bool,
565 #[graphql(desc = "The leader of the first single-leader round. \
566 If not set, this is random like other rounds.")]
567 first_leader: Option<AccountOwner>,
568 #[graphql(desc = "The duration of the fast round, in milliseconds; default: no timeout")]
569 fast_round_ms: Option<u64>,
570 #[graphql(
571 desc = "The duration of the first single-leader and all multi-leader rounds",
572 default = 10_000
573 )]
574 base_timeout_ms: u64,
575 #[graphql(
576 desc = "The number of milliseconds by which the timeout increases after each \
577 single-leader round",
578 default = 1_000
579 )]
580 timeout_increment_ms: u64,
581 #[graphql(
582 desc = "The age of an incoming tracked or protected message after which the \
583 validators start transitioning the chain to fallback mode, in milliseconds.",
584 default = 86_400_000
585 )]
586 fallback_duration_ms: u64,
587 ) -> Result<CryptoHash, Error> {
588 let operation = SystemOperation::ChangeOwnership {
589 super_owners: Vec::new(),
590 owners: new_owners.into_iter().zip(new_weights).collect(),
591 first_leader,
592 multi_leader_rounds,
593 open_multi_leader_rounds,
594 timeout_config: TimeoutConfig {
595 fast_round_duration: fast_round_ms.map(TimeDelta::from_millis),
596 base_timeout: TimeDelta::from_millis(base_timeout_ms),
597 timeout_increment: TimeDelta::from_millis(timeout_increment_ms),
598 fallback_duration: TimeDelta::from_millis(fallback_duration_ms),
599 },
600 };
601 self.execute_system_operation(operation, chain_id).await
602 }
603
604 #[expect(clippy::too_many_arguments)]
606 async fn change_application_permissions(
607 &self,
608 #[graphql(desc = "The chain whose permissions are being changed")] chain_id: ChainId,
609 #[graphql(
610 desc = "These applications are allowed to manage the chain: close it, change \
611 application permissions, and change ownership."
612 )]
613 manage_chain: Vec<ApplicationId>,
614 #[graphql(
615 desc = "If this is `None`, all system operations and application operations are allowed.
616If it is `Some`, only operations from the specified applications are allowed,
617and no system operations."
618 )]
619 execute_operations: Option<Vec<ApplicationId>>,
620 #[graphql(
621 desc = "At least one operation or incoming message from each of these applications must occur in every block."
622 )]
623 mandatory_applications: Vec<ApplicationId>,
624 #[graphql(
625 desc = "These applications are allowed to perform calls to services as oracles."
626 )]
627 call_service_as_oracle: Option<Vec<ApplicationId>>,
628 #[graphql(desc = "These applications are allowed to perform HTTP requests.")]
629 make_http_requests: Option<Vec<ApplicationId>>,
630 ) -> Result<CryptoHash, Error> {
631 let operation = SystemOperation::ChangeApplicationPermissions(ApplicationPermissions {
632 execute_operations,
633 mandatory_applications,
634 manage_chain,
635 call_service_as_oracle,
636 make_http_requests,
637 });
638 self.execute_system_operation(operation, chain_id).await
639 }
640
641 async fn create_committee(
645 &self,
646 chain_id: ChainId,
647 committee: Committee,
648 ) -> Result<CryptoHash, Error> {
649 Ok(self
650 .apply_client_command(&chain_id, move |client| {
651 let committee = committee.clone();
652 async move {
653 let result = client
654 .stage_new_committee(committee)
655 .await
656 .map_err(Error::from);
657 (result, client)
658 }
659 })
660 .await?
661 .hash())
662 }
663
664 async fn remove_committee(&self, chain_id: ChainId, epoch: Epoch) -> Result<CryptoHash, Error> {
668 let operation = SystemOperation::Admin(AdminOperation::RemoveCommittee { epoch });
669 self.execute_system_operation(operation, chain_id).await
670 }
671
672 async fn publish_module(
676 &self,
677 #[graphql(desc = "The chain publishing the module")] chain_id: ChainId,
678 #[graphql(desc = "The bytecode of the contract code")] contract: Bytecode,
679 #[graphql(desc = "The bytecode of the service code (only relevant for WebAssembly)")]
680 service: Bytecode,
681 #[graphql(desc = "The virtual machine being used (either Wasm or Evm)")]
682 vm_runtime: VmRuntime,
683 #[graphql(desc = "Optional JSON-encoded `Formats` description bytes")] formats: Option<
684 Vec<u8>,
685 >,
686 ) -> Result<ModuleId, Error> {
687 self.apply_client_command(&chain_id, move |client| {
688 let contract = contract.clone();
689 let service = service.clone();
690 let formats = formats.clone();
691 async move {
692 let result = client
693 .publish_module(contract, service, vm_runtime, formats)
694 .await
695 .map_err(Error::from)
696 .map(|outcome| outcome.map(|(module_id, _)| module_id));
697 (result, client)
698 }
699 })
700 .await
701 }
702
703 async fn publish_data_blob(
705 &self,
706 #[graphql(desc = "The chain paying for the blob publication")] chain_id: ChainId,
707 #[graphql(desc = "The content of the data blob being created")] bytes: Vec<u8>,
708 ) -> Result<CryptoHash, Error> {
709 self.apply_client_command(&chain_id, |client| {
710 let bytes = bytes.clone();
711 async move {
712 let result = client.publish_data_blob(bytes).await.map_err(Error::from);
713 (result, client)
714 }
715 })
716 .await
717 .map(|_| CryptoHash::new(&BlobContent::new_data(bytes)))
718 }
719
720 async fn create_application(
722 &self,
723 #[graphql(desc = "The chain paying for the creation of the application")] chain_id: ChainId,
724 #[graphql(desc = "The module ID of the application being created")] module_id: ModuleId,
725 #[graphql(desc = "The JSON serialization of the parameters of the application")]
726 parameters: String,
727 #[graphql(
728 desc = "The JSON serialization of the instantiation argument of the application"
729 )]
730 instantiation_argument: String,
731 #[graphql(desc = "The dependencies of the application being created")]
732 required_application_ids: Vec<ApplicationId>,
733 ) -> Result<ApplicationId, Error> {
734 self.apply_client_command(&chain_id, move |client| {
735 let parameters = parameters.as_bytes().to_vec();
736 let instantiation_argument = instantiation_argument.as_bytes().to_vec();
737 let required_application_ids = required_application_ids.clone();
738 async move {
739 let result = client
740 .create_application_untyped(
741 module_id,
742 parameters,
743 instantiation_argument,
744 required_application_ids,
745 )
746 .await
747 .map_err(Error::from)
748 .map(|outcome| outcome.map(|(application_id, _)| application_id));
749 (result, client)
750 }
751 })
752 .await
753 }
754}
755
756#[async_graphql::Object(cache_control(no_cache))]
757impl<C> QueryRoot<C>
758where
759 C: ClientContext + 'static,
760{
761 async fn chain(
762 &self,
763 chain_id: ChainId,
764 ) -> Result<ChainStateExtendedView<<C::Environment as linera_core::Environment>::Storage>, Error>
765 {
766 let client = self
767 .context
768 .lock()
769 .await
770 .make_chain_client(chain_id)
771 .await?;
772 let view = client.chain_state_view().await?;
773 Ok(ChainStateExtendedView::new(view))
774 }
775
776 async fn applications(&self, chain_id: ChainId) -> Result<Vec<ApplicationOverview>, Error> {
777 let client = self
778 .context
779 .lock()
780 .await
781 .make_chain_client(chain_id)
782 .await?;
783 let applications = client
784 .chain_state_view()
785 .await?
786 .execution_state
787 .list_applications()
788 .await?;
789
790 let overviews = applications
791 .into_iter()
792 .map(|(id, description)| ApplicationOverview::new(id, description, self.port, chain_id))
793 .collect();
794
795 Ok(overviews)
796 }
797
798 async fn chains(&self) -> Result<Chains, Error> {
799 Ok(Chains {
800 list: self
801 .context
802 .lock()
803 .await
804 .wallet()
805 .chain_ids()
806 .try_collect()
807 .await?,
808 default: self.default_chain,
809 })
810 }
811
812 async fn block(
813 &self,
814 hash: Option<CryptoHash>,
815 chain_id: ChainId,
816 ) -> Result<Option<Arc<ConfirmedBlock>>, Error> {
817 let client = self
818 .context
819 .lock()
820 .await
821 .make_chain_client(chain_id)
822 .await?;
823 let hash = match hash {
824 Some(hash) => Some(hash),
825 None => client.chain_info().await?.block_hash,
826 };
827 if let Some(hash) = hash {
828 Ok(Some(client.read_confirmed_block(hash).await?))
829 } else {
830 Ok(None)
831 }
832 }
833
834 async fn events_from_index(
835 &self,
836 chain_id: ChainId,
837 stream_id: StreamId,
838 start_index: u32,
839 ) -> Result<Vec<IndexAndEvent>, Error> {
840 Ok(self
841 .context
842 .lock()
843 .await
844 .make_chain_client(chain_id)
845 .await?
846 .events_from_index(stream_id, start_index)
847 .await?)
848 }
849
850 async fn blocks(
851 &self,
852 from: Option<CryptoHash>,
853 chain_id: ChainId,
854 limit: Option<u32>,
855 ) -> Result<Vec<Arc<ConfirmedBlock>>, Error> {
856 let client = self
857 .context
858 .lock()
859 .await
860 .make_chain_client(chain_id)
861 .await?;
862 let limit = limit.unwrap_or(10);
863 let from = match from {
864 Some(from) => Some(from),
865 None => client.chain_info().await?.block_hash,
866 };
867 let Some(from) = from else {
868 return Ok(vec![]);
869 };
870 let mut hash = Some(from);
871 let mut values = Vec::new();
872 for _ in 0..limit {
873 let Some(next_hash) = hash else {
874 break;
875 };
876 let value = client.read_confirmed_block(next_hash).await?;
877 hash = value.block().header.previous_block_hash;
878 values.push(value);
879 }
880 Ok(values)
881 }
882
883 async fn version(&self) -> linera_version::VersionInfo {
885 linera_version::VersionInfo::default()
886 }
887
888 async fn application_formats(
892 &self,
893 chain_id: ChainId,
894 formats_blob_hash: CryptoHash,
895 ) -> Result<Option<Vec<u8>>, Error> {
896 let client = self
897 .context
898 .lock()
899 .await
900 .make_chain_client(chain_id)
901 .await?;
902 let blob_id = linera_base::identifiers::BlobId::new(
903 formats_blob_hash,
904 linera_base::identifiers::BlobType::ApplicationFormats,
905 );
906 let blob = client.storage_client().read_blob(blob_id).await?;
907 Ok(blob.map(|b| b.bytes().to_vec()))
908 }
909}
910
911struct ChainStateViewExtension(ChainId);
915
916#[async_graphql::Object(cache_control(no_cache))]
917impl ChainStateViewExtension {
918 async fn chain_id(&self) -> ChainId {
919 self.0
920 }
921}
922
923#[derive(MergedObject)]
924struct ChainStateExtendedView<S: Storage>(ChainStateViewExtension, ReadOnlyChainStateView<S>);
925
926pub struct ReadOnlyChainStateView<S: Storage>(ChainStateViewReadGuard<S>);
929
930impl<S: Storage> ContainerType for ReadOnlyChainStateView<S>
931where
932 ChainStateView<S::Context>: ContainerType,
933{
934 async fn resolve_field(
935 &self,
936 context: &async_graphql::Context<'_>,
937 ) -> async_graphql::ServerResult<Option<async_graphql::Value>> {
938 self.0.resolve_field(context).await
939 }
940}
941
942impl<S: Storage> OutputType for ReadOnlyChainStateView<S>
943where
944 ChainStateView<S::Context>: OutputType,
945{
946 fn type_name() -> Cow<'static, str> {
947 ChainStateView::<S::Context>::type_name()
948 }
949
950 fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
951 ChainStateView::<S::Context>::create_type_info(registry)
952 }
953
954 async fn resolve(
955 &self,
956 context: &async_graphql::ContextSelectionSet<'_>,
957 field: &async_graphql::Positioned<async_graphql::parser::types::Field>,
958 ) -> async_graphql::ServerResult<async_graphql::Value> {
959 self.0.resolve(context, field).await
960 }
961}
962
963impl<S: Storage> ChainStateExtendedView<S> {
964 fn new(view: ChainStateViewReadGuard<S>) -> Self {
965 Self(
966 ChainStateViewExtension(view.chain_id()),
967 ReadOnlyChainStateView(view),
968 )
969 }
970}
971
972#[derive(SimpleObject)]
973pub struct ApplicationOverview {
974 id: ApplicationId,
975 description: ApplicationDescription,
976 link: String,
977}
978
979impl ApplicationOverview {
980 fn new(
981 id: ApplicationId,
982 description: ApplicationDescription,
983 port: NonZeroU16,
984 chain_id: ChainId,
985 ) -> Self {
986 Self {
987 id,
988 description,
989 link: format!(
990 "http://localhost:{}/chains/{}/applications/{}",
991 port.get(),
992 chain_id,
993 id
994 ),
995 }
996 }
997}
998
999pub enum NodeServiceSchema<C>
1001where
1002 C: ClientContext + 'static,
1003{
1004 Full(Schema<QueryRoot<C>, MutationRoot<C>, SubscriptionRoot<C>>),
1006 ReadOnly(Schema<QueryRoot<C>, EmptyMutation, SubscriptionRoot<C>>),
1008}
1009
1010impl<C> NodeServiceSchema<C>
1011where
1012 C: ClientContext,
1013{
1014 pub async fn execute(&self, request: impl Into<Request>) -> Response {
1016 match self {
1017 Self::Full(schema) => schema.execute(request).await,
1018 Self::ReadOnly(schema) => schema.execute(request).await,
1019 }
1020 }
1021
1022 pub fn sdl(&self) -> String {
1024 match self {
1025 Self::Full(schema) => schema.sdl(),
1026 Self::ReadOnly(schema) => schema.sdl(),
1027 }
1028 }
1029}
1030
1031impl<C> Clone for NodeServiceSchema<C>
1032where
1033 C: ClientContext,
1034{
1035 fn clone(&self) -> Self {
1036 match self {
1037 Self::Full(schema) => Self::Full(schema.clone()),
1038 Self::ReadOnly(schema) => Self::ReadOnly(schema.clone()),
1039 }
1040 }
1041}
1042
1043#[cfg(with_metrics)]
1044mod query_cache_metrics {
1045 use std::sync::LazyLock;
1046
1047 use linera_base::prometheus_util::{register_int_counter_vec, register_int_gauge};
1048 use prometheus::{IntCounterVec, IntGauge};
1049
1050 pub static QUERY_CACHE_HIT: LazyLock<IntCounterVec> = LazyLock::new(|| {
1051 register_int_counter_vec("query_response_cache_hit", "Query response cache hits", &[])
1052 });
1053
1054 pub static QUERY_CACHE_MISS: LazyLock<IntCounterVec> = LazyLock::new(|| {
1055 register_int_counter_vec(
1056 "query_response_cache_miss",
1057 "Query response cache misses",
1058 &[],
1059 )
1060 });
1061
1062 pub static QUERY_CACHE_INVALIDATION: LazyLock<IntCounterVec> = LazyLock::new(|| {
1063 register_int_counter_vec(
1064 "query_response_cache_invalidation",
1065 "Query response cache invalidations (per chain)",
1066 &[],
1067 )
1068 });
1069
1070 pub static QUERY_CACHE_ENTRIES: LazyLock<IntGauge> = LazyLock::new(|| {
1071 register_int_gauge(
1072 "query_response_cache_entries",
1073 "Current number of cached query responses across all chains",
1074 )
1075 });
1076}
1077
1078struct PerChainCache {
1081 lru: LruCache<(ApplicationId, Vec<u8>), Vec<u8>>,
1082 next_block_height: BlockHeight,
1083}
1084
1085struct QueryResponseCache {
1094 chains: papaya::HashMap<ChainId, StdMutex<PerChainCache>>,
1095 subscribed: papaya::HashSet<ChainId>,
1097 notification_sender: StdMutex<Option<tokio::sync::mpsc::UnboundedSender<Notification>>>,
1099 capacity_per_chain: std::num::NonZeroUsize,
1100}
1101
1102impl QueryResponseCache {
1103 fn new(capacity_per_chain: usize) -> Self {
1104 Self {
1105 chains: papaya::HashMap::new(),
1106 subscribed: papaya::HashSet::new(),
1107 notification_sender: StdMutex::new(None),
1108 capacity_per_chain: std::num::NonZeroUsize::new(capacity_per_chain)
1109 .expect("capacity must be > 0"),
1110 }
1111 }
1112
1113 fn set_notification_sender(&self, sender: tokio::sync::mpsc::UnboundedSender<Notification>) {
1115 *self
1116 .notification_sender
1117 .lock()
1118 .expect("sender mutex poisoned") = Some(sender);
1119 }
1120
1121 fn notification_sender(&self) -> Option<tokio::sync::mpsc::UnboundedSender<Notification>> {
1123 self.notification_sender
1124 .lock()
1125 .expect("sender mutex poisoned")
1126 .clone()
1127 }
1128
1129 fn mark_subscribed(&self, chain_id: ChainId) {
1131 self.subscribed.pin().insert(chain_id);
1132 }
1133
1134 fn needs_subscription(&self, chain_id: &ChainId) -> bool {
1136 !self.subscribed.pin().contains(chain_id)
1137 }
1138
1139 fn mark_all_subscribed(&self, chain_ids: &[ChainId]) {
1141 let pinned = self.subscribed.pin();
1142 for &chain_id in chain_ids {
1143 pinned.insert(chain_id);
1144 }
1145 }
1146
1147 fn get(&self, chain_id: ChainId, app_id: &ApplicationId, request: &[u8]) -> Option<Vec<u8>> {
1150 let pinned = self.chains.pin();
1151 let result = pinned.get(&chain_id).and_then(|mutex| {
1152 mutex
1153 .lock()
1154 .expect("LRU mutex poisoned")
1155 .lru
1156 .get(&(*app_id, request.to_vec()))
1157 .cloned()
1158 });
1159 #[cfg(with_metrics)]
1160 {
1161 let metric = if result.is_some() {
1162 &query_cache_metrics::QUERY_CACHE_HIT
1163 } else {
1164 &query_cache_metrics::QUERY_CACHE_MISS
1165 };
1166 metric.with_label_values(&[]).inc();
1167 }
1168 result
1169 }
1170
1171 fn insert(
1175 &self,
1176 chain_id: ChainId,
1177 app_id: ApplicationId,
1178 request: Vec<u8>,
1179 response: Vec<u8>,
1180 next_block_height: BlockHeight,
1181 ) {
1182 let pinned = self.chains.pin();
1183 let capacity = self.capacity_per_chain;
1184 let mutex = pinned.get_or_insert_with(chain_id, || {
1185 StdMutex::new(PerChainCache {
1186 lru: LruCache::new(capacity),
1187 next_block_height,
1188 })
1189 });
1190 let mut cache = mutex.lock().expect("LRU mutex poisoned");
1191 if next_block_height < cache.next_block_height {
1192 return; }
1194 if next_block_height > cache.next_block_height {
1198 debug!(
1199 "Unexpected query cache invalidation for chain {chain_id}:\
1200 {next_block_height} > {}",
1201 cache.next_block_height
1202 );
1203 #[cfg(with_metrics)]
1204 {
1205 #[expect(
1206 clippy::cast_possible_wrap,
1207 reason = "LRU cache size fits in i64 for any realistic cache"
1208 )]
1209 let cache_len = cache.lru.len() as i64;
1210 query_cache_metrics::QUERY_CACHE_ENTRIES.sub(cache_len);
1211 query_cache_metrics::QUERY_CACHE_INVALIDATION
1212 .with_label_values(&[])
1213 .inc();
1214 }
1215 cache.lru.clear();
1216 cache.next_block_height = next_block_height;
1217 }
1218 #[cfg(with_metrics)]
1219 let prev_len = cache.lru.len();
1220 cache.lru.put((app_id, request), response);
1221 #[cfg(with_metrics)]
1222 if cache.lru.len() != prev_len {
1223 query_cache_metrics::QUERY_CACHE_ENTRIES.inc();
1224 }
1225 }
1226
1227 fn invalidate_chain(&self, chain_id: &ChainId, next_block_height: BlockHeight) {
1230 let pinned = self.chains.pin();
1231 let capacity = self.capacity_per_chain;
1232 let mutex = pinned.get_or_insert_with(*chain_id, || {
1233 StdMutex::new(PerChainCache {
1234 lru: LruCache::new(capacity),
1235 next_block_height,
1236 })
1237 });
1238 let mut cache = mutex.lock().expect("LRU mutex poisoned");
1239 if next_block_height > cache.next_block_height {
1240 #[cfg(with_metrics)]
1241 {
1242 #[expect(
1243 clippy::cast_possible_wrap,
1244 reason = "LRU cache size fits in i64 for any realistic cache"
1245 )]
1246 let cache_len = cache.lru.len() as i64;
1247 query_cache_metrics::QUERY_CACHE_ENTRIES.sub(cache_len);
1248 query_cache_metrics::QUERY_CACHE_INVALIDATION
1249 .with_label_values(&[])
1250 .inc();
1251 }
1252 cache.lru.clear();
1253 cache.next_block_height = next_block_height;
1254 } else {
1255 debug!(
1256 "Query cache for chain {chain_id} was already invalidated:\
1257 {next_block_height} <= {}",
1258 cache.next_block_height
1259 );
1260 }
1261 }
1262}
1263
1264pub struct NodeService<C>
1267where
1268 C: ClientContext + 'static,
1269{
1270 config: ChainListenerConfig,
1271 port: NonZeroU16,
1272 #[cfg(with_metrics)]
1273 metrics_port: NonZeroU16,
1274 default_chain: Option<ChainId>,
1275 context: Arc<Mutex<C>>,
1276 read_only: bool,
1278 query_cache: Option<Arc<QueryResponseCache>>,
1280 query_subscriptions: Option<Arc<crate::query_subscription::QuerySubscriptionManager>>,
1281 cancellation_token: CancellationToken,
1282 enable_memory_profiling: bool,
1283 pause: bool,
1285}
1286
1287impl<C> Clone for NodeService<C>
1288where
1289 C: ClientContext + 'static,
1290{
1291 fn clone(&self) -> Self {
1292 Self {
1293 config: self.config.clone(),
1294 port: self.port,
1295 #[cfg(with_metrics)]
1296 metrics_port: self.metrics_port,
1297 default_chain: self.default_chain,
1298 context: Arc::clone(&self.context),
1299 read_only: self.read_only,
1300 query_cache: self.query_cache.clone(),
1301 query_subscriptions: self.query_subscriptions.clone(),
1302 cancellation_token: self.cancellation_token.clone(),
1303 enable_memory_profiling: self.enable_memory_profiling,
1304 pause: self.pause,
1305 }
1306 }
1307}
1308
1309impl<C> NodeService<C>
1310where
1311 C: ClientContext,
1312{
1313 #[expect(clippy::too_many_arguments)]
1319 pub fn new(
1320 config: ChainListenerConfig,
1321 port: NonZeroU16,
1322 #[cfg(with_metrics)] metrics_port: NonZeroU16,
1323 default_chain: Option<ChainId>,
1324 context: Arc<Mutex<C>>,
1325 read_only: bool,
1326 query_cache_size: Option<usize>,
1327 query_subscriptions: Option<Arc<crate::query_subscription::QuerySubscriptionManager>>,
1328 cancellation_token: CancellationToken,
1329 enable_memory_profiling: bool,
1330 pause: bool,
1331 ) -> Self {
1332 let query_cache = query_cache_size.map(|size| Arc::new(QueryResponseCache::new(size)));
1333 Self {
1334 config,
1335 port,
1336 #[cfg(with_metrics)]
1337 metrics_port,
1338 default_chain,
1339 context,
1340 read_only,
1341 query_cache,
1342 query_subscriptions,
1343 cancellation_token,
1344 enable_memory_profiling,
1345 pause,
1346 }
1347 }
1348
1349 #[cfg(with_metrics)]
1350 pub fn metrics_address(&self) -> SocketAddr {
1351 SocketAddr::from(([0, 0, 0, 0], self.metrics_port.get()))
1352 }
1353
1354 pub fn schema(&self) -> NodeServiceSchema<C> {
1355 let query = QueryRoot {
1356 context: Arc::clone(&self.context),
1357 port: self.port,
1358 default_chain: self.default_chain,
1359 };
1360 let subscription = SubscriptionRoot {
1361 context: Arc::clone(&self.context),
1362 query_subscriptions: self.query_subscriptions.clone(),
1363 cancellation_token: self.cancellation_token.clone(),
1364 };
1365
1366 if self.read_only {
1367 NodeServiceSchema::ReadOnly(Schema::build(query, EmptyMutation, subscription).finish())
1368 } else {
1369 NodeServiceSchema::Full(
1370 Schema::build(
1371 query,
1372 MutationRoot {
1373 context: Arc::clone(&self.context),
1374 },
1375 subscription,
1376 )
1377 .finish(),
1378 )
1379 }
1380 }
1381
1382 #[instrument(name = "node_service", level = "info", skip_all, fields(port = ?self.port))]
1384 pub async fn run(
1385 self,
1386 cancellation_token: CancellationToken,
1387 command_receiver: UnboundedReceiver<ListenerCommand>,
1388 ) -> Result<(), anyhow::Error> {
1389 let port = self.port.get();
1390 let index_handler = axum::routing::get(util::graphiql).post(Self::index_handler);
1391 let application_handler =
1392 axum::routing::get(util::graphiql).post(Self::application_handler);
1393
1394 #[cfg(with_metrics)]
1395 monitoring_server::start_metrics_with_profiling(
1396 self.metrics_address(),
1397 cancellation_token.clone(),
1398 self.enable_memory_profiling,
1399 )
1400 .await;
1401
1402 let base_router = Router::new()
1403 .route("/", index_handler)
1404 .route(
1405 "/chains/{chain_id}/applications/{application_id}",
1406 application_handler,
1407 )
1408 .route("/ready", axum::routing::get(|| async { "ready!" }));
1409
1410 let app = match self.schema() {
1412 NodeServiceSchema::Full(schema) => {
1413 base_router.route_service("/ws", GraphQLSubscription::new(schema))
1414 }
1415 NodeServiceSchema::ReadOnly(schema) => {
1416 base_router.route_service("/ws", GraphQLSubscription::new(schema))
1417 }
1418 }
1419 .layer(Extension(self.clone()))
1420 .layer(CorsLayer::permissive());
1422
1423 info!("GraphiQL IDE: http://localhost:{}", port);
1424
1425 if let Some(cache) = &self.query_cache {
1427 let guard = self.context.lock().await;
1428 let chain_ids: Vec<ChainId> = guard.wallet().chain_ids().try_collect().await?;
1429 let (tx, mut receiver) = tokio::sync::mpsc::unbounded_channel();
1430 guard.client().subscribe_extra(chain_ids.clone(), &tx);
1431 cache.mark_all_subscribed(&chain_ids);
1432 cache.set_notification_sender(tx);
1433 drop(guard);
1434 let cache = Arc::clone(cache);
1435 tokio::spawn(async move {
1436 while let Some(notification) = receiver.recv().await {
1437 if let Reason::NewBlock { height, .. } = notification.reason {
1438 let next_block_height = height
1439 .try_add_one()
1440 .expect("block height should not overflow");
1441 cache.invalidate_chain(¬ification.chain_id, next_block_height);
1442 }
1443 }
1444 });
1445 }
1446
1447 let tcp_listener =
1448 tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
1449 let server = axum::serve(tcp_listener, app)
1450 .with_graceful_shutdown(cancellation_token.clone().cancelled_owned())
1451 .into_future();
1452
1453 if self.pause {
1454 info!("Running in paused mode: chain synchronization is disabled");
1455 server.await?;
1456 } else {
1457 let storage = self.context.lock().await.storage().clone();
1458 let chain_listener = ChainListener::new(
1459 self.config,
1460 self.context,
1461 storage,
1462 cancellation_token.clone(),
1463 command_receiver,
1464 true,
1465 )
1466 .run()
1467 .await?;
1468 let mut chain_listener = Box::pin(chain_listener).fuse();
1469 futures::select! {
1470 result = chain_listener => result?,
1471 result = Box::pin(server).fuse() => result?,
1472 };
1473 }
1474
1475 Ok(())
1476 }
1477
1478 async fn handle_service_request(
1480 &self,
1481 application_id: ApplicationId,
1482 request: Vec<u8>,
1483 chain_id: ChainId,
1484 block_hash: Option<CryptoHash>,
1485 ) -> Result<Vec<u8>, NodeServiceError> {
1486 let cache = block_hash
1488 .is_none()
1489 .then_some(self.query_cache.as_ref())
1490 .flatten();
1491
1492 if let Some(cache) = cache {
1494 if let Some(cached) = cache.get(chain_id, &application_id, &request) {
1495 return Ok(cached);
1496 }
1497 }
1498
1499 let (
1500 QueryOutcome {
1501 response,
1502 operations,
1503 },
1504 block_height,
1505 ) = self
1506 .query_user_application(application_id, request.clone(), chain_id, block_hash)
1507 .await?;
1508 if operations.is_empty() {
1509 if let Some(cache) = cache {
1510 if cache.needs_subscription(&chain_id) {
1512 if let Some(sender) = cache.notification_sender() {
1513 self.context
1514 .lock()
1515 .await
1516 .client()
1517 .subscribe_extra(vec![chain_id], &sender);
1518 cache.mark_subscribed(chain_id);
1519 }
1520 }
1521 cache.insert(
1522 chain_id,
1523 application_id,
1524 request,
1525 response.clone(),
1526 block_height,
1527 );
1528 }
1529 return Ok(response);
1530 }
1531
1532 if self.read_only {
1533 return Err(NodeServiceError::ReadOnlyModeOperationsNotAllowed);
1534 }
1535
1536 trace!("Query requested a new block with operations: {operations:?}");
1537 let client = self
1538 .context
1539 .lock()
1540 .await
1541 .make_chain_client(chain_id)
1542 .await?;
1543 let hash = loop {
1544 let timeout = match client
1545 .execute_operations(operations.clone(), vec![])
1546 .await?
1547 {
1548 ClientOutcome::Committed(certificate) => break certificate.hash(),
1549 ClientOutcome::Conflict(certificate) => {
1550 return Err(chain_client::Error::Conflict(certificate.hash()).into());
1551 }
1552 ClientOutcome::WaitForTimeout(timeout) => timeout,
1553 };
1554 let mut stream = client.subscribe().map_err(|_| {
1555 chain_client::Error::InternalError("Could not subscribe to the local node.")
1556 })?;
1557 util::wait_for_next_round(&mut stream, timeout).await;
1558 };
1559 let response = async_graphql::Response::new(hash.to_value());
1560 Ok(serde_json::to_vec(&response)?)
1561 }
1562
1563 async fn query_user_application(
1566 &self,
1567 application_id: ApplicationId,
1568 bytes: Vec<u8>,
1569 chain_id: ChainId,
1570 block_hash: Option<CryptoHash>,
1571 ) -> Result<(QueryOutcome<Vec<u8>>, BlockHeight), NodeServiceError> {
1572 let query = Query::User {
1573 application_id,
1574 bytes,
1575 };
1576 let client = self
1577 .context
1578 .lock()
1579 .await
1580 .make_chain_client(chain_id)
1581 .await?;
1582 let (
1583 QueryOutcome {
1584 response,
1585 operations,
1586 },
1587 next_block_height,
1588 ) = client.query_application(query, block_hash).await?;
1589 match response {
1590 QueryResponse::System(_) => {
1591 unreachable!("cannot get a system response for a user query")
1592 }
1593 QueryResponse::User(user_response_bytes) => Ok((
1594 QueryOutcome {
1595 response: user_response_bytes,
1596 operations,
1597 },
1598 next_block_height,
1599 )),
1600 }
1601 }
1602
1603 async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
1605 service
1606 .0
1607 .schema()
1608 .execute(request.into_inner())
1609 .await
1610 .into()
1611 }
1612
1613 async fn application_handler(
1617 Path((chain_id, application_id)): Path<(String, String)>,
1618 service: Extension<Self>,
1619 request: String,
1620 ) -> Result<Vec<u8>, NodeServiceError> {
1621 let chain_id: ChainId = chain_id.parse().map_err(NodeServiceError::InvalidChainId)?;
1622 let application_id: ApplicationId = application_id.parse()?;
1623
1624 debug!(
1625 %chain_id,
1626 %application_id,
1627 "processing request for application:\n{:?}",
1628 &request
1629 );
1630 let response = service
1631 .0
1632 .handle_service_request(application_id, request.into_bytes(), chain_id, None)
1633 .await?;
1634
1635 Ok(response)
1636 }
1637}
1638
1639#[cfg(test)]
1640mod tests {
1641 use linera_base::{
1642 crypto::CryptoHash,
1643 data_types::BlockHeight,
1644 identifiers::{ApplicationId, ChainId},
1645 };
1646
1647 use super::QueryResponseCache;
1648
1649 fn test_chain(n: u64) -> ChainId {
1650 ChainId(CryptoHash::test_hash(format!("chain-{n}")))
1651 }
1652
1653 fn test_app(n: u64) -> ApplicationId {
1654 ApplicationId::new(CryptoHash::test_hash(format!("app-{n}")))
1655 }
1656
1657 #[test]
1658 fn cache_hit_and_miss() {
1659 let cache = QueryResponseCache::new(100);
1660 let chain = test_chain(0);
1661 let app = test_app(0);
1662 let request = b"query { balance }".to_vec();
1663 let response = b"{ \"balance\": 42 }".to_vec();
1664
1665 assert!(cache.get(chain, &app, &request).is_none());
1667
1668 cache.insert(
1670 chain,
1671 app,
1672 request.clone(),
1673 response.clone(),
1674 BlockHeight(1),
1675 );
1676
1677 assert_eq!(cache.get(chain, &app, &request), Some(response));
1679 }
1680
1681 #[test]
1682 fn per_chain_isolation() {
1683 let cache = QueryResponseCache::new(100);
1684 let chain_a = test_chain(0);
1685 let chain_b = test_chain(1);
1686 let app = test_app(0);
1687 let request = b"q".to_vec();
1688 let response = b"r".to_vec();
1689
1690 cache.insert(
1691 chain_a,
1692 app,
1693 request.clone(),
1694 response.clone(),
1695 BlockHeight(1),
1696 );
1697
1698 cache.invalidate_chain(&chain_b, BlockHeight(1));
1700 assert_eq!(cache.get(chain_a, &app, &request), Some(response));
1701 }
1702
1703 #[test]
1704 fn invalidation_clears_all_entries() {
1705 let cache = QueryResponseCache::new(100);
1706 let chain = test_chain(0);
1707 let app = test_app(0);
1708
1709 cache.insert(chain, app, b"q1".to_vec(), b"r1".to_vec(), BlockHeight(1));
1710 cache.insert(chain, app, b"q2".to_vec(), b"r2".to_vec(), BlockHeight(1));
1711
1712 cache.invalidate_chain(&chain, BlockHeight(2));
1713 assert!(cache.get(chain, &app, b"q1").is_none());
1714 assert!(cache.get(chain, &app, b"q2").is_none());
1715 }
1716
1717 #[test]
1718 fn lru_eviction() {
1719 let cache = QueryResponseCache::new(2);
1720 let chain = test_chain(0);
1721 let app = test_app(0);
1722
1723 cache.insert(chain, app, b"q1".to_vec(), b"r1".to_vec(), BlockHeight(1));
1724 cache.insert(chain, app, b"q2".to_vec(), b"r2".to_vec(), BlockHeight(1));
1725 cache.insert(chain, app, b"q3".to_vec(), b"r3".to_vec(), BlockHeight(1));
1727
1728 assert!(cache.get(chain, &app, b"q1").is_none());
1729 assert!(cache.get(chain, &app, b"q2").is_some());
1730 assert!(cache.get(chain, &app, b"q3").is_some());
1731 }
1732
1733 #[test]
1734 fn stale_insert_rejected_after_invalidation() {
1735 let cache = QueryResponseCache::new(100);
1736 let chain = test_chain(0);
1737 let app = test_app(0);
1738
1739 cache.insert(chain, app, b"q0".to_vec(), b"r0".to_vec(), BlockHeight(3));
1741 let stale_height = BlockHeight(3);
1742
1743 cache.invalidate_chain(&chain, BlockHeight(4));
1745
1746 cache.insert(chain, app, b"q".to_vec(), b"stale".to_vec(), stale_height);
1748
1749 assert!(cache.get(chain, &app, b"q").is_none());
1751 }
1752}