1use std::future::Future;
6use std::net::SocketAddr;
7use std::ops::ControlFlow;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::task::{Context, Poll};
11
12use futures::Stream;
13use scylla_cql::deserialize::result::RawRowLendingIterator;
14use scylla_cql::deserialize::row::{ColumnIterator, DeserializeRow};
15use scylla_cql::deserialize::{DeserializationError, TypeCheckError};
16use scylla_cql::frame::frame_errors::ResultMetadataAndRowsCountParseError;
17use scylla_cql::frame::request::query::PagingState;
18use scylla_cql::frame::response::result::RawMetadataAndRawRows;
19use scylla_cql::frame::response::NonErrorResponse;
20use scylla_cql::frame::types::SerialConsistency;
21use scylla_cql::serialize::row::SerializedValues;
22use scylla_cql::Consistency;
23use std::result::Result;
24use thiserror::Error;
25use tokio::sync::mpsc;
26
27use crate::client::execution_profile::ExecutionProfileInner;
28use crate::cluster::{ClusterState, NodeRef};
29use crate::deserialize::DeserializeOwnedRow;
30use crate::errors::{RequestAttemptError, RequestError};
31use crate::frame::response::result;
32use crate::network::Connection;
33use crate::observability::driver_tracing::RequestSpan;
34use crate::observability::history::{self, HistoryListener};
35#[cfg(feature = "metrics")]
36use crate::observability::metrics::Metrics;
37use crate::policies::load_balancing::{self, RoutingInfo};
38use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession};
39use crate::response::query_result::ColumnSpecs;
40use crate::response::{NonErrorQueryResponse, QueryResponse};
41use crate::statement::prepared::{PartitionKeyError, PreparedStatement};
42use crate::statement::unprepared::Statement;
43use tracing::{trace, trace_span, warn, Instrument};
44use uuid::Uuid;
45
46macro_rules! ready_some_ok {
50 ($e:expr) => {
51 match $e {
52 Poll::Ready(Some(Ok(x))) => x,
53 Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))),
54 Poll::Ready(None) => return Poll::Ready(None),
55 Poll::Pending => return Poll::Pending,
56 }
57 };
58}
59
60struct ReceivedPage {
61 rows: RawMetadataAndRawRows,
62 tracing_id: Option<Uuid>,
63}
64
65pub(crate) struct PreparedIteratorConfig {
66 pub(crate) prepared: PreparedStatement,
67 pub(crate) values: SerializedValues,
68 pub(crate) execution_profile: Arc<ExecutionProfileInner>,
69 pub(crate) cluster_state: Arc<ClusterState>,
70 #[cfg(feature = "metrics")]
71 pub(crate) metrics: Arc<Metrics>,
72}
73
74mod checked_channel_sender {
77 use scylla_cql::frame::response::result::RawMetadataAndRawRows;
78 use std::marker::PhantomData;
79 use tokio::sync::mpsc;
80 use uuid::Uuid;
81
82 use super::{NextPageError, ReceivedPage};
83
84 pub(crate) struct SendAttemptedProof<T>(PhantomData<T>);
88
89 pub(crate) struct ProvingSender<T>(mpsc::Sender<T>);
91
92 impl<T> From<mpsc::Sender<T>> for ProvingSender<T> {
93 fn from(s: mpsc::Sender<T>) -> Self {
94 Self(s)
95 }
96 }
97
98 impl<T> ProvingSender<T> {
99 pub(crate) async fn send(
100 &self,
101 value: T,
102 ) -> (SendAttemptedProof<T>, Result<(), mpsc::error::SendError<T>>) {
103 (SendAttemptedProof(PhantomData), self.0.send(value).await)
104 }
105 }
106
107 type ResultPage = Result<ReceivedPage, NextPageError>;
108
109 impl ProvingSender<ResultPage> {
110 pub(crate) async fn send_empty_page(
111 &self,
112 tracing_id: Option<Uuid>,
113 ) -> (
114 SendAttemptedProof<ResultPage>,
115 Result<(), mpsc::error::SendError<ResultPage>>,
116 ) {
117 let empty_page = ReceivedPage {
118 rows: RawMetadataAndRawRows::mock_empty(),
119 tracing_id,
120 };
121 self.send(Ok(empty_page)).await
122 }
123 }
124}
125
126use checked_channel_sender::{ProvingSender, SendAttemptedProof};
127
128type PageSendAttemptedProof = SendAttemptedProof<Result<ReceivedPage, NextPageError>>;
129
130struct PagerWorker<'a, QueryFunc, SpanCreatorFunc> {
133 sender: ProvingSender<Result<ReceivedPage, NextPageError>>,
134
135 page_query: QueryFunc,
138
139 statement_info: RoutingInfo<'a>,
140 query_is_idempotent: bool,
141 query_consistency: Consistency,
142 retry_session: Box<dyn RetrySession>,
143 execution_profile: Arc<ExecutionProfileInner>,
144 #[cfg(feature = "metrics")]
145 metrics: Arc<Metrics>,
146
147 paging_state: PagingState,
148
149 history_listener: Option<Arc<dyn HistoryListener>>,
150 current_request_id: Option<history::RequestId>,
151 current_attempt_id: Option<history::AttemptId>,
152
153 parent_span: tracing::Span,
154 span_creator: SpanCreatorFunc,
155}
156
157impl<QueryFunc, QueryFut, SpanCreator> PagerWorker<'_, QueryFunc, SpanCreator>
158where
159 QueryFunc: Fn(Arc<Connection>, Consistency, PagingState) -> QueryFut,
160 QueryFut: Future<Output = Result<QueryResponse, RequestAttemptError>>,
161 SpanCreator: Fn() -> RequestSpan,
162{
163 async fn work(mut self, cluster_state: Arc<ClusterState>) -> PageSendAttemptedProof {
165 let load_balancer = self.execution_profile.load_balancing_policy.clone();
166 let statement_info = self.statement_info.clone();
167 let query_plan =
168 load_balancing::Plan::new(load_balancer.as_ref(), &statement_info, &cluster_state);
169
170 let mut last_error: RequestError = RequestError::EmptyPlan;
171 let mut current_consistency: Consistency = self.query_consistency;
172
173 self.log_request_start();
174
175 'nodes_in_plan: for (node, shard) in query_plan {
176 let span =
177 trace_span!(parent: &self.parent_span, "Executing query", node = %node.address);
178 let connection: Arc<Connection> = match node
181 .connection_for_shard(shard)
182 .instrument(span.clone())
183 .await
184 {
185 Ok(connection) => connection,
186 Err(e) => {
187 trace!(
188 parent: &span,
189 error = %e,
190 "Choosing connection failed"
191 );
192 last_error = e.into();
193 continue 'nodes_in_plan;
195 }
196 };
197
198 'same_node_retries: loop {
199 trace!(parent: &span, "Execution started");
200 let queries_result: Result<PageSendAttemptedProof, RequestAttemptError> = self
202 .query_pages(&connection, current_consistency, node)
203 .instrument(span.clone())
204 .await;
205
206 let request_error: RequestAttemptError = match queries_result {
207 Ok(proof) => {
208 trace!(parent: &span, "Request succeeded");
209 return proof;
213 }
214 Err(error) => {
215 trace!(
216 parent: &span,
217 error = %error,
218 "Request failed"
219 );
220 error
221 }
222 };
223
224 let query_info = RequestInfo {
226 error: &request_error,
227 is_idempotent: self.query_is_idempotent,
228 consistency: self.query_consistency,
229 };
230
231 let retry_decision = self.retry_session.decide_should_retry(query_info);
232 trace!(
233 parent: &span,
234 retry_decision = ?retry_decision
235 );
236
237 self.log_attempt_error(&request_error, &retry_decision);
238
239 last_error = request_error.into();
240
241 match retry_decision {
242 RetryDecision::RetrySameTarget(cl) => {
243 #[cfg(feature = "metrics")]
244 self.metrics.inc_retries_num();
245 current_consistency = cl.unwrap_or(current_consistency);
246 continue 'same_node_retries;
247 }
248 RetryDecision::RetryNextTarget(cl) => {
249 #[cfg(feature = "metrics")]
250 self.metrics.inc_retries_num();
251 current_consistency = cl.unwrap_or(current_consistency);
252 continue 'nodes_in_plan;
253 }
254 RetryDecision::DontRetry => break 'nodes_in_plan,
255 RetryDecision::IgnoreWriteError => {
256 warn!("Ignoring error during fetching pages; stopping fetching.");
257 let (proof, _) = self.sender.send_empty_page(None).await;
264 return proof;
265 }
266 };
267 }
268 }
269
270 self.log_request_error(&last_error);
271 let (proof, _) = self
272 .sender
273 .send(Err(NextPageError::RequestFailure(last_error)))
274 .await;
275 proof
276 }
277
278 async fn query_pages(
284 &mut self,
285 connection: &Arc<Connection>,
286 consistency: Consistency,
287 node: NodeRef<'_>,
288 ) -> Result<PageSendAttemptedProof, RequestAttemptError> {
289 loop {
290 let request_span = (self.span_creator)();
291 match self
292 .query_one_page(connection, consistency, node, &request_span)
293 .instrument(request_span.span().clone())
294 .await?
295 {
296 ControlFlow::Break(proof) => return Ok(proof),
297 ControlFlow::Continue(_) => {}
298 }
299 }
300 }
301
302 async fn query_one_page(
303 &mut self,
304 connection: &Arc<Connection>,
305 consistency: Consistency,
306 node: NodeRef<'_>,
307 request_span: &RequestSpan,
308 ) -> Result<ControlFlow<PageSendAttemptedProof, ()>, RequestAttemptError> {
309 #[cfg(feature = "metrics")]
310 self.metrics.inc_total_paged_queries();
311 let query_start = std::time::Instant::now();
312
313 trace!(
314 connection = %connection.get_connect_address(),
315 "Sending"
316 );
317 self.log_attempt_start(connection.get_connect_address());
318
319 let query_response =
320 (self.page_query)(connection.clone(), consistency, self.paging_state.clone())
321 .await
322 .and_then(QueryResponse::into_non_error_query_response);
323
324 let elapsed = query_start.elapsed();
325
326 request_span.record_shard_id(connection);
327
328 match query_response {
329 Ok(NonErrorQueryResponse {
330 response:
331 NonErrorResponse::Result(result::Result::Rows((rows, paging_state_response))),
332 tracing_id,
333 ..
334 }) => {
335 #[cfg(feature = "metrics")]
336 let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
337 self.log_attempt_success();
338 self.log_request_success();
339 self.execution_profile
340 .load_balancing_policy
341 .on_request_success(&self.statement_info, elapsed, node);
342
343 request_span.record_raw_rows_fields(&rows);
344
345 let received_page = ReceivedPage { rows, tracing_id };
346
347 let (proof, res) = self.sender.send(Ok(received_page)).await;
349 if res.is_err() {
350 return Ok(ControlFlow::Break(proof));
352 }
353
354 match paging_state_response.into_paging_control_flow() {
355 ControlFlow::Continue(paging_state) => {
356 self.paging_state = paging_state;
357 }
358 ControlFlow::Break(()) => {
359 return Ok(ControlFlow::Break(proof));
361 }
362 }
363
364 self.retry_session.reset();
366 self.log_request_start();
367
368 Ok(ControlFlow::Continue(()))
369 }
370 Err(err) => {
371 #[cfg(feature = "metrics")]
372 self.metrics.inc_failed_paged_queries();
373 self.execution_profile
374 .load_balancing_policy
375 .on_request_failure(&self.statement_info, elapsed, node, &err);
376 Err(err)
377 }
378 Ok(NonErrorQueryResponse {
379 response: NonErrorResponse::Result(_),
380 tracing_id,
381 ..
382 }) => {
383 let (proof, _) = self.sender.send_empty_page(tracing_id).await;
388 Ok(ControlFlow::Break(proof))
389 }
390 Ok(response) => {
391 #[cfg(feature = "metrics")]
392 self.metrics.inc_failed_paged_queries();
393 let err =
394 RequestAttemptError::UnexpectedResponse(response.response.to_response_kind());
395 self.execution_profile
396 .load_balancing_policy
397 .on_request_failure(&self.statement_info, elapsed, node, &err);
398 Err(err)
399 }
400 }
401 }
402
403 fn log_request_start(&mut self) {
404 let history_listener: &dyn HistoryListener = match &self.history_listener {
405 Some(hl) => &**hl,
406 None => return,
407 };
408
409 self.current_request_id = Some(history_listener.log_request_start());
410 }
411
412 fn log_request_success(&mut self) {
413 let history_listener: &dyn HistoryListener = match &self.history_listener {
414 Some(hl) => &**hl,
415 None => return,
416 };
417
418 let request_id: history::RequestId = match &self.current_request_id {
419 Some(id) => *id,
420 None => return,
421 };
422
423 history_listener.log_request_success(request_id);
424 }
425
426 fn log_request_error(&mut self, error: &RequestError) {
427 let history_listener: &dyn HistoryListener = match &self.history_listener {
428 Some(hl) => &**hl,
429 None => return,
430 };
431
432 let request_id: history::RequestId = match &self.current_request_id {
433 Some(id) => *id,
434 None => return,
435 };
436
437 history_listener.log_request_error(request_id, error);
438 }
439
440 fn log_attempt_start(&mut self, node_addr: SocketAddr) {
441 let history_listener: &dyn HistoryListener = match &self.history_listener {
442 Some(hl) => &**hl,
443 None => return,
444 };
445
446 let request_id: history::RequestId = match &self.current_request_id {
447 Some(id) => *id,
448 None => return,
449 };
450
451 self.current_attempt_id =
452 Some(history_listener.log_attempt_start(request_id, None, node_addr));
453 }
454
455 fn log_attempt_success(&mut self) {
456 let history_listener: &dyn HistoryListener = match &self.history_listener {
457 Some(hl) => &**hl,
458 None => return,
459 };
460
461 let attempt_id: history::AttemptId = match &self.current_attempt_id {
462 Some(id) => *id,
463 None => return,
464 };
465
466 history_listener.log_attempt_success(attempt_id);
467 }
468
469 fn log_attempt_error(&mut self, error: &RequestAttemptError, retry_decision: &RetryDecision) {
470 let history_listener: &dyn HistoryListener = match &self.history_listener {
471 Some(hl) => &**hl,
472 None => return,
473 };
474
475 let attempt_id: history::AttemptId = match &self.current_attempt_id {
476 Some(id) => *id,
477 None => return,
478 };
479
480 history_listener.log_attempt_error(attempt_id, error, retry_decision);
481 }
482}
483
484struct SingleConnectionPagerWorker<Fetcher> {
488 sender: ProvingSender<Result<ReceivedPage, NextPageError>>,
489 fetcher: Fetcher,
490}
491
492impl<Fetcher, FetchFut> SingleConnectionPagerWorker<Fetcher>
493where
494 Fetcher: Fn(PagingState) -> FetchFut + Send + Sync,
495 FetchFut: Future<Output = Result<QueryResponse, RequestAttemptError>> + Send,
496{
497 async fn work(mut self) -> PageSendAttemptedProof {
498 match self.do_work().await {
499 Ok(proof) => proof,
500 Err(err) => {
501 let (proof, _) = self
502 .sender
503 .send(Err(NextPageError::RequestFailure(
504 RequestError::LastAttemptError(err),
505 )))
506 .await;
507 proof
508 }
509 }
510 }
511
512 async fn do_work(&mut self) -> Result<PageSendAttemptedProof, RequestAttemptError> {
513 let mut paging_state = PagingState::start();
514 loop {
515 let result = (self.fetcher)(paging_state).await?;
516 let response = result.into_non_error_query_response()?;
517 match response.response {
518 NonErrorResponse::Result(result::Result::Rows((rows, paging_state_response))) => {
519 let (proof, send_result) = self
520 .sender
521 .send(Ok(ReceivedPage {
522 rows,
523 tracing_id: response.tracing_id,
524 }))
525 .await;
526
527 if send_result.is_err() {
528 return Ok(proof);
530 }
531
532 match paging_state_response.into_paging_control_flow() {
533 ControlFlow::Continue(new_paging_state) => {
534 paging_state = new_paging_state;
535 }
536 ControlFlow::Break(()) => {
537 return Ok(proof);
539 }
540 }
541 }
542 NonErrorResponse::Result(_) => {
543 let (proof, _) = self.sender.send_empty_page(response.tracing_id).await;
548 return Ok(proof);
549 }
550 _ => {
551 return Err(RequestAttemptError::UnexpectedResponse(
552 response.response.to_response_kind(),
553 ));
554 }
555 }
556 }
557 }
558}
559
560#[derive(Debug)]
568pub struct QueryPager {
569 current_page: RawRowLendingIterator,
570 page_receiver: mpsc::Receiver<Result<ReceivedPage, NextPageError>>,
571 tracing_ids: Vec<Uuid>,
572}
573
574impl QueryPager {
580 async fn next(&mut self) -> Option<Result<ColumnIterator, NextRowError>> {
589 let res = std::future::poll_fn(|cx| Pin::new(&mut *self).poll_fill_page(cx)).await;
590 match res {
591 Some(Ok(())) => {}
592 Some(Err(err)) => return Some(Err(err)),
593 None => return None,
594 }
595
596 Some(
598 self.current_page
599 .next()
600 .unwrap()
601 .map_err(NextRowError::RowDeserializationError),
602 )
603 }
604
605 fn poll_fill_page<'r>(
607 mut self: Pin<&'r mut Self>,
608 cx: &mut Context<'_>,
609 ) -> Poll<Option<Result<(), NextRowError>>> {
610 if !self.is_current_page_exhausted() {
611 return Poll::Ready(Some(Ok(())));
612 }
613 ready_some_ok!(self.as_mut().poll_next_page(cx));
614 if self.is_current_page_exhausted() {
615 cx.waker().wake_by_ref();
618 Poll::Pending
619 } else {
620 Poll::Ready(Some(Ok(())))
621 }
622 }
623
624 fn poll_next_page<'r>(
630 mut self: Pin<&'r mut Self>,
631 cx: &mut Context<'_>,
632 ) -> Poll<Option<Result<(), NextRowError>>> {
633 let mut s = self.as_mut();
634
635 let received_page = ready_some_ok!(Pin::new(&mut s.page_receiver).poll_recv(cx));
636
637 let raw_rows_with_deserialized_metadata =
638 received_page.rows.deserialize_metadata().map_err(|err| {
639 NextRowError::NextPageError(NextPageError::ResultMetadataParseError(err))
640 })?;
641 s.current_page = RawRowLendingIterator::new(raw_rows_with_deserialized_metadata);
642
643 if let Some(tracing_id) = received_page.tracing_id {
644 s.tracing_ids.push(tracing_id);
645 }
646
647 Poll::Ready(Some(Ok(())))
648 }
649
650 #[inline]
655 pub fn type_check<'frame, 'metadata, RowT: DeserializeRow<'frame, 'metadata>>(
656 &self,
657 ) -> Result<(), TypeCheckError> {
658 RowT::type_check(self.column_specs().as_slice())
659 }
660
661 #[inline]
666 pub fn rows_stream<RowT: 'static + for<'frame, 'metadata> DeserializeRow<'frame, 'metadata>>(
667 self,
668 ) -> Result<TypedRowStream<RowT>, TypeCheckError> {
669 TypedRowStream::<RowT>::new(self)
670 }
671
672 pub(crate) async fn new_for_query(
673 statement: Statement,
674 execution_profile: Arc<ExecutionProfileInner>,
675 cluster_state: Arc<ClusterState>,
676 #[cfg(feature = "metrics")] metrics: Arc<Metrics>,
677 ) -> Result<Self, NextPageError> {
678 let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);
679
680 let consistency = statement
681 .config
682 .consistency
683 .unwrap_or(execution_profile.consistency);
684 let serial_consistency = statement
685 .config
686 .serial_consistency
687 .unwrap_or(execution_profile.serial_consistency);
688
689 let page_size = statement.get_validated_page_size();
690
691 let routing_info = RoutingInfo {
692 consistency,
693 serial_consistency,
694 ..Default::default()
695 };
696
697 let retry_session = statement
698 .get_retry_policy()
699 .map(|rp| &**rp)
700 .unwrap_or(&*execution_profile.retry_policy)
701 .new_session();
702
703 let parent_span = tracing::Span::current();
704 let worker_task = async move {
705 let statement_ref = &statement;
706
707 let page_query = |connection: Arc<Connection>,
708 consistency: Consistency,
709 paging_state: PagingState| {
710 async move {
711 connection
712 .query_raw_with_consistency(
713 statement_ref,
714 consistency,
715 serial_consistency,
716 Some(page_size),
717 paging_state,
718 )
719 .await
720 }
721 };
722
723 let query_ref = &statement;
724
725 let span_creator = move || {
726 let span = RequestSpan::new_query(&query_ref.contents);
727 span.record_request_size(0);
728 span
729 };
730
731 let worker = PagerWorker {
732 sender: sender.into(),
733 page_query,
734 statement_info: routing_info,
735 query_is_idempotent: statement.config.is_idempotent,
736 query_consistency: consistency,
737 retry_session,
738 execution_profile,
739 #[cfg(feature = "metrics")]
740 metrics,
741 paging_state: PagingState::start(),
742 history_listener: statement.config.history_listener.clone(),
743 current_request_id: None,
744 current_attempt_id: None,
745 parent_span,
746 span_creator,
747 };
748
749 worker.work(cluster_state).await
750 };
751
752 Self::new_from_worker_future(worker_task, receiver).await
753 }
754
755 pub(crate) async fn new_for_prepared_statement(
756 config: PreparedIteratorConfig,
757 ) -> Result<Self, NextPageError> {
758 let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);
759
760 let consistency = config
761 .prepared
762 .config
763 .consistency
764 .unwrap_or(config.execution_profile.consistency);
765 let serial_consistency = config
766 .prepared
767 .config
768 .serial_consistency
769 .unwrap_or(config.execution_profile.serial_consistency);
770
771 let page_size = config.prepared.get_validated_page_size();
772
773 let retry_session = config
774 .prepared
775 .get_retry_policy()
776 .map(|rp| &**rp)
777 .unwrap_or(&*config.execution_profile.retry_policy)
778 .new_session();
779
780 let parent_span = tracing::Span::current();
781 let worker_task = async move {
782 let prepared_ref = &config.prepared;
783 let values_ref = &config.values;
784
785 let (partition_key, token) = match prepared_ref
786 .extract_partition_key_and_calculate_token(
787 prepared_ref.get_partitioner_name(),
788 values_ref,
789 ) {
790 Ok(res) => res.unzip(),
791 Err(err) => {
792 let (proof, _res) = ProvingSender::from(sender)
793 .send(Err(NextPageError::PartitionKeyError(err)))
794 .await;
795 return proof;
796 }
797 };
798
799 let table_spec = config.prepared.get_table_spec();
800 let statement_info = RoutingInfo {
801 consistency,
802 serial_consistency,
803 token,
804 table: table_spec,
805 is_confirmed_lwt: config.prepared.is_confirmed_lwt(),
806 };
807
808 let page_query = |connection: Arc<Connection>,
809 consistency: Consistency,
810 paging_state: PagingState| async move {
811 connection
812 .execute_raw_with_consistency(
813 prepared_ref,
814 values_ref,
815 consistency,
816 serial_consistency,
817 Some(page_size),
818 paging_state,
819 )
820 .await
821 };
822
823 let serialized_values_size = config.values.buffer_size();
824
825 let replicas: Option<smallvec::SmallVec<[_; 8]>> =
826 if let (Some(table_spec), Some(token)) =
827 (statement_info.table, statement_info.token)
828 {
829 Some(
830 config
831 .cluster_state
832 .get_token_endpoints_iter(table_spec, token)
833 .map(|(node, shard)| (node.clone(), shard))
834 .collect(),
835 )
836 } else {
837 None
838 };
839
840 let span_creator = move || {
841 let span = RequestSpan::new_prepared(
842 partition_key.as_ref().map(|pk| pk.iter()),
843 token,
844 serialized_values_size,
845 );
846 if let Some(replicas) = replicas.as_ref() {
847 span.record_replicas(replicas.iter().map(|(node, shard)| (node, *shard)));
848 }
849 span
850 };
851
852 let worker = PagerWorker {
853 sender: sender.into(),
854 page_query,
855 statement_info,
856 query_is_idempotent: config.prepared.config.is_idempotent,
857 query_consistency: consistency,
858 retry_session,
859 execution_profile: config.execution_profile,
860 #[cfg(feature = "metrics")]
861 metrics: config.metrics,
862 paging_state: PagingState::start(),
863 history_listener: config.prepared.config.history_listener.clone(),
864 current_request_id: None,
865 current_attempt_id: None,
866 parent_span,
867 span_creator,
868 };
869
870 worker.work(config.cluster_state).await
871 };
872
873 Self::new_from_worker_future(worker_task, receiver).await
874 }
875
876 pub(crate) async fn new_for_connection_query_iter(
877 query: Statement,
878 connection: Arc<Connection>,
879 consistency: Consistency,
880 serial_consistency: Option<SerialConsistency>,
881 ) -> Result<Self, NextPageError> {
882 let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);
883
884 let page_size = query.get_validated_page_size();
885
886 let worker_task = async move {
887 let worker = SingleConnectionPagerWorker {
888 sender: sender.into(),
889 fetcher: |paging_state| {
890 connection.query_raw_with_consistency(
891 &query,
892 consistency,
893 serial_consistency,
894 Some(page_size),
895 paging_state,
896 )
897 },
898 };
899 worker.work().await
900 };
901
902 Self::new_from_worker_future(worker_task, receiver).await
903 }
904
905 pub(crate) async fn new_for_connection_execute_iter(
906 prepared: PreparedStatement,
907 values: SerializedValues,
908 connection: Arc<Connection>,
909 consistency: Consistency,
910 serial_consistency: Option<SerialConsistency>,
911 ) -> Result<Self, NextPageError> {
912 let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);
913
914 let page_size = prepared.get_validated_page_size();
915
916 let worker_task = async move {
917 let worker = SingleConnectionPagerWorker {
918 sender: sender.into(),
919 fetcher: |paging_state| {
920 connection.execute_raw_with_consistency(
921 &prepared,
922 &values,
923 consistency,
924 serial_consistency,
925 Some(page_size),
926 paging_state,
927 )
928 },
929 };
930 worker.work().await
931 };
932
933 Self::new_from_worker_future(worker_task, receiver).await
934 }
935
936 async fn new_from_worker_future(
937 worker_task: impl Future<Output = PageSendAttemptedProof> + Send + 'static,
938 mut receiver: mpsc::Receiver<Result<ReceivedPage, NextPageError>>,
939 ) -> Result<Self, NextPageError> {
940 tokio::task::spawn(worker_task);
941
942 let page_received = receiver.recv().await.unwrap()?;
948 let raw_rows_with_deserialized_metadata = page_received.rows.deserialize_metadata()?;
949
950 Ok(Self {
951 current_page: RawRowLendingIterator::new(raw_rows_with_deserialized_metadata),
952 page_receiver: receiver,
953 tracing_ids: if let Some(tracing_id) = page_received.tracing_id {
954 vec![tracing_id]
955 } else {
956 Vec::new()
957 },
958 })
959 }
960
961 #[inline]
963 pub fn tracing_ids(&self) -> &[Uuid] {
964 &self.tracing_ids
965 }
966
967 #[inline]
969 pub fn column_specs(&self) -> ColumnSpecs<'_, '_> {
970 ColumnSpecs::new(self.current_page.metadata().col_specs())
971 }
972
973 fn is_current_page_exhausted(&self) -> bool {
974 self.current_page.rows_remaining() == 0
975 }
976}
977
978pub struct TypedRowStream<RowT: 'static> {
983 raw_row_lending_stream: QueryPager,
984 _phantom: std::marker::PhantomData<RowT>,
985}
986
987impl<T> std::fmt::Debug for TypedRowStream<T> {
991 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
992 f.debug_struct("TypedRowStream")
993 .field("raw_row_lending_stream", &self.raw_row_lending_stream)
994 .finish()
995 }
996}
997
998impl<RowT> Unpin for TypedRowStream<RowT> {}
999
1000impl<RowT> TypedRowStream<RowT>
1001where
1002 RowT: for<'frame, 'metadata> DeserializeRow<'frame, 'metadata>,
1003{
1004 fn new(raw_stream: QueryPager) -> Result<Self, TypeCheckError> {
1005 raw_stream.type_check::<RowT>()?;
1006
1007 Ok(Self {
1008 raw_row_lending_stream: raw_stream,
1009 _phantom: Default::default(),
1010 })
1011 }
1012}
1013
1014impl<RowT> TypedRowStream<RowT> {
1015 #[inline]
1017 pub fn tracing_ids(&self) -> &[Uuid] {
1018 self.raw_row_lending_stream.tracing_ids()
1019 }
1020
1021 #[inline]
1023 pub fn column_specs(&self) -> ColumnSpecs {
1024 self.raw_row_lending_stream.column_specs()
1025 }
1026}
1027
1028impl<RowT> Stream for TypedRowStream<RowT>
1032where
1033 RowT: DeserializeOwnedRow,
1034{
1035 type Item = Result<RowT, NextRowError>;
1036
1037 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1038 let next_fut = async {
1039 self.raw_row_lending_stream.next().await.map(|res| {
1040 res.and_then(|column_iterator| {
1041 <RowT as DeserializeRow>::deserialize(column_iterator)
1042 .map_err(NextRowError::RowDeserializationError)
1043 })
1044 })
1045 };
1046
1047 futures::pin_mut!(next_fut);
1048 let value = ready_some_ok!(next_fut.poll(cx));
1049 Poll::Ready(Some(Ok(value)))
1050 }
1051}
1052
1053#[derive(Error, Debug, Clone)]
1055#[non_exhaustive]
1056pub enum NextPageError {
1057 #[error("Failed to extract PK and compute token required for routing: {0}")]
1059 PartitionKeyError(#[from] PartitionKeyError),
1060
1061 #[error(transparent)]
1063 RequestFailure(#[from] RequestError),
1064
1065 #[error("Failed to deserialize result metadata associated with next page response: {0}")]
1067 ResultMetadataParseError(#[from] ResultMetadataAndRowsCountParseError),
1068}
1069
1070#[derive(Error, Debug, Clone)]
1072#[non_exhaustive]
1073pub enum NextRowError {
1074 #[error("Failed to fetch next page of result: {0}")]
1076 NextPageError(#[from] NextPageError),
1077
1078 #[error("Row deserialization error: {0}")]
1080 RowDeserializationError(#[from] DeserializationError),
1081}