1#[cfg(not(web))]
7use std::time::Duration;
8
9use custom_debug_derive::Debug;
10use futures::{channel::mpsc, StreamExt as _};
11#[cfg(with_metrics)]
12use linera_base::prometheus_util::MeasureLatency as _;
13use linera_base::{
14 data_types::{
15 Amount, ApplicationPermissions, ArithmeticError, BlobContent, BlockHeight, Timestamp,
16 },
17 ensure, hex_debug, hex_vec_debug, http,
18 identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
19 ownership::ChainOwnership,
20};
21use linera_views::{batch::Batch, context::Context, views::View};
22use oneshot::Sender;
23use reqwest::{header::HeaderMap, Client, Url};
24
25use crate::{
26 system::{CreateApplicationResult, OpenChainConfig, Recipient},
27 util::RespondExt,
28 ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext,
29 ExecutionStateView, ModuleId, OutgoingMessage, ResourceController, TransactionTracker,
30 UserContractCode, UserServiceCode,
31};
32
33#[cfg(with_metrics)]
34mod metrics {
35 use std::sync::LazyLock;
36
37 use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
38 use prometheus::HistogramVec;
39
40 pub static LOAD_CONTRACT_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
42 register_histogram_vec(
43 "load_contract_latency",
44 "Load contract latency",
45 &[],
46 exponential_bucket_latencies(250.0),
47 )
48 });
49
50 pub static LOAD_SERVICE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
52 register_histogram_vec(
53 "load_service_latency",
54 "Load service latency",
55 &[],
56 exponential_bucket_latencies(250.0),
57 )
58 });
59}
60
61pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<ExecutionRequest>;
62
63impl<C> ExecutionStateView<C>
64where
65 C: Context + Clone + Send + Sync + 'static,
66 C::Extra: ExecutionRuntimeContext,
67{
68 pub(crate) async fn load_contract(
69 &mut self,
70 id: ApplicationId,
71 txn_tracker: &mut TransactionTracker,
72 ) -> Result<(UserContractCode, ApplicationDescription), ExecutionError> {
73 #[cfg(with_metrics)]
74 let _latency = metrics::LOAD_CONTRACT_LATENCY.measure_latency();
75 let blob_id = id.description_blob_id();
76 let description = match txn_tracker.created_blobs().get(&blob_id) {
77 Some(description) => {
78 let blob = description.clone();
79 bcs::from_bytes(blob.bytes())?
80 }
81 None => self.system.describe_application(id, txn_tracker).await?,
82 };
83 let code = self
84 .context()
85 .extra()
86 .get_user_contract(&description)
87 .await?;
88 Ok((code, description))
89 }
90
91 pub(crate) async fn load_service(
92 &mut self,
93 id: ApplicationId,
94 txn_tracker: &mut TransactionTracker,
95 ) -> Result<(UserServiceCode, ApplicationDescription), ExecutionError> {
96 #[cfg(with_metrics)]
97 let _latency = metrics::LOAD_SERVICE_LATENCY.measure_latency();
98 let blob_id = id.description_blob_id();
99 let description = match txn_tracker.created_blobs().get(&blob_id) {
100 Some(description) => {
101 let blob = description.clone();
102 bcs::from_bytes(blob.bytes())?
103 }
104 None => self.system.describe_application(id, txn_tracker).await?,
105 };
106 let code = self
107 .context()
108 .extra()
109 .get_user_service(&description)
110 .await?;
111 Ok((code, description))
112 }
113
114 pub(crate) async fn handle_request(
116 &mut self,
117 request: ExecutionRequest,
118 resource_controller: &mut ResourceController<Option<AccountOwner>>,
119 ) -> Result<(), ExecutionError> {
120 use ExecutionRequest::*;
121 match request {
122 #[cfg(not(web))]
123 LoadContract {
124 id,
125 callback,
126 mut txn_tracker,
127 } => {
128 let (code, description) = self.load_contract(id, &mut txn_tracker).await?;
129 callback.respond((code, description, txn_tracker))
130 }
131 #[cfg(not(web))]
132 LoadService {
133 id,
134 callback,
135 mut txn_tracker,
136 } => {
137 let (code, description) = self.load_service(id, &mut txn_tracker).await?;
138 callback.respond((code, description, txn_tracker))
139 }
140
141 ChainBalance { callback } => {
142 let balance = *self.system.balance.get();
143 callback.respond(balance);
144 }
145
146 OwnerBalance { owner, callback } => {
147 let balance = self.system.balances.get(&owner).await?.unwrap_or_default();
148 callback.respond(balance);
149 }
150
151 OwnerBalances { callback } => {
152 let balances = self.system.balances.index_values().await?;
153 callback.respond(balances.into_iter().collect());
154 }
155
156 BalanceOwners { callback } => {
157 let owners = self.system.balances.indices().await?;
158 callback.respond(owners);
159 }
160
161 Transfer {
162 source,
163 destination,
164 amount,
165 signer,
166 application_id,
167 callback,
168 } => callback.respond(
169 self.system
170 .transfer(
171 signer,
172 Some(application_id),
173 source,
174 Recipient::Account(destination),
175 amount,
176 )
177 .await?,
178 ),
179
180 Claim {
181 source,
182 destination,
183 amount,
184 signer,
185 application_id,
186 callback,
187 } => callback.respond(
188 self.system
189 .claim(
190 signer,
191 Some(application_id),
192 source.owner,
193 source.chain_id,
194 Recipient::Account(destination),
195 amount,
196 )
197 .await?,
198 ),
199
200 SystemTimestamp { callback } => {
201 let timestamp = *self.system.timestamp.get();
202 callback.respond(timestamp);
203 }
204
205 ChainOwnership { callback } => {
206 let ownership = self.system.ownership.get().clone();
207 callback.respond(ownership);
208 }
209
210 ContainsKey { id, key, callback } => {
211 let view = self.users.try_load_entry(&id).await?;
212 let result = match view {
213 Some(view) => view.contains_key(&key).await?,
214 None => false,
215 };
216 callback.respond(result);
217 }
218
219 ContainsKeys { id, keys, callback } => {
220 let view = self.users.try_load_entry(&id).await?;
221 let result = match view {
222 Some(view) => view.contains_keys(keys).await?,
223 None => vec![false; keys.len()],
224 };
225 callback.respond(result);
226 }
227
228 ReadMultiValuesBytes { id, keys, callback } => {
229 let view = self.users.try_load_entry(&id).await?;
230 let values = match view {
231 Some(view) => view.multi_get(keys).await?,
232 None => vec![None; keys.len()],
233 };
234 callback.respond(values);
235 }
236
237 ReadValueBytes { id, key, callback } => {
238 let view = self.users.try_load_entry(&id).await?;
239 let result = match view {
240 Some(view) => view.get(&key).await?,
241 None => None,
242 };
243 callback.respond(result);
244 }
245
246 FindKeysByPrefix {
247 id,
248 key_prefix,
249 callback,
250 } => {
251 let view = self.users.try_load_entry(&id).await?;
252 let result = match view {
253 Some(view) => view.find_keys_by_prefix(&key_prefix).await?,
254 None => Vec::new(),
255 };
256 callback.respond(result);
257 }
258
259 FindKeyValuesByPrefix {
260 id,
261 key_prefix,
262 callback,
263 } => {
264 let view = self.users.try_load_entry(&id).await?;
265 let result = match view {
266 Some(view) => view.find_key_values_by_prefix(&key_prefix).await?,
267 None => Vec::new(),
268 };
269 callback.respond(result);
270 }
271
272 WriteBatch {
273 id,
274 batch,
275 callback,
276 } => {
277 let mut view = self.users.try_load_entry_mut(&id).await?;
278 view.write_batch(batch).await?;
279 callback.respond(());
280 }
281
282 OpenChain {
283 ownership,
284 balance,
285 parent_id,
286 block_height,
287 application_permissions,
288 timestamp,
289 callback,
290 mut txn_tracker,
291 } => {
292 let config = OpenChainConfig {
293 ownership,
294 balance,
295 application_permissions,
296 };
297 let chain_id = self
298 .system
299 .open_chain(config, parent_id, block_height, timestamp, &mut txn_tracker)
300 .await?;
301 callback.respond((chain_id, txn_tracker));
302 }
303
304 CloseChain {
305 application_id,
306 callback,
307 } => {
308 let app_permissions = self.system.application_permissions.get();
309 if !app_permissions.can_close_chain(&application_id) {
310 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
311 } else {
312 self.system.close_chain().await?;
313 callback.respond(Ok(()));
314 }
315 }
316
317 ChangeApplicationPermissions {
318 application_id,
319 application_permissions,
320 callback,
321 } => {
322 let app_permissions = self.system.application_permissions.get();
323 if !app_permissions.can_change_application_permissions(&application_id) {
324 callback.respond(Err(ExecutionError::UnauthorizedApplication(application_id)));
325 } else {
326 self.system
327 .application_permissions
328 .set(application_permissions);
329 callback.respond(Ok(()));
330 }
331 }
332
333 CreateApplication {
334 chain_id,
335 block_height,
336 module_id,
337 parameters,
338 required_application_ids,
339 callback,
340 txn_tracker,
341 } => {
342 let create_application_result = self
343 .system
344 .create_application(
345 chain_id,
346 block_height,
347 module_id,
348 parameters,
349 required_application_ids,
350 txn_tracker,
351 )
352 .await?;
353 callback.respond(Ok(create_application_result));
354 }
355
356 PerformHttpRequest {
357 request,
358 http_responses_are_oracle_responses,
359 callback,
360 } => {
361 let headers = request
362 .headers
363 .into_iter()
364 .map(|http::Header { name, value }| Ok((name.parse()?, value.try_into()?)))
365 .collect::<Result<HeaderMap, ExecutionError>>()?;
366
367 let url = Url::parse(&request.url)?;
368 let host = url
369 .host_str()
370 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
371
372 let (_epoch, committee) = self
373 .system
374 .current_committee()
375 .ok_or_else(|| ExecutionError::UnauthorizedHttpRequest(url.clone()))?;
376 let allowed_hosts = &committee.policy().http_request_allow_list;
377
378 ensure!(
379 allowed_hosts.contains(host),
380 ExecutionError::UnauthorizedHttpRequest(url)
381 );
382
383 #[cfg_attr(web, allow(unused_mut))]
384 let mut request = Client::new()
385 .request(request.method.into(), url)
386 .body(request.body)
387 .headers(headers);
388 #[cfg(not(web))]
389 {
390 request = request.timeout(Duration::from_millis(
391 committee.policy().http_request_timeout_ms,
392 ));
393 }
394
395 let response = request.send().await?;
396
397 let mut response_size_limit = committee.policy().maximum_http_response_bytes;
398
399 if http_responses_are_oracle_responses {
400 response_size_limit =
401 response_size_limit.min(committee.policy().maximum_oracle_response_bytes);
402 }
403
404 callback.respond(
405 self.receive_http_response(response, response_size_limit)
406 .await?,
407 );
408 }
409
410 ReadBlobContent { blob_id, callback } => {
411 let blob = self.system.read_blob_content(blob_id).await?;
412 if blob_id.blob_type == BlobType::Data {
413 resource_controller
414 .with_state(&mut self.system)
415 .await?
416 .track_blob_read(blob.bytes().len() as u64)?;
417 }
418 let is_new = self
419 .system
420 .blob_used(&mut TransactionTracker::default(), blob_id)
421 .await?;
422 callback.respond((blob, is_new))
423 }
424
425 AssertBlobExists { blob_id, callback } => {
426 self.system.assert_blob_exists(blob_id).await?;
427 if blob_id.blob_type == BlobType::Data {
429 resource_controller
430 .with_state(&mut self.system)
431 .await?
432 .track_blob_read(0)?;
433 }
434 callback.respond(
435 self.system
436 .blob_used(&mut TransactionTracker::default(), blob_id)
437 .await?,
438 )
439 }
440
441 NextEventIndex {
442 stream_id,
443 callback,
444 } => {
445 let count = self
446 .stream_event_counts
447 .get_mut_or_default(&stream_id)
448 .await?;
449 let index = *count;
450 *count = count.checked_add(1).ok_or(ArithmeticError::Overflow)?;
451 callback.respond(index)
452 }
453
454 ReadEvent { event_id, callback } => {
455 let event = self.context().extra().get_event(event_id.clone()).await?;
456 let event = event.ok_or(ExecutionError::EventsNotFound(vec![event_id]))?;
457 callback.respond(event);
458 }
459
460 SubscribeToEvents {
461 chain_id,
462 stream_id,
463 subscriber_app_id,
464 callback,
465 } => {
466 let subscriptions = self
467 .system
468 .event_subscriptions
469 .get_mut_or_default(&(chain_id, stream_id))
470 .await?;
471 let next_index = if subscriptions.applications.insert(subscriber_app_id) {
472 subscriptions.next_index
473 } else {
474 0
475 };
476 callback.respond(next_index);
477 }
478
479 UnsubscribeFromEvents {
480 chain_id,
481 stream_id,
482 subscriber_app_id,
483 callback,
484 } => {
485 let key = (chain_id, stream_id);
486 let subscriptions = self
487 .system
488 .event_subscriptions
489 .get_mut_or_default(&key)
490 .await?;
491 subscriptions.applications.remove(&subscriber_app_id);
492 if subscriptions.applications.is_empty() {
493 self.system.event_subscriptions.remove(&key)?;
494 }
495 callback.respond(());
496 }
497
498 GetApplicationPermissions { callback } => {
499 let app_permissions = self.system.application_permissions.get();
500 callback.respond(app_permissions.clone());
501 }
502 }
503
504 Ok(())
505 }
506}
507
508impl<C> ExecutionStateView<C>
509where
510 C: Context + Clone + Send + Sync + 'static,
511 C::Extra: ExecutionRuntimeContext,
512{
513 async fn receive_http_response(
517 &mut self,
518 response: reqwest::Response,
519 size_limit: u64,
520 ) -> Result<http::Response, ExecutionError> {
521 let status = response.status().as_u16();
522 let maybe_content_length = response.content_length();
523
524 let headers = response
525 .headers()
526 .iter()
527 .map(|(name, value)| http::Header::new(name.to_string(), value.as_bytes()))
528 .collect::<Vec<_>>();
529
530 let total_header_size = headers
531 .iter()
532 .map(|header| (header.name.len() + header.value.len()) as u64)
533 .sum();
534
535 let mut remaining_bytes = size_limit.checked_sub(total_header_size).ok_or(
536 ExecutionError::HttpResponseSizeLimitExceeded {
537 limit: size_limit,
538 size: total_header_size,
539 },
540 )?;
541
542 if let Some(content_length) = maybe_content_length {
543 if content_length > remaining_bytes {
544 return Err(ExecutionError::HttpResponseSizeLimitExceeded {
545 limit: size_limit,
546 size: content_length + total_header_size,
547 });
548 }
549 }
550
551 let mut body = Vec::with_capacity(maybe_content_length.unwrap_or(0) as usize);
552 let mut body_stream = response.bytes_stream();
553
554 while let Some(bytes) = body_stream.next().await.transpose()? {
555 remaining_bytes = remaining_bytes.checked_sub(bytes.len() as u64).ok_or(
556 ExecutionError::HttpResponseSizeLimitExceeded {
557 limit: size_limit,
558 size: bytes.len() as u64 + (size_limit - remaining_bytes),
559 },
560 )?;
561
562 body.extend(&bytes);
563 }
564
565 Ok(http::Response {
566 status,
567 headers,
568 body,
569 })
570 }
571}
572
573#[derive(Debug)]
575pub enum ExecutionRequest {
576 #[cfg(not(web))]
577 LoadContract {
578 id: ApplicationId,
579 #[debug(skip)]
580 callback: Sender<(UserContractCode, ApplicationDescription, TransactionTracker)>,
581 #[debug(skip)]
582 txn_tracker: TransactionTracker,
583 },
584
585 #[cfg(not(web))]
586 LoadService {
587 id: ApplicationId,
588 #[debug(skip)]
589 callback: Sender<(UserServiceCode, ApplicationDescription, TransactionTracker)>,
590 #[debug(skip)]
591 txn_tracker: TransactionTracker,
592 },
593
594 ChainBalance {
595 #[debug(skip)]
596 callback: Sender<Amount>,
597 },
598
599 OwnerBalance {
600 owner: AccountOwner,
601 #[debug(skip)]
602 callback: Sender<Amount>,
603 },
604
605 OwnerBalances {
606 #[debug(skip)]
607 callback: Sender<Vec<(AccountOwner, Amount)>>,
608 },
609
610 BalanceOwners {
611 #[debug(skip)]
612 callback: Sender<Vec<AccountOwner>>,
613 },
614
615 Transfer {
616 source: AccountOwner,
617 destination: Account,
618 amount: Amount,
619 #[debug(skip_if = Option::is_none)]
620 signer: Option<AccountOwner>,
621 application_id: ApplicationId,
622 #[debug(skip)]
623 callback: Sender<Option<OutgoingMessage>>,
624 },
625
626 Claim {
627 source: Account,
628 destination: Account,
629 amount: Amount,
630 #[debug(skip_if = Option::is_none)]
631 signer: Option<AccountOwner>,
632 application_id: ApplicationId,
633 #[debug(skip)]
634 callback: Sender<OutgoingMessage>,
635 },
636
637 SystemTimestamp {
638 #[debug(skip)]
639 callback: Sender<Timestamp>,
640 },
641
642 ChainOwnership {
643 #[debug(skip)]
644 callback: Sender<ChainOwnership>,
645 },
646
647 ReadValueBytes {
648 id: ApplicationId,
649 #[debug(with = hex_debug)]
650 key: Vec<u8>,
651 #[debug(skip)]
652 callback: Sender<Option<Vec<u8>>>,
653 },
654
655 ContainsKey {
656 id: ApplicationId,
657 key: Vec<u8>,
658 #[debug(skip)]
659 callback: Sender<bool>,
660 },
661
662 ContainsKeys {
663 id: ApplicationId,
664 #[debug(with = hex_vec_debug)]
665 keys: Vec<Vec<u8>>,
666 callback: Sender<Vec<bool>>,
667 },
668
669 ReadMultiValuesBytes {
670 id: ApplicationId,
671 #[debug(with = hex_vec_debug)]
672 keys: Vec<Vec<u8>>,
673 #[debug(skip)]
674 callback: Sender<Vec<Option<Vec<u8>>>>,
675 },
676
677 FindKeysByPrefix {
678 id: ApplicationId,
679 #[debug(with = hex_debug)]
680 key_prefix: Vec<u8>,
681 #[debug(skip)]
682 callback: Sender<Vec<Vec<u8>>>,
683 },
684
685 FindKeyValuesByPrefix {
686 id: ApplicationId,
687 #[debug(with = hex_debug)]
688 key_prefix: Vec<u8>,
689 #[debug(skip)]
690 callback: Sender<Vec<(Vec<u8>, Vec<u8>)>>,
691 },
692
693 WriteBatch {
694 id: ApplicationId,
695 batch: Batch,
696 #[debug(skip)]
697 callback: Sender<()>,
698 },
699
700 OpenChain {
701 ownership: ChainOwnership,
702 #[debug(skip_if = Amount::is_zero)]
703 balance: Amount,
704 parent_id: ChainId,
705 block_height: BlockHeight,
706 application_permissions: ApplicationPermissions,
707 timestamp: Timestamp,
708 #[debug(skip)]
709 txn_tracker: TransactionTracker,
710 #[debug(skip)]
711 callback: Sender<(ChainId, TransactionTracker)>,
712 },
713
714 CloseChain {
715 application_id: ApplicationId,
716 #[debug(skip)]
717 callback: Sender<Result<(), ExecutionError>>,
718 },
719
720 ChangeApplicationPermissions {
721 application_id: ApplicationId,
722 application_permissions: ApplicationPermissions,
723 #[debug(skip)]
724 callback: Sender<Result<(), ExecutionError>>,
725 },
726
727 CreateApplication {
728 chain_id: ChainId,
729 block_height: BlockHeight,
730 module_id: ModuleId,
731 parameters: Vec<u8>,
732 required_application_ids: Vec<ApplicationId>,
733 #[debug(skip)]
734 txn_tracker: TransactionTracker,
735 #[debug(skip)]
736 callback: Sender<Result<CreateApplicationResult, ExecutionError>>,
737 },
738
739 PerformHttpRequest {
740 request: http::Request,
741 http_responses_are_oracle_responses: bool,
742 #[debug(skip)]
743 callback: Sender<http::Response>,
744 },
745
746 ReadBlobContent {
747 blob_id: BlobId,
748 #[debug(skip)]
749 callback: Sender<(BlobContent, bool)>,
750 },
751
752 AssertBlobExists {
753 blob_id: BlobId,
754 #[debug(skip)]
755 callback: Sender<bool>,
756 },
757
758 NextEventIndex {
759 stream_id: StreamId,
760 #[debug(skip)]
761 callback: Sender<u32>,
762 },
763
764 ReadEvent {
765 event_id: EventId,
766 callback: oneshot::Sender<Vec<u8>>,
767 },
768
769 SubscribeToEvents {
770 chain_id: ChainId,
771 stream_id: StreamId,
772 subscriber_app_id: ApplicationId,
773 #[debug(skip)]
774 callback: Sender<u32>,
775 },
776
777 UnsubscribeFromEvents {
778 chain_id: ChainId,
779 stream_id: StreamId,
780 subscriber_app_id: ApplicationId,
781 #[debug(skip)]
782 callback: Sender<()>,
783 },
784
785 GetApplicationPermissions {
786 #[debug(skip)]
787 callback: Sender<ApplicationPermissions>,
788 },
789}