1use std::{
5 collections::BTreeMap,
6 ops::{Deref, DerefMut},
7 vec,
8};
9
10use allocative::Allocative;
11use futures::{FutureExt, StreamExt};
12use linera_base::{
13 crypto::{BcsHashable, CryptoHash},
14 data_types::{Blob, BlobContent, BlockHeight, OracleResponse, StreamUpdate},
15 ensure,
16 identifiers::{AccountOwner, BlobId, BlobType, ChainId, GenericApplicationId, StreamId},
17 time::Instant,
18};
19use linera_views::{
20 context::Context,
21 historical_hash_wrapper::HistoricallyHashableView,
22 key_value_store_view::KeyValueStoreView,
23 map_view::MapView,
24 reentrant_collection_view::ReentrantCollectionView,
25 views::{ClonableView, ReplaceContext, View},
26 ViewError,
27};
28use serde::{Deserialize, Serialize};
29#[cfg(with_testing)]
30use {
31 crate::{
32 ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
33 },
34 linera_views::context::MemoryContext,
35 std::sync::Arc,
36};
37
38use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
39use crate::{
40 execution_state_actor::ExecutionStateActor,
41 resources::ResourceController,
42 system::{SystemExecutionStateView, SystemMessage},
43 transaction_tracker::PreparedCheckpoint,
44 ApplicationDescription, ApplicationId, ExecutionError, ExecutionRuntimeContext, JsVec, Message,
45 MessageContext, OperationContext, OutgoingMessage, ProcessStreamsContext, Query, QueryContext,
46 QueryOutcome, ServiceSyncRuntime, Timestamp, TransactionTracker,
47};
48
49#[derive(Debug, ClonableView, View, Allocative)]
51#[allocative(bound = "C")]
52pub struct ExecutionStateViewInner<C> {
53 pub system: SystemExecutionStateView<C>,
55 pub users: ReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
57 pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
59 pub previous_event_blocks: MapView<C, StreamId, BlockHeight>,
61}
62
63impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateViewInner<C> {
64 type Target = ExecutionStateViewInner<C2>;
65
66 async fn with_context(
67 &mut self,
68 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
69 ) -> Self::Target {
70 ExecutionStateViewInner {
71 system: self.system.with_context(ctx.clone()).await,
72 users: self.users.with_context(ctx.clone()).await,
73 previous_message_blocks: self.previous_message_blocks.with_context(ctx.clone()).await,
74 previous_event_blocks: self.previous_event_blocks.with_context(ctx.clone()).await,
75 }
76 }
77}
78
79#[derive(Debug, ClonableView, View, Allocative)]
81#[allocative(bound = "C")]
82pub struct ExecutionStateView<C> {
83 inner: HistoricallyHashableView<C, ExecutionStateViewInner<C>>,
84}
85
86impl<C> Deref for ExecutionStateView<C> {
87 type Target = ExecutionStateViewInner<C>;
88
89 fn deref(&self) -> &ExecutionStateViewInner<C> {
90 self.inner.deref()
91 }
92}
93
94impl<C> DerefMut for ExecutionStateView<C> {
95 fn deref_mut(&mut self) -> &mut ExecutionStateViewInner<C> {
96 self.inner.deref_mut()
97 }
98}
99
100impl<C> ExecutionStateView<C>
101where
102 C: Context + Clone + 'static,
103 C::Extra: ExecutionRuntimeContext,
104{
105 pub async fn crypto_hash_mut(&mut self) -> Result<CryptoHash, ViewError> {
107 #[derive(Serialize, Deserialize)]
108 struct ExecutionStateViewHash([u8; 32]);
109 impl BcsHashable<'_> for ExecutionStateViewHash {}
110 let hash = self.inner.historical_hash().await?;
111 Ok(CryptoHash::new(&ExecutionStateViewHash(hash.into())))
112 }
113
114 pub async fn prepare_checkpoint(
128 &mut self,
129 maximum_blob_size: u64,
130 ) -> Result<Vec<Blob>, ExecutionError> {
131 let mut had_system_event_block = false;
136 self.previous_event_blocks
137 .for_each_index_while(|stream_id| {
138 if matches!(stream_id.application_id, GenericApplicationId::System) {
139 had_system_event_block = true;
140 Ok(false)
141 } else {
142 Ok(true)
143 }
144 })
145 .await?;
146 ensure!(
147 !had_system_event_block,
148 ExecutionError::CheckpointPreconditionFailed("chain has published system events")
149 );
150
151 let (bytes, _content_hash) = self.inner.dump_content().await?;
152 let chunk_size = usize::try_from(maximum_blob_size).unwrap_or(usize::MAX);
153 Ok(bytes
154 .chunks(chunk_size)
155 .map(|chunk| {
156 Blob::new(BlobContent::new(
157 BlobType::CheckpointExecutionState,
158 chunk.to_vec(),
159 ))
160 })
161 .collect())
162 }
163
164 pub async fn apply_checkpoint(
172 &mut self,
173 prepared: PreparedCheckpoint,
174 txn_tracker: &mut TransactionTracker,
175 ) -> Result<(), ExecutionError> {
176 let PreparedCheckpoint {
177 blobs,
178 origin_cursors,
179 inbox_cursors,
180 outbox_block_hashes,
181 } = prepared;
182 let execution_state_blobs = blobs.iter().map(|blob| blob.id().hash).collect();
183 let used_blobs = self.system.used_blobs.indices().await?;
184 for blob in blobs {
185 txn_tracker.add_created_blob(blob);
186 }
187 txn_tracker.replay_oracle_response(OracleResponse::Checkpoint {
188 execution_state_blobs,
189 used_blobs,
190 outbox_block_hashes,
191 inbox_cursors,
192 })?;
193 for (origin, latest_received_cursor) in origin_cursors {
194 txn_tracker.add_outgoing_message(OutgoingMessage::new(
195 origin,
196 Message::System(SystemMessage::CheckpointAck {
197 latest_received_cursor,
198 }),
199 ));
200 }
201 self.system.pending_checkpoint_ack_targets.clear();
205 Ok(())
206 }
207
208 pub async fn restore_from_content(&mut self, bytes: &[u8]) -> Result<(), ViewError> {
212 self.inner.restore_from_content(bytes).await?;
213 Ok(())
214 }
215}
216
217impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
218 type Target = ExecutionStateView<C2>;
219
220 async fn with_context(
221 &mut self,
222 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
223 ) -> Self::Target {
224 ExecutionStateView {
225 inner: self.inner.with_context(ctx.clone()).await,
226 }
227 }
228}
229
230pub struct ServiceRuntimeEndpoint {
232 pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
234 pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
236}
237
238#[cfg(with_testing)]
239impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
240where
241 MemoryContext<TestExecutionRuntimeContext>: Context + Clone + 'static,
242{
243 pub async fn simulate_instantiation(
245 &mut self,
246 contract: UserContractCode,
247 local_time: linera_base::data_types::Timestamp,
248 application_description: ApplicationDescription,
249 instantiation_argument: Vec<u8>,
250 contract_blob: Blob,
251 service_blob: Blob,
252 ) -> Result<(), ExecutionError> {
253 let chain_id = application_description.creator_chain_id;
254 assert_eq!(chain_id, self.context().extra().chain_id);
255 let context = OperationContext {
256 chain_id,
257 authenticated_owner: None,
258 height: application_description.block_height,
259 round: None,
260 timestamp: local_time,
261 };
262
263 let action = UserAction::Instantiate(context, instantiation_argument);
264 let next_application_index = application_description.application_index + 1;
265 let next_chain_index = 0;
266
267 let application_id = From::from(&application_description);
268 let blob = Blob::new_application_description(&application_description);
269
270 self.system.used_blobs.insert(&blob.id())?;
271 self.system.used_blobs.insert(&contract_blob.id())?;
272 self.system.used_blobs.insert(&service_blob.id())?;
273
274 self.context()
275 .extra()
276 .user_contracts()
277 .pin()
278 .insert(application_id, contract);
279
280 self.context()
281 .extra()
282 .add_blobs([
283 contract_blob,
284 service_blob,
285 Blob::new_application_description(&application_description),
286 ])
287 .await?;
288
289 let tracker = ResourceTracker::default();
290 let policy = ResourceControlPolicy::no_fees();
291 let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
292 let mut txn_tracker = TransactionTracker::new(
293 local_time,
294 0,
295 next_application_index,
296 next_chain_index,
297 None,
298 &[],
299 );
300 txn_tracker.add_created_blob(blob);
301 Box::pin(
302 ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
303 .run_user_action(application_id, action, context.refund_grant_to(), None),
304 )
305 .await?;
306
307 Ok(())
308 }
309}
310
311pub enum UserAction {
312 Instantiate(OperationContext, Vec<u8>),
313 Operation(OperationContext, Vec<u8>),
314 Message(MessageContext, Vec<u8>),
315 ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
316 SummarizeEvents(ProcessStreamsContext, Vec<StreamUpdate>),
317}
318
319impl UserAction {
320 pub(crate) fn signer(&self) -> Option<AccountOwner> {
321 match self {
322 UserAction::Instantiate(context, _) => context.authenticated_owner,
323 UserAction::Operation(context, _) => context.authenticated_owner,
324 UserAction::ProcessStreams(_, _) => None,
325 UserAction::SummarizeEvents(_, _) => None,
326 UserAction::Message(context, _) => context.authenticated_owner,
327 }
328 }
329
330 pub(crate) fn height(&self) -> BlockHeight {
331 match self {
332 UserAction::Instantiate(context, _) => context.height,
333 UserAction::Operation(context, _) => context.height,
334 UserAction::ProcessStreams(context, _) => context.height,
335 UserAction::SummarizeEvents(context, _) => context.height,
336 UserAction::Message(context, _) => context.height,
337 }
338 }
339
340 pub(crate) fn round(&self) -> Option<u32> {
341 match self {
342 UserAction::Instantiate(context, _) => context.round,
343 UserAction::Operation(context, _) => context.round,
344 UserAction::ProcessStreams(context, _) => context.round,
345 UserAction::SummarizeEvents(context, _) => context.round,
346 UserAction::Message(context, _) => context.round,
347 }
348 }
349
350 pub(crate) fn timestamp(&self) -> Timestamp {
351 match self {
352 UserAction::Instantiate(context, _) => context.timestamp,
353 UserAction::Operation(context, _) => context.timestamp,
354 UserAction::ProcessStreams(context, _) => context.timestamp,
355 UserAction::SummarizeEvents(context, _) => context.timestamp,
356 UserAction::Message(context, _) => context.timestamp,
357 }
358 }
359}
360
361impl<C> ExecutionStateView<C>
362where
363 C: Context + Clone + 'static,
364 C::Extra: ExecutionRuntimeContext,
365{
366 pub async fn query_application(
368 &mut self,
369 context: QueryContext,
370 query: Query,
371 endpoint: Option<&mut ServiceRuntimeEndpoint>,
372 ) -> Result<QueryOutcome, ExecutionError> {
373 assert_eq!(context.chain_id, self.context().extra().chain_id());
374 match query {
375 Query::System(query) => {
376 let outcome = self.system.handle_query(context, query);
377 Ok(outcome.into())
378 }
379 Query::User {
380 application_id,
381 bytes,
382 } => {
383 let outcome = match endpoint {
384 Some(endpoint) => {
385 self.query_user_application_with_long_lived_service(
386 application_id,
387 context,
388 bytes,
389 &mut endpoint.incoming_execution_requests,
390 &endpoint.runtime_request_sender,
391 )
392 .await?
393 }
394 None => {
395 self.query_user_application(application_id, context, bytes)
396 .await?
397 }
398 };
399 Ok(outcome.into())
400 }
401 }
402 }
403
404 async fn query_user_application(
405 &mut self,
406 application_id: ApplicationId,
407 context: QueryContext,
408 query: Vec<u8>,
409 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
410 self.query_user_application_with_deadline(
411 application_id,
412 context,
413 query,
414 None,
415 BTreeMap::new(),
416 )
417 .await
418 }
419
420 pub(crate) async fn query_user_application_with_deadline(
421 &mut self,
422 application_id: ApplicationId,
423 context: QueryContext,
424 query: Vec<u8>,
425 deadline: Option<Instant>,
426 created_blobs: BTreeMap<BlobId, BlobContent>,
427 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
428 let (execution_state_sender, mut execution_state_receiver) =
429 futures::channel::mpsc::unbounded();
430 let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
431 let mut resource_controller = ResourceController::default();
432 let thread_pool = self.context().extra().thread_pool().clone();
433 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
434
435 let (codes, descriptions) = actor.service_and_dependencies(application_id).await?;
436
437 let service_runtime_task = thread_pool
438 .run_send(JsVec(codes), move |codes| async move {
439 let mut runtime = ServiceSyncRuntime::new_with_deadline(
440 execution_state_sender,
441 context,
442 deadline,
443 );
444
445 for (code, description) in codes.0.into_iter().zip(descriptions) {
446 runtime.preload_service(ApplicationId::from(&description), code, description);
447 }
448
449 runtime.run_query(application_id, query)
450 })
451 .await;
452
453 while let Some(request) = execution_state_receiver.next().await {
454 actor.handle_request(request).await?;
455 }
456
457 service_runtime_task.await?
458 }
459
460 async fn query_user_application_with_long_lived_service(
461 &mut self,
462 application_id: ApplicationId,
463 context: QueryContext,
464 query: Vec<u8>,
465 incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
466 ExecutionRequest,
467 >,
468 runtime_request_sender: &std::sync::mpsc::Sender<ServiceRuntimeRequest>,
469 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
470 let (outcome_sender, outcome_receiver) = oneshot::channel();
471 let mut outcome_receiver = outcome_receiver.fuse();
472
473 runtime_request_sender
474 .send(ServiceRuntimeRequest::Query {
475 application_id,
476 context,
477 query,
478 callback: outcome_sender,
479 })
480 .expect("Service runtime thread should only stop when `request_sender` is dropped");
481
482 let mut txn_tracker = TransactionTracker::default();
483 let mut resource_controller = ResourceController::default();
484 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
485
486 loop {
487 futures::select! {
488 maybe_request = incoming_execution_requests.next() => {
489 if let Some(request) = maybe_request {
490 actor.handle_request(request).await?;
491 }
492 }
493 outcome = &mut outcome_receiver => {
494 return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
495 }
496 }
497 }
498 }
499
500 pub async fn list_applications(
502 &self,
503 ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
504 let mut applications = vec![];
505 for app_id in self.users.indices().await? {
506 let blob_id = app_id.description_blob_id();
507 let blob_content = self.system.read_blob_content(blob_id).await?;
508 let application_description = bcs::from_bytes(blob_content.bytes())?;
509 applications.push((app_id, application_description));
510 }
511 Ok(applications)
512 }
513}