1use std::{
5 collections::{BTreeMap, BTreeSet},
6 future::Future,
7 mem, vec,
8};
9
10use custom_debug_derive::Debug;
11use linera_base::{
12 crypto::CryptoHash,
13 data_types::{Blob, BlobContent, Cursor, Event, OracleResponse, StreamUpdate, Timestamp},
14 ensure,
15 identifiers::{ApplicationId, BlobId, ChainId, StreamId},
16};
17
18use crate::{ExecutionError, OutgoingMessage};
19
20type AppStreamUpdates = BTreeMap<(ChainId, StreamId), (u32, u32)>;
21
22#[derive(Debug, Default)]
25pub struct TransactionTracker {
26 #[debug(skip_if = Option::is_none)]
27 replaying_oracle_responses: Option<vec::IntoIter<OracleResponse>>,
28 #[debug(skip_if = Vec::is_empty)]
29 oracle_responses: Vec<OracleResponse>,
30 #[debug(skip_if = Vec::is_empty)]
31 outgoing_messages: Vec<OutgoingMessage>,
32 local_time: Timestamp,
34 transaction_index: u32,
36 next_application_index: u32,
37 next_chain_index: u32,
38 events: Vec<Event>,
40 blobs: BTreeMap<BlobId, BlobContent>,
50 previously_created_blobs: BTreeMap<BlobId, BlobContent>,
52 operation_result: Option<Vec<u8>>,
54 streams_to_process: BTreeMap<ApplicationId, AppStreamUpdates>,
56 blobs_published: BTreeSet<BlobId>,
58 free_blob_ids: BTreeSet<BlobId>,
60 #[debug(skip_if = Option::is_none)]
66 prepared_checkpoint: Option<PreparedCheckpoint>,
67}
68
69#[derive(Clone, Debug)]
71pub struct PreparedCheckpoint {
72 pub blobs: Vec<Blob>,
75 pub origin_cursors: Vec<(ChainId, Cursor)>,
81 pub inbox_cursors: Vec<(ChainId, Cursor)>,
86 pub outbox_block_hashes: Vec<CryptoHash>,
90}
91
92#[derive(Debug, Default)]
94pub struct TransactionOutcome {
95 #[debug(skip_if = Vec::is_empty)]
97 pub oracle_responses: Vec<OracleResponse>,
98 #[debug(skip_if = Vec::is_empty)]
100 pub outgoing_messages: Vec<OutgoingMessage>,
101 pub next_application_index: u32,
103 pub next_chain_index: u32,
105 pub events: Vec<Event>,
107 pub blobs: Vec<Blob>,
109 pub operation_result: Vec<u8>,
111 pub blobs_published: BTreeSet<BlobId>,
113 pub free_blob_ids: BTreeSet<BlobId>,
115}
116
117impl TransactionTracker {
118 pub fn new(
120 local_time: Timestamp,
121 transaction_index: u32,
122 next_application_index: u32,
123 next_chain_index: u32,
124 oracle_responses: Option<Vec<OracleResponse>>,
125 blobs: &[Vec<Blob>],
126 ) -> Self {
127 let mut previously_created_blobs = BTreeMap::new();
128 for tx_blobs in blobs {
129 for blob in tx_blobs {
130 previously_created_blobs.insert(blob.id(), blob.content().clone());
131 }
132 }
133 TransactionTracker {
134 local_time,
135 transaction_index,
136 next_application_index,
137 next_chain_index,
138 replaying_oracle_responses: oracle_responses.map(Vec::into_iter),
139 previously_created_blobs,
140 ..Self::default()
141 }
142 }
143
144 pub fn with_blobs(mut self, blobs: BTreeMap<BlobId, BlobContent>) -> Self {
146 self.blobs = blobs;
147 self
148 }
149
150 pub fn set_prepared_checkpoint(&mut self, prepared: PreparedCheckpoint) {
154 self.prepared_checkpoint = Some(prepared);
155 }
156
157 pub fn take_prepared_checkpoint(&mut self) -> Option<PreparedCheckpoint> {
159 self.prepared_checkpoint.take()
160 }
161
162 pub fn local_time(&self) -> Timestamp {
164 self.local_time
165 }
166
167 pub fn set_local_time(&mut self, local_time: Timestamp) {
169 self.local_time = local_time;
170 }
171
172 pub fn transaction_index(&self) -> u32 {
174 self.transaction_index
175 }
176
177 pub fn peek_application_index(&self) -> u32 {
179 self.next_application_index
180 }
181
182 pub fn next_application_index(&mut self) -> u32 {
184 let index = self.next_application_index;
185 self.next_application_index += 1;
186 index
187 }
188
189 pub fn next_chain_index(&mut self) -> u32 {
191 let index = self.next_chain_index;
192 self.next_chain_index += 1;
193 index
194 }
195
196 pub fn add_outgoing_message(&mut self, message: OutgoingMessage) {
198 self.outgoing_messages.push(message);
199 }
200
201 pub fn add_outgoing_messages(&mut self, messages: impl IntoIterator<Item = OutgoingMessage>) {
203 for message in messages {
204 self.add_outgoing_message(message);
205 }
206 }
207
208 pub fn add_event(&mut self, stream_id: StreamId, index: u32, value: Vec<u8>) {
210 self.events.push(Event {
211 stream_id,
212 index,
213 value,
214 });
215 }
216
217 pub fn get_blob_content(&self, blob_id: &BlobId) -> Option<&BlobContent> {
219 if let Some(content) = self.blobs.get(blob_id) {
220 return Some(content);
221 }
222 self.previously_created_blobs.get(blob_id)
223 }
224
225 pub fn add_created_blob(&mut self, blob: Blob) {
227 self.blobs.insert(blob.id(), blob.into_content());
228 }
229
230 pub fn add_published_blob(&mut self, blob_id: BlobId) {
232 self.blobs_published.insert(blob_id);
233 }
234
235 pub fn mark_blob_free(&mut self, blob_id: BlobId) {
237 self.free_blob_ids.insert(blob_id);
238 }
239
240 pub fn created_blobs(&self) -> &BTreeMap<BlobId, BlobContent> {
242 &self.blobs
243 }
244
245 pub fn add_operation_result(&mut self, result: Option<Vec<u8>>) {
247 self.operation_result = result
248 }
249
250 pub async fn oracle<F, G>(&mut self, f: F) -> Result<&OracleResponse, ExecutionError>
254 where
255 F: FnOnce() -> G,
256 G: Future<Output = Result<OracleResponse, ExecutionError>>,
257 {
258 let response = match self.next_replayed_oracle_response()? {
259 Some(response) => response,
260 None => f().await?,
261 };
262 self.oracle_responses.push(response);
263 Ok(self.oracle_responses.last().unwrap())
264 }
265
266 pub fn add_stream_to_process(
268 &mut self,
269 application_id: ApplicationId,
270 chain_id: ChainId,
271 stream_id: StreamId,
272 previous_index: u32,
273 next_index: u32,
274 ) {
275 if next_index == previous_index {
276 return; }
278 self.streams_to_process
279 .entry(application_id)
280 .or_default()
281 .entry((chain_id, stream_id))
282 .and_modify(|(pi, ni)| {
283 *pi = (*pi).min(previous_index);
284 *ni = (*ni).max(next_index);
285 })
286 .or_insert_with(|| (previous_index, next_index));
287 }
288
289 pub fn remove_stream_to_process(
291 &mut self,
292 application_id: ApplicationId,
293 chain_id: ChainId,
294 stream_id: StreamId,
295 ) {
296 let Some(streams) = self.streams_to_process.get_mut(&application_id) else {
297 return;
298 };
299 if streams.remove(&(chain_id, stream_id)).is_some() && streams.is_empty() {
300 self.streams_to_process.remove(&application_id);
301 }
302 }
303
304 pub fn take_streams_to_process(&mut self) -> BTreeMap<ApplicationId, Vec<StreamUpdate>> {
306 mem::take(&mut self.streams_to_process)
307 .into_iter()
308 .map(|(app_id, streams)| {
309 let updates = streams
310 .into_iter()
311 .map(
312 |((chain_id, stream_id), (previous_index, next_index))| StreamUpdate {
313 chain_id,
314 stream_id,
315 previous_index,
316 next_index,
317 },
318 )
319 .collect();
320 (app_id, updates)
321 })
322 .collect()
323 }
324
325 pub fn replay_oracle_response(
328 &mut self,
329 oracle_response: OracleResponse,
330 ) -> Result<bool, ExecutionError> {
331 let replaying = if let Some(recorded_response) = self.next_replayed_oracle_response()? {
332 ensure!(
333 recorded_response == oracle_response,
334 ExecutionError::OracleResponseMismatch
335 );
336 true
337 } else {
338 false
339 };
340 self.oracle_responses.push(oracle_response);
341 Ok(replaying)
342 }
343
344 fn next_replayed_oracle_response(&mut self) -> Result<Option<OracleResponse>, ExecutionError> {
352 let Some(responses) = &mut self.replaying_oracle_responses else {
353 return Ok(None); };
355 let response = responses
356 .next()
357 .ok_or_else(|| ExecutionError::MissingOracleResponse)?;
358 Ok(Some(response))
359 }
360
361 pub fn into_outcome(self) -> Result<TransactionOutcome, ExecutionError> {
363 let TransactionTracker {
364 replaying_oracle_responses,
365 oracle_responses,
366 outgoing_messages,
367 local_time: _,
368 transaction_index: _,
369 next_application_index,
370 next_chain_index,
371 events,
372 blobs,
373 previously_created_blobs: _,
374 operation_result,
375 streams_to_process,
376 blobs_published,
377 free_blob_ids,
378 prepared_checkpoint: _,
379 } = self;
380 ensure!(
381 streams_to_process.is_empty(),
382 ExecutionError::UnprocessedStreams
383 );
384 if let Some(mut responses) = replaying_oracle_responses {
385 ensure!(
386 responses.next().is_none(),
387 ExecutionError::UnexpectedOracleResponse
388 );
389 }
390 let blobs = blobs
391 .into_iter()
392 .map(|(blob_id, content)| Blob::new_with_hash_unchecked(blob_id, content))
393 .collect::<Vec<_>>();
394 Ok(TransactionOutcome {
395 outgoing_messages,
396 oracle_responses,
397 next_application_index,
398 next_chain_index,
399 events,
400 blobs,
401 operation_result: operation_result.unwrap_or_default(),
402 blobs_published,
403 free_blob_ids,
404 })
405 }
406}
407
408#[cfg(with_testing)]
409impl TransactionTracker {
410 pub fn new_replaying(oracle_responses: Vec<OracleResponse>) -> Self {
413 TransactionTracker::new(Timestamp::from(0), 0, 0, 0, Some(oracle_responses), &[])
414 }
415
416 pub fn new_replaying_blobs<T>(blob_ids: T) -> Self
419 where
420 T: IntoIterator,
421 T::Item: std::borrow::Borrow<BlobId>,
422 {
423 use std::borrow::Borrow;
424
425 let oracle_responses = blob_ids
426 .into_iter()
427 .map(|blob_id| OracleResponse::Blob(*blob_id.borrow()))
428 .collect();
429 TransactionTracker::new_replaying(oracle_responses)
430 }
431}