Skip to main content

linera_execution/
transaction_tracker.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    future::Future,
7    mem, vec,
8};
9
10use custom_debug_derive::Debug;
11use linera_base::{
12    crypto::CryptoHash,
13    data_types::{Blob, BlobContent, Cursor, Event, OracleResponse, StreamUpdate, Timestamp},
14    ensure,
15    identifiers::{ApplicationId, BlobId, ChainId, StreamId},
16};
17
18use crate::{ExecutionError, OutgoingMessage};
19
20type AppStreamUpdates = BTreeMap<(ChainId, StreamId), (u32, u32)>;
21
22/// Tracks oracle responses and execution outcomes of an ongoing transaction execution, as well
23/// as replayed oracle responses.
24#[derive(Debug, Default)]
25pub struct TransactionTracker {
26    #[debug(skip_if = Option::is_none)]
27    replaying_oracle_responses: Option<vec::IntoIter<OracleResponse>>,
28    #[debug(skip_if = Vec::is_empty)]
29    oracle_responses: Vec<OracleResponse>,
30    #[debug(skip_if = Vec::is_empty)]
31    outgoing_messages: Vec<OutgoingMessage>,
32    /// The current local time.
33    local_time: Timestamp,
34    /// The index of the current transaction in the block.
35    transaction_index: u32,
36    next_application_index: u32,
37    next_chain_index: u32,
38    /// Events recorded by contracts' `emit` calls.
39    events: Vec<Event>,
40    /// Blobs created by contracts.
41    ///
42    /// As of right now, blobs created by the contracts are one of the following types:
43    /// - [`Data`]
44    /// - [`ContractBytecode`]
45    /// - [`ServiceBytecode`]
46    /// - [`EvmBytecode`]
47    /// - [`ApplicationDescription`]
48    /// - [`ChainDescription`]
49    blobs: BTreeMap<BlobId, BlobContent>,
50    /// The blobs created in the previous transactions.
51    previously_created_blobs: BTreeMap<BlobId, BlobContent>,
52    /// Operation result.
53    operation_result: Option<Vec<u8>>,
54    /// Streams that have been updated but not yet processed during this transaction.
55    streams_to_process: BTreeMap<ApplicationId, AppStreamUpdates>,
56    /// Published blobs this transaction refers to by [`BlobId`].
57    blobs_published: BTreeSet<BlobId>,
58    /// Blob IDs created or published by free apps (fees waived).
59    free_blob_ids: BTreeSet<BlobId>,
60    /// Inputs computed pre-block by the checkpoint pre-hook, stashed here so that the
61    /// matching `SystemOperation::Checkpoint` operation handler can use them when it
62    /// runs. The state dump and the inbox snapshot must both be captured before any
63    /// block-level mutation taints the chain state, so we collect them up front and
64    /// hand them off through this tracker.
65    #[debug(skip_if = Option::is_none)]
66    prepared_checkpoint: Option<PreparedCheckpoint>,
67}
68
69/// Pre-block-computed inputs for a `SystemOperation::Checkpoint` transaction.
70#[derive(Clone, Debug)]
71pub struct PreparedCheckpoint {
72    /// The execution-state dump split into blobs at the current epoch's
73    /// `maximum_blob_size`.
74    pub blobs: Vec<Blob>,
75    /// For each chain we've received messages from since our last own checkpoint, the
76    /// position past the last bundle we've consumed. Used to emit a
77    /// `SystemMessage::CheckpointAck` to each origin so the origin can later trim its
78    /// outbox dump. This is the delta over the previous checkpoint, filtered by
79    /// `pending_checkpoint_ack_targets` to break the notification ping-pong.
80    pub origin_cursors: Vec<(ChainId, Cursor)>,
81    /// For *every* inbox with a non-default `next_cursor_to_remove`, the cursor itself.
82    /// A bootstrapping node uses these to seed each inbox's `restored_cursor`, so a
83    /// sender that hasn't seen the matching `CheckpointAck` yet and re-pushes an
84    /// already-consumed bundle is a silent no-op rather than a duplicate consumption.
85    pub inbox_cursors: Vec<(ChainId, Cursor)>,
86    /// Hashes of every block on this chain that the chain's outboxes still reference,
87    /// taken before the block runs. Included in the oracle response so the checkpoint
88    /// block's certificate transitively certifies those older blocks.
89    pub outbox_block_hashes: Vec<CryptoHash>,
90}
91
92/// The [`TransactionTracker`] contents after a transaction has finished.
93#[derive(Debug, Default)]
94pub struct TransactionOutcome {
95    /// The recorded oracle responses.
96    #[debug(skip_if = Vec::is_empty)]
97    pub oracle_responses: Vec<OracleResponse>,
98    /// The messages to be sent to other chains.
99    #[debug(skip_if = Vec::is_empty)]
100    pub outgoing_messages: Vec<OutgoingMessage>,
101    /// The index to be assigned to the next created application.
102    pub next_application_index: u32,
103    /// The index to be assigned to the next created chain.
104    pub next_chain_index: u32,
105    /// Events recorded by contracts' `emit` calls.
106    pub events: Vec<Event>,
107    /// Blobs created by contracts.
108    pub blobs: Vec<Blob>,
109    /// Operation result.
110    pub operation_result: Vec<u8>,
111    /// Blobs published by this transaction.
112    pub blobs_published: BTreeSet<BlobId>,
113    /// Blob IDs created or published by free apps (fees waived).
114    pub free_blob_ids: BTreeSet<BlobId>,
115}
116
117impl TransactionTracker {
118    /// Creates a new [`TransactionTracker`].
119    pub fn new(
120        local_time: Timestamp,
121        transaction_index: u32,
122        next_application_index: u32,
123        next_chain_index: u32,
124        oracle_responses: Option<Vec<OracleResponse>>,
125        blobs: &[Vec<Blob>],
126    ) -> Self {
127        let mut previously_created_blobs = BTreeMap::new();
128        for tx_blobs in blobs {
129            for blob in tx_blobs {
130                previously_created_blobs.insert(blob.id(), blob.content().clone());
131            }
132        }
133        TransactionTracker {
134            local_time,
135            transaction_index,
136            next_application_index,
137            next_chain_index,
138            replaying_oracle_responses: oracle_responses.map(Vec::into_iter),
139            previously_created_blobs,
140            ..Self::default()
141        }
142    }
143
144    /// Sets the blobs known to the tracker and returns the updated tracker.
145    pub fn with_blobs(mut self, blobs: BTreeMap<BlobId, BlobContent>) -> Self {
146        self.blobs = blobs;
147        self
148    }
149
150    /// Stashes pre-block-computed checkpoint inputs on the tracker. The matching
151    /// `SystemOperation::Checkpoint` operation handler will retrieve them via
152    /// [`Self::take_prepared_checkpoint`] when the operation runs.
153    pub fn set_prepared_checkpoint(&mut self, prepared: PreparedCheckpoint) {
154        self.prepared_checkpoint = Some(prepared);
155    }
156
157    /// Takes the pre-block-computed checkpoint inputs, if any were stashed.
158    pub fn take_prepared_checkpoint(&mut self) -> Option<PreparedCheckpoint> {
159        self.prepared_checkpoint.take()
160    }
161
162    /// Returns the local time recorded by the tracker.
163    pub fn local_time(&self) -> Timestamp {
164        self.local_time
165    }
166
167    /// Sets the local time recorded by the tracker.
168    pub fn set_local_time(&mut self, local_time: Timestamp) {
169        self.local_time = local_time;
170    }
171
172    /// Returns the index of the current transaction in the block.
173    pub fn transaction_index(&self) -> u32 {
174        self.transaction_index
175    }
176
177    /// Returns the index that would be assigned to the next created application, without consuming it.
178    pub fn peek_application_index(&self) -> u32 {
179        self.next_application_index
180    }
181
182    /// Returns the index to be assigned to the next created application and increments the counter.
183    pub fn next_application_index(&mut self) -> u32 {
184        let index = self.next_application_index;
185        self.next_application_index += 1;
186        index
187    }
188
189    /// Returns the index to be assigned to the next created chain and increments the counter.
190    pub fn next_chain_index(&mut self) -> u32 {
191        let index = self.next_chain_index;
192        self.next_chain_index += 1;
193        index
194    }
195
196    /// Records an outgoing message.
197    pub fn add_outgoing_message(&mut self, message: OutgoingMessage) {
198        self.outgoing_messages.push(message);
199    }
200
201    /// Records multiple outgoing messages.
202    pub fn add_outgoing_messages(&mut self, messages: impl IntoIterator<Item = OutgoingMessage>) {
203        for message in messages {
204            self.add_outgoing_message(message);
205        }
206    }
207
208    /// Records an event emitted on the given stream.
209    pub fn add_event(&mut self, stream_id: StreamId, index: u32, value: Vec<u8>) {
210        self.events.push(Event {
211            stream_id,
212            index,
213            value,
214        });
215    }
216
217    /// Returns the content of the blob with the given ID, if known to the tracker.
218    pub fn get_blob_content(&self, blob_id: &BlobId) -> Option<&BlobContent> {
219        if let Some(content) = self.blobs.get(blob_id) {
220            return Some(content);
221        }
222        self.previously_created_blobs.get(blob_id)
223    }
224
225    /// Records a blob created by this transaction.
226    pub fn add_created_blob(&mut self, blob: Blob) {
227        self.blobs.insert(blob.id(), blob.into_content());
228    }
229
230    /// Records a blob published by this transaction.
231    pub fn add_published_blob(&mut self, blob_id: BlobId) {
232        self.blobs_published.insert(blob_id);
233    }
234
235    /// Marks a blob as created/published by a free app, so its fees will be waived.
236    pub fn mark_blob_free(&mut self, blob_id: BlobId) {
237        self.free_blob_ids.insert(blob_id);
238    }
239
240    /// Returns the blobs created by this transaction.
241    pub fn created_blobs(&self) -> &BTreeMap<BlobId, BlobContent> {
242        &self.blobs
243    }
244
245    /// Records the result of the operation.
246    pub fn add_operation_result(&mut self, result: Option<Vec<u8>>) {
247        self.operation_result = result
248    }
249
250    /// In replay mode, returns the next recorded oracle response. Otherwise executes `f` and
251    /// records and returns the result. `f` is the implementation of the actual oracle and is
252    /// only called in validation mode, so it does not have to be fully deterministic.
253    pub async fn oracle<F, G>(&mut self, f: F) -> Result<&OracleResponse, ExecutionError>
254    where
255        F: FnOnce() -> G,
256        G: Future<Output = Result<OracleResponse, ExecutionError>>,
257    {
258        let response = match self.next_replayed_oracle_response()? {
259            Some(response) => response,
260            None => f().await?,
261        };
262        self.oracle_responses.push(response);
263        Ok(self.oracle_responses.last().unwrap())
264    }
265
266    /// Records that the given range of events on a stream must be processed by the application.
267    pub fn add_stream_to_process(
268        &mut self,
269        application_id: ApplicationId,
270        chain_id: ChainId,
271        stream_id: StreamId,
272        previous_index: u32,
273        next_index: u32,
274    ) {
275        if next_index == previous_index {
276            return; // No new events in the stream.
277        }
278        self.streams_to_process
279            .entry(application_id)
280            .or_default()
281            .entry((chain_id, stream_id))
282            .and_modify(|(pi, ni)| {
283                *pi = (*pi).min(previous_index);
284                *ni = (*ni).max(next_index);
285            })
286            .or_insert_with(|| (previous_index, next_index));
287    }
288
289    /// Removes a stream from the set of streams to be processed by the application.
290    pub fn remove_stream_to_process(
291        &mut self,
292        application_id: ApplicationId,
293        chain_id: ChainId,
294        stream_id: StreamId,
295    ) {
296        let Some(streams) = self.streams_to_process.get_mut(&application_id) else {
297            return;
298        };
299        if streams.remove(&(chain_id, stream_id)).is_some() && streams.is_empty() {
300            self.streams_to_process.remove(&application_id);
301        }
302    }
303
304    /// Takes the streams to be processed, grouped by application.
305    pub fn take_streams_to_process(&mut self) -> BTreeMap<ApplicationId, Vec<StreamUpdate>> {
306        mem::take(&mut self.streams_to_process)
307            .into_iter()
308            .map(|(app_id, streams)| {
309                let updates = streams
310                    .into_iter()
311                    .map(
312                        |((chain_id, stream_id), (previous_index, next_index))| StreamUpdate {
313                            chain_id,
314                            stream_id,
315                            previous_index,
316                            next_index,
317                        },
318                    )
319                    .collect();
320                (app_id, updates)
321            })
322            .collect()
323    }
324
325    /// Adds the oracle response to the record.
326    /// If replaying, it also checks that it matches the next replayed one and returns `true`.
327    pub fn replay_oracle_response(
328        &mut self,
329        oracle_response: OracleResponse,
330    ) -> Result<bool, ExecutionError> {
331        let replaying = if let Some(recorded_response) = self.next_replayed_oracle_response()? {
332            ensure!(
333                recorded_response == oracle_response,
334                ExecutionError::OracleResponseMismatch
335            );
336            true
337        } else {
338            false
339        };
340        self.oracle_responses.push(oracle_response);
341        Ok(replaying)
342    }
343
344    /// If in replay mode, returns the next oracle response, or an error if it is missing.
345    ///
346    /// If not in replay mode, `None` is returned, and the caller must execute the actual oracle
347    /// to obtain the value.
348    ///
349    /// In both cases, the value (returned or obtained from the oracle) must be recorded using
350    /// `add_oracle_response`.
351    fn next_replayed_oracle_response(&mut self) -> Result<Option<OracleResponse>, ExecutionError> {
352        let Some(responses) = &mut self.replaying_oracle_responses else {
353            return Ok(None); // Not in replay mode.
354        };
355        let response = responses
356            .next()
357            .ok_or_else(|| ExecutionError::MissingOracleResponse)?;
358        Ok(Some(response))
359    }
360
361    /// Consumes the tracker and returns the resulting [`TransactionOutcome`].
362    pub fn into_outcome(self) -> Result<TransactionOutcome, ExecutionError> {
363        let TransactionTracker {
364            replaying_oracle_responses,
365            oracle_responses,
366            outgoing_messages,
367            local_time: _,
368            transaction_index: _,
369            next_application_index,
370            next_chain_index,
371            events,
372            blobs,
373            previously_created_blobs: _,
374            operation_result,
375            streams_to_process,
376            blobs_published,
377            free_blob_ids,
378            prepared_checkpoint: _,
379        } = self;
380        ensure!(
381            streams_to_process.is_empty(),
382            ExecutionError::UnprocessedStreams
383        );
384        if let Some(mut responses) = replaying_oracle_responses {
385            ensure!(
386                responses.next().is_none(),
387                ExecutionError::UnexpectedOracleResponse
388            );
389        }
390        let blobs = blobs
391            .into_iter()
392            .map(|(blob_id, content)| Blob::new_with_hash_unchecked(blob_id, content))
393            .collect::<Vec<_>>();
394        Ok(TransactionOutcome {
395            outgoing_messages,
396            oracle_responses,
397            next_application_index,
398            next_chain_index,
399            events,
400            blobs,
401            operation_result: operation_result.unwrap_or_default(),
402            blobs_published,
403            free_blob_ids,
404        })
405    }
406}
407
408#[cfg(with_testing)]
409impl TransactionTracker {
410    /// Creates a new [`TransactionTracker`] for testing, with default values and the given
411    /// oracle responses.
412    pub fn new_replaying(oracle_responses: Vec<OracleResponse>) -> Self {
413        TransactionTracker::new(Timestamp::from(0), 0, 0, 0, Some(oracle_responses), &[])
414    }
415
416    /// Creates a new [`TransactionTracker`] for testing, with default values and oracle responses
417    /// for the given blobs.
418    pub fn new_replaying_blobs<T>(blob_ids: T) -> Self
419    where
420        T: IntoIterator,
421        T::Item: std::borrow::Borrow<BlobId>,
422    {
423        use std::borrow::Borrow;
424
425        let oracle_responses = blob_ids
426            .into_iter()
427            .map(|blob_id| OracleResponse::Blob(*blob_id.borrow()))
428            .collect();
429        TransactionTracker::new_replaying(oracle_responses)
430    }
431}