scylla/client/
pager.rs

1//! Entities that provide automated transparent paging of a query.
2//! They enable consuming result of a paged query as a stream over rows,
3//! which abstracts over page boundaries.
4
5use std::future::Future;
6use std::net::SocketAddr;
7use std::ops::ControlFlow;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::task::{Context, Poll};
11
12use futures::Stream;
13use scylla_cql::deserialize::result::RawRowLendingIterator;
14use scylla_cql::deserialize::row::{ColumnIterator, DeserializeRow};
15use scylla_cql::deserialize::{DeserializationError, TypeCheckError};
16use scylla_cql::frame::frame_errors::ResultMetadataAndRowsCountParseError;
17use scylla_cql::frame::request::query::PagingState;
18use scylla_cql::frame::response::result::RawMetadataAndRawRows;
19use scylla_cql::frame::response::NonErrorResponse;
20use scylla_cql::frame::types::SerialConsistency;
21use scylla_cql::serialize::row::SerializedValues;
22use scylla_cql::Consistency;
23use std::result::Result;
24use thiserror::Error;
25use tokio::sync::mpsc;
26
27use crate::client::execution_profile::ExecutionProfileInner;
28use crate::cluster::{ClusterState, NodeRef};
29use crate::deserialize::DeserializeOwnedRow;
30use crate::errors::{RequestAttemptError, RequestError};
31use crate::frame::response::result;
32use crate::network::Connection;
33use crate::observability::driver_tracing::RequestSpan;
34use crate::observability::history::{self, HistoryListener};
35#[cfg(feature = "metrics")]
36use crate::observability::metrics::Metrics;
37use crate::policies::load_balancing::{self, RoutingInfo};
38use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession};
39use crate::response::query_result::ColumnSpecs;
40use crate::response::{NonErrorQueryResponse, QueryResponse};
41use crate::statement::prepared::{PartitionKeyError, PreparedStatement};
42use crate::statement::unprepared::Statement;
43use tracing::{trace, trace_span, warn, Instrument};
44use uuid::Uuid;
45
46// Like std::task::ready!, but handles the whole stack of Poll<Option<Result<>>>.
47// If it matches Poll::Ready(Some(Ok(_))), then it returns the innermost value,
48// otherwise it returns from the surrounding function.
49macro_rules! ready_some_ok {
50    ($e:expr) => {
51        match $e {
52            Poll::Ready(Some(Ok(x))) => x,
53            Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))),
54            Poll::Ready(None) => return Poll::Ready(None),
55            Poll::Pending => return Poll::Pending,
56        }
57    };
58}
59
60struct ReceivedPage {
61    rows: RawMetadataAndRawRows,
62    tracing_id: Option<Uuid>,
63}
64
65pub(crate) struct PreparedIteratorConfig {
66    pub(crate) prepared: PreparedStatement,
67    pub(crate) values: SerializedValues,
68    pub(crate) execution_profile: Arc<ExecutionProfileInner>,
69    pub(crate) cluster_state: Arc<ClusterState>,
70    #[cfg(feature = "metrics")]
71    pub(crate) metrics: Arc<Metrics>,
72}
73
74// A separate module is used here so that the parent module cannot construct
75// SendAttemptedProof directly.
76mod checked_channel_sender {
77    use scylla_cql::frame::response::result::RawMetadataAndRawRows;
78    use std::marker::PhantomData;
79    use tokio::sync::mpsc;
80    use uuid::Uuid;
81
82    use super::{NextPageError, ReceivedPage};
83
84    /// A value whose existence proves that there was an attempt
85    /// to send an item of type T through a channel.
86    /// Can only be constructed by ProvingSender::send.
87    pub(crate) struct SendAttemptedProof<T>(PhantomData<T>);
88
89    /// An mpsc::Sender which returns proofs that it attempted to send items.
90    pub(crate) struct ProvingSender<T>(mpsc::Sender<T>);
91
92    impl<T> From<mpsc::Sender<T>> for ProvingSender<T> {
93        fn from(s: mpsc::Sender<T>) -> Self {
94            Self(s)
95        }
96    }
97
98    impl<T> ProvingSender<T> {
99        pub(crate) async fn send(
100            &self,
101            value: T,
102        ) -> (SendAttemptedProof<T>, Result<(), mpsc::error::SendError<T>>) {
103            (SendAttemptedProof(PhantomData), self.0.send(value).await)
104        }
105    }
106
107    type ResultPage = Result<ReceivedPage, NextPageError>;
108
109    impl ProvingSender<ResultPage> {
110        pub(crate) async fn send_empty_page(
111            &self,
112            tracing_id: Option<Uuid>,
113        ) -> (
114            SendAttemptedProof<ResultPage>,
115            Result<(), mpsc::error::SendError<ResultPage>>,
116        ) {
117            let empty_page = ReceivedPage {
118                rows: RawMetadataAndRawRows::mock_empty(),
119                tracing_id,
120            };
121            self.send(Ok(empty_page)).await
122        }
123    }
124}
125
126use checked_channel_sender::{ProvingSender, SendAttemptedProof};
127
128type PageSendAttemptedProof = SendAttemptedProof<Result<ReceivedPage, NextPageError>>;
129
130// PagerWorker works in the background to fetch pages
131// QueryPager receives them through a channel
132struct PagerWorker<'a, QueryFunc, SpanCreatorFunc> {
133    sender: ProvingSender<Result<ReceivedPage, NextPageError>>,
134
135    // Closure used to perform a single page query
136    // AsyncFn(Arc<Connection>, Option<Arc<[u8]>>) -> Result<QueryResponse, RequestAttemptError>
137    page_query: QueryFunc,
138
139    statement_info: RoutingInfo<'a>,
140    query_is_idempotent: bool,
141    query_consistency: Consistency,
142    retry_session: Box<dyn RetrySession>,
143    execution_profile: Arc<ExecutionProfileInner>,
144    #[cfg(feature = "metrics")]
145    metrics: Arc<Metrics>,
146
147    paging_state: PagingState,
148
149    history_listener: Option<Arc<dyn HistoryListener>>,
150    current_request_id: Option<history::RequestId>,
151    current_attempt_id: Option<history::AttemptId>,
152
153    parent_span: tracing::Span,
154    span_creator: SpanCreatorFunc,
155}
156
157impl<QueryFunc, QueryFut, SpanCreator> PagerWorker<'_, QueryFunc, SpanCreator>
158where
159    QueryFunc: Fn(Arc<Connection>, Consistency, PagingState) -> QueryFut,
160    QueryFut: Future<Output = Result<QueryResponse, RequestAttemptError>>,
161    SpanCreator: Fn() -> RequestSpan,
162{
163    // Contract: this function MUST send at least one item through self.sender
164    async fn work(mut self, cluster_state: Arc<ClusterState>) -> PageSendAttemptedProof {
165        let load_balancer = self.execution_profile.load_balancing_policy.clone();
166        let statement_info = self.statement_info.clone();
167        let query_plan =
168            load_balancing::Plan::new(load_balancer.as_ref(), &statement_info, &cluster_state);
169
170        let mut last_error: RequestError = RequestError::EmptyPlan;
171        let mut current_consistency: Consistency = self.query_consistency;
172
173        self.log_request_start();
174
175        'nodes_in_plan: for (node, shard) in query_plan {
176            let span =
177                trace_span!(parent: &self.parent_span, "Executing query", node = %node.address);
178            // For each node in the plan choose a connection to use
179            // This connection will be reused for same node retries to preserve paging cache on the shard
180            let connection: Arc<Connection> = match node
181                .connection_for_shard(shard)
182                .instrument(span.clone())
183                .await
184            {
185                Ok(connection) => connection,
186                Err(e) => {
187                    trace!(
188                        parent: &span,
189                        error = %e,
190                        "Choosing connection failed"
191                    );
192                    last_error = e.into();
193                    // Broken connection doesn't count as a failed query, don't log in metrics
194                    continue 'nodes_in_plan;
195                }
196            };
197
198            'same_node_retries: loop {
199                trace!(parent: &span, "Execution started");
200                // Query pages until an error occurs
201                let queries_result: Result<PageSendAttemptedProof, RequestAttemptError> = self
202                    .query_pages(&connection, current_consistency, node)
203                    .instrument(span.clone())
204                    .await;
205
206                let request_error: RequestAttemptError = match queries_result {
207                    Ok(proof) => {
208                        trace!(parent: &span, "Request succeeded");
209                        // query_pages returned Ok, so we are guaranteed
210                        // that it attempted to send at least one page
211                        // through self.sender and we can safely return now.
212                        return proof;
213                    }
214                    Err(error) => {
215                        trace!(
216                            parent: &span,
217                            error = %error,
218                            "Request failed"
219                        );
220                        error
221                    }
222                };
223
224                // Use retry policy to decide what to do next
225                let query_info = RequestInfo {
226                    error: &request_error,
227                    is_idempotent: self.query_is_idempotent,
228                    consistency: self.query_consistency,
229                };
230
231                let retry_decision = self.retry_session.decide_should_retry(query_info);
232                trace!(
233                    parent: &span,
234                    retry_decision = ?retry_decision
235                );
236
237                self.log_attempt_error(&request_error, &retry_decision);
238
239                last_error = request_error.into();
240
241                match retry_decision {
242                    RetryDecision::RetrySameTarget(cl) => {
243                        #[cfg(feature = "metrics")]
244                        self.metrics.inc_retries_num();
245                        current_consistency = cl.unwrap_or(current_consistency);
246                        continue 'same_node_retries;
247                    }
248                    RetryDecision::RetryNextTarget(cl) => {
249                        #[cfg(feature = "metrics")]
250                        self.metrics.inc_retries_num();
251                        current_consistency = cl.unwrap_or(current_consistency);
252                        continue 'nodes_in_plan;
253                    }
254                    RetryDecision::DontRetry => break 'nodes_in_plan,
255                    RetryDecision::IgnoreWriteError => {
256                        warn!("Ignoring error during fetching pages; stopping fetching.");
257                        // If we are here then, most likely, we didn't send
258                        // anything through the self.sender channel.
259                        // Although we are in an awkward situation (_iter
260                        // interface isn't meant for sending writes),
261                        // we must attempt to send something because
262                        // the iterator expects it.
263                        let (proof, _) = self.sender.send_empty_page(None).await;
264                        return proof;
265                    }
266                };
267            }
268        }
269
270        self.log_request_error(&last_error);
271        let (proof, _) = self
272            .sender
273            .send(Err(NextPageError::RequestFailure(last_error)))
274            .await;
275        proof
276    }
277
278    // Given a working connection query as many pages as possible until the first error.
279    //
280    // Contract: this function must either:
281    // - Return an error
282    // - Return Ok but have attempted to send a page via self.sender
283    async fn query_pages(
284        &mut self,
285        connection: &Arc<Connection>,
286        consistency: Consistency,
287        node: NodeRef<'_>,
288    ) -> Result<PageSendAttemptedProof, RequestAttemptError> {
289        loop {
290            let request_span = (self.span_creator)();
291            match self
292                .query_one_page(connection, consistency, node, &request_span)
293                .instrument(request_span.span().clone())
294                .await?
295            {
296                ControlFlow::Break(proof) => return Ok(proof),
297                ControlFlow::Continue(_) => {}
298            }
299        }
300    }
301
302    async fn query_one_page(
303        &mut self,
304        connection: &Arc<Connection>,
305        consistency: Consistency,
306        node: NodeRef<'_>,
307        request_span: &RequestSpan,
308    ) -> Result<ControlFlow<PageSendAttemptedProof, ()>, RequestAttemptError> {
309        #[cfg(feature = "metrics")]
310        self.metrics.inc_total_paged_queries();
311        let query_start = std::time::Instant::now();
312
313        trace!(
314            connection = %connection.get_connect_address(),
315            "Sending"
316        );
317        self.log_attempt_start(connection.get_connect_address());
318
319        let query_response =
320            (self.page_query)(connection.clone(), consistency, self.paging_state.clone())
321                .await
322                .and_then(QueryResponse::into_non_error_query_response);
323
324        let elapsed = query_start.elapsed();
325
326        request_span.record_shard_id(connection);
327
328        match query_response {
329            Ok(NonErrorQueryResponse {
330                response:
331                    NonErrorResponse::Result(result::Result::Rows((rows, paging_state_response))),
332                tracing_id,
333                ..
334            }) => {
335                #[cfg(feature = "metrics")]
336                let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
337                self.log_attempt_success();
338                self.log_request_success();
339                self.execution_profile
340                    .load_balancing_policy
341                    .on_request_success(&self.statement_info, elapsed, node);
342
343                request_span.record_raw_rows_fields(&rows);
344
345                let received_page = ReceivedPage { rows, tracing_id };
346
347                // Send next page to QueryPager
348                let (proof, res) = self.sender.send(Ok(received_page)).await;
349                if res.is_err() {
350                    // channel was closed, QueryPager was dropped - should shutdown
351                    return Ok(ControlFlow::Break(proof));
352                }
353
354                match paging_state_response.into_paging_control_flow() {
355                    ControlFlow::Continue(paging_state) => {
356                        self.paging_state = paging_state;
357                    }
358                    ControlFlow::Break(()) => {
359                        // Reached the last query, shutdown
360                        return Ok(ControlFlow::Break(proof));
361                    }
362                }
363
364                // Query succeeded, reset retry policy for future retries
365                self.retry_session.reset();
366                self.log_request_start();
367
368                Ok(ControlFlow::Continue(()))
369            }
370            Err(err) => {
371                #[cfg(feature = "metrics")]
372                self.metrics.inc_failed_paged_queries();
373                self.execution_profile
374                    .load_balancing_policy
375                    .on_request_failure(&self.statement_info, elapsed, node, &err);
376                Err(err)
377            }
378            Ok(NonErrorQueryResponse {
379                response: NonErrorResponse::Result(_),
380                tracing_id,
381                ..
382            }) => {
383                // We have most probably sent a modification statement (e.g. INSERT or UPDATE),
384                // so let's return an empty iterator as suggested in #631.
385
386                // We must attempt to send something because the iterator expects it.
387                let (proof, _) = self.sender.send_empty_page(tracing_id).await;
388                Ok(ControlFlow::Break(proof))
389            }
390            Ok(response) => {
391                #[cfg(feature = "metrics")]
392                self.metrics.inc_failed_paged_queries();
393                let err =
394                    RequestAttemptError::UnexpectedResponse(response.response.to_response_kind());
395                self.execution_profile
396                    .load_balancing_policy
397                    .on_request_failure(&self.statement_info, elapsed, node, &err);
398                Err(err)
399            }
400        }
401    }
402
403    fn log_request_start(&mut self) {
404        let history_listener: &dyn HistoryListener = match &self.history_listener {
405            Some(hl) => &**hl,
406            None => return,
407        };
408
409        self.current_request_id = Some(history_listener.log_request_start());
410    }
411
412    fn log_request_success(&mut self) {
413        let history_listener: &dyn HistoryListener = match &self.history_listener {
414            Some(hl) => &**hl,
415            None => return,
416        };
417
418        let request_id: history::RequestId = match &self.current_request_id {
419            Some(id) => *id,
420            None => return,
421        };
422
423        history_listener.log_request_success(request_id);
424    }
425
426    fn log_request_error(&mut self, error: &RequestError) {
427        let history_listener: &dyn HistoryListener = match &self.history_listener {
428            Some(hl) => &**hl,
429            None => return,
430        };
431
432        let request_id: history::RequestId = match &self.current_request_id {
433            Some(id) => *id,
434            None => return,
435        };
436
437        history_listener.log_request_error(request_id, error);
438    }
439
440    fn log_attempt_start(&mut self, node_addr: SocketAddr) {
441        let history_listener: &dyn HistoryListener = match &self.history_listener {
442            Some(hl) => &**hl,
443            None => return,
444        };
445
446        let request_id: history::RequestId = match &self.current_request_id {
447            Some(id) => *id,
448            None => return,
449        };
450
451        self.current_attempt_id =
452            Some(history_listener.log_attempt_start(request_id, None, node_addr));
453    }
454
455    fn log_attempt_success(&mut self) {
456        let history_listener: &dyn HistoryListener = match &self.history_listener {
457            Some(hl) => &**hl,
458            None => return,
459        };
460
461        let attempt_id: history::AttemptId = match &self.current_attempt_id {
462            Some(id) => *id,
463            None => return,
464        };
465
466        history_listener.log_attempt_success(attempt_id);
467    }
468
469    fn log_attempt_error(&mut self, error: &RequestAttemptError, retry_decision: &RetryDecision) {
470        let history_listener: &dyn HistoryListener = match &self.history_listener {
471            Some(hl) => &**hl,
472            None => return,
473        };
474
475        let attempt_id: history::AttemptId = match &self.current_attempt_id {
476            Some(id) => *id,
477            None => return,
478        };
479
480        history_listener.log_attempt_error(attempt_id, error, retry_decision);
481    }
482}
483
484/// A massively simplified version of the PagerWorker. It does not have
485/// any complicated logic related to retries, it just fetches pages from
486/// a single connection.
487struct SingleConnectionPagerWorker<Fetcher> {
488    sender: ProvingSender<Result<ReceivedPage, NextPageError>>,
489    fetcher: Fetcher,
490}
491
492impl<Fetcher, FetchFut> SingleConnectionPagerWorker<Fetcher>
493where
494    Fetcher: Fn(PagingState) -> FetchFut + Send + Sync,
495    FetchFut: Future<Output = Result<QueryResponse, RequestAttemptError>> + Send,
496{
497    async fn work(mut self) -> PageSendAttemptedProof {
498        match self.do_work().await {
499            Ok(proof) => proof,
500            Err(err) => {
501                let (proof, _) = self
502                    .sender
503                    .send(Err(NextPageError::RequestFailure(
504                        RequestError::LastAttemptError(err),
505                    )))
506                    .await;
507                proof
508            }
509        }
510    }
511
512    async fn do_work(&mut self) -> Result<PageSendAttemptedProof, RequestAttemptError> {
513        let mut paging_state = PagingState::start();
514        loop {
515            let result = (self.fetcher)(paging_state).await?;
516            let response = result.into_non_error_query_response()?;
517            match response.response {
518                NonErrorResponse::Result(result::Result::Rows((rows, paging_state_response))) => {
519                    let (proof, send_result) = self
520                        .sender
521                        .send(Ok(ReceivedPage {
522                            rows,
523                            tracing_id: response.tracing_id,
524                        }))
525                        .await;
526
527                    if send_result.is_err() {
528                        // channel was closed, QueryPager was dropped - should shutdown
529                        return Ok(proof);
530                    }
531
532                    match paging_state_response.into_paging_control_flow() {
533                        ControlFlow::Continue(new_paging_state) => {
534                            paging_state = new_paging_state;
535                        }
536                        ControlFlow::Break(()) => {
537                            // Reached the last query, shutdown
538                            return Ok(proof);
539                        }
540                    }
541                }
542                NonErrorResponse::Result(_) => {
543                    // We have most probably sent a modification statement (e.g. INSERT or UPDATE),
544                    // so let's return an empty iterator as suggested in #631.
545
546                    // We must attempt to send something because the iterator expects it.
547                    let (proof, _) = self.sender.send_empty_page(response.tracing_id).await;
548                    return Ok(proof);
549                }
550                _ => {
551                    return Err(RequestAttemptError::UnexpectedResponse(
552                        response.response.to_response_kind(),
553                    ));
554                }
555            }
556        }
557    }
558}
559
560/// An intermediate object that allows to construct a stream over a query
561/// that is asynchronously paged in the background.
562///
563/// Before the results can be processed in a convenient way, the QueryPager
564/// needs to be cast into a typed stream. This is done by use of `rows_stream()` method.
565/// As the method is generic over the target type, the turbofish syntax
566/// can come in handy there, e.g. `query_pager.rows_stream::<(i32, String, Uuid)>()`.
567#[derive(Debug)]
568pub struct QueryPager {
569    current_page: RawRowLendingIterator,
570    page_receiver: mpsc::Receiver<Result<ReceivedPage, NextPageError>>,
571    tracing_ids: Vec<Uuid>,
572}
573
574// QueryPager is not an iterator or a stream! However, it implements
575// a `next()` method that returns a [ColumnIterator], which can be used
576// to manually deserialize a row.
577// The `ColumnIterator` borrows from the `QueryPager`, and the [futures::Stream] trait
578// does not allow for such a pattern. Lending streams are not a thing yet.
579impl QueryPager {
580    /// Returns the next item (`ColumnIterator`) from the stream.
581    ///
582    /// This can be used with `type_check() for manual deserialization - see example below.
583    ///
584    /// This is not a part of the `Stream` interface because the returned iterator
585    /// borrows from self.
586    ///
587    /// This is cancel-safe.
588    async fn next(&mut self) -> Option<Result<ColumnIterator, NextRowError>> {
589        let res = std::future::poll_fn(|cx| Pin::new(&mut *self).poll_fill_page(cx)).await;
590        match res {
591            Some(Ok(())) => {}
592            Some(Err(err)) => return Some(Err(err)),
593            None => return None,
594        }
595
596        // We are guaranteed here to have a non-empty page, so unwrap
597        Some(
598            self.current_page
599                .next()
600                .unwrap()
601                .map_err(NextRowError::RowDeserializationError),
602        )
603    }
604
605    /// Tries to acquire a non-empty page, if current page is exhausted.
606    fn poll_fill_page<'r>(
607        mut self: Pin<&'r mut Self>,
608        cx: &mut Context<'_>,
609    ) -> Poll<Option<Result<(), NextRowError>>> {
610        if !self.is_current_page_exhausted() {
611            return Poll::Ready(Some(Ok(())));
612        }
613        ready_some_ok!(self.as_mut().poll_next_page(cx));
614        if self.is_current_page_exhausted() {
615            // We most likely got a zero-sized page.
616            // Try again later.
617            cx.waker().wake_by_ref();
618            Poll::Pending
619        } else {
620            Poll::Ready(Some(Ok(())))
621        }
622    }
623
624    /// Makes an attempt to acquire the next page (which may be empty).
625    ///
626    /// On success, returns Some(Ok()).
627    /// On failure, returns Some(Err()).
628    /// If there are no more pages, returns None.
629    fn poll_next_page<'r>(
630        mut self: Pin<&'r mut Self>,
631        cx: &mut Context<'_>,
632    ) -> Poll<Option<Result<(), NextRowError>>> {
633        let mut s = self.as_mut();
634
635        let received_page = ready_some_ok!(Pin::new(&mut s.page_receiver).poll_recv(cx));
636
637        let raw_rows_with_deserialized_metadata =
638            received_page.rows.deserialize_metadata().map_err(|err| {
639                NextRowError::NextPageError(NextPageError::ResultMetadataParseError(err))
640            })?;
641        s.current_page = RawRowLendingIterator::new(raw_rows_with_deserialized_metadata);
642
643        if let Some(tracing_id) = received_page.tracing_id {
644            s.tracing_ids.push(tracing_id);
645        }
646
647        Poll::Ready(Some(Ok(())))
648    }
649
650    /// Type-checks the iterator against given type.
651    ///
652    /// This is automatically called upon transforming [QueryPager] into [TypedRowStream].
653    // Can be used with `next()` for manual deserialization.
654    #[inline]
655    pub fn type_check<'frame, 'metadata, RowT: DeserializeRow<'frame, 'metadata>>(
656        &self,
657    ) -> Result<(), TypeCheckError> {
658        RowT::type_check(self.column_specs().as_slice())
659    }
660
661    /// Casts the iterator to a given row type, enabling [Stream]'ed operations
662    /// on rows, which deserialize them on-the-fly to that given type.
663    /// It only allows deserializing owned types, because [Stream] is not lending.
664    /// Begins with performing type check.
665    #[inline]
666    pub fn rows_stream<RowT: 'static + for<'frame, 'metadata> DeserializeRow<'frame, 'metadata>>(
667        self,
668    ) -> Result<TypedRowStream<RowT>, TypeCheckError> {
669        TypedRowStream::<RowT>::new(self)
670    }
671
672    pub(crate) async fn new_for_query(
673        statement: Statement,
674        execution_profile: Arc<ExecutionProfileInner>,
675        cluster_state: Arc<ClusterState>,
676        #[cfg(feature = "metrics")] metrics: Arc<Metrics>,
677    ) -> Result<Self, NextPageError> {
678        let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);
679
680        let consistency = statement
681            .config
682            .consistency
683            .unwrap_or(execution_profile.consistency);
684        let serial_consistency = statement
685            .config
686            .serial_consistency
687            .unwrap_or(execution_profile.serial_consistency);
688
689        let page_size = statement.get_validated_page_size();
690
691        let routing_info = RoutingInfo {
692            consistency,
693            serial_consistency,
694            ..Default::default()
695        };
696
697        let retry_session = statement
698            .get_retry_policy()
699            .map(|rp| &**rp)
700            .unwrap_or(&*execution_profile.retry_policy)
701            .new_session();
702
703        let parent_span = tracing::Span::current();
704        let worker_task = async move {
705            let statement_ref = &statement;
706
707            let page_query = |connection: Arc<Connection>,
708                              consistency: Consistency,
709                              paging_state: PagingState| {
710                async move {
711                    connection
712                        .query_raw_with_consistency(
713                            statement_ref,
714                            consistency,
715                            serial_consistency,
716                            Some(page_size),
717                            paging_state,
718                        )
719                        .await
720                }
721            };
722
723            let query_ref = &statement;
724
725            let span_creator = move || {
726                let span = RequestSpan::new_query(&query_ref.contents);
727                span.record_request_size(0);
728                span
729            };
730
731            let worker = PagerWorker {
732                sender: sender.into(),
733                page_query,
734                statement_info: routing_info,
735                query_is_idempotent: statement.config.is_idempotent,
736                query_consistency: consistency,
737                retry_session,
738                execution_profile,
739                #[cfg(feature = "metrics")]
740                metrics,
741                paging_state: PagingState::start(),
742                history_listener: statement.config.history_listener.clone(),
743                current_request_id: None,
744                current_attempt_id: None,
745                parent_span,
746                span_creator,
747            };
748
749            worker.work(cluster_state).await
750        };
751
752        Self::new_from_worker_future(worker_task, receiver).await
753    }
754
755    pub(crate) async fn new_for_prepared_statement(
756        config: PreparedIteratorConfig,
757    ) -> Result<Self, NextPageError> {
758        let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);
759
760        let consistency = config
761            .prepared
762            .config
763            .consistency
764            .unwrap_or(config.execution_profile.consistency);
765        let serial_consistency = config
766            .prepared
767            .config
768            .serial_consistency
769            .unwrap_or(config.execution_profile.serial_consistency);
770
771        let page_size = config.prepared.get_validated_page_size();
772
773        let retry_session = config
774            .prepared
775            .get_retry_policy()
776            .map(|rp| &**rp)
777            .unwrap_or(&*config.execution_profile.retry_policy)
778            .new_session();
779
780        let parent_span = tracing::Span::current();
781        let worker_task = async move {
782            let prepared_ref = &config.prepared;
783            let values_ref = &config.values;
784
785            let (partition_key, token) = match prepared_ref
786                .extract_partition_key_and_calculate_token(
787                    prepared_ref.get_partitioner_name(),
788                    values_ref,
789                ) {
790                Ok(res) => res.unzip(),
791                Err(err) => {
792                    let (proof, _res) = ProvingSender::from(sender)
793                        .send(Err(NextPageError::PartitionKeyError(err)))
794                        .await;
795                    return proof;
796                }
797            };
798
799            let table_spec = config.prepared.get_table_spec();
800            let statement_info = RoutingInfo {
801                consistency,
802                serial_consistency,
803                token,
804                table: table_spec,
805                is_confirmed_lwt: config.prepared.is_confirmed_lwt(),
806            };
807
808            let page_query = |connection: Arc<Connection>,
809                              consistency: Consistency,
810                              paging_state: PagingState| async move {
811                connection
812                    .execute_raw_with_consistency(
813                        prepared_ref,
814                        values_ref,
815                        consistency,
816                        serial_consistency,
817                        Some(page_size),
818                        paging_state,
819                    )
820                    .await
821            };
822
823            let serialized_values_size = config.values.buffer_size();
824
825            let replicas: Option<smallvec::SmallVec<[_; 8]>> =
826                if let (Some(table_spec), Some(token)) =
827                    (statement_info.table, statement_info.token)
828                {
829                    Some(
830                        config
831                            .cluster_state
832                            .get_token_endpoints_iter(table_spec, token)
833                            .map(|(node, shard)| (node.clone(), shard))
834                            .collect(),
835                    )
836                } else {
837                    None
838                };
839
840            let span_creator = move || {
841                let span = RequestSpan::new_prepared(
842                    partition_key.as_ref().map(|pk| pk.iter()),
843                    token,
844                    serialized_values_size,
845                );
846                if let Some(replicas) = replicas.as_ref() {
847                    span.record_replicas(replicas.iter().map(|(node, shard)| (node, *shard)));
848                }
849                span
850            };
851
852            let worker = PagerWorker {
853                sender: sender.into(),
854                page_query,
855                statement_info,
856                query_is_idempotent: config.prepared.config.is_idempotent,
857                query_consistency: consistency,
858                retry_session,
859                execution_profile: config.execution_profile,
860                #[cfg(feature = "metrics")]
861                metrics: config.metrics,
862                paging_state: PagingState::start(),
863                history_listener: config.prepared.config.history_listener.clone(),
864                current_request_id: None,
865                current_attempt_id: None,
866                parent_span,
867                span_creator,
868            };
869
870            worker.work(config.cluster_state).await
871        };
872
873        Self::new_from_worker_future(worker_task, receiver).await
874    }
875
876    pub(crate) async fn new_for_connection_query_iter(
877        query: Statement,
878        connection: Arc<Connection>,
879        consistency: Consistency,
880        serial_consistency: Option<SerialConsistency>,
881    ) -> Result<Self, NextPageError> {
882        let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);
883
884        let page_size = query.get_validated_page_size();
885
886        let worker_task = async move {
887            let worker = SingleConnectionPagerWorker {
888                sender: sender.into(),
889                fetcher: |paging_state| {
890                    connection.query_raw_with_consistency(
891                        &query,
892                        consistency,
893                        serial_consistency,
894                        Some(page_size),
895                        paging_state,
896                    )
897                },
898            };
899            worker.work().await
900        };
901
902        Self::new_from_worker_future(worker_task, receiver).await
903    }
904
905    pub(crate) async fn new_for_connection_execute_iter(
906        prepared: PreparedStatement,
907        values: SerializedValues,
908        connection: Arc<Connection>,
909        consistency: Consistency,
910        serial_consistency: Option<SerialConsistency>,
911    ) -> Result<Self, NextPageError> {
912        let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);
913
914        let page_size = prepared.get_validated_page_size();
915
916        let worker_task = async move {
917            let worker = SingleConnectionPagerWorker {
918                sender: sender.into(),
919                fetcher: |paging_state| {
920                    connection.execute_raw_with_consistency(
921                        &prepared,
922                        &values,
923                        consistency,
924                        serial_consistency,
925                        Some(page_size),
926                        paging_state,
927                    )
928                },
929            };
930            worker.work().await
931        };
932
933        Self::new_from_worker_future(worker_task, receiver).await
934    }
935
936    async fn new_from_worker_future(
937        worker_task: impl Future<Output = PageSendAttemptedProof> + Send + 'static,
938        mut receiver: mpsc::Receiver<Result<ReceivedPage, NextPageError>>,
939    ) -> Result<Self, NextPageError> {
940        tokio::task::spawn(worker_task);
941
942        // This unwrap is safe because:
943        // - The future returned by worker.work sends at least one item
944        //   to the channel (the PageSendAttemptedProof helps enforce this)
945        // - That future is polled in a tokio::task which isn't going to be
946        //   cancelled
947        let page_received = receiver.recv().await.unwrap()?;
948        let raw_rows_with_deserialized_metadata = page_received.rows.deserialize_metadata()?;
949
950        Ok(Self {
951            current_page: RawRowLendingIterator::new(raw_rows_with_deserialized_metadata),
952            page_receiver: receiver,
953            tracing_ids: if let Some(tracing_id) = page_received.tracing_id {
954                vec![tracing_id]
955            } else {
956                Vec::new()
957            },
958        })
959    }
960
961    /// If tracing was enabled returns tracing ids of all finished page queries
962    #[inline]
963    pub fn tracing_ids(&self) -> &[Uuid] {
964        &self.tracing_ids
965    }
966
967    /// Returns specification of row columns
968    #[inline]
969    pub fn column_specs(&self) -> ColumnSpecs<'_, '_> {
970        ColumnSpecs::new(self.current_page.metadata().col_specs())
971    }
972
973    fn is_current_page_exhausted(&self) -> bool {
974        self.current_page.rows_remaining() == 0
975    }
976}
977
978/// Returned by [QueryPager::rows_stream].
979///
980/// Implements [Stream], but only permits deserialization of owned types.
981/// To use [Stream] API (only accessible for owned types), use [QueryPager::rows_stream].
982pub struct TypedRowStream<RowT: 'static> {
983    raw_row_lending_stream: QueryPager,
984    _phantom: std::marker::PhantomData<RowT>,
985}
986
987// Manual implementation not to depend on RowT implementing Debug.
988// Explanation: automatic derive of Debug would impose the RowT: Debug
989// constaint for the Debug impl.
990impl<T> std::fmt::Debug for TypedRowStream<T> {
991    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
992        f.debug_struct("TypedRowStream")
993            .field("raw_row_lending_stream", &self.raw_row_lending_stream)
994            .finish()
995    }
996}
997
998impl<RowT> Unpin for TypedRowStream<RowT> {}
999
1000impl<RowT> TypedRowStream<RowT>
1001where
1002    RowT: for<'frame, 'metadata> DeserializeRow<'frame, 'metadata>,
1003{
1004    fn new(raw_stream: QueryPager) -> Result<Self, TypeCheckError> {
1005        raw_stream.type_check::<RowT>()?;
1006
1007        Ok(Self {
1008            raw_row_lending_stream: raw_stream,
1009            _phantom: Default::default(),
1010        })
1011    }
1012}
1013
1014impl<RowT> TypedRowStream<RowT> {
1015    /// If tracing was enabled, returns tracing ids of all finished page queries.
1016    #[inline]
1017    pub fn tracing_ids(&self) -> &[Uuid] {
1018        self.raw_row_lending_stream.tracing_ids()
1019    }
1020
1021    /// Returns specification of row columns
1022    #[inline]
1023    pub fn column_specs(&self) -> ColumnSpecs {
1024        self.raw_row_lending_stream.column_specs()
1025    }
1026}
1027
1028/// Stream implementation for TypedRowStream.
1029///
1030/// It only works with owned types! For example, &str is not supported.
1031impl<RowT> Stream for TypedRowStream<RowT>
1032where
1033    RowT: DeserializeOwnedRow,
1034{
1035    type Item = Result<RowT, NextRowError>;
1036
1037    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1038        let next_fut = async {
1039            self.raw_row_lending_stream.next().await.map(|res| {
1040                res.and_then(|column_iterator| {
1041                    <RowT as DeserializeRow>::deserialize(column_iterator)
1042                        .map_err(NextRowError::RowDeserializationError)
1043                })
1044            })
1045        };
1046
1047        futures::pin_mut!(next_fut);
1048        let value = ready_some_ok!(next_fut.poll(cx));
1049        Poll::Ready(Some(Ok(value)))
1050    }
1051}
1052
1053/// An error returned that occurred during next page fetch.
1054#[derive(Error, Debug, Clone)]
1055#[non_exhaustive]
1056pub enum NextPageError {
1057    /// PK extraction and/or token calculation error. Applies only for prepared statements.
1058    #[error("Failed to extract PK and compute token required for routing: {0}")]
1059    PartitionKeyError(#[from] PartitionKeyError),
1060
1061    /// Failed to run a request responsible for fetching new page.
1062    #[error(transparent)]
1063    RequestFailure(#[from] RequestError),
1064
1065    /// Failed to deserialize result metadata associated with next page response.
1066    #[error("Failed to deserialize result metadata associated with next page response: {0}")]
1067    ResultMetadataParseError(#[from] ResultMetadataAndRowsCountParseError),
1068}
1069
1070/// An error returned by async iterator API.
1071#[derive(Error, Debug, Clone)]
1072#[non_exhaustive]
1073pub enum NextRowError {
1074    /// Failed to fetch next page of result.
1075    #[error("Failed to fetch next page of result: {0}")]
1076    NextPageError(#[from] NextPageError),
1077
1078    /// An error occurred during row deserialization.
1079    #[error("Row deserialization error: {0}")]
1080    RowDeserializationError(#[from] DeserializationError),
1081}