1use std::{
3 collections::BTreeMap,
4 fmt::{Debug, Display},
5 net::SocketAddr,
6 sync::Mutex,
7 time::SystemTime,
8};
9
10use crate::errors::{RequestAttemptError, RequestError};
11use crate::policies::retry::RetryDecision;
12use chrono::{DateTime, Utc};
13
14use tracing::warn;
15
16#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
18pub struct RequestId(pub usize);
19
20#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
22pub struct AttemptId(pub usize);
23
24#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
28pub struct SpeculativeId(pub usize);
29
30pub trait HistoryListener: Debug + Send + Sync {
39 fn log_request_start(&self) -> RequestId;
41
42 fn log_request_success(&self, request_id: RequestId);
44
45 fn log_request_error(&self, request_id: RequestId, error: &RequestError);
47
48 fn log_new_speculative_fiber(&self, request_id: RequestId) -> SpeculativeId;
50
51 fn log_attempt_start(
53 &self,
54 request_id: RequestId,
55 speculative_id: Option<SpeculativeId>,
56 node_addr: SocketAddr,
57 ) -> AttemptId;
58
59 fn log_attempt_success(&self, attempt_id: AttemptId);
61
62 fn log_attempt_error(
64 &self,
65 attempt_id: AttemptId,
66 error: &RequestAttemptError,
67 retry_decision: &RetryDecision,
68 );
69}
70
71pub type TimePoint = DateTime<Utc>;
72
73#[derive(Debug, Default)]
76pub struct HistoryCollector {
77 data: Mutex<HistoryCollectorData>,
78}
79
80#[derive(Debug, Clone)]
81pub struct HistoryCollectorData {
82 events: Vec<(HistoryEvent, TimePoint)>,
83 next_request_id: RequestId,
84 next_speculative_fiber_id: SpeculativeId,
85 next_attempt_id: AttemptId,
86}
87
88#[derive(Debug, Clone)]
89pub enum HistoryEvent {
90 NewRequest(RequestId),
91 RequestSuccess(RequestId),
92 RequestError(RequestId, RequestError),
93 NewSpeculativeFiber(SpeculativeId, RequestId),
94 NewAttempt(AttemptId, RequestId, Option<SpeculativeId>, SocketAddr),
95 AttemptSuccess(AttemptId),
96 AttemptError(AttemptId, RequestAttemptError, RetryDecision),
97}
98
99impl HistoryCollectorData {
100 fn new() -> HistoryCollectorData {
101 HistoryCollectorData {
102 events: Vec::new(),
103 next_request_id: RequestId(0),
104 next_speculative_fiber_id: SpeculativeId(0),
105 next_attempt_id: AttemptId(0),
106 }
107 }
108
109 fn add_event(&mut self, event: HistoryEvent) {
110 let event_time: TimePoint = SystemTime::now().into();
111 self.events.push((event, event_time));
112 }
113}
114
115impl Default for HistoryCollectorData {
116 fn default() -> HistoryCollectorData {
117 HistoryCollectorData::new()
118 }
119}
120
121impl HistoryCollector {
122 pub fn new() -> HistoryCollector {
124 HistoryCollector::default()
125 }
126
127 pub fn clone_collected(&self) -> HistoryCollectorData {
129 self.do_with_data(|data| data.clone())
130 }
131
132 pub fn take_collected(&self) -> HistoryCollectorData {
136 self.do_with_data(|data| {
137 let mut data_to_swap = HistoryCollectorData {
138 events: Vec::new(),
139 next_request_id: data.next_request_id,
140 next_speculative_fiber_id: data.next_speculative_fiber_id,
141 next_attempt_id: data.next_attempt_id,
142 };
143 std::mem::swap(&mut data_to_swap, data);
144 data_to_swap
145 })
146 }
147
148 pub fn clone_structured_history(&self) -> StructuredHistory {
150 StructuredHistory::from(&self.clone_collected())
151 }
152
153 pub fn take_structured_history(&self) -> StructuredHistory {
155 StructuredHistory::from(&self.take_collected())
156 }
157
158 fn do_with_data<OpRetType>(
160 &self,
161 do_fn: impl Fn(&mut HistoryCollectorData) -> OpRetType,
162 ) -> OpRetType {
163 match self.data.lock() {
164 Ok(mut data) => do_fn(&mut data),
165 Err(poison_error) => {
166 warn!("HistoryCollector - mutex poisoned! Error: {}", poison_error);
169 let mut dummy_data: HistoryCollectorData = HistoryCollectorData::default();
170 do_fn(&mut dummy_data)
171 }
172 }
173 }
174}
175
176impl HistoryListener for HistoryCollector {
177 fn log_request_start(&self) -> RequestId {
178 self.do_with_data(|data| {
179 let new_request_id: RequestId = data.next_request_id;
180 data.next_request_id.0 += 1;
181 data.add_event(HistoryEvent::NewRequest(new_request_id));
182 new_request_id
183 })
184 }
185
186 fn log_request_success(&self, request_id: RequestId) {
187 self.do_with_data(|data| {
188 data.add_event(HistoryEvent::RequestSuccess(request_id));
189 })
190 }
191
192 fn log_request_error(&self, request_id: RequestId, error: &RequestError) {
193 self.do_with_data(|data| {
194 data.add_event(HistoryEvent::RequestError(request_id, error.clone()))
195 })
196 }
197
198 fn log_new_speculative_fiber(&self, request_id: RequestId) -> SpeculativeId {
199 self.do_with_data(|data| {
200 let new_speculative_id: SpeculativeId = data.next_speculative_fiber_id;
201 data.next_speculative_fiber_id.0 += 1;
202 data.add_event(HistoryEvent::NewSpeculativeFiber(
203 new_speculative_id,
204 request_id,
205 ));
206 new_speculative_id
207 })
208 }
209
210 fn log_attempt_start(
211 &self,
212 request_id: RequestId,
213 speculative_id: Option<SpeculativeId>,
214 node_addr: SocketAddr,
215 ) -> AttemptId {
216 self.do_with_data(|data| {
217 let new_attempt_id: AttemptId = data.next_attempt_id;
218 data.next_attempt_id.0 += 1;
219 data.add_event(HistoryEvent::NewAttempt(
220 new_attempt_id,
221 request_id,
222 speculative_id,
223 node_addr,
224 ));
225 new_attempt_id
226 })
227 }
228
229 fn log_attempt_success(&self, attempt_id: AttemptId) {
230 self.do_with_data(|data| data.add_event(HistoryEvent::AttemptSuccess(attempt_id)))
231 }
232
233 fn log_attempt_error(
234 &self,
235 attempt_id: AttemptId,
236 error: &RequestAttemptError,
237 retry_decision: &RetryDecision,
238 ) {
239 self.do_with_data(|data| {
240 data.add_event(HistoryEvent::AttemptError(
241 attempt_id,
242 error.clone(),
243 retry_decision.clone(),
244 ))
245 })
246 }
247}
248
249#[derive(Debug, Clone)]
254pub struct StructuredHistory {
255 pub requests: Vec<RequestHistory>,
256}
257
258#[derive(Debug, Clone)]
259pub struct RequestHistory {
260 pub start_time: TimePoint,
261 pub non_speculative_fiber: FiberHistory,
262 pub speculative_fibers: Vec<FiberHistory>,
263 pub result: Option<RequestHistoryResult>,
264}
265
266#[derive(Debug, Clone)]
267pub enum RequestHistoryResult {
268 Success(TimePoint),
269 Error(TimePoint, RequestError),
270}
271
272#[derive(Debug, Clone)]
273pub struct FiberHistory {
274 pub start_time: TimePoint,
275 pub attempts: Vec<AttemptHistory>,
276}
277
278#[derive(Debug, Clone)]
279pub struct AttemptHistory {
280 pub send_time: TimePoint,
281 pub node_addr: SocketAddr,
282 pub result: Option<AttemptResult>,
283}
284
285#[derive(Debug, Clone)]
286pub enum AttemptResult {
287 Success(TimePoint),
288 Error(TimePoint, RequestAttemptError, RetryDecision),
289}
290
291impl From<&HistoryCollectorData> for StructuredHistory {
292 fn from(data: &HistoryCollectorData) -> StructuredHistory {
293 let mut attempts: BTreeMap<AttemptId, AttemptHistory> = BTreeMap::new();
294 let mut requests: BTreeMap<RequestId, RequestHistory> = BTreeMap::new();
295 let mut fibers: BTreeMap<SpeculativeId, FiberHistory> = BTreeMap::new();
296
297 for (event, event_time) in &data.events {
299 match event {
300 HistoryEvent::NewAttempt(attempt_id, _, _, node_addr) => {
301 attempts.insert(
302 *attempt_id,
303 AttemptHistory {
304 send_time: *event_time,
305 node_addr: *node_addr,
306 result: None,
307 },
308 );
309 }
310 HistoryEvent::AttemptSuccess(attempt_id) => {
311 if let Some(attempt) = attempts.get_mut(attempt_id) {
312 attempt.result = Some(AttemptResult::Success(*event_time));
313 }
314 }
315 HistoryEvent::AttemptError(attempt_id, error, retry_decision) => {
316 match attempts.get_mut(attempt_id) {
317 Some(attempt) => {
318 if attempt.result.is_some() {
319 warn!("StructuredHistory - attempt with id {:?} has multiple results", attempt_id);
320 }
321 attempt.result = Some(AttemptResult::Error(*event_time, error.clone(), retry_decision.clone()));
322 },
323 None => warn!("StructuredHistory - attempt with id {:?} finished with an error but not created", attempt_id)
324 }
325 }
326 HistoryEvent::NewRequest(request_id) => {
327 requests.insert(
328 *request_id,
329 RequestHistory {
330 start_time: *event_time,
331 non_speculative_fiber: FiberHistory {
332 start_time: *event_time,
333 attempts: Vec::new(),
334 },
335 speculative_fibers: Vec::new(),
336 result: None,
337 },
338 );
339 }
340 HistoryEvent::RequestSuccess(request_id) => {
341 if let Some(request) = requests.get_mut(request_id) {
342 request.result = Some(RequestHistoryResult::Success(*event_time));
343 }
344 }
345 HistoryEvent::RequestError(request_id, error) => {
346 if let Some(request) = requests.get_mut(request_id) {
347 request.result =
348 Some(RequestHistoryResult::Error(*event_time, error.clone()));
349 }
350 }
351 HistoryEvent::NewSpeculativeFiber(speculative_id, _) => {
352 fibers.insert(
353 *speculative_id,
354 FiberHistory {
355 start_time: *event_time,
356 attempts: Vec::new(),
357 },
358 );
359 }
360 }
361 }
362
363 for (event, _) in &data.events {
365 if let HistoryEvent::NewAttempt(attempt_id, request_id, speculative_id, _) = event {
366 if let Some(attempt) = attempts.remove(attempt_id) {
367 match speculative_id {
368 Some(spec_id) => {
369 if let Some(spec_fiber) = fibers.get_mut(spec_id) {
370 spec_fiber.attempts.push(attempt);
371 }
372 }
373 None => {
374 if let Some(request) = requests.get_mut(request_id) {
375 request.non_speculative_fiber.attempts.push(attempt);
376 }
377 }
378 }
379 }
380 }
381 }
382
383 for (event, _) in &data.events {
385 if let HistoryEvent::NewSpeculativeFiber(speculative_id, request_id) = event {
386 if let Some(fiber) = fibers.remove(speculative_id) {
387 if let Some(request) = requests.get_mut(request_id) {
388 request.speculative_fibers.push(fiber);
389 }
390 }
391 }
392 }
393
394 StructuredHistory {
395 requests: requests.into_values().collect(),
396 }
397 }
398}
399
400impl Display for StructuredHistory {
402 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
403 writeln!(f, "Requests History:")?;
404 for (i, request) in self.requests.iter().enumerate() {
405 writeln!(f, "=== Request #{} ===", i)?;
406 writeln!(f, "| start_time: {}", request.start_time)?;
407 writeln!(f, "| Non-speculative attempts:")?;
408 write_fiber_attempts(&request.non_speculative_fiber, f)?;
409 for (spec_i, speculative_fiber) in request.speculative_fibers.iter().enumerate() {
410 writeln!(f, "|")?;
411 writeln!(f, "|")?;
412 writeln!(f, "| > Speculative fiber #{}", spec_i)?;
413 writeln!(f, "| fiber start time: {}", speculative_fiber.start_time)?;
414 write_fiber_attempts(speculative_fiber, f)?;
415 }
416 writeln!(f, "|")?;
417 match &request.result {
418 Some(RequestHistoryResult::Success(succ_time)) => {
419 writeln!(f, "| Request successful at {}", succ_time)?;
420 }
421 Some(RequestHistoryResult::Error(err_time, error)) => {
422 writeln!(f, "| Request failed at {}", err_time)?;
423 writeln!(f, "| Error: {}", error)?;
424 }
425 None => writeln!(f, "| Request still running - no final result yet")?,
426 };
427 writeln!(f, "=================")?;
428 }
429 Ok(())
430 }
431}
432
433fn write_fiber_attempts(fiber: &FiberHistory, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434 for (i, attempt) in fiber.attempts.iter().enumerate() {
435 if i != 0 {
436 writeln!(f, "|")?;
437 }
438 writeln!(f, "| - Attempt #{} sent to {}", i, attempt.node_addr)?;
439 writeln!(f, "| request send time: {}", attempt.send_time)?;
440 match &attempt.result {
441 Some(AttemptResult::Success(time)) => writeln!(f, "| Success at {}", time)?,
442 Some(AttemptResult::Error(time, err, retry_decision)) => {
443 writeln!(f, "| Error at {}", time)?;
444 writeln!(f, "| Error: {}", err)?;
445 writeln!(f, "| Retry decision: {:?}", retry_decision)?;
446 }
447 None => writeln!(f, "| No result yet")?,
448 };
449 }
450
451 Ok(())
452}
453
454#[cfg(test)]
455mod tests {
456 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
457
458 use crate::{
459 errors::{DbError, RequestAttemptError, RequestError},
460 policies::retry::RetryDecision,
461 test_utils::setup_tracing,
462 };
463
464 use super::{
465 AttemptId, AttemptResult, HistoryCollector, HistoryListener, RequestHistoryResult,
466 RequestId, SpeculativeId, StructuredHistory, TimePoint,
467 };
468 use assert_matches::assert_matches;
469 use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
470 use scylla_cql::{frame::response::CqlResponseKind, Consistency};
471
472 fn set_one_time(mut history: StructuredHistory) -> StructuredHistory {
476 let the_time: TimePoint = DateTime::<Utc>::from_naive_utc_and_offset(
477 NaiveDateTime::new(
478 NaiveDate::from_ymd_opt(2022, 2, 22).unwrap(),
479 NaiveTime::from_hms_opt(20, 22, 22).unwrap(),
480 ),
481 Utc,
482 );
483
484 for request in &mut history.requests {
485 request.start_time = the_time;
486 match &mut request.result {
487 Some(RequestHistoryResult::Success(succ_time)) => *succ_time = the_time,
488 Some(RequestHistoryResult::Error(err_time, _)) => *err_time = the_time,
489 None => {}
490 };
491
492 for fiber in std::iter::once(&mut request.non_speculative_fiber)
493 .chain(request.speculative_fibers.iter_mut())
494 {
495 fiber.start_time = the_time;
496 for attempt in &mut fiber.attempts {
497 attempt.send_time = the_time;
498 match &mut attempt.result {
499 Some(AttemptResult::Success(succ_time)) => *succ_time = the_time,
500 Some(AttemptResult::Error(err_time, _, _)) => *err_time = the_time,
501 None => {}
502 }
503 }
504 }
505 }
506
507 history
508 }
509
510 fn node1_addr() -> SocketAddr {
511 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 19042)
512 }
513
514 fn node2_addr() -> SocketAddr {
515 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)), 19042)
516 }
517
518 fn node3_addr() -> SocketAddr {
519 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 3)), 19042)
520 }
521
522 fn unexpected_response(kind: CqlResponseKind) -> RequestAttemptError {
523 RequestAttemptError::UnexpectedResponse(kind)
524 }
525
526 fn unavailable_error() -> RequestAttemptError {
527 RequestAttemptError::DbError(
528 DbError::Unavailable {
529 consistency: Consistency::Quorum,
530 required: 2,
531 alive: 1,
532 },
533 "Not enough nodes to satisfy consistency".to_string(),
534 )
535 }
536
537 fn no_stream_id_error() -> RequestAttemptError {
538 RequestAttemptError::UnableToAllocStreamId
539 }
540
541 #[test]
542 fn empty_history() {
543 setup_tracing();
544 let history_collector = HistoryCollector::new();
545 let history: StructuredHistory = history_collector.clone_structured_history();
546
547 assert!(history.requests.is_empty());
548
549 let displayed = "Requests History:
550";
551 assert_eq!(displayed, format!("{}", history));
552 }
553
554 #[test]
555 fn empty_request() {
556 setup_tracing();
557 let history_collector = HistoryCollector::new();
558
559 let _request_id: RequestId = history_collector.log_request_start();
560
561 let history: StructuredHistory = history_collector.clone_structured_history();
562
563 assert_eq!(history.requests.len(), 1);
564 assert!(history.requests[0]
565 .non_speculative_fiber
566 .attempts
567 .is_empty());
568 assert!(history.requests[0].speculative_fibers.is_empty());
569
570 let displayed = "Requests History:
571=== Request #0 ===
572| start_time: 2022-02-22 20:22:22 UTC
573| Non-speculative attempts:
574|
575| Request still running - no final result yet
576=================
577";
578
579 assert_eq!(displayed, format!("{}", set_one_time(history)));
580 }
581
582 #[test]
583 fn one_attempt() {
584 setup_tracing();
585 let history_collector = HistoryCollector::new();
586
587 let request_id: RequestId = history_collector.log_request_start();
588 let attempt_id: AttemptId =
589 history_collector.log_attempt_start(request_id, None, node1_addr());
590 history_collector.log_attempt_success(attempt_id);
591 history_collector.log_request_success(request_id);
592
593 let history: StructuredHistory = history_collector.clone_structured_history();
594
595 assert_eq!(history.requests.len(), 1);
596 assert_eq!(history.requests[0].non_speculative_fiber.attempts.len(), 1);
597 assert!(history.requests[0].speculative_fibers.is_empty());
598 assert_matches!(
599 history.requests[0].non_speculative_fiber.attempts[0].result,
600 Some(AttemptResult::Success(_))
601 );
602
603 let displayed = "Requests History:
604=== Request #0 ===
605| start_time: 2022-02-22 20:22:22 UTC
606| Non-speculative attempts:
607| - Attempt #0 sent to 127.0.0.1:19042
608| request send time: 2022-02-22 20:22:22 UTC
609| Success at 2022-02-22 20:22:22 UTC
610|
611| Request successful at 2022-02-22 20:22:22 UTC
612=================
613";
614 assert_eq!(displayed, format!("{}", set_one_time(history)));
615 }
616
617 #[test]
618 fn two_error_atempts() {
619 setup_tracing();
620 let history_collector = HistoryCollector::new();
621
622 let request_id: RequestId = history_collector.log_request_start();
623
624 let attempt_id: AttemptId =
625 history_collector.log_attempt_start(request_id, None, node1_addr());
626 history_collector.log_attempt_error(
627 attempt_id,
628 &unexpected_response(CqlResponseKind::Ready),
629 &RetryDecision::RetrySameTarget(Some(Consistency::Quorum)),
630 );
631
632 let second_attempt_id: AttemptId =
633 history_collector.log_attempt_start(request_id, None, node1_addr());
634 history_collector.log_attempt_error(
635 second_attempt_id,
636 &unavailable_error(),
637 &RetryDecision::DontRetry,
638 );
639
640 history_collector.log_request_error(
641 request_id,
642 &RequestError::LastAttemptError(unavailable_error()),
643 );
644
645 let history: StructuredHistory = history_collector.clone_structured_history();
646
647 let displayed =
648"Requests History:
649=== Request #0 ===
650| start_time: 2022-02-22 20:22:22 UTC
651| Non-speculative attempts:
652| - Attempt #0 sent to 127.0.0.1:19042
653| request send time: 2022-02-22 20:22:22 UTC
654| Error at 2022-02-22 20:22:22 UTC
655| Error: Received unexpected response from the server: READY. Expected RESULT or ERROR response.
656| Retry decision: RetrySameTarget(Some(Quorum))
657|
658| - Attempt #1 sent to 127.0.0.1:19042
659| request send time: 2022-02-22 20:22:22 UTC
660| Error at 2022-02-22 20:22:22 UTC
661| Error: Database returned an error: Not enough nodes are alive to satisfy required consistency level (consistency: Quorum, required: 2, alive: 1), Error message: Not enough nodes to satisfy consistency
662| Retry decision: DontRetry
663|
664| Request failed at 2022-02-22 20:22:22 UTC
665| Error: Database returned an error: Not enough nodes are alive to satisfy required consistency level (consistency: Quorum, required: 2, alive: 1), Error message: Not enough nodes to satisfy consistency
666=================
667";
668 assert_eq!(displayed, format!("{}", set_one_time(history)));
669 }
670
671 #[test]
672 fn empty_fibers() {
673 setup_tracing();
674 let history_collector = HistoryCollector::new();
675
676 let request_id: RequestId = history_collector.log_request_start();
677 history_collector.log_new_speculative_fiber(request_id);
678 history_collector.log_new_speculative_fiber(request_id);
679 history_collector.log_new_speculative_fiber(request_id);
680
681 let history: StructuredHistory = history_collector.clone_structured_history();
682
683 assert_eq!(history.requests.len(), 1);
684 assert!(history.requests[0]
685 .non_speculative_fiber
686 .attempts
687 .is_empty());
688 assert_eq!(history.requests[0].speculative_fibers.len(), 3);
689 assert!(history.requests[0].speculative_fibers[0]
690 .attempts
691 .is_empty());
692 assert!(history.requests[0].speculative_fibers[1]
693 .attempts
694 .is_empty());
695 assert!(history.requests[0].speculative_fibers[2]
696 .attempts
697 .is_empty());
698
699 let displayed = "Requests History:
700=== Request #0 ===
701| start_time: 2022-02-22 20:22:22 UTC
702| Non-speculative attempts:
703|
704|
705| > Speculative fiber #0
706| fiber start time: 2022-02-22 20:22:22 UTC
707|
708|
709| > Speculative fiber #1
710| fiber start time: 2022-02-22 20:22:22 UTC
711|
712|
713| > Speculative fiber #2
714| fiber start time: 2022-02-22 20:22:22 UTC
715|
716| Request still running - no final result yet
717=================
718";
719 assert_eq!(displayed, format!("{}", set_one_time(history)));
720 }
721
722 #[test]
723 fn complex() {
724 setup_tracing();
725 let history_collector = HistoryCollector::new();
726
727 let request_id: RequestId = history_collector.log_request_start();
728
729 let attempt1: AttemptId =
730 history_collector.log_attempt_start(request_id, None, node1_addr());
731
732 let speculative1: SpeculativeId = history_collector.log_new_speculative_fiber(request_id);
733
734 let spec1_attempt1: AttemptId =
735 history_collector.log_attempt_start(request_id, Some(speculative1), node2_addr());
736
737 history_collector.log_attempt_error(
738 attempt1,
739 &unexpected_response(CqlResponseKind::Event),
740 &RetryDecision::RetryNextTarget(Some(Consistency::Quorum)),
741 );
742 let _attempt2: AttemptId =
743 history_collector.log_attempt_start(request_id, None, node3_addr());
744
745 let speculative2: SpeculativeId = history_collector.log_new_speculative_fiber(request_id);
746
747 let spec2_attempt1: AttemptId =
748 history_collector.log_attempt_start(request_id, Some(speculative2), node1_addr());
749 history_collector.log_attempt_error(
750 spec2_attempt1,
751 &no_stream_id_error(),
752 &RetryDecision::RetrySameTarget(Some(Consistency::Quorum)),
753 );
754
755 let spec2_attempt2: AttemptId =
756 history_collector.log_attempt_start(request_id, Some(speculative2), node1_addr());
757
758 let _speculative3: SpeculativeId = history_collector.log_new_speculative_fiber(request_id);
759 let speculative4: SpeculativeId = history_collector.log_new_speculative_fiber(request_id);
760
761 history_collector.log_attempt_error(
762 spec1_attempt1,
763 &unavailable_error(),
764 &RetryDecision::RetryNextTarget(Some(Consistency::Quorum)),
765 );
766
767 let _spec4_attempt1: AttemptId =
768 history_collector.log_attempt_start(request_id, Some(speculative4), node2_addr());
769
770 history_collector.log_attempt_success(spec2_attempt2);
771 history_collector.log_request_success(request_id);
772
773 let history: StructuredHistory = history_collector.clone_structured_history();
774
775 let displayed = "Requests History:
776=== Request #0 ===
777| start_time: 2022-02-22 20:22:22 UTC
778| Non-speculative attempts:
779| - Attempt #0 sent to 127.0.0.1:19042
780| request send time: 2022-02-22 20:22:22 UTC
781| Error at 2022-02-22 20:22:22 UTC
782| Error: Received unexpected response from the server: EVENT. Expected RESULT or ERROR response.
783| Retry decision: RetryNextTarget(Some(Quorum))
784|
785| - Attempt #1 sent to 127.0.0.3:19042
786| request send time: 2022-02-22 20:22:22 UTC
787| No result yet
788|
789|
790| > Speculative fiber #0
791| fiber start time: 2022-02-22 20:22:22 UTC
792| - Attempt #0 sent to 127.0.0.2:19042
793| request send time: 2022-02-22 20:22:22 UTC
794| Error at 2022-02-22 20:22:22 UTC
795| Error: Database returned an error: Not enough nodes are alive to satisfy required consistency level (consistency: Quorum, required: 2, alive: 1), Error message: Not enough nodes to satisfy consistency
796| Retry decision: RetryNextTarget(Some(Quorum))
797|
798|
799| > Speculative fiber #1
800| fiber start time: 2022-02-22 20:22:22 UTC
801| - Attempt #0 sent to 127.0.0.1:19042
802| request send time: 2022-02-22 20:22:22 UTC
803| Error at 2022-02-22 20:22:22 UTC
804| Error: Unable to allocate stream id
805| Retry decision: RetrySameTarget(Some(Quorum))
806|
807| - Attempt #1 sent to 127.0.0.1:19042
808| request send time: 2022-02-22 20:22:22 UTC
809| Success at 2022-02-22 20:22:22 UTC
810|
811|
812| > Speculative fiber #2
813| fiber start time: 2022-02-22 20:22:22 UTC
814|
815|
816| > Speculative fiber #3
817| fiber start time: 2022-02-22 20:22:22 UTC
818| - Attempt #0 sent to 127.0.0.2:19042
819| request send time: 2022-02-22 20:22:22 UTC
820| No result yet
821|
822| Request successful at 2022-02-22 20:22:22 UTC
823=================
824";
825 assert_eq!(displayed, format!("{}", set_one_time(history)));
826 }
827
828 #[test]
829 fn multiple_requests() {
830 setup_tracing();
831 let history_collector = HistoryCollector::new();
832
833 let request1_id: RequestId = history_collector.log_request_start();
834 let request1_attempt1: AttemptId =
835 history_collector.log_attempt_start(request1_id, None, node1_addr());
836 history_collector.log_attempt_error(
837 request1_attempt1,
838 &unexpected_response(CqlResponseKind::Supported),
839 &RetryDecision::RetryNextTarget(Some(Consistency::Quorum)),
840 );
841 let request1_attempt2: AttemptId =
842 history_collector.log_attempt_start(request1_id, None, node2_addr());
843 history_collector.log_attempt_success(request1_attempt2);
844 history_collector.log_request_success(request1_id);
845
846 let request2_id: RequestId = history_collector.log_request_start();
847 let request2_attempt1: AttemptId =
848 history_collector.log_attempt_start(request2_id, None, node1_addr());
849 history_collector.log_attempt_success(request2_attempt1);
850 history_collector.log_request_success(request2_id);
851
852 let history: StructuredHistory = history_collector.clone_structured_history();
853
854 let displayed = "Requests History:
855=== Request #0 ===
856| start_time: 2022-02-22 20:22:22 UTC
857| Non-speculative attempts:
858| - Attempt #0 sent to 127.0.0.1:19042
859| request send time: 2022-02-22 20:22:22 UTC
860| Error at 2022-02-22 20:22:22 UTC
861| Error: Received unexpected response from the server: SUPPORTED. Expected RESULT or ERROR response.
862| Retry decision: RetryNextTarget(Some(Quorum))
863|
864| - Attempt #1 sent to 127.0.0.2:19042
865| request send time: 2022-02-22 20:22:22 UTC
866| Success at 2022-02-22 20:22:22 UTC
867|
868| Request successful at 2022-02-22 20:22:22 UTC
869=================
870=== Request #1 ===
871| start_time: 2022-02-22 20:22:22 UTC
872| Non-speculative attempts:
873| - Attempt #0 sent to 127.0.0.1:19042
874| request send time: 2022-02-22 20:22:22 UTC
875| Success at 2022-02-22 20:22:22 UTC
876|
877| Request successful at 2022-02-22 20:22:22 UTC
878=================
879";
880 assert_eq!(displayed, format!("{}", set_one_time(history)));
881 }
882}