linera_execution/
transaction_tracker.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    future::Future,
7    mem, vec,
8};
9
10use custom_debug_derive::Debug;
11use linera_base::{
12    data_types::{Blob, BlobContent, Event, OracleResponse, StreamUpdate, Timestamp},
13    ensure,
14    identifiers::{ApplicationId, BlobId, ChainId, StreamId},
15};
16
17use crate::{ExecutionError, OutgoingMessage};
18
19type AppStreamUpdates = BTreeMap<(ChainId, StreamId), (u32, u32)>;
20
21/// Tracks oracle responses and execution outcomes of an ongoing transaction execution, as well
22/// as replayed oracle responses.
23#[derive(Debug, Default)]
24pub struct TransactionTracker {
25    #[debug(skip_if = Option::is_none)]
26    replaying_oracle_responses: Option<vec::IntoIter<OracleResponse>>,
27    #[debug(skip_if = Vec::is_empty)]
28    oracle_responses: Vec<OracleResponse>,
29    #[debug(skip_if = Vec::is_empty)]
30    outgoing_messages: Vec<OutgoingMessage>,
31    /// The current local time.
32    local_time: Timestamp,
33    /// The index of the current transaction in the block.
34    transaction_index: u32,
35    next_application_index: u32,
36    next_chain_index: u32,
37    /// Events recorded by contracts' `emit` calls.
38    events: Vec<Event>,
39    /// Blobs created by contracts.
40    ///
41    /// As of right now, blobs created by the contracts are one of the following types:
42    /// - [`Data`]
43    /// - [`ContractBytecode`]
44    /// - [`ServiceBytecode`]
45    /// - [`EvmBytecode`]
46    /// - [`ApplicationDescription`]
47    /// - [`ChainDescription`]
48    blobs: BTreeMap<BlobId, BlobContent>,
49    /// The blobs created in the previous transactions.
50    previously_created_blobs: BTreeMap<BlobId, BlobContent>,
51    /// Operation result.
52    operation_result: Option<Vec<u8>>,
53    /// Streams that have been updated but not yet processed during this transaction.
54    streams_to_process: BTreeMap<ApplicationId, AppStreamUpdates>,
55    /// Published blobs this transaction refers to by [`BlobId`].
56    blobs_published: BTreeSet<BlobId>,
57    /// Blob IDs created or published by free apps (fees waived).
58    free_blob_ids: BTreeSet<BlobId>,
59}
60
61/// The [`TransactionTracker`] contents after a transaction has finished.
62#[derive(Debug, Default)]
63pub struct TransactionOutcome {
64    #[debug(skip_if = Vec::is_empty)]
65    pub oracle_responses: Vec<OracleResponse>,
66    #[debug(skip_if = Vec::is_empty)]
67    pub outgoing_messages: Vec<OutgoingMessage>,
68    pub next_application_index: u32,
69    pub next_chain_index: u32,
70    /// Events recorded by contracts' `emit` calls.
71    pub events: Vec<Event>,
72    /// Blobs created by contracts.
73    pub blobs: Vec<Blob>,
74    /// Operation result.
75    pub operation_result: Vec<u8>,
76    /// Blobs published by this transaction.
77    pub blobs_published: BTreeSet<BlobId>,
78    /// Blob IDs created or published by free apps (fees waived).
79    pub free_blob_ids: BTreeSet<BlobId>,
80}
81
82impl TransactionTracker {
83    pub fn new(
84        local_time: Timestamp,
85        transaction_index: u32,
86        next_application_index: u32,
87        next_chain_index: u32,
88        oracle_responses: Option<Vec<OracleResponse>>,
89        blobs: &[Vec<Blob>],
90    ) -> Self {
91        let mut previously_created_blobs = BTreeMap::new();
92        for tx_blobs in blobs {
93            for blob in tx_blobs {
94                previously_created_blobs.insert(blob.id(), blob.content().clone());
95            }
96        }
97        TransactionTracker {
98            local_time,
99            transaction_index,
100            next_application_index,
101            next_chain_index,
102            replaying_oracle_responses: oracle_responses.map(Vec::into_iter),
103            previously_created_blobs,
104            ..Self::default()
105        }
106    }
107
108    pub fn with_blobs(mut self, blobs: BTreeMap<BlobId, BlobContent>) -> Self {
109        self.blobs = blobs;
110        self
111    }
112
113    pub fn local_time(&self) -> Timestamp {
114        self.local_time
115    }
116
117    pub fn set_local_time(&mut self, local_time: Timestamp) {
118        self.local_time = local_time;
119    }
120
121    pub fn transaction_index(&self) -> u32 {
122        self.transaction_index
123    }
124
125    pub fn peek_application_index(&self) -> u32 {
126        self.next_application_index
127    }
128
129    pub fn next_application_index(&mut self) -> u32 {
130        let index = self.next_application_index;
131        self.next_application_index += 1;
132        index
133    }
134
135    pub fn next_chain_index(&mut self) -> u32 {
136        let index = self.next_chain_index;
137        self.next_chain_index += 1;
138        index
139    }
140
141    pub fn add_outgoing_message(&mut self, message: OutgoingMessage) {
142        self.outgoing_messages.push(message);
143    }
144
145    pub fn add_outgoing_messages(&mut self, messages: impl IntoIterator<Item = OutgoingMessage>) {
146        for message in messages {
147            self.add_outgoing_message(message);
148        }
149    }
150
151    pub fn add_event(&mut self, stream_id: StreamId, index: u32, value: Vec<u8>) {
152        self.events.push(Event {
153            stream_id,
154            index,
155            value,
156        });
157    }
158
159    pub fn get_blob_content(&self, blob_id: &BlobId) -> Option<&BlobContent> {
160        if let Some(content) = self.blobs.get(blob_id) {
161            return Some(content);
162        }
163        self.previously_created_blobs.get(blob_id)
164    }
165
166    pub fn add_created_blob(&mut self, blob: Blob) {
167        self.blobs.insert(blob.id(), blob.into_content());
168    }
169
170    pub fn add_published_blob(&mut self, blob_id: BlobId) {
171        self.blobs_published.insert(blob_id);
172    }
173
174    /// Marks a blob as created/published by a free app, so its fees will be waived.
175    pub fn mark_blob_free(&mut self, blob_id: BlobId) {
176        self.free_blob_ids.insert(blob_id);
177    }
178
179    pub fn created_blobs(&self) -> &BTreeMap<BlobId, BlobContent> {
180        &self.blobs
181    }
182
183    pub fn add_operation_result(&mut self, result: Option<Vec<u8>>) {
184        self.operation_result = result
185    }
186
187    /// In replay mode, returns the next recorded oracle response. Otherwise executes `f` and
188    /// records and returns the result. `f` is the implementation of the actual oracle and is
189    /// only called in validation mode, so it does not have to be fully deterministic.
190    pub async fn oracle<F, G>(&mut self, f: F) -> Result<&OracleResponse, ExecutionError>
191    where
192        F: FnOnce() -> G,
193        G: Future<Output = Result<OracleResponse, ExecutionError>>,
194    {
195        let response = match self.next_replayed_oracle_response()? {
196            Some(response) => response,
197            None => f().await?,
198        };
199        self.oracle_responses.push(response);
200        Ok(self.oracle_responses.last().unwrap())
201    }
202
203    pub fn add_stream_to_process(
204        &mut self,
205        application_id: ApplicationId,
206        chain_id: ChainId,
207        stream_id: StreamId,
208        previous_index: u32,
209        next_index: u32,
210    ) {
211        if next_index == previous_index {
212            return; // No new events in the stream.
213        }
214        self.streams_to_process
215            .entry(application_id)
216            .or_default()
217            .entry((chain_id, stream_id))
218            .and_modify(|(pi, ni)| {
219                *pi = (*pi).min(previous_index);
220                *ni = (*ni).max(next_index);
221            })
222            .or_insert_with(|| (previous_index, next_index));
223    }
224
225    pub fn remove_stream_to_process(
226        &mut self,
227        application_id: ApplicationId,
228        chain_id: ChainId,
229        stream_id: StreamId,
230    ) {
231        let Some(streams) = self.streams_to_process.get_mut(&application_id) else {
232            return;
233        };
234        if streams.remove(&(chain_id, stream_id)).is_some() && streams.is_empty() {
235            self.streams_to_process.remove(&application_id);
236        }
237    }
238
239    pub fn take_streams_to_process(&mut self) -> BTreeMap<ApplicationId, Vec<StreamUpdate>> {
240        mem::take(&mut self.streams_to_process)
241            .into_iter()
242            .map(|(app_id, streams)| {
243                let updates = streams
244                    .into_iter()
245                    .map(
246                        |((chain_id, stream_id), (previous_index, next_index))| StreamUpdate {
247                            chain_id,
248                            stream_id,
249                            previous_index,
250                            next_index,
251                        },
252                    )
253                    .collect();
254                (app_id, updates)
255            })
256            .collect()
257    }
258
259    /// Adds the oracle response to the record.
260    /// If replaying, it also checks that it matches the next replayed one and returns `true`.
261    pub fn replay_oracle_response(
262        &mut self,
263        oracle_response: OracleResponse,
264    ) -> Result<bool, ExecutionError> {
265        let replaying = if let Some(recorded_response) = self.next_replayed_oracle_response()? {
266            ensure!(
267                recorded_response == oracle_response,
268                ExecutionError::OracleResponseMismatch
269            );
270            true
271        } else {
272            false
273        };
274        self.oracle_responses.push(oracle_response);
275        Ok(replaying)
276    }
277
278    /// If in replay mode, returns the next oracle response, or an error if it is missing.
279    ///
280    /// If not in replay mode, `None` is returned, and the caller must execute the actual oracle
281    /// to obtain the value.
282    ///
283    /// In both cases, the value (returned or obtained from the oracle) must be recorded using
284    /// `add_oracle_response`.
285    fn next_replayed_oracle_response(&mut self) -> Result<Option<OracleResponse>, ExecutionError> {
286        let Some(responses) = &mut self.replaying_oracle_responses else {
287            return Ok(None); // Not in replay mode.
288        };
289        let response = responses
290            .next()
291            .ok_or_else(|| ExecutionError::MissingOracleResponse)?;
292        Ok(Some(response))
293    }
294
295    pub fn into_outcome(self) -> Result<TransactionOutcome, ExecutionError> {
296        let TransactionTracker {
297            replaying_oracle_responses,
298            oracle_responses,
299            outgoing_messages,
300            local_time: _,
301            transaction_index: _,
302            next_application_index,
303            next_chain_index,
304            events,
305            blobs,
306            previously_created_blobs: _,
307            operation_result,
308            streams_to_process,
309            blobs_published,
310            free_blob_ids,
311        } = self;
312        ensure!(
313            streams_to_process.is_empty(),
314            ExecutionError::UnprocessedStreams
315        );
316        if let Some(mut responses) = replaying_oracle_responses {
317            ensure!(
318                responses.next().is_none(),
319                ExecutionError::UnexpectedOracleResponse
320            );
321        }
322        let blobs = blobs
323            .into_iter()
324            .map(|(blob_id, content)| Blob::new_with_hash_unchecked(blob_id, content))
325            .collect::<Vec<_>>();
326        Ok(TransactionOutcome {
327            outgoing_messages,
328            oracle_responses,
329            next_application_index,
330            next_chain_index,
331            events,
332            blobs,
333            operation_result: operation_result.unwrap_or_default(),
334            blobs_published,
335            free_blob_ids,
336        })
337    }
338}
339
340#[cfg(with_testing)]
341impl TransactionTracker {
342    /// Creates a new [`TransactionTracker`] for testing, with default values and the given
343    /// oracle responses.
344    pub fn new_replaying(oracle_responses: Vec<OracleResponse>) -> Self {
345        TransactionTracker::new(Timestamp::from(0), 0, 0, 0, Some(oracle_responses), &[])
346    }
347
348    /// Creates a new [`TransactionTracker`] for testing, with default values and oracle responses
349    /// for the given blobs.
350    pub fn new_replaying_blobs<T>(blob_ids: T) -> Self
351    where
352        T: IntoIterator,
353        T::Item: std::borrow::Borrow<BlobId>,
354    {
355        use std::borrow::Borrow;
356
357        let oracle_responses = blob_ids
358            .into_iter()
359            .map(|blob_id| OracleResponse::Blob(*blob_id.borrow()))
360            .collect();
361        TransactionTracker::new_replaying(oracle_responses)
362    }
363}