linera_execution/
transaction_tracker.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    mem, vec,
7};
8
9use custom_debug_derive::Debug;
10use linera_base::{
11    data_types::{ArithmeticError, Blob, Event, OracleResponse, StreamUpdate, Timestamp},
12    ensure,
13    identifiers::{ApplicationId, BlobId, ChainId, StreamId},
14};
15
16use crate::{ExecutionError, OutgoingMessage};
17
18type AppStreamUpdates = BTreeMap<(ChainId, StreamId), (u32, u32)>;
19
20/// Tracks oracle responses and execution outcomes of an ongoing transaction execution, as well
21/// as replayed oracle responses.
22#[derive(Debug, Default)]
23pub struct TransactionTracker {
24    #[debug(skip_if = Option::is_none)]
25    replaying_oracle_responses: Option<vec::IntoIter<OracleResponse>>,
26    #[debug(skip_if = Vec::is_empty)]
27    oracle_responses: Vec<OracleResponse>,
28    #[debug(skip_if = Vec::is_empty)]
29    outgoing_messages: Vec<OutgoingMessage>,
30    /// The current local time.
31    local_time: Timestamp,
32    /// The index of the current transaction in the block.
33    transaction_index: u32,
34    next_message_index: u32,
35    next_application_index: u32,
36    next_chain_index: u32,
37    /// Events recorded by contracts' `emit` calls.
38    events: Vec<Event>,
39    /// Blobs created by contracts.
40    ///
41    /// As of right now, blobs created by the contracts are one of the two:
42    /// - [`OpenChain`]
43    /// - [`CreateApplication`]
44    blobs: BTreeMap<BlobId, Blob>,
45    /// Operation result.
46    operation_result: Option<Vec<u8>>,
47    /// Streams that have been updated but not yet processed during this transaction.
48    streams_to_process: BTreeMap<ApplicationId, AppStreamUpdates>,
49    /// Published blobs this transaction refers to by [`BlobId`].
50    blobs_published: BTreeSet<BlobId>,
51}
52
53/// The [`TransactionTracker`] contents after a transaction has finished.
54#[derive(Debug, Default)]
55pub struct TransactionOutcome {
56    #[debug(skip_if = Vec::is_empty)]
57    pub oracle_responses: Vec<OracleResponse>,
58    #[debug(skip_if = Vec::is_empty)]
59    pub outgoing_messages: Vec<OutgoingMessage>,
60    pub next_message_index: u32,
61    pub next_application_index: u32,
62    pub next_chain_index: u32,
63    /// Events recorded by contracts' `emit` calls.
64    pub events: Vec<Event>,
65    /// Blobs created by contracts.
66    pub blobs: Vec<Blob>,
67    /// Operation result.
68    pub operation_result: Vec<u8>,
69    /// Blobs published by this transaction.
70    pub blobs_published: BTreeSet<BlobId>,
71}
72
73impl TransactionTracker {
74    pub fn new(
75        local_time: Timestamp,
76        transaction_index: u32,
77        next_message_index: u32,
78        next_application_index: u32,
79        next_chain_index: u32,
80        oracle_responses: Option<Vec<OracleResponse>>,
81    ) -> Self {
82        TransactionTracker {
83            local_time,
84            transaction_index,
85            next_message_index,
86            next_application_index,
87            next_chain_index,
88            replaying_oracle_responses: oracle_responses.map(Vec::into_iter),
89            ..Self::default()
90        }
91    }
92
93    pub fn with_blobs(mut self, blobs: BTreeMap<BlobId, Blob>) -> Self {
94        self.blobs = blobs;
95        self
96    }
97
98    pub fn local_time(&self) -> Timestamp {
99        self.local_time
100    }
101
102    pub fn set_local_time(&mut self, local_time: Timestamp) {
103        self.local_time = local_time;
104    }
105
106    pub fn transaction_index(&self) -> u32 {
107        self.transaction_index
108    }
109
110    pub fn next_message_index(&self) -> u32 {
111        self.next_message_index
112    }
113
114    pub fn next_application_index(&mut self) -> u32 {
115        let index = self.next_application_index;
116        self.next_application_index += 1;
117        index
118    }
119
120    pub fn next_chain_index(&mut self) -> u32 {
121        let index = self.next_chain_index;
122        self.next_chain_index += 1;
123        index
124    }
125
126    pub fn add_outgoing_message(
127        &mut self,
128        message: OutgoingMessage,
129    ) -> Result<(), ArithmeticError> {
130        self.next_message_index = self
131            .next_message_index
132            .checked_add(1)
133            .ok_or(ArithmeticError::Overflow)?;
134        self.outgoing_messages.push(message);
135        Ok(())
136    }
137
138    pub fn add_outgoing_messages(
139        &mut self,
140        messages: impl IntoIterator<Item = OutgoingMessage>,
141    ) -> Result<(), ArithmeticError> {
142        for message in messages {
143            self.add_outgoing_message(message)?;
144        }
145        Ok(())
146    }
147
148    pub fn add_event(&mut self, stream_id: StreamId, index: u32, value: Vec<u8>) {
149        self.events.push(Event {
150            stream_id,
151            index,
152            value,
153        });
154    }
155
156    pub fn add_created_blob(&mut self, blob: Blob) {
157        self.blobs.insert(blob.id(), blob);
158    }
159
160    pub fn add_published_blob(&mut self, blob_id: BlobId) {
161        self.blobs_published.insert(blob_id);
162    }
163
164    pub fn created_blobs(&self) -> &BTreeMap<BlobId, Blob> {
165        &self.blobs
166    }
167
168    pub fn add_oracle_response(&mut self, oracle_response: OracleResponse) {
169        self.oracle_responses.push(oracle_response);
170    }
171
172    pub fn add_operation_result(&mut self, result: Option<Vec<u8>>) {
173        self.operation_result = result
174    }
175
176    pub fn add_stream_to_process(
177        &mut self,
178        application_id: ApplicationId,
179        chain_id: ChainId,
180        stream_id: StreamId,
181        previous_index: u32,
182        next_index: u32,
183    ) {
184        if next_index == previous_index {
185            return; // No new events in the stream.
186        }
187        self.streams_to_process
188            .entry(application_id)
189            .or_default()
190            .entry((chain_id, stream_id))
191            .and_modify(|(pi, ni)| {
192                *pi = (*pi).min(previous_index);
193                *ni = (*ni).max(next_index);
194            })
195            .or_insert_with(|| (previous_index, next_index));
196    }
197
198    pub fn remove_stream_to_process(
199        &mut self,
200        application_id: ApplicationId,
201        chain_id: ChainId,
202        stream_id: StreamId,
203    ) {
204        let Some(streams) = self.streams_to_process.get_mut(&application_id) else {
205            return;
206        };
207        if streams.remove(&(chain_id, stream_id)).is_some() && streams.is_empty() {
208            self.streams_to_process.remove(&application_id);
209        }
210    }
211
212    pub fn take_streams_to_process(&mut self) -> BTreeMap<ApplicationId, Vec<StreamUpdate>> {
213        mem::take(&mut self.streams_to_process)
214            .into_iter()
215            .map(|(app_id, streams)| {
216                let updates = streams
217                    .into_iter()
218                    .map(
219                        |((chain_id, stream_id), (previous_index, next_index))| StreamUpdate {
220                            chain_id,
221                            stream_id,
222                            previous_index,
223                            next_index,
224                        },
225                    )
226                    .collect();
227                (app_id, updates)
228            })
229            .collect()
230    }
231
232    /// Adds the oracle response to the record.
233    /// If replaying, it also checks that it matches the next replayed one and returns `true`.
234    pub fn replay_oracle_response(
235        &mut self,
236        oracle_response: OracleResponse,
237    ) -> Result<bool, ExecutionError> {
238        let replaying = if let Some(recorded_response) = self.next_replayed_oracle_response()? {
239            ensure!(
240                recorded_response == oracle_response,
241                ExecutionError::OracleResponseMismatch
242            );
243            true
244        } else {
245            false
246        };
247        self.add_oracle_response(oracle_response);
248        Ok(replaying)
249    }
250
251    /// If in replay mode, returns the next oracle response, or an error if it is missing.
252    ///
253    /// If not in replay mode, `None` is returned, and the caller must execute the actual oracle
254    /// to obtain the value.
255    ///
256    /// In both cases, the value (returned or obtained from the oracle) must be recorded using
257    /// `add_oracle_response`.
258    pub fn next_replayed_oracle_response(
259        &mut self,
260    ) -> Result<Option<OracleResponse>, ExecutionError> {
261        let Some(responses) = &mut self.replaying_oracle_responses else {
262            return Ok(None); // Not in replay mode.
263        };
264        let response = responses
265            .next()
266            .ok_or_else(|| ExecutionError::MissingOracleResponse)?;
267        Ok(Some(response))
268    }
269
270    pub fn into_outcome(self) -> Result<TransactionOutcome, ExecutionError> {
271        let TransactionTracker {
272            replaying_oracle_responses,
273            oracle_responses,
274            outgoing_messages,
275            local_time: _,
276            transaction_index: _,
277            next_message_index,
278            next_application_index,
279            next_chain_index,
280            events,
281            blobs,
282            operation_result,
283            streams_to_process,
284            blobs_published,
285        } = self;
286        ensure!(
287            streams_to_process.is_empty(),
288            ExecutionError::UnprocessedStreams
289        );
290        if let Some(mut responses) = replaying_oracle_responses {
291            ensure!(
292                responses.next().is_none(),
293                ExecutionError::UnexpectedOracleResponse
294            );
295        }
296        Ok(TransactionOutcome {
297            outgoing_messages,
298            oracle_responses,
299            next_message_index,
300            next_application_index,
301            next_chain_index,
302            events,
303            blobs: blobs.into_values().collect(),
304            operation_result: operation_result.unwrap_or_default(),
305            blobs_published,
306        })
307    }
308}
309
310#[cfg(with_testing)]
311impl TransactionTracker {
312    /// Creates a new [`TransactionTracker`] for testing, with default values and the given
313    /// oracle responses.
314    pub fn new_replaying(oracle_responses: Vec<OracleResponse>) -> Self {
315        TransactionTracker::new(Timestamp::from(0), 0, 0, 0, 0, Some(oracle_responses))
316    }
317
318    /// Creates a new [`TransactionTracker`] for testing, with default values and oracle responses
319    /// for the given blobs.
320    pub fn new_replaying_blobs<T>(blob_ids: T) -> Self
321    where
322        T: IntoIterator,
323        T::Item: std::borrow::Borrow<BlobId>,
324    {
325        use std::borrow::Borrow;
326
327        let oracle_responses = blob_ids
328            .into_iter()
329            .map(|blob_id| OracleResponse::Blob(*blob_id.borrow()))
330            .collect();
331        TransactionTracker::new_replaying(oracle_responses)
332    }
333}