Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUST-1588: Add RunCursorCommand #912

Merged
merged 15 commits into from
Jul 19, 2023
146 changes: 143 additions & 3 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{fmt::Debug, sync::Arc};

#[cfg(feature = "in-use-encryption-unstable")]
use bson::doc;
use bson::RawDocumentBuf;
use futures_util::stream::TryStreamExt;

use crate::{
Expand All @@ -15,19 +16,28 @@ use crate::{
ChangeStream,
},
client::session::TransactionState,
cmap::conn::PinnedConnectionHandle,
cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription},
concern::{ReadConcern, WriteConcern},
cursor::Cursor,
cursor::{Cursor, CursorSpecification},
error::{Error, ErrorKind, Result},
gridfs::{options::GridFsBucketOptions, GridFsBucket},
operation::{Aggregate, AggregateTarget, Create, DropDatabase, ListCollections, RunCommand},
operation::{
Aggregate,
AggregateTarget,
Create,
DropDatabase,
ListCollections,
Operation,
RunCommand,
},
options::{
AggregateOptions,
CollectionOptions,
CreateCollectionOptions,
DatabaseOptions,
DropDatabaseOptions,
ListCollectionsOptions,
RunCursorCommandOptions,
},
results::CollectionSpecification,
selection_criteria::SelectionCriteria,
Expand Down Expand Up @@ -469,6 +479,53 @@ impl Database {
.await
}

/// Runs a database-level command and returns a cursor to the response.
pub async fn run_cursor_command(
&self,
command: Document,
options: RunCursorCommandOptions,
drshika marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The option type still needs to be updated here

) -> Result<Cursor<Document>> {
let rcc = RunCommand::new(self.name().to_string(), command, None, None)?;
let rc_command = RunCursorCommand {
run_command: rcc,
options,
};
let client = self.client();
client.execute_cursor_operation(rc_command).await
}

/// Runs a database-level command and returns a cursor to the response.
pub async fn run_cursor_command_with_session(
&self,
command: Document,
options: RunCursorCommandOptions,
drshika marked this conversation as resolved.
Show resolved Hide resolved
session: &mut ClientSession,
) -> Result<SessionCursor<Document>> {
let mut selection_criteria = SelectionCriteria::ReadPreference(options.read_preference.clone().unwrap()).into();
drshika marked this conversation as resolved.
Show resolved Hide resolved
match session.transaction.state {
TransactionState::Starting | TransactionState::InProgress => {
selection_criteria = match selection_criteria {
Some(selection_criteria) => Some(selection_criteria),
None => {
if let Some(ref options) = session.transaction.options {
options.selection_criteria.clone()
} else {
None
}
}
};
}
_ => {}
}
let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?;
let rc_command = RunCursorCommand {
run_command: rcc,
options,
};
let client = self.client();
client.execute_session_cursor_operation(rc_command, session).await
}

/// Runs a database-level command using the provided `ClientSession`.
///
/// If the `ClientSession` provided is currently in a transaction, `command` must not specify a
Expand Down Expand Up @@ -607,3 +664,86 @@ impl Database {
GridFsBucket::new(self.clone(), options.into().unwrap_or_default())
}
}

pub(super) struct RunCursorCommand<'conn> {
drshika marked this conversation as resolved.
Show resolved Hide resolved
run_command: RunCommand<'conn>,
options: RunCursorCommandOptions,
}

impl<'conn> Operation for RunCursorCommand<'conn> {
type O = CursorSpecification;
type Command = RawDocumentBuf;

const NAME: &'static str = "run_cursor_command";

fn build(&mut self, _description: &StreamDescription) -> Result<Command<Self::Command>> {
drshika marked this conversation as resolved.
Show resolved Hide resolved
drshika marked this conversation as resolved.
Show resolved Hide resolved
self.run_command.build(_description)
}

fn serialize_command(&mut self, cmd: Command<Self::Command>) -> Result<Vec<u8>> {
self.run_command.serialize_command(cmd)
}

fn extract_at_cluster_time(
&self,
_response: &bson::RawDocument,
) -> Result<Option<bson::Timestamp>> {
self.run_command.extract_at_cluster_time(_response)
}

fn handle_error(&self, error: Error) -> Result<Self::O> {
Err(error)
}

fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.run_command.selection_criteria()
}

