diff --git a/src/cursor.rs b/src/cursor.rs index 7603006c4..8dae51d2d 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -12,7 +12,7 @@ use bson::RawDocument; #[cfg(test)] use bson::RawDocumentBuf; -use futures_core::{future::BoxFuture, Stream}; +use futures_core::Stream; use serde::{de::DeserializeOwned, Deserialize}; #[cfg(test)] use tokio::sync::oneshot; @@ -21,13 +21,12 @@ use crate::{ change_stream::event::ResumeToken, client::{options::ServerAddress, AsyncDropToken}, cmap::conn::PinnedConnectionHandle, + cursor::common::ImplicitClientSessionHandle, error::{Error, Result}, - operation::GetMore, - results::GetMoreResult, Client, ClientSession, }; -use common::{kill_cursor, GenericCursor, GetMoreProvider, GetMoreProviderResult}; +use common::{kill_cursor, GenericCursor}; pub(crate) use common::{ stream_poll_next, BatchValue, @@ -119,16 +118,14 @@ impl Cursor { session: Option, pin: Option, ) -> Self { - let provider = ImplicitSessionGetMoreProvider::new(&spec, session); - Self { client: client.clone(), drop_token: client.register_async_drop(), - wrapped_cursor: Some(ImplicitSessionCursor::new( + wrapped_cursor: Some(ImplicitSessionCursor::with_implicit_session( client, spec, PinnedConnection::new(pin), - provider, + ImplicitClientSessionHandle(session), )), drop_address: None, #[cfg(test)] @@ -176,7 +173,7 @@ impl Cursor { pub(crate) fn take_implicit_session(&mut self) -> Option { self.wrapped_cursor .as_mut() - .and_then(|c| c.provider_mut().take_implicit_session()) + .and_then(|c| c.take_implicit_session()) } /// Move the cursor forward, potentially triggering requests to the database for more results @@ -358,121 +355,4 @@ impl Drop for Cursor { /// A `GenericCursor` that optionally owns its own sessions. /// This is to be used by cursors associated with implicit sessions. -type ImplicitSessionCursor = GenericCursor; - -struct ImplicitSessionGetMoreResult { - get_more_result: Result, - session: Option>, -} - -impl GetMoreProviderResult for ImplicitSessionGetMoreResult { - type Session = Option>; - - fn as_ref(&self) -> std::result::Result<&GetMoreResult, &Error> { - self.get_more_result.as_ref() - } - - fn into_parts(self) -> (Result, Self::Session) { - (self.get_more_result, self.session) - } -} - -/// A `GetMoreProvider` that optionally owns its own session. -/// This is to be used with cursors associated with implicit sessions. -enum ImplicitSessionGetMoreProvider { - Executing(BoxFuture<'static, ImplicitSessionGetMoreResult>), - Idle(Option>), - Done, -} - -impl ImplicitSessionGetMoreProvider { - fn new(spec: &CursorSpecification, session: Option) -> Self { - let session = session.map(Box::new); - if spec.id() == 0 { - Self::Done - } else { - Self::Idle(session) - } - } - - /// Extract the stored implicit session, if any. The provider cannot be started again after - /// this call. - fn take_implicit_session(&mut self) -> Option { - match self { - ImplicitSessionGetMoreProvider::Idle(session) => session.take().map(|s| *s), - _ => None, - } - } -} - -impl GetMoreProvider for ImplicitSessionGetMoreProvider { - type ResultType = ImplicitSessionGetMoreResult; - type GetMoreFuture = BoxFuture<'static, ImplicitSessionGetMoreResult>; - - fn executing_future(&mut self) -> Option<&mut Self::GetMoreFuture> { - match self { - Self::Executing(ref mut future) => Some(future), - Self::Idle { .. } | Self::Done => None, - } - } - - fn clear_execution(&mut self, session: Option>, exhausted: bool) { - // If cursor is exhausted, immediately return implicit session to the pool. - if exhausted { - *self = Self::Done; - } else { - *self = Self::Idle(session); - } - } - - fn start_execution( - &mut self, - info: CursorInformation, - client: Client, - pinned_connection: Option<&PinnedConnectionHandle>, - ) { - take_mut::take(self, |self_| match self_ { - Self::Idle(mut session) => { - let pinned_connection = pinned_connection.map(|c| c.replicate()); - let future = Box::pin(async move { - let get_more = GetMore::new(info, pinned_connection.as_ref()); - let get_more_result = client - .execute_operation(get_more, session.as_mut().map(|b| b.as_mut())) - .await; - ImplicitSessionGetMoreResult { - get_more_result, - session, - } - }); - Self::Executing(future) - } - Self::Executing(_) | Self::Done => self_, - }) - } - - fn execute( - &mut self, - info: CursorInformation, - client: Client, - pinned_connection: PinnedConnection, - ) -> BoxFuture<'_, Result> { - match self { - Self::Idle(ref mut session) => Box::pin(async move { - let get_more = GetMore::new(info, pinned_connection.handle()); - client - .execute_operation(get_more, session.as_mut().map(|b| b.as_mut())) - .await - }), - Self::Executing(_fut) => Box::pin(async { - Err(Error::internal( - "streaming the cursor was cancelled while a request was in progress and must \ - be continued before iterating manually", - )) - }), - Self::Done => { - // this should never happen - Box::pin(async { Err(Error::internal("cursor iterated after already exhausted")) }) - } - } - } -} +type ImplicitSessionCursor = GenericCursor<'static, ImplicitClientSessionHandle>; diff --git a/src/cursor/common.rs b/src/cursor/common.rs index d42e655b5..30d05ca0a 100644 --- a/src/cursor/common.rs +++ b/src/cursor/common.rs @@ -14,10 +14,10 @@ use tokio::sync::oneshot; use crate::{ bson::{Bson, Document}, change_stream::event::ResumeToken, - client::AsyncDropToken, + client::{session::ClientSession, AsyncDropToken}, cmap::conn::PinnedConnectionHandle, error::{Error, ErrorKind, Result}, - operation, + operation::{self, GetMore}, options::ServerAddress, results::GetMoreResult, Client, @@ -37,12 +37,9 @@ pub(super) enum AdvanceResult { /// An internal cursor that can be used in a variety of contexts depending on its `GetMoreProvider`. #[derive(Derivative)] #[derivative(Debug)] -pub(super) struct GenericCursor

