1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
use std::borrow::Cow;
use std::sync::Arc;

use crate::history::HistoryListener;
use crate::retry_policy::RetryPolicy;
use crate::statement::{prepared_statement::PreparedStatement, query::Query};
use crate::transport::execution_profile::ExecutionProfileHandle;

use super::StatementConfig;
use super::{Consistency, SerialConsistency};
pub use crate::frame::request::batch::BatchType;

/// CQL batch statement.
///
/// This represents a CQL batch that can be executed on a server.
#[derive(Clone)]
pub struct Batch {
    pub(crate) config: StatementConfig,

    pub statements: Vec<BatchStatement>,
    batch_type: BatchType,
}

impl Batch {
    /// Creates a new, empty `Batch` of `batch_type` type.
    pub fn new(batch_type: BatchType) -> Self {
        Self {
            batch_type,
            ..Default::default()
        }
    }

    /// Creates an empty batch, with the configuration of existing batch.
    pub(crate) fn new_from(batch: &Batch) -> Batch {
        let batch_type = batch.get_type();
        let config = batch.config.clone();
        Batch {
            batch_type,
            config,
            ..Default::default()
        }
    }

    /// Creates a new, empty `Batch` of `batch_type` type with the provided statements.
    pub fn new_with_statements(batch_type: BatchType, statements: Vec<BatchStatement>) -> Self {
        Self {
            batch_type,
            statements,
            ..Default::default()
        }
    }

    /// Appends a new statement to the batch.
    pub fn append_statement(&mut self, statement: impl Into<BatchStatement>) {
        self.statements.push(statement.into());
    }

    /// Gets type of batch.
    pub fn get_type(&self) -> BatchType {
        self.batch_type
    }

    /// Sets the consistency to be used when executing this batch.
    pub fn set_consistency(&mut self, c: Consistency) {
        self.config.consistency = Some(c);
    }

    /// Gets the consistency to be used when executing this batch if it is filled.
    /// If this is empty, the default_consistency of the session will be used.
    pub fn get_consistency(&self) -> Option<Consistency> {
        self.config.consistency
    }

    /// Sets the serial consistency to be used when executing this batch.
    /// (Ignored unless the batch is an LWT)
    pub fn set_serial_consistency(&mut self, sc: Option<SerialConsistency>) {
        self.config.serial_consistency = Some(sc);
    }

    /// Gets the serial consistency to be used when executing this batch.
    /// (Ignored unless the batch is an LWT)
    pub fn get_serial_consistency(&self) -> Option<SerialConsistency> {
        self.config.serial_consistency.flatten()
    }

    /// Sets the idempotence of this batch
    /// A query is idempotent if it can be applied multiple times without changing the result of the initial application
    /// If set to `true` we can be sure that it is idempotent
    /// If set to `false` it is unknown whether it is idempotent
    /// This is used in [`RetryPolicy`] to decide if retrying a query is safe
    pub fn set_is_idempotent(&mut self, is_idempotent: bool) {
        self.config.is_idempotent = is_idempotent;
    }

    /// Gets the idempotence of this batch
    pub fn get_is_idempotent(&self) -> bool {
        self.config.is_idempotent
    }

    /// Enable or disable CQL Tracing for this batch
    /// If enabled session.batch() will return a QueryResult containing tracing_id
    /// which can be used to query tracing information about the execution of this query
    pub fn set_tracing(&mut self, should_trace: bool) {
        self.config.tracing = should_trace;
    }

    /// Gets whether tracing is enabled for this batch
    pub fn get_tracing(&self) -> bool {
        self.config.tracing
    }

    /// Sets the default timestamp for this batch in microseconds.
    /// If not None, it will replace the server side assigned timestamp as default timestamp for
    /// all the statements contained in the batch.
    pub fn set_timestamp(&mut self, timestamp: Option<i64>) {
        self.config.timestamp = timestamp
    }

    /// Gets the default timestamp for this batch in microseconds.
    pub fn get_timestamp(&self) -> Option<i64> {
        self.config.timestamp
    }

    /// Set the retry policy for this batch, overriding the one from execution profile if not None.
    #[inline]
    pub fn set_retry_policy(&mut self, retry_policy: Option<Arc<dyn RetryPolicy>>) {
        self.config.retry_policy = retry_policy;
    }

    /// Get the retry policy set for the batch.
    #[inline]
    pub fn get_retry_policy(&self) -> Option<&Arc<dyn RetryPolicy>> {
        self.config.retry_policy.as_ref()
    }

    /// Sets the listener capable of listening what happens during query execution.
    pub fn set_history_listener(&mut self, history_listener: Arc<dyn HistoryListener>) {
        self.config.history_listener = Some(history_listener);
    }

    /// Removes the listener set by `set_history_listener`.
    pub fn remove_history_listener(&mut self) -> Option<Arc<dyn HistoryListener>> {
        self.config.history_listener.take()
    }

    /// Associates the batch with execution profile referred by the provided handle.
    /// Handle may be later remapped to another profile, and batch will reflect those changes.
    pub fn set_execution_profile_handle(&mut self, profile_handle: Option<ExecutionProfileHandle>) {
        self.config.execution_profile_handle = profile_handle;
    }

    /// Borrows the execution profile handle associated with this batch.
    pub fn get_execution_profile_handle(&self) -> Option<&ExecutionProfileHandle> {
        self.config.execution_profile_handle.as_ref()
    }
}

impl Default for Batch {
    fn default() -> Self {
        Self {
            statements: Vec::new(),
            batch_type: BatchType::Logged,
            config: Default::default(),
        }
    }
}

