diff --git a/src/db.rs b/src/db.rs index cb5738eaa..9fb870584 100644 --- a/src/db.rs +++ b/src/db.rs @@ -20,7 +20,15 @@ use crate::{ cursor::Cursor, error::{Error, ErrorKind, Result}, gridfs::{options::GridFsBucketOptions, GridFsBucket}, - operation::{Aggregate, AggregateTarget, Create, DropDatabase, ListCollections, RunCommand}, + operation::{ + Aggregate, + AggregateTarget, + Create, + DropDatabase, + ListCollections, + RunCommand, + RunCursorCommand, + }, options::{ AggregateOptions, CollectionOptions, @@ -28,6 +36,7 @@ use crate::{ DatabaseOptions, DropDatabaseOptions, ListCollectionsOptions, + RunCursorCommandOptions, }, results::CollectionSpecification, selection_criteria::SelectionCriteria, @@ -469,6 +478,42 @@ impl Database { .await } + /// Runs a database-level command and returns a cursor to the response. + pub async fn run_cursor_command( + &self, + command: Document, + options: impl Into>, + ) -> Result> { + let options: Option = options.into(); + let selection_criteria = options + .as_ref() + .and_then(|options| options.selection_criteria.clone()); + let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?; + let rc_command = RunCursorCommand::new(rcc, options)?; + let client = self.client(); + client.execute_cursor_operation(rc_command).await + } + + /// Runs a database-level command and returns a cursor to the response. + pub async fn run_cursor_command_with_session( + &self, + command: Document, + options: impl Into>, + session: &mut ClientSession, + ) -> Result> { + let mut options: Option = options.into(); + resolve_selection_criteria_with_session!(self, options, Some(&mut *session))?; + let selection_criteria = options + .as_ref() + .and_then(|options| options.selection_criteria.clone()); + let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?; + let rc_command = RunCursorCommand::new(rcc, options)?; + let client = self.client(); + client + .execute_session_cursor_operation(rc_command, session) + .await + } + /// Runs a database-level command using the provided `ClientSession`. /// /// If the `ClientSession` provided is currently in a transaction, `command` must not specify a diff --git a/src/db/options.rs b/src/db/options.rs index 5cdb1cbb6..13302a714 100644 --- a/src/db/options.rs +++ b/src/db/options.rs @@ -8,7 +8,7 @@ use typed_builder::TypedBuilder; use crate::{ bson::{Bson, Document}, concern::{ReadConcern, WriteConcern}, - options::Collation, + options::{Collation, CursorType}, selection_criteria::SelectionCriteria, serde_util, }; @@ -312,3 +312,23 @@ pub struct ChangeStreamPreAndPostImages { /// If `true`, change streams will be able to include pre- and post-images. pub enabled: bool, } + +/// Specifies the options to a +/// [`Database::RunCursorCommand`](../struct.Database.html#method.run_cursor_command) operation. +#[derive(Clone, Debug, Default, TypedBuilder)] +#[builder(field_defaults(default, setter(into)))] +#[non_exhaustive] +pub struct RunCursorCommandOptions { + /// The default read preference for operations. + pub selection_criteria: Option, + /// The type of cursor to return. + pub cursor_type: Option, + /// Number of documents to return per batch. + pub batch_size: Option, + /// Optional non-negative integer value. Use this value to configure the maxTimeMS option sent + /// on subsequent getMore commands. + pub max_time: Option, + /// Optional BSON value. Use this value to configure the comment option sent on subsequent + /// getMore commands. + pub comment: Option, +} diff --git a/src/operation.rs b/src/operation.rs index f3e8fbd55..22d1e6a9f 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -19,6 +19,7 @@ mod list_databases; mod list_indexes; mod raw_output; mod run_command; +mod run_cursor_command; mod update; #[cfg(test)] @@ -71,6 +72,7 @@ pub(crate) use list_indexes::ListIndexes; #[cfg(feature = "in-use-encryption-unstable")] pub(crate) use raw_output::RawOutput; pub(crate) use run_command::RunCommand; +pub(crate) use run_cursor_command::RunCursorCommand; pub(crate) use update::Update; const SERVER_4_2_0_WIRE_VERSION: i32 = 8; diff --git a/src/operation/run_cursor_command.rs b/src/operation/run_cursor_command.rs new file mode 100644 index 000000000..7547a374c --- /dev/null +++ b/src/operation/run_cursor_command.rs @@ -0,0 +1,121 @@ +#[cfg(feature = "in-use-encryption-unstable")] +use bson::doc; +use bson::RawDocumentBuf; + +use crate::{ + cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription}, + concern::WriteConcern, + cursor::CursorSpecification, + error::{Error, Result}, + operation::{Operation, RunCommand}, + options::RunCursorCommandOptions, + selection_criteria::SelectionCriteria, +}; + +#[derive(Debug, Clone)] +pub(crate) struct RunCursorCommand<'conn> { + run_command: RunCommand<'conn>, + options: Option, +} + +impl<'conn> RunCursorCommand<'conn> { + pub(crate) fn new( + run_command: RunCommand<'conn>, + options: Option, + ) -> Result { + Ok(Self { + run_command, + options, + }) + } +} + +impl<'conn> Operation for RunCursorCommand<'conn> { + type O = CursorSpecification; + type Command = RawDocumentBuf; + + const NAME: &'static str = "run_cursor_command"; + + fn build(&mut self, description: &StreamDescription) -> Result> { + self.run_command.build(description) + } + + fn serialize_command(&mut self, cmd: Command) -> Result> { + self.run_command.serialize_command(cmd) + } + + fn extract_at_cluster_time( + &self, + response: &bson::RawDocument, + ) -> Result> { + self.run_command.extract_at_cluster_time(response) + } + + fn handle_error(&self, error: Error) -> Result { + Err(error) + } + + fn selection_criteria(&self) -> Option<&SelectionCriteria> { + self.run_command.selection_criteria() + } + + fn is_acknowledged(&self) -> bool { + self.run_command.is_acknowledged() + } + + fn write_concern(&self) -> Option<&WriteConcern> { + self.run_command.write_concern() + } + + fn supports_read_concern(&self, description: &StreamDescription) -> bool { + self.run_command.supports_read_concern(description) + } + + fn supports_sessions(&self) -> bool { + self.run_command.supports_sessions() + } + + fn retryability(&self) -> crate::operation::Retryability { + self.run_command.retryability() + } + + fn update_for_retry(&mut self) { + self.run_command.update_for_retry() + } + + fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { + self.run_command.pinned_connection() + } + + fn name(&self) -> &str { + self.run_command.name() + } + + fn handle_response( + &self, + response: RawCommandResponse, + description: &StreamDescription, + ) -> Result { + let doc = Operation::handle_response(&self.run_command, response, description)?; + let cursor_info = bson::from_document(doc)?; + let batch_size = match &self.options { + Some(options) => options.batch_size.clone(), + None => None, + }; + let max_time = match &self.options { + Some(options) => options.max_time.clone(), + None => None, + }; + let comment = match &self.options { + Some(options) => options.comment.clone(), + None => None, + }; + Ok(CursorSpecification::new( + cursor_info, + description.server_address.clone(), + batch_size, + max_time, + comment, + )) + } +} diff --git a/src/results.rs b/src/results.rs index dc4782147..f15e5c852 100644 --- a/src/results.rs +++ b/src/results.rs @@ -6,8 +6,8 @@ use crate::{ bson::{serde_helpers, Bson, Document}, change_stream::event::ResumeToken, db::options::CreateCollectionOptions, - Namespace, serde_util, + Namespace, }; use bson::{Binary, RawDocumentBuf};