1use std::{collections::BTreeMap, vec};
5
6use allocative::Allocative;
7use futures::{FutureExt, StreamExt};
8use linera_base::{
9 data_types::{BlobContent, BlockHeight, StreamUpdate},
10 identifiers::{AccountOwner, BlobId},
11 time::Instant,
12};
13use linera_views::{
14 context::Context,
15 key_value_store_view::KeyValueStoreView,
16 reentrant_collection_view::HashedReentrantCollectionView,
17 views::{ClonableView, ReplaceContext, View},
18};
19use linera_views_derive::CryptoHashView;
20#[cfg(with_testing)]
21use {
22 crate::{
23 ResourceControlPolicy, ResourceTracker, TestExecutionRuntimeContext, UserContractCode,
24 },
25 linera_base::data_types::Blob,
26 linera_views::context::MemoryContext,
27 std::sync::Arc,
28};
29
30use super::{execution_state_actor::ExecutionRequest, runtime::ServiceRuntimeRequest};
31use crate::{
32 execution_state_actor::ExecutionStateActor, resources::ResourceController,
33 system::SystemExecutionStateView, ApplicationDescription, ApplicationId, ExecutionError,
34 ExecutionRuntimeConfig, ExecutionRuntimeContext, MessageContext, OperationContext,
35 ProcessStreamsContext, Query, QueryContext, QueryOutcome, ServiceSyncRuntime, Timestamp,
36 TransactionTracker,
37};
38
39#[derive(Debug, ClonableView, CryptoHashView, Allocative)]
41#[allocative(bound = "C")]
42pub struct ExecutionStateView<C> {
43 pub system: SystemExecutionStateView<C>,
45 pub users: HashedReentrantCollectionView<C, ApplicationId, KeyValueStoreView<C>>,
47}
48
49impl<C: Context, C2: Context> ReplaceContext<C2> for ExecutionStateView<C> {
50 type Target = ExecutionStateView<C2>;
51
52 async fn with_context(
53 &mut self,
54 ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
55 ) -> Self::Target {
56 ExecutionStateView {
57 system: self.system.with_context(ctx.clone()).await,
58 users: self.users.with_context(ctx.clone()).await,
59 }
60 }
61}
62
63pub struct ServiceRuntimeEndpoint {
65 pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver<ExecutionRequest>,
67 pub runtime_request_sender: std::sync::mpsc::Sender<ServiceRuntimeRequest>,
69}
70
71#[cfg(with_testing)]
72impl ExecutionStateView<MemoryContext<TestExecutionRuntimeContext>>
73where
74 MemoryContext<TestExecutionRuntimeContext>: Context + Clone + Send + Sync + 'static,
75{
76 pub async fn simulate_instantiation(
78 &mut self,
79 contract: UserContractCode,
80 local_time: linera_base::data_types::Timestamp,
81 application_description: ApplicationDescription,
82 instantiation_argument: Vec<u8>,
83 contract_blob: Blob,
84 service_blob: Blob,
85 ) -> Result<(), ExecutionError> {
86 let chain_id = application_description.creator_chain_id;
87 assert_eq!(chain_id, self.context().extra().chain_id);
88 let context = OperationContext {
89 chain_id,
90 authenticated_owner: None,
91 height: application_description.block_height,
92 round: None,
93 timestamp: local_time,
94 };
95
96 let action = UserAction::Instantiate(context, instantiation_argument);
97 let next_application_index = application_description.application_index + 1;
98 let next_chain_index = 0;
99
100 let application_id = From::from(&application_description);
101 let blob = Blob::new_application_description(&application_description);
102
103 self.system.used_blobs.insert(&blob.id())?;
104 self.system.used_blobs.insert(&contract_blob.id())?;
105 self.system.used_blobs.insert(&service_blob.id())?;
106
107 self.context()
108 .extra()
109 .user_contracts()
110 .pin()
111 .insert(application_id, contract);
112
113 self.context()
114 .extra()
115 .add_blobs([
116 contract_blob,
117 service_blob,
118 Blob::new_application_description(&application_description),
119 ])
120 .await?;
121
122 let tracker = ResourceTracker::default();
123 let policy = ResourceControlPolicy::no_fees();
124 let mut resource_controller = ResourceController::new(Arc::new(policy), tracker, None);
125 let mut txn_tracker = TransactionTracker::new(
126 local_time,
127 0,
128 next_application_index,
129 next_chain_index,
130 None,
131 &[],
132 );
133 txn_tracker.add_created_blob(blob);
134 Box::pin(
135 ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller)
136 .run_user_action(application_id, action, context.refund_grant_to(), None),
137 )
138 .await?;
139
140 Ok(())
141 }
142}
143
144pub enum UserAction {
145 Instantiate(OperationContext, Vec<u8>),
146 Operation(OperationContext, Vec<u8>),
147 Message(MessageContext, Vec<u8>),
148 ProcessStreams(ProcessStreamsContext, Vec<StreamUpdate>),
149}
150
151impl UserAction {
152 pub(crate) fn signer(&self) -> Option<AccountOwner> {
153 match self {
154 UserAction::Instantiate(context, _) => context.authenticated_owner,
155 UserAction::Operation(context, _) => context.authenticated_owner,
156 UserAction::ProcessStreams(_, _) => None,
157 UserAction::Message(context, _) => context.authenticated_owner,
158 }
159 }
160
161 pub(crate) fn height(&self) -> BlockHeight {
162 match self {
163 UserAction::Instantiate(context, _) => context.height,
164 UserAction::Operation(context, _) => context.height,
165 UserAction::ProcessStreams(context, _) => context.height,
166 UserAction::Message(context, _) => context.height,
167 }
168 }
169
170 pub(crate) fn round(&self) -> Option<u32> {
171 match self {
172 UserAction::Instantiate(context, _) => context.round,
173 UserAction::Operation(context, _) => context.round,
174 UserAction::ProcessStreams(context, _) => context.round,
175 UserAction::Message(context, _) => context.round,
176 }
177 }
178
179 pub(crate) fn timestamp(&self) -> Timestamp {
180 match self {
181 UserAction::Instantiate(context, _) => context.timestamp,
182 UserAction::Operation(context, _) => context.timestamp,
183 UserAction::ProcessStreams(context, _) => context.timestamp,
184 UserAction::Message(context, _) => context.timestamp,
185 }
186 }
187}
188
189impl<C> ExecutionStateView<C>
190where
191 C: Context + Clone + Send + Sync + 'static,
192 C::Extra: ExecutionRuntimeContext,
193{
194 pub async fn query_application(
195 &mut self,
196 context: QueryContext,
197 query: Query,
198 endpoint: Option<&mut ServiceRuntimeEndpoint>,
199 ) -> Result<QueryOutcome, ExecutionError> {
200 assert_eq!(context.chain_id, self.context().extra().chain_id());
201 match query {
202 Query::System(query) => {
203 let outcome = self.system.handle_query(context, query);
204 Ok(outcome.into())
205 }
206 Query::User {
207 application_id,
208 bytes,
209 } => {
210 let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config();
211 let outcome = match endpoint {
212 Some(endpoint) => {
213 self.query_user_application_with_long_lived_service(
214 application_id,
215 context,
216 bytes,
217 &mut endpoint.incoming_execution_requests,
218 &mut endpoint.runtime_request_sender,
219 )
220 .await?
221 }
222 None => {
223 self.query_user_application(application_id, context, bytes)
224 .await?
225 }
226 };
227 Ok(outcome.into())
228 }
229 }
230 }
231
232 async fn query_user_application(
233 &mut self,
234 application_id: ApplicationId,
235 context: QueryContext,
236 query: Vec<u8>,
237 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
238 self.query_user_application_with_deadline(
239 application_id,
240 context,
241 query,
242 None,
243 BTreeMap::new(),
244 )
245 .await
246 }
247
248 pub(crate) async fn query_user_application_with_deadline(
249 &mut self,
250 application_id: ApplicationId,
251 context: QueryContext,
252 query: Vec<u8>,
253 deadline: Option<Instant>,
254 created_blobs: BTreeMap<BlobId, BlobContent>,
255 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
256 let (execution_state_sender, mut execution_state_receiver) =
257 futures::channel::mpsc::unbounded();
258 let mut txn_tracker = TransactionTracker::default().with_blobs(created_blobs);
259 let mut resource_controller = ResourceController::default();
260 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
261 let (code, description) = actor.load_service(application_id).await?;
262
263 let service_runtime_task = linera_base::task::Blocking::spawn(move |mut codes| {
264 let mut runtime =
265 ServiceSyncRuntime::new_with_deadline(execution_state_sender, context, deadline);
266
267 async move {
268 let code = codes.next().await.expect("we send this immediately below");
269 runtime.preload_service(application_id, code, description)?;
270 runtime.run_query(application_id, query)
271 }
272 })
273 .await;
274
275 service_runtime_task.send(code)?;
276
277 while let Some(request) = execution_state_receiver.next().await {
278 actor.handle_request(request).await?;
279 }
280
281 service_runtime_task.join().await
282 }
283
284 async fn query_user_application_with_long_lived_service(
285 &mut self,
286 application_id: ApplicationId,
287 context: QueryContext,
288 query: Vec<u8>,
289 incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver<
290 ExecutionRequest,
291 >,
292 runtime_request_sender: &mut std::sync::mpsc::Sender<ServiceRuntimeRequest>,
293 ) -> Result<QueryOutcome<Vec<u8>>, ExecutionError> {
294 let (outcome_sender, outcome_receiver) = oneshot::channel();
295 let mut outcome_receiver = outcome_receiver.fuse();
296
297 runtime_request_sender
298 .send(ServiceRuntimeRequest::Query {
299 application_id,
300 context,
301 query,
302 callback: outcome_sender,
303 })
304 .expect("Service runtime thread should only stop when `request_sender` is dropped");
305
306 let mut txn_tracker = TransactionTracker::default();
307 let mut resource_controller = ResourceController::default();
308 let mut actor = ExecutionStateActor::new(self, &mut txn_tracker, &mut resource_controller);
309
310 loop {
311 futures::select! {
312 maybe_request = incoming_execution_requests.next() => {
313 if let Some(request) = maybe_request {
314 actor.handle_request(request).await?;
315 }
316 }
317 outcome = &mut outcome_receiver => {
318 return outcome.map_err(|_| ExecutionError::MissingRuntimeResponse)?;
319 }
320 }
321 }
322 }
323
324 pub async fn list_applications(
325 &self,
326 ) -> Result<Vec<(ApplicationId, ApplicationDescription)>, ExecutionError> {
327 let mut applications = vec![];
328 for app_id in self.users.indices().await? {
329 let blob_id = app_id.description_blob_id();
330 let blob_content = self.system.read_blob_content(blob_id).await?;
331 let application_description = bcs::from_bytes(blob_content.bytes())?;
332 applications.push((app_id, application_description));
333 }
334 Ok(applications)
335 }
336}