/// This enum represents a CQL statement, that can be part of batch.
#[derive(Clone)]
pub enum BatchStatement {
    Query(Query),
    PreparedStatement(PreparedStatement),
}

impl From<&str> for BatchStatement {
    fn from(s: &str) -> Self {
        BatchStatement::Query(Query::from(s))
    }
}

impl From<Query> for BatchStatement {
    fn from(q: Query) -> Self {
        BatchStatement::Query(q)
    }
}

impl From<PreparedStatement> for BatchStatement {
    fn from(p: PreparedStatement) -> Self {
        BatchStatement::PreparedStatement(p)
    }
}

impl<'a: 'b, 'b> From<&'a BatchStatement>
    for scylla_cql::frame::request::batch::BatchStatement<'b>
{
    fn from(val: &'a BatchStatement) -> Self {
        match val {
            BatchStatement::Query(query) => {
                scylla_cql::frame::request::batch::BatchStatement::Query {
                    text: Cow::Borrowed(&query.contents),
                }
            }
            BatchStatement::PreparedStatement(prepared) => {
                scylla_cql::frame::request::batch::BatchStatement::Prepared {
                    id: Cow::Borrowed(prepared.get_id()),
                }
            }
        }
    }
}

pub(crate) mod batch_values {
    use scylla_cql::types::serialize::batch::BatchValues;
    use scylla_cql::types::serialize::batch::BatchValuesIterator;
    use scylla_cql::types::serialize::row::RowSerializationContext;
    use scylla_cql::types::serialize::row::SerializedValues;
    use scylla_cql::types::serialize::{RowWriter, SerializationError};

    use crate::routing::Token;
    use crate::transport::errors::QueryError;

    use super::BatchStatement;

    // Takes an optional reference to the first statement in the batch and
    // the batch values, and tries to compute the token for the statement.
    // Returns the (optional) token and batch values. If the function needed
    // to serialize values for the first statement, the returned batch values
    // will cache the results of the serialization.
    //
    // NOTE: Batch values returned by this function might not type check
    // the first statement when it is serialized! However, if they don't,
    // then the first row was already checked by the function. It is assumed
    // that `statement` holds the first prepared statement of the batch (if
    // there is one), and that it will be used later to serialize the values.
    pub(crate) fn peek_first_token<'bv>(
        values: impl BatchValues + 'bv,
        statement: Option<&BatchStatement>,
    ) -> Result<(Option<Token>, impl BatchValues + 'bv), QueryError> {
        let mut values_iter = values.batch_values_iter();
        let (token, first_values) = match statement {
            Some(BatchStatement::PreparedStatement(ps)) => {
                let ctx = RowSerializationContext::from_prepared(ps.get_prepared_metadata());
                let (first_values, did_write) = SerializedValues::from_closure(|writer| {
                    values_iter
                        .serialize_next(&ctx, writer)
                        .transpose()
                        .map(|o| o.is_some())
                })?;
                if did_write {
                    let token = ps.calculate_token_untyped(&first_values)?;
                    (token, Some(first_values))
                } else {
                    (None, None)
                }
            }
            _ => (None, None),
        };

        // Need to do it explicitly, otherwise the next line will complain
        // that `values_iter` still borrows `values`.
        std::mem::drop(values_iter);

        // Reuse the already serialized first value via `BatchValuesFirstSerialized`.
        let values = BatchValuesFirstSerialized::new(values, first_values);

        Ok((token, values))
    }

    struct BatchValuesFirstSerialized<BV> {
        // Contains the first value of BV in a serialized form.
        // The first value in the iterator returned from `rest` should be skipped!
        first: Option<SerializedValues>,
        rest: BV,
    }

    impl<BV> BatchValuesFirstSerialized<BV> {
        fn new(rest: BV, first: Option<SerializedValues>) -> Self {
            Self { first, rest }
        }
    }

    impl<BV> BatchValues for BatchValuesFirstSerialized<BV>
    where
        BV: BatchValues,
    {
        type BatchValuesIter<'r>
            = BatchValuesFirstSerializedIterator<'r, BV::BatchValuesIter<'r>>
        where
            Self: 'r;

        fn batch_values_iter(&self) -> Self::BatchValuesIter<'_> {
            BatchValuesFirstSerializedIterator {
                first: self.first.as_ref(),
                rest: self.rest.batch_values_iter(),
            }
        }
    }

    struct BatchValuesFirstSerializedIterator<'f, BVI> {
        first: Option<&'f SerializedValues>,
        rest: BVI,
    }

    impl<'f, BVI> BatchValuesIterator<'f> for BatchValuesFirstSerializedIterator<'f, BVI>
    where
        BVI: BatchValuesIterator<'f>,
    {
        #[inline]
        fn serialize_next(
            &mut self,
            ctx: &RowSerializationContext<'_>,
            writer: &mut RowWriter,
        ) -> Option<Result<(), SerializationError>> {
            match self.first.take() {
                Some(sr) => {
                    writer.append_serialize_row(sr);
                    self.rest.skip_next();
                    Some(Ok(()))
                }
                None => self.rest.serialize_next(ctx, writer),
            }
        }

        #[inline]
        fn is_empty_next(&mut self) -> Option<bool> {
            match self.first.take() {
                Some(s) => {
                    self.rest.skip_next();
                    Some(s.is_empty())
                }
                None => self.rest.is_empty_next(),
            }
        }

        #[inline]
        fn skip_next(&mut self) -> Option<()> {
            self.first = None;
            self.rest.skip_next()
        }

        #[inline]
        fn count(self) -> usize
        where
            Self: Sized,
        {
            self.rest.count()
        }
    }
}