-where - P: GetMoreProvider, -{ +pub(super) struct GenericCursor<'s, S> { #[derivative(Debug = "ignore")] - provider: P, + provider: GetMoreProvider<'s, S>, client: Client, info: CursorInformation, /// This is an `Option` to allow it to be "taken" when the cursor is no longer needed @@ -50,20 +47,21 @@ where state: Option, } -impl

GenericCursor

-where - P: GetMoreProvider, -{ - pub(super) fn new( +impl GenericCursor<'static, ImplicitClientSessionHandle> { + pub(super) fn with_implicit_session( client: Client, spec: CursorSpecification, pinned_connection: PinnedConnection, - get_more_provider: P, + session: ImplicitClientSessionHandle, ) -> Self { let exhausted = spec.id() == 0; Self { client, - provider: get_more_provider, + provider: if exhausted { + GetMoreProvider::Done + } else { + GetMoreProvider::Idle(Box::new(session)) + }, info: spec.info, state: Some(CursorState { buffer: CursorBuffer::new(spec.initial_buffer), @@ -74,20 +72,29 @@ where } } - pub(super) fn from_state( + /// Extracts the stored implicit [`ClientSession`], if any. + pub(super) fn take_implicit_session(&mut self) -> Option { + self.provider.take_implicit_session() + } +} + +impl<'s> GenericCursor<'s, ExplicitClientSessionHandle<'s>> { + pub(super) fn with_explicit_session( state: CursorState, client: Client, info: CursorInformation, - provider: P, + session: ExplicitClientSessionHandle<'s>, ) -> Self { Self { - provider, + provider: GetMoreProvider::Idle(Box::new(session)), client, info, state: state.into(), } } +} +impl<'s, S: ClientSessionHandle<'s>> GenericCursor<'s, S> { pub(super) fn current(&self) -> Option<&RawDocument> { self.state().buffer.current() } @@ -210,10 +217,6 @@ where } } } - - pub(super) fn provider_mut(&mut self) -> &mut P { - &mut self.provider - } } pub(crate) trait CursorStream { @@ -226,20 +229,18 @@ pub(crate) enum BatchValue { Exhausted, } -impl

CursorStream for GenericCursor

-where - P: GetMoreProvider, -{ +impl<'s, S: ClientSessionHandle<'s>> CursorStream for GenericCursor<'s, S> { fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll> { // If there is a get more in flight, check on its status. if let Some(future) = self.provider.executing_future() { match Pin::new(future).poll(cx) { // If a result is ready, retrieve the buffer and update the exhausted status. - Poll::Ready(get_more_result) => { - let (result, session) = get_more_result.into_parts(); - let output = self.handle_get_more_result(result); - self.provider - .clear_execution(session, self.state().exhausted); + Poll::Ready(get_more_result_and_session) => { + let output = self.handle_get_more_result(get_more_result_and_session.result); + self.provider.clear_execution( + get_more_result_and_session.session, + self.state().exhausted, + ); output?; } Poll::Pending => return Poll::Pending, @@ -308,52 +309,105 @@ where } } -/// A trait implemented by objects that can provide batches of documents to a cursor via the getMore -/// command. -pub(super) trait GetMoreProvider: Unpin { - /// The result type that the future running the getMore evaluates to. - type ResultType: GetMoreProviderResult; +/// Provides batches of documents to a cursor via the `getMore` command. +enum GetMoreProvider<'s, S> { + Executing(BoxFuture<'s, GetMoreResultAndSession>), + // `Box` is used to make the size of `Idle` similar to that of the other variants. + Idle(Box), + Done, +} - /// The type of future created by this provider when running a getMore. - type GetMoreFuture: Future + Unpin; +impl GetMoreProvider<'static, ImplicitClientSessionHandle> { + /// Extracts the stored implicit [`ClientSession`], if any. + /// The provider cannot be started again after this call. + fn take_implicit_session(&mut self) -> Option { + match self { + Self::Idle(session) => session.take_implicit_session(), + Self::Executing(..) | Self::Done => None, + } + } +} +impl<'s, S: ClientSessionHandle<'s>> GetMoreProvider<'s, S> { /// Get the future being evaluated, if there is one. - fn executing_future(&mut self) -> Option<&mut Self::GetMoreFuture>; + fn executing_future(&mut self) -> Option<&mut BoxFuture<'s, GetMoreResultAndSession>> { + if let Self::Executing(future) = self { + Some(future) + } else { + None + } + } - /// Clear out any state remaining from previous getMore executions. - fn clear_execution( - &mut self, - session: ::Session, - exhausted: bool, - ); + /// Clear out any state remaining from previous `getMore` executions. + fn clear_execution(&mut self, session: S, exhausted: bool) { + if exhausted && session.is_implicit() { + *self = Self::Done + } else { + *self = Self::Idle(Box::new(session)) + } + } - /// Start executing a new getMore if one isn't already in flight. + /// Start executing a new `getMore` if one is not already in flight. fn start_execution( &mut self, - spec: CursorInformation, + info: CursorInformation, client: Client, pinned_connection: Option<&PinnedConnectionHandle>, - ); + ) { + take_mut::take(self, |self_| { + if let Self::Idle(mut session) = self_ { + let pinned_connection = pinned_connection.map(|c| c.replicate()); + let future = Box::pin(async move { + let get_more = GetMore::new(info, pinned_connection.as_ref()); + let get_more_result = client + .execute_operation(get_more, session.borrow_mut()) + .await; + GetMoreResultAndSession { + result: get_more_result, + session: *session, + } + }); + Self::Executing(future) + } else { + self_ + } + }) + } - /// Return a future that will execute the getMore when polled. - /// This is useful in async functions that can await the entire getMore process. - /// `start_execution` and `clear_execution` should be used for contexts where the futures - /// need to be polled manually. + /// Return a future that will execute the `getMore` when polled. + /// This is useful in `async` functions that can `.await` the entire `getMore` process. + /// [`GetMoreProvider::start_execution`] and [`GetMoreProvider::clear_execution`] + /// should be used for contexts where the futures need to be [`poll`](Future::poll)ed manually. fn execute( &mut self, - _spec: CursorInformation, - _client: Client, - _pinned_conn: PinnedConnection, - ) -> BoxFuture<'_, Result>; + info: CursorInformation, + client: Client, + pinned_connection: PinnedConnection, + ) -> BoxFuture<'_, Result> { + match self { + Self::Idle(ref mut session) => Box::pin(async move { + let get_more = GetMore::new(info, pinned_connection.handle()); + client + .execute_operation(get_more, session.borrow_mut()) + .await + }), + Self::Executing(_fut) => Box::pin(async { + Err(Error::internal( + "streaming the cursor was cancelled while a request was in progress and must \ + be continued before iterating manually", + )) + }), + Self::Done => { + // this should never happen + Box::pin(async { Err(Error::internal("cursor iterated after already exhausted")) }) + } + } + } } -/// Trait describing results returned from a `GetMoreProvider`. -pub(crate) trait GetMoreProviderResult { - type Session; - - fn as_ref(&self) -> std::result::Result<&GetMoreResult, &Error>; - - fn into_parts(self) -> (Result, Self::Session); +struct GetMoreResultAndSession { + result: Result, + session: S, } /// Specification used to create a new cursor. @@ -569,3 +623,39 @@ fn test_buffer() { assert!(!buffer.advance()); assert_eq!(buffer.current(), None); } + +pub(super) struct ImplicitClientSessionHandle(pub(super) Option); + +impl ImplicitClientSessionHandle { + fn take_implicit_session(&mut self) -> Option { + self.0.take() + } +} + +impl ClientSessionHandle<'_> for ImplicitClientSessionHandle { + fn is_implicit(&self) -> bool { + true + } + + fn borrow_mut(&mut self) -> Option<&mut ClientSession> { + self.0.as_mut() + } +} + +pub(super) struct ExplicitClientSessionHandle<'a>(pub(super) &'a mut ClientSession); + +impl<'a> ClientSessionHandle<'a> for ExplicitClientSessionHandle<'a> { + fn is_implicit(&self) -> bool { + false + } + + fn borrow_mut(&mut self) -> Option<&mut ClientSession> { + Some(self.0) + } +} + +pub(super) trait ClientSessionHandle<'a>: Send + 'a { + fn is_implicit(&self) -> bool; + + fn borrow_mut(&mut self) -> Option<&mut ClientSession>; +} diff --git a/src/cursor/session.rs b/src/cursor/session.rs index f1b25782d..5f350ba68 100644 --- a/src/cursor/session.rs +++ b/src/cursor/session.rs @@ -5,7 +5,7 @@ use std::{ }; use bson::RawDocument; -use futures_core::{future::BoxFuture, Stream}; +use futures_core::Stream; use futures_util::StreamExt; use serde::{de::DeserializeOwned, Deserialize}; #[cfg(test)] @@ -18,8 +18,6 @@ use super::{ CursorInformation, CursorState, GenericCursor, - GetMoreProvider, - GetMoreProviderResult, PinnedConnection, }, stream_poll_next, @@ -31,10 +29,8 @@ use crate::{ change_stream::event::ResumeToken, client::{options::ServerAddress, AsyncDropToken}, cmap::conn::PinnedConnectionHandle, - cursor::CursorSpecification, + cursor::{common::ExplicitClientSessionHandle, CursorSpecification}, error::{Error, Result}, - operation::GetMore, - results::GetMoreResult, Client, ClientSession, }; @@ -195,16 +191,14 @@ impl SessionCursor { &mut self, session: &'session mut ClientSession, ) -> SessionCursorStream<'_, 'session, T> { - let get_more_provider = ExplicitSessionGetMoreProvider::new(session); - // Pass the state into this cursor handle for iteration. // It will be returned in the handle's `Drop` implementation. SessionCursorStream { - generic_cursor: ExplicitSessionCursor::from_state( + generic_cursor: ExplicitSessionCursor::with_explicit_session( self.take_state(), self.client.clone(), self.info.clone(), - get_more_provider, + ExplicitClientSessionHandle(session), ), session_cursor: self, } @@ -387,7 +381,8 @@ impl Drop for SessionCursor { /// A `GenericCursor` that borrows its session. /// This is to be used with cursors associated with explicit sessions borrowed from the user. -type ExplicitSessionCursor<'session> = GenericCursor>; +type ExplicitSessionCursor<'session> = + GenericCursor<'session, ExplicitClientSessionHandle<'session>>; /// A type that implements [`Stream`](https://docs.rs/futures/latest/futures/stream/index.html) which can be used to /// stream the results of a [`SessionCursor`]. Returned from [`SessionCursor::stream`]. @@ -438,113 +433,3 @@ impl<'cursor, 'session, T> Drop for SessionCursorStream<'cursor, 'session, T> { self.session_cursor.state = Some(self.generic_cursor.take_state()); } } - -/// Enum determining whether a `SessionCursorHandle` is excuting a getMore or not. -/// In charge of maintaining ownership of the session reference. -enum ExplicitSessionGetMoreProvider<'session> { - /// The handle is currently executing a getMore via the future. - /// - /// This future owns the reference to the session and will return it on completion. - Executing(BoxFuture<'session, ExecutionResult<'session>>), - - /// No future is being executed. - /// - /// This variant needs a `MutableSessionReference` struct that can be moved in order to - /// transition to `Executing` via `take_mut`. - Idle(MutableSessionReference<'session>), -} - -impl<'session> ExplicitSessionGetMoreProvider<'session> { - fn new(session: &'session mut ClientSession) -> Self { - Self::Idle(MutableSessionReference { reference: session }) - } -} - -impl<'session> GetMoreProvider for ExplicitSessionGetMoreProvider<'session> { - type ResultType = ExecutionResult<'session>; - type GetMoreFuture = BoxFuture<'session, ExecutionResult<'session>>; - - fn executing_future(&mut self) -> Option<&mut Self::GetMoreFuture> { - match self { - Self::Executing(future) => Some(future), - Self::Idle { .. } => None, - } - } - - fn clear_execution(&mut self, session: &'session mut ClientSession, _exhausted: bool) { - *self = Self::Idle(MutableSessionReference { reference: session }) - } - - fn start_execution( - &mut self, - info: CursorInformation, - client: Client, - pinned_connection: Option<&PinnedConnectionHandle>, - ) { - take_mut::take(self, |self_| { - if let ExplicitSessionGetMoreProvider::Idle(session) = self_ { - let pinned_connection = pinned_connection.map(|c| c.replicate()); - let future = Box::pin(async move { - let get_more = GetMore::new(info, pinned_connection.as_ref()); - let get_more_result = client - .execute_operation(get_more, Some(&mut *session.reference)) - .await; - ExecutionResult { - get_more_result, - session: session.reference, - } - }); - return ExplicitSessionGetMoreProvider::Executing(future); - } - self_ - }); - } - - fn execute( - &mut self, - info: CursorInformation, - client: Client, - pinned_connection: PinnedConnection, - ) -> BoxFuture<'_, Result> { - match self { - Self::Idle(ref mut session) => Box::pin(async move { - let get_more = GetMore::new(info, pinned_connection.handle()); - client - .execute_operation(get_more, Some(&mut *session.reference)) - .await - }), - Self::Executing(_fut) => Box::pin(async { - Err(Error::internal( - "streaming the cursor was cancelled while a request was in progress and must \ - be continued before iterating manually", - )) - }), - } - } -} - -/// Struct returned from awaiting on a `GetMoreFuture` containing the result of the getMore as -/// well as the reference to the `ClientSession` used for the getMore. -struct ExecutionResult<'session> { - get_more_result: Result, - session: &'session mut ClientSession, -} - -impl<'session> GetMoreProviderResult for ExecutionResult<'session> { - type Session = &'session mut ClientSession; - - fn as_ref(&self) -> std::result::Result<&GetMoreResult, &Error> { - self.get_more_result.as_ref() - } - - fn into_parts(self) -> (Result, Self::Session) { - (self.get_more_result, self.session) - } -} - -/// Wrapper around a mutable reference to a `ClientSession` that provides move semantics. -/// This is used to prevent re-borrowing of the session and forcing it to be moved instead -/// by moving the wrapping struct. -struct MutableSessionReference<'a> { - reference: &'a mut ClientSession, -}