linera_execution/
transaction_tracker.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    mem, vec,
7};
8
9use custom_debug_derive::Debug;
10use linera_base::{
11    data_types::{ArithmeticError, Blob, Event, OracleResponse, StreamUpdate, Timestamp},
12    ensure,
13    identifiers::{ApplicationId, BlobId, ChainId, StreamId},
14};
15
16use crate::{ExecutionError, OutgoingMessage};
17
18type AppStreamUpdates = BTreeMap<(ChainId, StreamId), (u32, u32)>;
19
20/// Tracks oracle responses and execution outcomes of an ongoing transaction execution, as well
21/// as replayed oracle responses.
22#[derive(Debug, Default)]
23pub struct TransactionTracker {
24    #[debug(skip_if = Option::is_none)]
25    replaying_oracle_responses: Option<vec::IntoIter<OracleResponse>>,
26    #[debug(skip_if = Vec::is_empty)]
27    oracle_responses: Vec<OracleResponse>,
28    #[debug(skip_if = Vec::is_empty)]
29    outgoing_messages: Vec<OutgoingMessage>,
30    /// The current local time.
31    local_time: Timestamp,
32    /// The index of the current transaction in the block.
33    transaction_index: u32,
34    next_application_index: u32,
35    next_chain_index: u32,
36    /// Events recorded by contracts' `emit` calls.
37    events: Vec<Event>,
38    /// Blobs created by contracts.
39    ///
40    /// As of right now, blobs created by the contracts are one of the two:
41    /// - [`OpenChain`]
42    /// - [`CreateApplication`]
43    blobs: BTreeMap<BlobId, Blob>,
44    /// Operation result.
45    operation_result: Option<Vec<u8>>,
46    /// Streams that have been updated but not yet processed during this transaction.
47    streams_to_process: BTreeMap<ApplicationId, AppStreamUpdates>,
48    /// Published blobs this transaction refers to by [`BlobId`].
49    blobs_published: BTreeSet<BlobId>,
50}
51
52/// The [`TransactionTracker`] contents after a transaction has finished.
53#[derive(Debug, Default)]
54pub struct TransactionOutcome {
55    #[debug(skip_if = Vec::is_empty)]
56    pub oracle_responses: Vec<OracleResponse>,
57    #[debug(skip_if = Vec::is_empty)]
58    pub outgoing_messages: Vec<OutgoingMessage>,
59    pub next_application_index: u32,
60    pub next_chain_index: u32,
61    /// Events recorded by contracts' `emit` calls.
62    pub events: Vec<Event>,
63    /// Blobs created by contracts.
64    pub blobs: Vec<Blob>,
65    /// Operation result.
66    pub operation_result: Vec<u8>,
67    /// Blobs published by this transaction.
68    pub blobs_published: BTreeSet<BlobId>,
69}
70
71impl TransactionTracker {
72    pub fn new(
73        local_time: Timestamp,
74        transaction_index: u32,
75        next_application_index: u32,
76        next_chain_index: u32,
77        oracle_responses: Option<Vec<OracleResponse>>,
78    ) -> Self {
79        TransactionTracker {
80            local_time,
81            transaction_index,
82            next_application_index,
83            next_chain_index,
84            replaying_oracle_responses: oracle_responses.map(Vec::into_iter),
85            ..Self::default()
86        }
87    }
88
89    pub fn with_blobs(mut self, blobs: BTreeMap<BlobId, Blob>) -> Self {
90        self.blobs = blobs;
91        self
92    }
93
94    pub fn local_time(&self) -> Timestamp {
95        self.local_time
96    }
97
98    pub fn set_local_time(&mut self, local_time: Timestamp) {
99        self.local_time = local_time;
100    }
101
102    pub fn transaction_index(&self) -> u32 {
103        self.transaction_index
104    }
105
106    pub fn next_application_index(&mut self) -> u32 {
107        let index = self.next_application_index;
108        self.next_application_index += 1;
109        index
110    }
111
112    pub fn next_chain_index(&mut self) -> u32 {
113        let index = self.next_chain_index;
114        self.next_chain_index += 1;
115        index
116    }
117
118    pub fn add_outgoing_message(
119        &mut self,
120        message: OutgoingMessage,
121    ) -> Result<(), ArithmeticError> {
122        self.outgoing_messages.push(message);
123        Ok(())
124    }
125
126    pub fn add_outgoing_messages(
127        &mut self,
128        messages: impl IntoIterator<Item = OutgoingMessage>,
129    ) -> Result<(), ArithmeticError> {
130        for message in messages {
131            self.add_outgoing_message(message)?;
132        }
133        Ok(())
134    }
135
136    pub fn add_event(&mut self, stream_id: StreamId, index: u32, value: Vec<u8>) {
137        self.events.push(Event {
138            stream_id,
139            index,
140            value,
141        });
142    }
143
144    pub fn add_created_blob(&mut self, blob: Blob) {
145        self.blobs.insert(blob.id(), blob);
146    }
147
148    pub fn add_published_blob(&mut self, blob_id: BlobId) {
149        self.blobs_published.insert(blob_id);
150    }
151
152    pub fn created_blobs(&self) -> &BTreeMap<BlobId, Blob> {
153        &self.blobs
154    }
155
156    pub fn add_oracle_response(&mut self, oracle_response: OracleResponse) {
157        self.oracle_responses.push(oracle_response);
158    }
159
160    pub fn add_operation_result(&mut self, result: Option<Vec<u8>>) {
161        self.operation_result = result
162    }
163
164    pub fn add_stream_to_process(
165        &mut self,
166        application_id: ApplicationId,
167        chain_id: ChainId,
168        stream_id: StreamId,
169        previous_index: u32,
170        next_index: u32,
171    ) {
172        if next_index == previous_index {
173            return; // No new events in the stream.
174        }
175        self.streams_to_process
176            .entry(application_id)
177            .or_default()
178            .entry((chain_id, stream_id))
179            .and_modify(|(pi, ni)| {
180                *pi = (*pi).min(previous_index);
181                *ni = (*ni).max(next_index);
182            })
183            .or_insert_with(|| (previous_index, next_index));
184    }
185
186    pub fn remove_stream_to_process(
187        &mut self,
188        application_id: ApplicationId,
189        chain_id: ChainId,
190        stream_id: StreamId,
191    ) {
192        let Some(streams) = self.streams_to_process.get_mut(&application_id) else {
193            return;
194        };
195        if streams.remove(&(chain_id, stream_id)).is_some() && streams.is_empty() {
196            self.streams_to_process.remove(&application_id);
197        }
198    }
199
200    pub fn take_streams_to_process(&mut self) -> BTreeMap<ApplicationId, Vec<StreamUpdate>> {
201        mem::take(&mut self.streams_to_process)
202            .into_iter()
203            .map(|(app_id, streams)| {
204                let updates = streams
205                    .into_iter()
206                    .map(
207                        |((chain_id, stream_id), (previous_index, next_index))| StreamUpdate {
208                            chain_id,
209                            stream_id,
210                            previous_index,
211                            next_index,
212                        },
213                    )
214                    .collect();
215                (app_id, updates)
216            })
217            .collect()
218    }
219
220    /// Adds the oracle response to the record.
221    /// If replaying, it also checks that it matches the next replayed one and returns `true`.
222    pub fn replay_oracle_response(
223        &mut self,
224        oracle_response: OracleResponse,
225    ) -> Result<bool, ExecutionError> {
226        let replaying = if let Some(recorded_response) = self.next_replayed_oracle_response()? {
227            ensure!(
228                recorded_response == oracle_response,
229                ExecutionError::OracleResponseMismatch
230            );
231            true
232        } else {
233            false
234        };
235        self.add_oracle_response(oracle_response);
236        Ok(replaying)
237    }
238
239    /// If in replay mode, returns the next oracle response, or an error if it is missing.
240    ///
241    /// If not in replay mode, `None` is returned, and the caller must execute the actual oracle
242    /// to obtain the value.
243    ///
244    /// In both cases, the value (returned or obtained from the oracle) must be recorded using
245    /// `add_oracle_response`.
246    pub fn next_replayed_oracle_response(
247        &mut self,
248    ) -> Result<Option<OracleResponse>, ExecutionError> {
249        let Some(responses) = &mut self.replaying_oracle_responses else {
250            return Ok(None); // Not in replay mode.
251        };
252        let response = responses
253            .next()
254            .ok_or_else(|| ExecutionError::MissingOracleResponse)?;
255        Ok(Some(response))
256    }
257
258    pub fn into_outcome(self) -> Result<TransactionOutcome, ExecutionError> {
259        let TransactionTracker {
260            replaying_oracle_responses,
261            oracle_responses,
262            outgoing_messages,
263            local_time: _,
264            transaction_index: _,
265            next_application_index,
266            next_chain_index,
267            events,
268            blobs,
269            operation_result,
270            streams_to_process,
271            blobs_published,
272        } = self;
273        ensure!(
274            streams_to_process.is_empty(),
275            ExecutionError::UnprocessedStreams
276        );
277        if let Some(mut responses) = replaying_oracle_responses {
278            ensure!(
279                responses.next().is_none(),
280                ExecutionError::UnexpectedOracleResponse
281            );
282        }
283        Ok(TransactionOutcome {
284            outgoing_messages,
285            oracle_responses,
286            next_application_index,
287            next_chain_index,
288            events,
289            blobs: blobs.into_values().collect(),
290            operation_result: operation_result.unwrap_or_default(),
291            blobs_published,
292        })
293    }
294}
295
296#[cfg(with_testing)]
297impl TransactionTracker {
298    /// Creates a new [`TransactionTracker`] for testing, with default values and the given
299    /// oracle responses.
300    pub fn new_replaying(oracle_responses: Vec<OracleResponse>) -> Self {
301        TransactionTracker::new(Timestamp::from(0), 0, 0, 0, Some(oracle_responses))
302    }
303
304    /// Creates a new [`TransactionTracker`] for testing, with default values and oracle responses
305    /// for the given blobs.
306    pub fn new_replaying_blobs<T>(blob_ids: T) -> Self
307    where
308        T: IntoIterator,
309        T::Item: std::borrow::Borrow<BlobId>,
310    {
311        use std::borrow::Borrow;
312
313        let oracle_responses = blob_ids
314            .into_iter()
315            .map(|blob_id| OracleResponse::Blob(*blob_id.borrow()))
316            .collect();
317        TransactionTracker::new_replaying(oracle_responses)
318    }
319}