1use std::{
5 collections::BTreeMap,
6 ops::{Deref, DerefMut},
7 vec,
8};
9
10use allocative::Allocative;
11use futures::{FutureExt, StreamExt};
12use linera_base::{
13 crypto::{BcsHashable, CryptoHash},
14 data_types::{BlobContent, BlockHeight, StreamUpdate},
15 identifiers::{AccountOwner, BlobId, ChainId, StreamId},
16 time::Instant,
17};
18use linera_views::{
19 context::Context,
20 historical_hash_wrapper::HistoricallyHashableView,
21 key_value_store_view::KeyValueStoreView,
22 map_view::MapView,
23 reentrant_collection_view::ReentrantCollectionView,
24 views::{ClonableView, ReplaceContext, View},
25 ViewError,
26};
27use serde::{Deserialize, Serialize};
28#[cfg(with_testing)]
29use {
30 crate::{
31 ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
32 },
33 linera_base::data_types::Blob,
34 linera_views::context::MemoryContext,
35 std::sync::Arc,
36};
37
38use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
39use crate::{
40 execution_state_actor::ExecutionStateActor, resources::ResourceController,
41 system::SystemExecutionStateView, ApplicationDescription, ApplicationId, ExecutionError,
42 ExecutionRuntimeContext, JsVec, MessageContext, OperationContext, ProcessStreamsContext, Query,
43 QueryContext, QueryOutcome, ServiceSyncRuntime, Timestamp, TransactionTracker,
44};
45
46#[derive(Debug, ClonableView, View, Allocative)]
48#[allocative(bound = "C")]
49pub struct ExecutionStateViewInner<C> {
50 pub system: SystemExecutionStateView<C>,
52 pub users: ReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
54 pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
56 pub previous_event_blocks: MapView<C, StreamId, BlockHeight>,
58}
59
60impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateViewInner<C> {
61 type Target = ExecutionStateViewInner<C2>;
62
63 async fn with_context(
64 &mut self,
65 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
66 ) -> Self::Target {
67 ExecutionStateViewInner {
68 system: self.system.with_context(ctx.clone()).await,
69 users: self.users.with_context(ctx.clone()).await,
70 previous_message_blocks: self.previous_message_blocks.with_context(ctx.clone()).await,
71 previous_event_blocks: self.previous_event_blocks.with_context(ctx.clone()).await,
72 }
73 }
74}
75
76#[derive(Debug, ClonableView, View, Allocative)]
78#[allocative(bound = "C")]
79pub struct ExecutionStateView<C> {
80 inner: HistoricallyHashableView<C, ExecutionStateViewInner<C>>,
81}
82
83impl<C> Deref for ExecutionStateView<C> {
84 type Target = ExecutionStateViewInner<C>;
85
86 fn deref(&self) -> &ExecutionStateViewInner<C> {
87 self.inner.deref()
88 }
89}
90
91impl<C> DerefMut for ExecutionStateView<C> {
92 fn deref_mut(&mut self) -> &mut ExecutionStateViewInner<C> {
93 self.inner.deref_mut()
94 }
95}
96
97impl<C> ExecutionStateView<C>
98where
99 C: Context + Clone + 'static,
100 C::Extra: ExecutionRuntimeContext,
101{
102 pub async fn crypto_hash_mut(&mut self) -> Result<CryptoHash, ViewError> {
103 #[derive(Serialize, Deserialize)]
104 struct ExecutionStateViewHash([u8; 32]);
105 impl BcsHashable<'_> for ExecutionStateViewHash {}
106 let hash = self.inner.historical_hash().await?;
107 Ok(CryptoHash::new(&ExecutionStateViewHash(hash.into())))
108 }
109}
110
111impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
112 type Target = ExecutionStateView<C2>;
113
114 async fn with_context(
115 &mut self,
116 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
117 ) -> Self::Target {
118 ExecutionStateView {
119 inner: self.inner.with_context(ctx.clone()).await,
120 }
121 }
122}
123
124pub struct ServiceRuntimeEndpoint {
126 pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
128 pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
130}
131
132#[cfg(with_testing)]
133impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
134where
135 MemoryContext<TestExecutionRuntimeContext>: Context + Clone + 'static,
136{
137 pub async fn simulate_instantiation(
139 &mut self,
140 contract: UserContractCode,
141 local_time: linera_base::data_types::Timestamp,
142 application_description: ApplicationDescription,
143 instantiation_argument: Vec<u8>,
144 contract_blob: Blob,
145 service_blob: Blob,
146 ) -> Result<(), ExecutionError> {
147 let chain_id = application_description.creator_chain_id;
148 assert_eq!(chain_id, self.context().extra().chain_id);
149 let context = OperationContext {
150 chain_id,
151 authenticated_owner: None,
152 height: application_description.block_height,
153 round: None,
154 timestamp: local_time,
155 };
156
157 let action = UserAction::Instantiate(context, instantiation_argument);
158 let next_application_index = application_description.application_index + 1;
159 let next_chain_index = 0;
160
161 let application_id = From::from(&application_description);
162 let blob = Blob::new_application_description(&application_description);
163
164 self.system.used_blobs.insert(&blob.id())?;
165 self.system.used_blobs.insert(&contract_blob.id())?;
166 self.system.used_blobs.insert(&service_blob.id())?;
167
168 self.context()
169 .extra()
170 .user_contracts()
171 .pin()
172 .insert(application_id, contract);
173
174 self.context()
175 .extra()
176 .add_blobs([
177 contract_blob,
178 service_blob,
179 Blob::new_application_description(&application_description),
180 ])
181 .await?;
182
183 let tracker = ResourceTracker::default();
184 let policy = ResourceControlPolicy::no_fees();
185 let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
186 let mut txn_tracker = TransactionTracker::new(
187 local_time,
188 0,
189 next_application_index,
190 next_chain_index,
191 None,
192 &[],
193 );
194 txn_tracker.add_created_blob(blob);
195 Box::pin(
196 ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
197 .run_user_action(application_id, action, context.refund_grant_to(), None),
198 )
199 .await?;
200
201 Ok(())
202 }
203}
204
205pub enum UserAction {
206 Instantiate(OperationContext, Vec<u8>),
207 Operation(OperationContext, Vec<u8>),
208 Message(MessageContext, Vec<u8>),
209 ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
210}
211
212impl UserAction {
213 pub(crate) fn signer(&self) -> Option<AccountOwner> {
214 match self {
215 UserAction::Instantiate(context, _) => context.authenticated_owner,
216 UserAction::Operation(context, _) => context.authenticated_owner,
217 UserAction::ProcessStreams(_, _) => None,
218 UserAction::Message(context, _) => context.authenticated_owner,
219 }
220 }
221
222 pub(crate) fn height(&self) -> BlockHeight {
223 match self {
224 UserAction::Instantiate(context, _) => context.height,
225 UserAction::Operation(context, _) => context.height,
226 UserAction::ProcessStreams(context, _) => context.height,
227 UserAction::Message(context, _) => context.height,
228 }
229 }
230
231 pub(crate) fn round(&self) -> Option<u32> {
232 match self {
233 UserAction::Instantiate(context, _) => context.round,
234 UserAction::Operation(context, _) => context.round,
235 UserAction::ProcessStreams(context, _) => context.round,
236 UserAction::Message(context, _) => context.round,
237 }
238 }
239
240 pub(crate) fn timestamp(&self) -> Timestamp {
241 match self {
242 UserAction::Instantiate(context, _) => context.timestamp,
243 UserAction::Operation(context, _) => context.timestamp,
244 UserAction::ProcessStreams(context, _) => context.timestamp,
245 UserAction::Message(context, _) => context.timestamp,
246 }
247 }
248}
249
250impl<C> ExecutionStateView<C>
251where
252 C: Context + Clone + 'static,
253 C::Extra: ExecutionRuntimeContext,
254{
255 pub async fn query_application(
256 &mut self,
257 context: QueryContext,
258 query: Query,
259 endpoint: Option<&mut ServiceRuntimeEndpoint>,
260 ) -> Result<QueryOutcome, ExecutionError> {
261 assert_eq!(context.chain_id, self.context().extra().chain_id());
262 match query {
263 Query::System(query) => {
264 let outcome = self.system.handle_query(context, query);
265 Ok(outcome.into())
266 }
267 Query::User {
268 application_id,
269 bytes,
270 } => {
271 let outcome = match endpoint {
272 Some(endpoint) => {
273 self.query_user_application_with_long_lived_service(
274 application_id,
275 context,
276 bytes,
277 &mut endpoint.incoming_execution_requests,
278 &mut endpoint.runtime_request_sender,
279 )
280 .await?
281 }
282 None => {
283 self.query_user_application(application_id, context, bytes)
284 .await?
285 }
286 };
287 Ok(outcome.into())
288 }
289 }
290 }
291
292 async fn query_user_application(
293 &mut self,
294 application_id: ApplicationId,
295 context: QueryContext,
296 query: Vec<u8>,
297 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
298 self.query_user_application_with_deadline(
299 application_id,
300 context,
301 query,
302 None,
303 BTreeMap::new(),
304 )
305 .await
306 }
307
308 pub(crate) async fn query_user_application_with_deadline(
309 &mut self,
310 application_id: ApplicationId,
311 context: QueryContext,
312 query: Vec<u8>,
313 deadline: Option<Instant>,
314 created_blobs: BTreeMap<BlobId, BlobContent>,
315 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
316 let (execution_state_sender, mut execution_state_receiver) =
317 futures::channel::mpsc::unbounded();
318 let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
319 let mut resource_controller = ResourceController::default();
320 let thread_pool = self.context().extra().thread_pool().clone();
321 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
322
323 let (codes, descriptions) = actor.service_and_dependencies(application_id).await?;
324
325 let service_runtime_task = thread_pool
326 .run_send(JsVec(codes), move |codes| async move {
327 let mut runtime = ServiceSyncRuntime::new_with_deadline(
328 execution_state_sender,
329 context,
330 deadline,
331 );
332
333 for (code, description) in codes.0.into_iter().zip(descriptions) {
334 runtime.preload_service(
335 ApplicationId::from(&description),
336 code,
337 description,
338 )?;
339 }
340
341 runtime.run_query(application_id, query)
342 })
343 .await;
344
345 while let Some(request) = execution_state_receiver.next().await {
346 actor.handle_request(request).await?;
347 }
348
349 service_runtime_task.await?
350 }
351
352 async fn query_user_application_with_long_lived_service(
353 &mut self,
354 application_id: ApplicationId,
355 context: QueryContext,
356 query: Vec<u8>,
357 incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
358 ExecutionRequest,
359 >,
360 runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
361 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
362 let (outcome_sender, outcome_receiver) = oneshot::channel();
363 let mut outcome_receiver = outcome_receiver.fuse();
364
365 runtime_request_sender
366 .send(ServiceRuntimeRequest::Query {
367 application_id,
368 context,
369 query,
370 callback: outcome_sender,
371 })
372 .expect("Service runtime thread should only stop when `request_sender` is dropped");
373
374 let mut txn_tracker = TransactionTracker::default();
375 let mut resource_controller = ResourceController::default();
376 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
377
378 loop {
379 futures::select! {
380 maybe_request = incoming_execution_requests.next() => {
381 if let Some(request) = maybe_request {
382 actor.handle_request(request).await?;
383 }
384 }
385 outcome = &mut outcome_receiver => {
386 return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
387 }
388 }
389 }
390 }
391
392 pub async fn list_applications(
393 &self,
394 ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
395 let mut applications = vec![];
396 for app_id in self.users.indices().await? {
397 let blob_id = app_id.description_blob_id();
398 let blob_content = self.system.read_blob_content(blob_id).await?;
399 let application_description = bcs::from_bytes(blob_content.bytes())?;
400 applications.push((app_id, application_description));
401 }
402 Ok(applications)
403 }
404}