fn is_acknowledged(&self) -> bool {
self.run_command.is_acknowledged()
}

fn write_concern(&self) -> Option<&WriteConcern> {
self.run_command.write_concern()
}

fn supports_read_concern(&self, _description: &StreamDescription) -> bool {
self.run_command.supports_read_concern(_description)
}

fn supports_sessions(&self) -> bool {
self.run_command.supports_sessions()
}

fn retryability(&self) -> crate::operation::Retryability {
self.run_command.retryability()
}

fn update_for_retry(&mut self) {
self.run_command.update_for_retry()
}

fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
self.run_command.pinned_connection()
}

fn name(&self) -> &str {
self.run_command.name()
}

fn handle_response(
&self,
response: RawCommandResponse,
description: &StreamDescription,
) -> Result<Self::O> {
let doc = Operation::handle_response(&self.run_command, response, description)?;
let cursor_info = bson::from_document(doc)?;
Ok(CursorSpecification::new(
cursor_info,
description.server_address.clone(),
self.options.batch_size,
self.options.max_time_ms,
self.options.comment.clone(),
))
}
}
36 changes: 34 additions & 2 deletions src/db/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use typed_builder::TypedBuilder;
use crate::{
bson::{Bson, Document},
concern::{ReadConcern, WriteConcern},
options::Collation,
selection_criteria::SelectionCriteria,
options::{Collation, CursorType},
selection_criteria::{SelectionCriteria, ReadPreference},
serde_util,
};

Expand Down Expand Up @@ -312,3 +312,35 @@ pub struct ChangeStreamPreAndPostImages {
/// If `true`, change streams will be able to include pre- and post-images.
pub enabled: bool,
}

/// Determines the lifetime of a cursor.
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum TimeoutMode {
/// Times out after one Iteration.
Iteration,
/// Times out after the end of the Cursor Lifetime.
CursorLifetime,
}
drshika marked this conversation as resolved.
Show resolved Hide resolved

/// Specifies the options to a
/// [`Database::RunCursorCommand`](../struct.Database.html#method.run_cursor_command) operation.
#[derive(Clone, Debug, Default, TypedBuilder)]
#[builder(field_defaults(default, setter(into)))]
#[non_exhaustive]
pub struct RunCursorCommandOptions {
drshika marked this conversation as resolved.
Show resolved Hide resolved
/// Optional string enum value, one of 'iteration' | 'cursorLifetime'.
pub timeout_mode: Option<TimeoutMode>,
/// The default read preference for operations.
pub read_preference: Option<ReadPreference>,
drshika marked this conversation as resolved.
Show resolved Hide resolved
/// Optional string enum value, one of 'tailable' | 'tailableAwait' | 'nonTailable'.
drshika marked this conversation as resolved.
Show resolved Hide resolved
pub cursor_type: Option<CursorType>,
/// Number of documents to return per batch.
pub batch_size: Option<u32>,
/// Optional non-negative integer value. Use this value to configure the maxTimeMS option sent
/// on subsequent getMore commands.
pub max_time_ms: Option<Duration>,
drshika marked this conversation as resolved.
Show resolved Hide resolved
/// Optional BSON value. Use this value to configure the comment option sent on subsequent
/// getMore commands.
pub comment: Option<Bson>,
}
2 changes: 1 addition & 1 deletion src/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::{
bson::{serde_helpers, Bson, Document},
change_stream::event::ResumeToken,
db::options::CreateCollectionOptions,
Namespace,
serde_util,
Namespace,
};

use bson::{Binary, RawDocumentBuf};
Expand Down