scylla/observability/
history.rs

1//! Collecting history of request executions - retries, speculative, etc.
2use std::{
3    collections::BTreeMap,
4    fmt::{Debug, Display},
5    net::SocketAddr,
6    sync::Mutex,
7    time::SystemTime,
8};
9
10use crate::errors::{RequestAttemptError, RequestError};
11use crate::policies::retry::RetryDecision;
12use chrono::{DateTime, Utc};
13
14use tracing::warn;
15
16/// Id of a single request, i.e. a single call to Session::{query,execute}_{unpaged,single_page}/etc.
17#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
18pub struct RequestId(pub usize);
19
20/// Id of a single attempt within a request run - a single request sent on some connection.
21#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
22pub struct AttemptId(pub usize);
23
24/// Id of a speculative execution fiber.
25/// When speculative execution is enabled the driver will start multiple
26/// speculative threads, each of them performing sequential attempts.
27#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
28pub struct SpeculativeId(pub usize);
29
30/// Any type implementing this trait can be passed to Session
31/// to collect execution history of specific requests.\
32/// In order to use it call `set_history_listener` on
33/// `Query`, `PreparedStatement`, etc...\
34/// The listener has to generate unique IDs for new requests, attempts and speculative fibers.
35/// These ids are then used by the caller to identify them.\
36/// It's important to note that even after a request is finished there still might come events related to it.
37/// These events come from speculative futures that didn't notice the request is done already.
38pub trait HistoryListener: Debug + Send + Sync {
39    /// Log that a request has started on request start - right after the call to Session::{query,execute}_*/batch.
40    fn log_request_start(&self) -> RequestId;
41
42    /// Log that request was successful - called right before returning the result from Session::query_*, execute_*, etc.
43    fn log_request_success(&self, request_id: RequestId);
44
45    /// Log that request ended with an error - called right before returning the error from Session::query_*, execute_*, etc.
46    fn log_request_error(&self, request_id: RequestId, error: &RequestError);
47
48    /// Log that a new speculative fiber has started.
49    fn log_new_speculative_fiber(&self, request_id: RequestId) -> SpeculativeId;
50
51    /// Log that an attempt has started - request has been sent on some Connection, now awaiting for an answer.
52    fn log_attempt_start(
53        &self,
54        request_id: RequestId,
55        speculative_id: Option<SpeculativeId>,
56        node_addr: SocketAddr,
57    ) -> AttemptId;
58
59    /// Log that an attempt succeeded.
60    fn log_attempt_success(&self, attempt_id: AttemptId);
61
62    /// Log that an attempt ended with an error. The error and decision whether to retry the attempt are also included in the log.
63    fn log_attempt_error(
64        &self,
65        attempt_id: AttemptId,
66        error: &RequestAttemptError,
67        retry_decision: &RetryDecision,
68    );
69}
70
71pub type TimePoint = DateTime<Utc>;
72
73/// HistoryCollector can be used as HistoryListener to collect all the request history events.
74/// Each event is marked with an UTC timestamp.
75#[derive(Debug, Default)]
76pub struct HistoryCollector {
77    data: Mutex<HistoryCollectorData>,
78}
79
80#[derive(Debug, Clone)]
81pub struct HistoryCollectorData {
82    events: Vec<(HistoryEvent, TimePoint)>,
83    next_request_id: RequestId,
84    next_speculative_fiber_id: SpeculativeId,
85    next_attempt_id: AttemptId,
86}
87
88#[derive(Debug, Clone)]
89pub enum HistoryEvent {
90    NewRequest(RequestId),
91    RequestSuccess(RequestId),
92    RequestError(RequestId, RequestError),
93    NewSpeculativeFiber(SpeculativeId, RequestId),
94    NewAttempt(AttemptId, RequestId, Option<SpeculativeId>, SocketAddr),
95    AttemptSuccess(AttemptId),
96    AttemptError(AttemptId, RequestAttemptError, RetryDecision),
97}
98
99impl HistoryCollectorData {
100    fn new() -> HistoryCollectorData {
101        HistoryCollectorData {
102            events: Vec::new(),
103            next_request_id: RequestId(0),
104            next_speculative_fiber_id: SpeculativeId(0),
105            next_attempt_id: AttemptId(0),
106        }
107    }
108
109    fn add_event(&mut self, event: HistoryEvent) {
110        let event_time: TimePoint = SystemTime::now().into();
111        self.events.push((event, event_time));
112    }
113}
114
115impl Default for HistoryCollectorData {
116    fn default() -> HistoryCollectorData {
117        HistoryCollectorData::new()
118    }
119}
120
121impl HistoryCollector {
122    /// Creates a new HistoryCollector with empty data.
123    pub fn new() -> HistoryCollector {
124        HistoryCollector::default()
125    }
126
127    /// Clones the data collected by the collector.
128    pub fn clone_collected(&self) -> HistoryCollectorData {
129        self.do_with_data(|data| data.clone())
130    }
131
132    /// Takes the data out of the collector. The collected events are cleared.\
133    /// It's possible that after finishing a request and taking out the events
134    /// new ones will still come - from requests that haven't been cancelled yet.
135    pub fn take_collected(&self) -> HistoryCollectorData {
136        self.do_with_data(|data| {
137            let mut data_to_swap = HistoryCollectorData {
138                events: Vec::new(),
139                next_request_id: data.next_request_id,
140                next_speculative_fiber_id: data.next_speculative_fiber_id,
141                next_attempt_id: data.next_attempt_id,
142            };
143            std::mem::swap(&mut data_to_swap, data);
144            data_to_swap
145        })
146    }
147
148    /// Clone the collected events and convert them to StructuredHistory.
149    pub fn clone_structured_history(&self) -> StructuredHistory {
150        StructuredHistory::from(&self.clone_collected())
151    }
152
153    /// Take the collected events out, just like in `take_collected` and convert them to StructuredHistory.
154    pub fn take_structured_history(&self) -> StructuredHistory {
155        StructuredHistory::from(&self.take_collected())
156    }
157
158    /// Lock the data mutex and perform an operation on it.
159    fn do_with_data<OpRetType>(
160        &self,
161        do_fn: impl Fn(&mut HistoryCollectorData) -> OpRetType,
162    ) -> OpRetType {
163        match self.data.lock() {
164            Ok(mut data) => do_fn(&mut data),
165            Err(poison_error) => {
166                // Avoid panicking on poisoned mutex - HistoryCollector isn't that important.
167                // Print a warning and do the operation on dummy data so that the code compiles.
168                warn!("HistoryCollector - mutex poisoned! Error: {}", poison_error);
169                let mut dummy_data: HistoryCollectorData = HistoryCollectorData::default();
170                do_fn(&mut dummy_data)
171            }
172        }
173    }
174}
175
176impl HistoryListener for HistoryCollector {
177    fn log_request_start(&self) -> RequestId {
178        self.do_with_data(|data| {
179            let new_request_id: RequestId = data.next_request_id;
180            data.next_request_id.0 += 1;
181            data.add_event(HistoryEvent::NewRequest(new_request_id));
182            new_request_id
183        })
184    }
185
186    fn log_request_success(&self, request_id: RequestId) {
187        self.do_with_data(|data| {
188            data.add_event(HistoryEvent::RequestSuccess(request_id));
189        })
190    }
191
192    fn log_request_error(&self, request_id: RequestId, error: &RequestError) {
193        self.do_with_data(|data| {
194            data.add_event(HistoryEvent::RequestError(request_id, error.clone()))
195        })
196    }
197
198    fn log_new_speculative_fiber(&self, request_id: RequestId) -> SpeculativeId {
199        self.do_with_data(|data| {
200            let new_speculative_id: SpeculativeId = data.next_speculative_fiber_id;
201            data.next_speculative_fiber_id.0 += 1;
202            data.add_event(HistoryEvent::NewSpeculativeFiber(
203                new_speculative_id,
204                request_id,
205            ));
206            new_speculative_id
207        })
208    }
209
210    fn log_attempt_start(
211        &self,
212        request_id: RequestId,
213        speculative_id: Option<SpeculativeId>,
214        node_addr: SocketAddr,
215    ) -> AttemptId {
216        self.do_with_data(|data| {
217            let new_attempt_id: AttemptId = data.next_attempt_id;
218            data.next_attempt_id.0 += 1;
219            data.add_event(HistoryEvent::NewAttempt(
220                new_attempt_id,
221                request_id,
222                speculative_id,
223                node_addr,
224            ));
225            new_attempt_id
226        })
227    }
228
229    fn log_attempt_success(&self, attempt_id: AttemptId) {
230        self.do_with_data(|data| data.add_event(HistoryEvent::AttemptSuccess(attempt_id)))
231    }
232
233    fn log_attempt_error(
234        &self,
235        attempt_id: AttemptId,
236        error: &RequestAttemptError,
237        retry_decision: &RetryDecision,
238    ) {
239        self.do_with_data(|data| {
240            data.add_event(HistoryEvent::AttemptError(
241                attempt_id,
242                error.clone(),
243                retry_decision.clone(),
244            ))
245        })
246    }
247}
248
249/// Structured representation of requests history.\
250/// HistoryCollector collects raw events which later can be converted
251/// to this pretty representation.\
252/// It has a `Display` impl which can be used for printing pretty request history.
253#[derive(Debug, Clone)]
254pub struct StructuredHistory {
255    pub requests: Vec<RequestHistory>,
256}
257
258#[derive(Debug, Clone)]
259pub struct RequestHistory {
260    pub start_time: TimePoint,
261    pub non_speculative_fiber: FiberHistory,
262    pub speculative_fibers: Vec<FiberHistory>,
263    pub result: Option<RequestHistoryResult>,
264}
265
266#[derive(Debug, Clone)]
267pub enum RequestHistoryResult {
268    Success(TimePoint),
269    Error(TimePoint, RequestError),
270}
271
272#[derive(Debug, Clone)]
273pub struct FiberHistory {
274    pub start_time: TimePoint,
275    pub attempts: Vec<AttemptHistory>,
276}
277
278#[derive(Debug, Clone)]
279pub struct AttemptHistory {
280    pub send_time: TimePoint,
281    pub node_addr: SocketAddr,
282    pub result: Option<AttemptResult>,
283}
284
285#[derive(Debug, Clone)]
286pub enum AttemptResult {
287    Success(TimePoint),
288    Error(TimePoint, RequestAttemptError, RetryDecision),
289}
290
291impl From<&HistoryCollectorData> for StructuredHistory {
292    fn from(data: &HistoryCollectorData) -> StructuredHistory {
293        let mut attempts: BTreeMap<AttemptId, AttemptHistory> = BTreeMap::new();
294        let mut requests: BTreeMap<RequestId, RequestHistory> = BTreeMap::new();
295        let mut fibers: BTreeMap<SpeculativeId, FiberHistory> = BTreeMap::new();
296
297        // Collect basic data about requests, attempts and speculative fibers
298        for (event, event_time) in &data.events {
299            match event {
300                HistoryEvent::NewAttempt(attempt_id, _, _, node_addr) => {
301                    attempts.insert(
302                        *attempt_id,
303                        AttemptHistory {
304                            send_time: *event_time,
305                            node_addr: *node_addr,
306                            result: None,
307                        },
308                    );
309                }
310                HistoryEvent::AttemptSuccess(attempt_id) => {
311                    if let Some(attempt) = attempts.get_mut(attempt_id) {
312                        attempt.result = Some(AttemptResult::Success(*event_time));
313                    }
314                }
315                HistoryEvent::AttemptError(attempt_id, error, retry_decision) => {
316                    match attempts.get_mut(attempt_id) {
317                        Some(attempt) => {
318                            if attempt.result.is_some() {
319                                warn!("StructuredHistory - attempt with id {:?} has multiple results", attempt_id);
320                            }
321                            attempt.result = Some(AttemptResult::Error(*event_time, error.clone(), retry_decision.clone()));
322                        },
323                        None => warn!("StructuredHistory - attempt with id {:?} finished with an error but not created", attempt_id)
324                    }
325                }
326                HistoryEvent::NewRequest(request_id) => {
327                    requests.insert(
328                        *request_id,
329                        RequestHistory {
330                            start_time: *event_time,
331                            non_speculative_fiber: FiberHistory {
332                                start_time: *event_time,
333                                attempts: Vec::new(),
334                            },
335                            speculative_fibers: Vec::new(),
336                            result: None,
337                        },
338                    );
339                }
340                HistoryEvent::RequestSuccess(request_id) => {
341                    if let Some(request) = requests.get_mut(request_id) {
342                        request.result = Some(RequestHistoryResult::Success(*event_time));
343                    }
344                }
345                HistoryEvent::RequestError(request_id, error) => {
346                    if let Some(request) = requests.get_mut(request_id) {
347                        request.result =
348                            Some(RequestHistoryResult::Error(*event_time, error.clone()));
349                    }
350                }
351                HistoryEvent::NewSpeculativeFiber(speculative_id, _) => {
352                    fibers.insert(
353                        *speculative_id,
354                        FiberHistory {
355                            start_time: *event_time,
356                            attempts: Vec::new(),
357                        },
358                    );
359                }
360            }
361        }
362
363        // Move attempts to their speculative fibers
364        for (event, _) in &data.events {
365            if let HistoryEvent::NewAttempt(attempt_id, request_id, speculative_id, _) = event {
366                if let Some(attempt) = attempts.remove(attempt_id) {
367                    match speculative_id {
368                        Some(spec_id) => {
369                            if let Some(spec_fiber) = fibers.get_mut(spec_id) {
370                                spec_fiber.attempts.push(attempt);
371                            }
372                        }
373                        None => {
374                            if let Some(request) = requests.get_mut(request_id) {
375                                request.non_speculative_fiber.attempts.push(attempt);
376                            }
377                        }
378                    }
379                }
380            }
381        }
382
383        // Move speculative fibers to their requests
384        for (event, _) in &data.events {
385            if let HistoryEvent::NewSpeculativeFiber(speculative_id, request_id) = event {
386                if let Some(fiber) = fibers.remove(speculative_id) {
387                    if let Some(request) = requests.get_mut(request_id) {
388                        request.speculative_fibers.push(fiber);
389                    }
390                }
391            }
392        }
393
394        StructuredHistory {
395            requests: requests.into_values().collect(),
396        }
397    }
398}
399
400/// StructuredHistory should be used for printing request history.
401impl Display for StructuredHistory {
402    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
403        writeln!(f, "Requests History:")?;
404        for (i, request) in self.requests.iter().enumerate() {
405            writeln!(f, "=== Request #{} ===", i)?;
406            writeln!(f, "| start_time: {}", request.start_time)?;
407            writeln!(f, "| Non-speculative attempts:")?;
408            write_fiber_attempts(&request.non_speculative_fiber, f)?;
409            for (spec_i, speculative_fiber) in request.speculative_fibers.iter().enumerate() {
410                writeln!(f, "|")?;
411                writeln!(f, "|")?;
412                writeln!(f, "| > Speculative fiber #{}", spec_i)?;
413                writeln!(f, "| fiber start time: {}", speculative_fiber.start_time)?;
414                write_fiber_attempts(speculative_fiber, f)?;
415            }
416            writeln!(f, "|")?;
417            match &request.result {
418                Some(RequestHistoryResult::Success(succ_time)) => {
419                    writeln!(f, "| Request successful at {}", succ_time)?;
420                }
421                Some(RequestHistoryResult::Error(err_time, error)) => {
422                    writeln!(f, "| Request failed at {}", err_time)?;
423                    writeln!(f, "| Error: {}", error)?;
424                }
425                None => writeln!(f, "| Request still running - no final result yet")?,
426            };
427            writeln!(f, "=================")?;
428        }
429        Ok(())
430    }
431}
432
433fn write_fiber_attempts(fiber: &FiberHistory, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434    for (i, attempt) in fiber.attempts.iter().enumerate() {
435        if i != 0 {
436            writeln!(f, "|")?;
437        }
438        writeln!(f, "| - Attempt #{} sent to {}", i, attempt.node_addr)?;
439        writeln!(f, "|   request send time: {}", attempt.send_time)?;
440        match &attempt.result {
441            Some(AttemptResult::Success(time)) => writeln!(f, "|   Success at {}", time)?,
442            Some(AttemptResult::Error(time, err, retry_decision)) => {
443                writeln!(f, "|   Error at {}", time)?;
444                writeln!(f, "|   Error: {}", err)?;
445                writeln!(f, "|   Retry decision: {:?}", retry_decision)?;
446            }
447            None => writeln!(f, "|   No result yet")?,
448        };
449    }
450
451    Ok(())
452}
453
454#[cfg(test)]
455mod tests {
456    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
457
458    use crate::{
459        errors::{DbError, RequestAttemptError, RequestError},
460        policies::retry::RetryDecision,
461        test_utils::setup_tracing,
462    };
463
464    use super::{
465        AttemptId, AttemptResult, HistoryCollector, HistoryListener, RequestHistoryResult,
466        RequestId, SpeculativeId, StructuredHistory, TimePoint,
467    };
468    use assert_matches::assert_matches;
469    use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
470    use scylla_cql::{frame::response::CqlResponseKind, Consistency};
471
472    // Set a single time for all timestamps within StructuredHistory.
473    // HistoryCollector sets the timestamp to current time which changes with each test.
474    // Setting it to one makes it possible to test displaying consistently.
475    fn set_one_time(mut history: StructuredHistory) -> StructuredHistory {
476        let the_time: TimePoint = DateTime::<Utc>::from_naive_utc_and_offset(
477            NaiveDateTime::new(
478                NaiveDate::from_ymd_opt(2022, 2, 22).unwrap(),
479                NaiveTime::from_hms_opt(20, 22, 22).unwrap(),
480            ),
481            Utc,
482        );
483
484        for request in &mut history.requests {
485            request.start_time = the_time;
486            match &mut request.result {
487                Some(RequestHistoryResult::Success(succ_time)) => *succ_time = the_time,
488                Some(RequestHistoryResult::Error(err_time, _)) => *err_time = the_time,
489                None => {}
490            };
491
492            for fiber in std::iter::once(&mut request.non_speculative_fiber)
493                .chain(request.speculative_fibers.iter_mut())
494            {
495                fiber.start_time = the_time;
496                for attempt in &mut fiber.attempts {
497                    attempt.send_time = the_time;
498                    match &mut attempt.result {
499                        Some(AttemptResult::Success(succ_time)) => *succ_time = the_time,
500                        Some(AttemptResult::Error(err_time, _, _)) => *err_time = the_time,
501                        None => {}
502                    }
503                }
504            }
505        }
506
507        history
508    }
509
510    fn node1_addr() -> SocketAddr {
511        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 19042)
512    }
513
514    fn node2_addr() -> SocketAddr {
515        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)), 19042)
516    }
517
518    fn node3_addr() -> SocketAddr {
519        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 3)), 19042)
520    }
521
522    fn unexpected_response(kind: CqlResponseKind) -> RequestAttemptError {
523        RequestAttemptError::UnexpectedResponse(kind)
524    }
525
526    fn unavailable_error() -> RequestAttemptError {
527        RequestAttemptError::DbError(
528            DbError::Unavailable {
529                consistency: Consistency::Quorum,
530                required: 2,
531                alive: 1,
532            },
533            "Not enough nodes to satisfy consistency".to_string(),
534        )
535    }
536
537    fn no_stream_id_error() -> RequestAttemptError {
538        RequestAttemptError::UnableToAllocStreamId
539    }
540
541    #[test]
542    fn empty_history() {
543        setup_tracing();
544        let history_collector = HistoryCollector::new();
545        let history: StructuredHistory = history_collector.clone_structured_history();
546
547        assert!(history.requests.is_empty());
548
549        let displayed = "Requests History:
550";
551        assert_eq!(displayed, format!("{}", history));
552    }
553
554    #[test]
555    fn empty_request() {
556        setup_tracing();
557        let history_collector = HistoryCollector::new();
558
559        let _request_id: RequestId = history_collector.log_request_start();
560
561        let history: StructuredHistory = history_collector.clone_structured_history();
562
563        assert_eq!(history.requests.len(), 1);
564        assert!(history.requests[0]
565            .non_speculative_fiber
566            .attempts
567            .is_empty());
568        assert!(history.requests[0].speculative_fibers.is_empty());
569
570        let displayed = "Requests History:
571=== Request #0 ===
572| start_time: 2022-02-22 20:22:22 UTC
573| Non-speculative attempts:
574|
575| Request still running - no final result yet
576=================
577";
578
579        assert_eq!(displayed, format!("{}", set_one_time(history)));
580    }
581
582    #[test]
583    fn one_attempt() {
584        setup_tracing();
585        let history_collector = HistoryCollector::new();
586
587        let request_id: RequestId = history_collector.log_request_start();
588        let attempt_id: AttemptId =
589            history_collector.log_attempt_start(request_id, None, node1_addr());
590        history_collector.log_attempt_success(attempt_id);
591        history_collector.log_request_success(request_id);
592
593        let history: StructuredHistory = history_collector.clone_structured_history();
594
595        assert_eq!(history.requests.len(), 1);
596        assert_eq!(history.requests[0].non_speculative_fiber.attempts.len(), 1);
597        assert!(history.requests[0].speculative_fibers.is_empty());
598        assert_matches!(
599            history.requests[0].non_speculative_fiber.attempts[0].result,
600            Some(AttemptResult::Success(_))
601        );
602
603        let displayed = "Requests History:
604=== Request #0 ===
605| start_time: 2022-02-22 20:22:22 UTC
606| Non-speculative attempts:
607| - Attempt #0 sent to 127.0.0.1:19042
608|   request send time: 2022-02-22 20:22:22 UTC
609|   Success at 2022-02-22 20:22:22 UTC
610|
611| Request successful at 2022-02-22 20:22:22 UTC
612=================
613";
614        assert_eq!(displayed, format!("{}", set_one_time(history)));
615    }
616
617    #[test]
618    fn two_error_atempts() {
619        setup_tracing();
620        let history_collector = HistoryCollector::new();
621
622        let request_id: RequestId = history_collector.log_request_start();
623
624        let attempt_id: AttemptId =
625            history_collector.log_attempt_start(request_id, None, node1_addr());
626        history_collector.log_attempt_error(
627            attempt_id,
628            &unexpected_response(CqlResponseKind::Ready),
629            &RetryDecision::RetrySameTarget(Some(Consistency::Quorum)),
630        );
631
632        let second_attempt_id: AttemptId =
633            history_collector.log_attempt_start(request_id, None, node1_addr());
634        history_collector.log_attempt_error(
635            second_attempt_id,
636            &unavailable_error(),
637            &RetryDecision::DontRetry,
638        );
639
640        history_collector.log_request_error(
641            request_id,
642            &RequestError::LastAttemptError(unavailable_error()),
643        );
644
645        let history: StructuredHistory = history_collector.clone_structured_history();
646
647        let displayed =
648"Requests History:
649=== Request #0 ===
650| start_time: 2022-02-22 20:22:22 UTC
651| Non-speculative attempts:
652| - Attempt #0 sent to 127.0.0.1:19042
653|   request send time: 2022-02-22 20:22:22 UTC
654|   Error at 2022-02-22 20:22:22 UTC
655|   Error: Received unexpected response from the server: READY. Expected RESULT or ERROR response.
656|   Retry decision: RetrySameTarget(Some(Quorum))
657|
658| - Attempt #1 sent to 127.0.0.1:19042
659|   request send time: 2022-02-22 20:22:22 UTC
660|   Error at 2022-02-22 20:22:22 UTC
661|   Error: Database returned an error: Not enough nodes are alive to satisfy required consistency level (consistency: Quorum, required: 2, alive: 1), Error message: Not enough nodes to satisfy consistency
662|   Retry decision: DontRetry
663|
664| Request failed at 2022-02-22 20:22:22 UTC
665| Error: Database returned an error: Not enough nodes are alive to satisfy required consistency level (consistency: Quorum, required: 2, alive: 1), Error message: Not enough nodes to satisfy consistency
666=================
667";
668        assert_eq!(displayed, format!("{}", set_one_time(history)));
669    }
670
671    #[test]
672    fn empty_fibers() {
673        setup_tracing();
674        let history_collector = HistoryCollector::new();
675
676        let request_id: RequestId = history_collector.log_request_start();
677        history_collector.log_new_speculative_fiber(request_id);
678        history_collector.log_new_speculative_fiber(request_id);
679        history_collector.log_new_speculative_fiber(request_id);
680
681        let history: StructuredHistory = history_collector.clone_structured_history();
682
683        assert_eq!(history.requests.len(), 1);
684        assert!(history.requests[0]
685            .non_speculative_fiber
686            .attempts
687            .is_empty());
688        assert_eq!(history.requests[0].speculative_fibers.len(), 3);
689        assert!(history.requests[0].speculative_fibers[0]
690            .attempts
691            .is_empty());
692        assert!(history.requests[0].speculative_fibers[1]
693            .attempts
694            .is_empty());
695        assert!(history.requests[0].speculative_fibers[2]
696            .attempts
697            .is_empty());
698
699        let displayed = "Requests History:
700=== Request #0 ===
701| start_time: 2022-02-22 20:22:22 UTC
702| Non-speculative attempts:
703|
704|
705| > Speculative fiber #0
706| fiber start time: 2022-02-22 20:22:22 UTC
707|
708|
709| > Speculative fiber #1
710| fiber start time: 2022-02-22 20:22:22 UTC
711|
712|
713| > Speculative fiber #2
714| fiber start time: 2022-02-22 20:22:22 UTC
715|
716| Request still running - no final result yet
717=================
718";
719        assert_eq!(displayed, format!("{}", set_one_time(history)));
720    }
721
722    #[test]
723    fn complex() {
724        setup_tracing();
725        let history_collector = HistoryCollector::new();
726
727        let request_id: RequestId = history_collector.log_request_start();
728
729        let attempt1: AttemptId =
730            history_collector.log_attempt_start(request_id, None, node1_addr());
731
732        let speculative1: SpeculativeId = history_collector.log_new_speculative_fiber(request_id);
733
734        let spec1_attempt1: AttemptId =
735            history_collector.log_attempt_start(request_id, Some(speculative1), node2_addr());
736
737        history_collector.log_attempt_error(
738            attempt1,
739            &unexpected_response(CqlResponseKind::Event),
740            &RetryDecision::RetryNextTarget(Some(Consistency::Quorum)),
741        );
742        let _attempt2: AttemptId =
743            history_collector.log_attempt_start(request_id, None, node3_addr());
744
745        let speculative2: SpeculativeId = history_collector.log_new_speculative_fiber(request_id);
746
747        let spec2_attempt1: AttemptId =
748            history_collector.log_attempt_start(request_id, Some(speculative2), node1_addr());
749        history_collector.log_attempt_error(
750            spec2_attempt1,
751            &no_stream_id_error(),
752            &RetryDecision::RetrySameTarget(Some(Consistency::Quorum)),
753        );
754
755        let spec2_attempt2: AttemptId =
756            history_collector.log_attempt_start(request_id, Some(speculative2), node1_addr());
757
758        let _speculative3: SpeculativeId = history_collector.log_new_speculative_fiber(request_id);
759        let speculative4: SpeculativeId = history_collector.log_new_speculative_fiber(request_id);
760
761        history_collector.log_attempt_error(
762            spec1_attempt1,
763            &unavailable_error(),
764            &RetryDecision::RetryNextTarget(Some(Consistency::Quorum)),
765        );
766
767        let _spec4_attempt1: AttemptId =
768            history_collector.log_attempt_start(request_id, Some(speculative4), node2_addr());
769
770        history_collector.log_attempt_success(spec2_attempt2);
771        history_collector.log_request_success(request_id);
772
773        let history: StructuredHistory = history_collector.clone_structured_history();
774
775        let displayed = "Requests History:
776=== Request #0 ===
777| start_time: 2022-02-22 20:22:22 UTC
778| Non-speculative attempts:
779| - Attempt #0 sent to 127.0.0.1:19042
780|   request send time: 2022-02-22 20:22:22 UTC
781|   Error at 2022-02-22 20:22:22 UTC
782|   Error: Received unexpected response from the server: EVENT. Expected RESULT or ERROR response.
783|   Retry decision: RetryNextTarget(Some(Quorum))
784|
785| - Attempt #1 sent to 127.0.0.3:19042
786|   request send time: 2022-02-22 20:22:22 UTC
787|   No result yet
788|
789|
790| > Speculative fiber #0
791| fiber start time: 2022-02-22 20:22:22 UTC
792| - Attempt #0 sent to 127.0.0.2:19042
793|   request send time: 2022-02-22 20:22:22 UTC
794|   Error at 2022-02-22 20:22:22 UTC
795|   Error: Database returned an error: Not enough nodes are alive to satisfy required consistency level (consistency: Quorum, required: 2, alive: 1), Error message: Not enough nodes to satisfy consistency
796|   Retry decision: RetryNextTarget(Some(Quorum))
797|
798|
799| > Speculative fiber #1
800| fiber start time: 2022-02-22 20:22:22 UTC
801| - Attempt #0 sent to 127.0.0.1:19042
802|   request send time: 2022-02-22 20:22:22 UTC
803|   Error at 2022-02-22 20:22:22 UTC
804|   Error: Unable to allocate stream id
805|   Retry decision: RetrySameTarget(Some(Quorum))
806|
807| - Attempt #1 sent to 127.0.0.1:19042
808|   request send time: 2022-02-22 20:22:22 UTC
809|   Success at 2022-02-22 20:22:22 UTC
810|
811|
812| > Speculative fiber #2
813| fiber start time: 2022-02-22 20:22:22 UTC
814|
815|
816| > Speculative fiber #3
817| fiber start time: 2022-02-22 20:22:22 UTC
818| - Attempt #0 sent to 127.0.0.2:19042
819|   request send time: 2022-02-22 20:22:22 UTC
820|   No result yet
821|
822| Request successful at 2022-02-22 20:22:22 UTC
823=================
824";
825        assert_eq!(displayed, format!("{}", set_one_time(history)));
826    }
827
828    #[test]
829    fn multiple_requests() {
830        setup_tracing();
831        let history_collector = HistoryCollector::new();
832
833        let request1_id: RequestId = history_collector.log_request_start();
834        let request1_attempt1: AttemptId =
835            history_collector.log_attempt_start(request1_id, None, node1_addr());
836        history_collector.log_attempt_error(
837            request1_attempt1,
838            &unexpected_response(CqlResponseKind::Supported),
839            &RetryDecision::RetryNextTarget(Some(Consistency::Quorum)),
840        );
841        let request1_attempt2: AttemptId =
842            history_collector.log_attempt_start(request1_id, None, node2_addr());
843        history_collector.log_attempt_success(request1_attempt2);
844        history_collector.log_request_success(request1_id);
845
846        let request2_id: RequestId = history_collector.log_request_start();
847        let request2_attempt1: AttemptId =
848            history_collector.log_attempt_start(request2_id, None, node1_addr());
849        history_collector.log_attempt_success(request2_attempt1);
850        history_collector.log_request_success(request2_id);
851
852        let history: StructuredHistory = history_collector.clone_structured_history();
853
854        let displayed = "Requests History:
855=== Request #0 ===
856| start_time: 2022-02-22 20:22:22 UTC
857| Non-speculative attempts:
858| - Attempt #0 sent to 127.0.0.1:19042
859|   request send time: 2022-02-22 20:22:22 UTC
860|   Error at 2022-02-22 20:22:22 UTC
861|   Error: Received unexpected response from the server: SUPPORTED. Expected RESULT or ERROR response.
862|   Retry decision: RetryNextTarget(Some(Quorum))
863|
864| - Attempt #1 sent to 127.0.0.2:19042
865|   request send time: 2022-02-22 20:22:22 UTC
866|   Success at 2022-02-22 20:22:22 UTC
867|
868| Request successful at 2022-02-22 20:22:22 UTC
869=================
870=== Request #1 ===
871| start_time: 2022-02-22 20:22:22 UTC
872| Non-speculative attempts:
873| - Attempt #0 sent to 127.0.0.1:19042
874|   request send time: 2022-02-22 20:22:22 UTC
875|   Success at 2022-02-22 20:22:22 UTC
876|
877| Request successful at 2022-02-22 20:22:22 UTC
878=================
879";
880        assert_eq!(displayed, format!("{}", set_one_time(history)));
881    }
882}