Skip to content

Commit

Permalink
RUST-1676 Simplify GenericCursor by refactoring the `GetMoreProvide…
Browse files Browse the repository at this point in the history
…r` trait into a generic struct (#983)
  • Loading branch information
stIncMale authored Oct 31, 2023
1 parent 6d4e316 commit 4f7402d
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 310 deletions.
134 changes: 7 additions & 127 deletions src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use bson::RawDocument;

#[cfg(test)]
use bson::RawDocumentBuf;
use futures_core::{future::BoxFuture, Stream};
use futures_core::Stream;
use serde::{de::DeserializeOwned, Deserialize};
#[cfg(test)]
use tokio::sync::oneshot;
Expand All @@ -21,13 +21,12 @@ use crate::{
change_stream::event::ResumeToken,
client::{options::ServerAddress, AsyncDropToken},
cmap::conn::PinnedConnectionHandle,
cursor::common::ImplicitClientSessionHandle,
error::{Error, Result},
operation::GetMore,
results::GetMoreResult,
Client,
ClientSession,
};
use common::{kill_cursor, GenericCursor, GetMoreProvider, GetMoreProviderResult};
use common::{kill_cursor, GenericCursor};
pub(crate) use common::{
stream_poll_next,
BatchValue,
Expand Down Expand Up @@ -119,16 +118,14 @@ impl<T> Cursor<T> {
session: Option<ClientSession>,
pin: Option<PinnedConnectionHandle>,
) -> Self {
let provider = ImplicitSessionGetMoreProvider::new(&spec, session);

Self {
client: client.clone(),
drop_token: client.register_async_drop(),
wrapped_cursor: Some(ImplicitSessionCursor::new(
wrapped_cursor: Some(ImplicitSessionCursor::with_implicit_session(
client,
spec,
PinnedConnection::new(pin),
provider,
ImplicitClientSessionHandle(session),
)),
drop_address: None,
#[cfg(test)]
Expand Down Expand Up @@ -176,7 +173,7 @@ impl<T> Cursor<T> {
pub(crate) fn take_implicit_session(&mut self) -> Option<ClientSession> {
self.wrapped_cursor
.as_mut()
.and_then(|c| c.provider_mut().take_implicit_session())
.and_then(|c| c.take_implicit_session())
}

/// Move the cursor forward, potentially triggering requests to the database for more results
Expand Down Expand Up @@ -358,121 +355,4 @@ impl<T> Drop for Cursor<T> {

/// A `GenericCursor` that optionally owns its own sessions.
/// This is to be used by cursors associated with implicit sessions.
type ImplicitSessionCursor = GenericCursor<ImplicitSessionGetMoreProvider>;

struct ImplicitSessionGetMoreResult {
get_more_result: Result<GetMoreResult>,
session: Option<Box<ClientSession>>,
}

impl GetMoreProviderResult for ImplicitSessionGetMoreResult {
type Session = Option<Box<ClientSession>>;

fn as_ref(&self) -> std::result::Result<&GetMoreResult, &Error> {
self.get_more_result.as_ref()
}

fn into_parts(self) -> (Result<GetMoreResult>, Self::Session) {
(self.get_more_result, self.session)
}
}

/// A `GetMoreProvider` that optionally owns its own session.
/// This is to be used with cursors associated with implicit sessions.
enum ImplicitSessionGetMoreProvider {
Executing(BoxFuture<'static, ImplicitSessionGetMoreResult>),
Idle(Option<Box<ClientSession>>),
Done,
}

impl ImplicitSessionGetMoreProvider {
fn new(spec: &CursorSpecification, session: Option<ClientSession>) -> Self {
let session = session.map(Box::new);
if spec.id() == 0 {
Self::Done
} else {
Self::Idle(session)
}
}

/// Extract the stored implicit session, if any. The provider cannot be started again after
/// this call.
fn take_implicit_session(&mut self) -> Option<ClientSession> {
match self {
ImplicitSessionGetMoreProvider::Idle(session) => session.take().map(|s| *s),
_ => None,
}
}
}

impl GetMoreProvider for ImplicitSessionGetMoreProvider {
type ResultType = ImplicitSessionGetMoreResult;
type GetMoreFuture = BoxFuture<'static, ImplicitSessionGetMoreResult>;

fn executing_future(&mut self) -> Option<&mut Self::GetMoreFuture> {
match self {
Self::Executing(ref mut future) => Some(future),
Self::Idle { .. } | Self::Done => None,
}
}

fn clear_execution(&mut self, session: Option<Box<ClientSession>>, exhausted: bool) {
// If cursor is exhausted, immediately return implicit session to the pool.
if exhausted {
*self = Self::Done;
} else {
*self = Self::Idle(session);
}
}

fn start_execution(
&mut self,
info: CursorInformation,
client: Client,
pinned_connection: Option<&PinnedConnectionHandle>,
) {
take_mut::take(self, |self_| match self_ {
Self::Idle(mut session) => {
let pinned_connection = pinned_connection.map(|c| c.replicate());
let future = Box::pin(async move {
let get_more = GetMore::new(info, pinned_connection.as_ref());
let get_more_result = client
.execute_operation(get_more, session.as_mut().map(|b| b.as_mut()))
.await;
ImplicitSessionGetMoreResult {
get_more_result,
session,
}
});
Self::Executing(future)
}
Self::Executing(_) | Self::Done => self_,
})
}

fn execute(
&mut self,
info: CursorInformation,
client: Client,
pinned_connection: PinnedConnection,
) -> BoxFuture<'_, Result<GetMoreResult>> {
match self {
Self::Idle(ref mut session) => Box::pin(async move {
let get_more = GetMore::new(info, pinned_connection.handle());
client
.execute_operation(get_more, session.as_mut().map(|b| b.as_mut()))
.await
}),
Self::Executing(_fut) => Box::pin(async {
Err(Error::internal(
"streaming the cursor was cancelled while a request was in progress and must \
be continued before iterating manually",
))
}),
Self::Done => {
// this should never happen
Box::pin(async { Err(Error::internal("cursor iterated after already exhausted")) })
}
}
}
}
type ImplicitSessionCursor = GenericCursor<'static, ImplicitClientSessionHandle>;
Loading

0 comments on commit 4f7402d

Please sign in to comment.