Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iterator: delete TypedRowLendingStream as unusable #1122

Merged
merged 1 commit into from
Nov 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 18 additions & 77 deletions scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ impl QueryPager {

/// Type-checks the iterator against given type.
///
/// This is automatically called upon transforming [QueryPager] into [TypedRowLendingStream].
/// This is automatically called upon transforming [QueryPager] into [TypedRowStream].
/// Can be used with `next()` for manual deserialization. See `next()` for an example.
#[inline]
pub fn type_check<'frame, 'metadata, RowT: DeserializeRow<'frame, 'metadata>>(
Expand All @@ -643,21 +643,6 @@ impl QueryPager {
RowT::type_check(self.column_specs().inner())
}

/// Casts the iterator to a given row type, enabling Stream'ed operations
/// on rows, which deserialize them on-the-fly to that given type.
/// It allows deserializing borrowed types, but hence cannot implement [Stream]
/// (because [Stream] is not lending).
/// Begins with performing type check.
#[inline]
pub fn rows_lending_stream<'frame, 'metadata, RowT: DeserializeRow<'frame, 'metadata>>(
self,
) -> Result<TypedRowLendingStream<RowT>, TypeCheckError>
where
'frame: 'metadata,
{
TypedRowLendingStream::<RowT>::new(self)
}

/// Casts the iterator to a given row type, enabling [Stream]'ed operations
/// on rows, which deserialize them on-the-fly to that given type.
/// It only allows deserializing owned types, because [Stream] is not lending.
Expand All @@ -666,9 +651,7 @@ impl QueryPager {
pub fn rows_stream<RowT: 'static + for<'frame, 'metadata> DeserializeRow<'frame, 'metadata>>(
self,
) -> Result<TypedRowStream<RowT>, TypeCheckError> {
TypedRowLendingStream::<RowT>::new(self).map(|typed_row_lending_stream| TypedRowStream {
typed_row_lending_stream,
})
TypedRowStream::<RowT>::new(self)
}

/// Converts this iterator into an iterator over rows parsed as given type,
Expand Down Expand Up @@ -982,35 +965,20 @@ impl QueryPager {
}
}

/// Returned by [QueryPager::rows_lending_stream].
/// Returned by [QueryPager::rows_stream].
///
/// Does not implement [Stream], but permits deserialization of borrowed types.
/// Implements [Stream], but only permits deserialization of owned types.
/// To use [Stream] API (only accessible for owned types), use [QueryPager::rows_stream].
pub struct TypedRowLendingStream<RowT> {
pub struct TypedRowStream<RowT: 'static> {
raw_row_lending_stream: QueryPager,
_phantom: std::marker::PhantomData<RowT>,
}

impl<RowT> Unpin for TypedRowLendingStream<RowT> {}

impl<RowT> TypedRowLendingStream<RowT> {
/// If tracing was enabled, returns tracing ids of all finished page queries.
#[inline]
pub fn tracing_ids(&self) -> &[Uuid] {
self.raw_row_lending_stream.tracing_ids()
}

/// Returns specification of row columns
#[inline]
pub fn column_specs(&self) -> ColumnSpecs {
self.raw_row_lending_stream.column_specs()
}
}
impl<RowT> Unpin for TypedRowStream<RowT> {}

impl<'frame, 'metadata, RowT> TypedRowLendingStream<RowT>
impl<RowT> TypedRowStream<RowT>
where
'frame: 'metadata,
RowT: DeserializeRow<'frame, 'metadata>,
RowT: for<'frame, 'metadata> DeserializeRow<'frame, 'metadata>,
{
fn new(raw_stream: QueryPager) -> Result<Self, TypeCheckError> {
raw_stream.type_check::<RowT>()?;
Expand All @@ -1020,52 +988,19 @@ where
_phantom: Default::default(),
})
}

/// Stream-like next() implementation for TypedRowLendingStream.
///
/// It also works with borrowed types! For example, &str is supported.
/// However, this is not a Stream. To create a Stream, use `into_stream()`.
#[inline]
pub async fn next(&'frame mut self) -> Option<Result<RowT, QueryError>> {
self.raw_row_lending_stream.next().await.map(|res| {
res.and_then(|column_iterator| {
<RowT as DeserializeRow>::deserialize(column_iterator)
.map_err(|err| RowsParseError::from(err).into())
})
})
}

/// Stream-like try_next() implementation for TypedRowLendingStream.
///
/// It also works with borrowed types! For example, &str is supported.
/// However, this is not a Stream. To create a Stream, use `into_stream()`.
#[inline]
pub async fn try_next(&'frame mut self) -> Result<Option<RowT>, QueryError> {
self.next().await.transpose()
}
}

/// Returned by [QueryPager::rows_stream].
///
/// Implements [Stream], but only permits deserialization of owned types.
/// To use [Stream] API (only accessible for owned types), use [QueryPager::rows_stream].
pub struct TypedRowStream<RowT: 'static> {
typed_row_lending_stream: TypedRowLendingStream<RowT>,
}

impl<RowT> Unpin for TypedRowStream<RowT> {}

impl<RowT> TypedRowStream<RowT> {
/// If tracing was enabled, returns tracing ids of all finished page queries.
#[inline]
pub fn tracing_ids(&self) -> &[Uuid] {
self.typed_row_lending_stream.tracing_ids()
self.raw_row_lending_stream.tracing_ids()
}

/// Returns specification of row columns
#[inline]
pub fn column_specs(&self) -> ColumnSpecs {
self.typed_row_lending_stream.column_specs()
self.raw_row_lending_stream.column_specs()
}
}

Expand All @@ -1079,9 +1014,15 @@ where
type Item = Result<RowT, QueryError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut s = self.as_mut();
let next_fut = async {
self.raw_row_lending_stream.next().await.map(|res| {
res.and_then(|column_iterator| {
<RowT as DeserializeRow>::deserialize(column_iterator)
.map_err(|err| RowsParseError::from(err).into())
})
})
};

let next_fut = s.typed_row_lending_stream.next();
futures::pin_mut!(next_fut);
let value = ready_some_ok!(next_fut.poll(cx));
Poll::Ready(Some(Ok(value)))
Expand Down
Loading