1#[cfg(not(web))]
7use std::time::Duration;
8
9use custom_debug_derive::Debug;
10use futures::{channel::mpsc, StreamExt as _};
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::MeasureLatency as _;
13use linera_base::{
14 data_types::{
15 Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, Timestamp,
16 },
17 ensure, hex_debug, hex_vec_debug, http,
18 identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
19 ownership::ChainOwnership,
20};
21use linera_views::{batch::Batch, context::Context, views::View};
22use oneshot::Sender;
23use reqwest::{header::HeaderMap, Client, Url};
24
25use crate::{
26 system::{CreateApplicationResult, OpenChainConfig, Recipient},
27 util::RespondExt,
28 ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
29 ExecutionStateView, ModuleId, OutgoingMessage, ResourceController, TransactionTracker,
30 UserContractCode, UserServiceCode,
31};
32
33#[cfg(with_metrics)]
34mod metrics {
35 use std::sync::LazyLock;
36
37 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
38 use prometheus::HistogramVec;
39
40 pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
42 register_histogram_vec(
43 "load_contract_latency",
44 "Load contract latency",
45 &[],
46 exponential_bucket_latencies(250.0),
47 )
48 });
49
50 pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
52 register_histogram_vec(
53 "load_service_latency",
54 "Load service latency",
55 &[],
56 exponential_bucket_latencies(250.0),
57 )
58 });
59}
60
61pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
62
63impl<C> ExecutionStateView<C>
64where
65 C: Context + Clone + Send + Sync + 'static,
66 C::Extra: ExecutionRuntimeContext,
67{
68 pub(crate) async fn load_contract(
69 &mut self,
70 id: ApplicationId,
71 txn_tracker: &mut TransactionTracker,
72 ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
73 #[cfg(with_metrics)]
74 let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
75 let blob_id = id.description_blob_id();
76 let description = match txn_tracker.created_blobs().get(&blob_id) {
77 Some(blob) => bcs::from_bytes(blob.bytes())?,
78 None => self.system.describe_application(id, txn_tracker).await?,
79 };
80 let code = self
81 .context()
82 .extra()
83 .get_user_contract(&description)
84 .await?;
85 Ok((code, description))
86 }
87
88 pub(crate) async fn load_service(
89 &mut self,
90 id: ApplicationId,
91 txn_tracker: &mut TransactionTracker,
92 ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
93 #[cfg(with_metrics)]
94 let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
95 let blob_id = id.description_blob_id();
96 let description = match txn_tracker.created_blobs().get(&blob_id) {
97 Some(blob) => bcs::from_bytes(blob.bytes())?,
98 None => self.system.describe_application(id, txn_tracker).await?,
99 };
100 let code = self
101 .context()
102 .extra()
103 .get_user_service(&description)
104 .await?;
105 Ok((code, description))
106 }
107
108 pub(crate) async fn handle_request(
110 &mut self,
111 request: ExecutionRequest,
112 resource_controller: &mut ResourceController<Option<AccountOwner>>,
113 ) -> Result<(), ExecutionError> {
114 use ExecutionRequest::*;
115 match request {
116 #[cfg(not(web))]
117 LoadContract {
118 id,
119 callback,
120 mut txn_tracker,
121 } => {
122 let (code, description) = self.load_contract(id, &mut txn_tracker).await?;
123 callback.respond((code, description, txn_tracker))
124 }
125 #[cfg(not(web))]
126 LoadService {
127 id,
128 callback,
129 mut txn_tracker,
130 } => {
131 let (code, description) = self.load_service(id, &mut txn_tracker).await?;
132 callback.respond((code, description, txn_tracker))
133 }
134
135 ChainBalance { callback } => {
136 let balance = *self.system.balance.get();
137 callback.respond(balance);
138 }
139
140 OwnerBalance { owner, callback } => {
141 let balance = self.system.balances.get(&owner).await?.unwrap_or_default();
142 callback.respond(balance);
143 }
144
145 OwnerBalances { callback } => {
146 let balances = self.system.balances.index_values().await?;
147 callback.respond(balances.into_iter().collect());
148 }
149
150 BalanceOwners { callback } => {
151 let owners = self.system.balances.indices().await?;
152 callback.respond(owners);
153 }
154
155 Transfer {
156 source,
157 destination,
158 amount,
159 signer,
160 application_id,
161 callback,
162 } => callback.respond(
163 self.system
164 .transfer(
165 signer,
166 Some(application_id),
167 source,
168 Recipient::Account(destination),
169 amount,
170 )
171 .await?,
172 ),
173
174 Claim {
175 source,
176 destination,
177 amount,
178 signer,
179 application_id,
180 callback,
181 } => callback.respond(
182 self.system
183 .claim(
184 signer,
185 Some(application_id),
186 source.owner,
187 source.chain_id,
188 Recipient::Account(destination),
189 amount,
190 )
191 .await?,
192 ),
193
194 SystemTimestamp { callback } => {
195 let timestamp = *self.system.timestamp.get();
196 callback.respond(timestamp);
197 }
198
199 ChainOwnership { callback } => {
200 let ownership = self.system.ownership.get().clone();
201 callback.respond(ownership);
202 }
203
204 ContainsKey { id, key, callback } => {
205 let view = self.users.try_load_entry(&id).await?;
206 let result = match view {
207 Some(view) => view.contains_key(&key).await?,
208 None => false,
209 };
210 callback.respond(result);
211 }
212
213 ContainsKeys { id, keys, callback } => {
214 let view = self.users.try_load_entry(&id).await?;
215 let result = match view {
216 Some(view) => view.contains_keys(keys).await?,
217 None => vec![false; keys.len()],
218 };
219 callback.respond(result);
220 }
221
222 ReadMultiValuesBytes { id, keys, callback } => {
223 let view = self.users.try_load_entry(&id).await?;
224 let values = match view {
225 Some(view) => view.multi_get(keys).await?,
226 None => vec![None; keys.len()],
227 };
228 callback.respond(values);
229 }
230
231 ReadValueBytes { id, key, callback } => {
232 let view = self.users.try_load_entry(&id).await?;
233 let result = match view {
234 Some(view) => view.get(&key).await?,
235 None => None,
236 };
237 callback.respond(result);
238 }
239
240 FindKeysByPrefix {
241 id,
242 key_prefix,
243 callback,
244 } => {
245 let view = self.users.try_load_entry(&id).await?;
246 let result = match view {
247 Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
248 None => Vec::new(),
249 };
250 callback.respond(result);
251 }
252
253 FindKeyValuesByPrefix {
254 id,
255 key_prefix,
256 callback,
257 } => {
258 let view = self.users.try_load_entry(&id).await?;
259 let result = match view {
260 Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
261 None => Vec::new(),
262 };
263 callback.respond(result);
264 }
265
266 WriteBatch {
267 id,
268 batch,
269 callback,
270 } => {
271 let mut view = self.users.try_load_entry_mut(&id).await?;
272 view.write_batch(batch).await?;
273 callback.respond(());
274 }
275
276 OpenChain {
277 ownership,
278 balance,
279 parent_id,
280 block_height,
281 application_permissions,
282 timestamp,
283 callback,
284 mut txn_tracker,
285 } => {
286 let config = OpenChainConfig {
287 ownership,
288 balance,
289 application_permissions,
290 };
291 let chain_id = self
292 .system
293 .open_chain(config, parent_id, block_height, timestamp, &mut txn_tracker)
294 .await?;
295 callback.respond((chain_id, txn_tracker));
296 }
297
298 CloseChain {
299 application_id,
300 callback,
301 } => {
302 let app_permissions = self.system.application_permissions.get();
303 if !app_permissions.can_close_chain(&application_id) {
304 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
305 } else {
306 self.system.close_chain().await?;
307 callback.respond(Ok(()));
308 }
309 }
310
311 ChangeApplicationPermissions {
312 application_id,
313 application_permissions,
314 callback,
315 } => {
316 let app_permissions = self.system.application_permissions.get();
317 if !app_permissions.can_change_application_permissions(&application_id) {
318 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
319 } else {
320 self.system
321 .application_permissions
322 .set(application_permissions);
323 callback.respond(Ok(()));
324 }
325 }
326
327 CreateApplication {
328 chain_id,
329 block_height,
330 module_id,
331 parameters,
332 required_application_ids,
333 callback,
334 txn_tracker,
335 } => {
336 let create_application_result = self
337 .system
338 .create_application(
339 chain_id,
340 block_height,
341 module_id,
342 parameters,
343 required_application_ids,
344 txn_tracker,
345 )
346 .await?;
347 callback.respond(Ok(create_application_result));
348 }
349
350 PerformHttpRequest {
351 request,
352 http_responses_are_oracle_responses,
353 callback,
354 } => {
355 let headers = request
356 .headers
357 .into_iter()
358 .map(|http::Header { name, value }| Ok((name.parse()?, value.try_into()?)))
359 .collect::<Result<HeaderMap, ExecutionError>>()?;
360
361 let url = Url::parse(&request.url)?;
362 let host = url
363 .host_str()
364 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
365
366 let (_epoch, committee) = self
367 .system
368 .current_committee()
369 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
370 let allowed_hosts = &committee.policy().http_request_allow_list;
371
372 ensure!(
373 allowed_hosts.contains(host),
374 ExecutionError::UnauthorizedHttpRequest(url)
375 );
376
377 #[cfg_attr(web, allow(unused_mut))]
378 let mut request = Client::new()
379 .request(request.method.into(), url)
380 .body(request.body)
381 .headers(headers);
382 #[cfg(not(web))]
383 {
384 request = request.timeout(Duration::from_millis(
385 committee.policy().http_request_timeout_ms,
386 ));
387 }
388
389 let response = request.send().await?;
390
391 let mut response_size_limit = committee.policy().maximum_http_response_bytes;
392
393 if http_responses_are_oracle_responses {
394 response_size_limit =
395 response_size_limit.min(committee.policy().maximum_oracle_response_bytes);
396 }
397
398 callback.respond(
399 self.receive_http_response(response, response_size_limit)
400 .await?,
401 );
402 }
403
404 ReadBlobContent { blob_id, callback } => {
405 let blob = self.system.read_blob_content(blob_id).await?;
406 if blob_id.blob_type == BlobType::Data {
407 resource_controller
408 .with_state(&mut self.system)
409 .await?
410 .track_blob_read(blob.bytes().len() as u64)?;
411 }
412 let is_new = self
413 .system
414 .blob_used(&mut TransactionTracker::default(), blob_id)
415 .await?;
416 callback.respond((blob, is_new))
417 }
418
419 AssertBlobExists { blob_id, callback } => {
420 self.system.assert_blob_exists(blob_id).await?;
421 if blob_id.blob_type == BlobType::Data {
423 resource_controller
424 .with_state(&mut self.system)
425 .await?
426 .track_blob_read(0)?;
427 }
428 callback.respond(
429 self.system
430 .blob_used(&mut TransactionTracker::default(), blob_id)
431 .await?,
432 )
433 }
434
435 NextEventIndex {
436 stream_id,
437 callback,
438 } => {
439 let count = self
440 .stream_event_counts
441 .get_mut_or_default(&stream_id)
442 .await?;
443 let index = *count;
444 *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
445 callback.respond(index)
446 }
447
448 ReadEvent { event_id, callback } => {
449 let event = self.context().extra().get_event(event_id.clone()).await?;
450 let event = event.ok_or(ExecutionError::EventsNotFound(vec![event_id]))?;
451 callback.respond(event);
452 }
453
454 SubscribeToEvents {
455 chain_id,
456 stream_id,
457 subscriber_app_id,
458 callback,
459 } => {
460 let subscriptions = self
461 .system
462 .event_subscriptions
463 .get_mut_or_default(&(chain_id, stream_id))
464 .await?;
465 let next_index = if subscriptions.applications.insert(subscriber_app_id) {
466 subscriptions.next_index
467 } else {
468 0
469 };
470 callback.respond(next_index);
471 }
472
473 UnsubscribeFromEvents {
474 chain_id,
475 stream_id,
476 subscriber_app_id,
477 callback,
478 } => {
479 let key = (chain_id, stream_id);
480 let subscriptions = self
481 .system
482 .event_subscriptions
483 .get_mut_or_default(&key)
484 .await?;
485 subscriptions.applications.remove(&subscriber_app_id);
486 if subscriptions.applications.is_empty() {
487 self.system.event_subscriptions.remove(&key)?;
488 }
489 callback.respond(());
490 }
491
492 GetApplicationPermissions { callback } => {
493 let app_permissions = self.system.application_permissions.get();
494 callback.respond(app_permissions.clone());
495 }
496 }
497
498 Ok(())
499 }
500}
501
502impl<C> ExecutionStateView<C>
503where
504 C: Context + Clone + Send + Sync + 'static,
505 C::Extra: ExecutionRuntimeContext,
506{
507 async fn receive_http_response(
511 &mut self,
512 response: reqwest::Response,
513 size_limit: u64,
514 ) -> Result<http::Response, ExecutionError> {
515 let status = response.status().as_u16();
516 let maybe_content_length = response.content_length();
517
518 let headers = response
519 .headers()
520 .iter()
521 .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
522 .collect::<Vec<_>>();
523
524 let total_header_size = headers
525 .iter()
526 .map(|header| (header.name.len() + header.value.len()) as u64)
527 .sum();
528
529 let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
530 ExecutionError::HttpResponseSizeLimitExceeded {
531 limit: size_limit,
532 size: total_header_size,
533 },
534 )?;
535
536 if let Some(content_length) = maybe_content_length {
537 if content_length > remaining_bytes {
538 return Err(ExecutionError::HttpResponseSizeLimitExceeded {
539 limit: size_limit,
540 size: content_length + total_header_size,
541 });
542 }
543 }
544
545 let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
546 let mut body_stream = response.bytes_stream();
547
548 while let Some(bytes) = body_stream.next().await.transpose()? {
549 remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
550 ExecutionError::HttpResponseSizeLimitExceeded {
551 limit: size_limit,
552 size: bytes.len() as u64 + (size_limit - remaining_bytes),
553 },
554 )?;
555
556 body.extend(&bytes);
557 }
558
559 Ok(http::Response {
560 status,
561 headers,
562 body,
563 })
564 }
565}
566
567#[derive(Debug)]
569pub enum ExecutionRequest {
570 #[cfg(not(web))]
571 LoadContract {
572 id: ApplicationId,
573 #[debug(skip)]
574 callback: Sender<(UserContractCode, ApplicationDescription, TransactionTracker)>,
575 #[debug(skip)]
576 txn_tracker: TransactionTracker,
577 },
578
579 #[cfg(not(web))]
580 LoadService {
581 id: ApplicationId,
582 #[debug(skip)]
583 callback: Sender<(UserServiceCode, ApplicationDescription, TransactionTracker)>,
584 #[debug(skip)]
585 txn_tracker: TransactionTracker,
586 },
587
588 ChainBalance {
589 #[debug(skip)]
590 callback: Sender<Amount>,
591 },
592
593 OwnerBalance {
594 owner: AccountOwner,
595 #[debug(skip)]
596 callback: Sender<Amount>,
597 },
598
599 OwnerBalances {
600 #[debug(skip)]
601 callback: Sender<Vec<(AccountOwner, Amount)>>,
602 },
603
604 BalanceOwners {
605 #[debug(skip)]
606 callback: Sender<Vec<AccountOwner>>,
607 },
608
609 Transfer {
610 source: AccountOwner,
611 destination: Account,
612 amount: Amount,
613 #[debug(skip_if = Option::is_none)]
614 signer: Option<AccountOwner>,
615 application_id: ApplicationId,
616 #[debug(skip)]
617 callback: Sender<Option<OutgoingMessage>>,
618 },
619
620 Claim {
621 source: Account,
622 destination: Account,
623 amount: Amount,
624 #[debug(skip_if = Option::is_none)]
625 signer: Option<AccountOwner>,
626 application_id: ApplicationId,
627 #[debug(skip)]
628 callback: Sender<OutgoingMessage>,
629 },
630
631 SystemTimestamp {
632 #[debug(skip)]
633 callback: Sender<Timestamp>,
634 },
635
636 ChainOwnership {
637 #[debug(skip)]
638 callback: Sender<ChainOwnership>,
639 },
640
641 ReadValueBytes {
642 id: ApplicationId,
643 #[debug(with = hex_debug)]
644 key: Vec<u8>,
645 #[debug(skip)]
646 callback: Sender<Option<Vec<u8>>>,
647 },
648
649 ContainsKey {
650 id: ApplicationId,
651 key: Vec<u8>,
652 #[debug(skip)]
653 callback: Sender<bool>,
654 },
655
656 ContainsKeys {
657 id: ApplicationId,
658 #[debug(with = hex_vec_debug)]
659 keys: Vec<Vec<u8>>,
660 callback: Sender<Vec<bool>>,
661 },
662
663 ReadMultiValuesBytes {
664 id: ApplicationId,
665 #[debug(with = hex_vec_debug)]
666 keys: Vec<Vec<u8>>,
667 #[debug(skip)]
668 callback: Sender<Vec<Option<Vec<u8>>>>,
669 },
670
671 FindKeysByPrefix {
672 id: ApplicationId,
673 #[debug(with = hex_debug)]
674 key_prefix: Vec<u8>,
675 #[debug(skip)]
676 callback: Sender<Vec<Vec<u8>>>,
677 },
678
679 FindKeyValuesByPrefix {
680 id: ApplicationId,
681 #[debug(with = hex_debug)]
682 key_prefix: Vec<u8>,
683 #[debug(skip)]
684 callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
685 },
686
687 WriteBatch {
688 id: ApplicationId,
689 batch: Batch,
690 #[debug(skip)]
691 callback: Sender<()>,
692 },
693
694 OpenChain {
695 ownership: ChainOwnership,
696 #[debug(skip_if = Amount::is_zero)]
697 balance: Amount,
698 parent_id: ChainId,
699 block_height: BlockHeight,
700 application_permissions: ApplicationPermissions,
701 timestamp: Timestamp,
702 #[debug(skip)]
703 txn_tracker: TransactionTracker,
704 #[debug(skip)]
705 callback: Sender<(ChainId, TransactionTracker)>,
706 },
707
708 CloseChain {
709 application_id: ApplicationId,
710 #[debug(skip)]
711 callback: Sender<Result<(), ExecutionError>>,
712 },
713
714 ChangeApplicationPermissions {
715 application_id: ApplicationId,
716 application_permissions: ApplicationPermissions,
717 #[debug(skip)]
718 callback: Sender<Result<(), ExecutionError>>,
719 },
720
721 CreateApplication {
722 chain_id: ChainId,
723 block_height: BlockHeight,
724 module_id: ModuleId,
725 parameters: Vec<u8>,
726 required_application_ids: Vec<ApplicationId>,
727 #[debug(skip)]
728 txn_tracker: TransactionTracker,
729 #[debug(skip)]
730 callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
731 },
732
733 PerformHttpRequest {
734 request: http::Request,
735 http_responses_are_oracle_responses: bool,
736 #[debug(skip)]
737 callback: Sender<http::Response>,
738 },
739
740 ReadBlobContent {
741 blob_id: BlobId,
742 #[debug(skip)]
743 callback: Sender<(BlobContent, bool)>,
744 },
745
746 AssertBlobExists {
747 blob_id: BlobId,
748 #[debug(skip)]
749 callback: Sender<bool>,
750 },
751
752 NextEventIndex {
753 stream_id: StreamId,
754 #[debug(skip)]
755 callback: Sender<u32>,
756 },
757
758 ReadEvent {
759 event_id: EventId,
760 callback: oneshot::Sender<Vec<u8>>,
761 },
762
763 SubscribeToEvents {
764 chain_id: ChainId,
765 stream_id: StreamId,
766 subscriber_app_id: ApplicationId,
767 #[debug(skip)]
768 callback: Sender<u32>,
769 },
770
771 UnsubscribeFromEvents {
772 chain_id: ChainId,
773 stream_id: StreamId,
774 subscriber_app_id: ApplicationId,
775 #[debug(skip)]
776 callback: Sender<()>,
777 },
778
779 GetApplicationPermissions {
780 #[debug(skip)]
781 callback: Sender<ApplicationPermissions>,
782 },
783}