From 4efc673ec7b3379263f6b9bb8e5e306c2798c0ea Mon Sep 17 00:00:00 2001 From: drshika <67125579+drshika@users.noreply.github.com> Date: Mon, 10 Jul 2023 13:57:39 -0400 Subject: [PATCH 01/15] RUST-1636: Add RunCursorCommand --- src/coll/options.rs | 2 +- src/db.rs | 116 ++++++++++++++++++++++++++++++++++++++++++-- src/db/options.rs | 21 +++++++- src/operation.rs | 2 +- src/results.rs | 2 +- 5 files changed, 136 insertions(+), 7 deletions(-) diff --git a/src/coll/options.rs b/src/coll/options.rs index 140e6b910..4ff6d9445 100644 --- a/src/coll/options.rs +++ b/src/coll/options.rs @@ -78,7 +78,7 @@ impl Hint { } /// Specifies the type of cursor to return from a find operation. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Deserialize)] #[non_exhaustive] pub enum CursorType { /// Default; close the cursor after the last document is received from the server. diff --git a/src/db.rs b/src/db.rs index cb5738eaa..2150430b0 100644 --- a/src/db.rs +++ b/src/db.rs @@ -4,6 +4,7 @@ use std::{fmt::Debug, sync::Arc}; #[cfg(feature = "in-use-encryption-unstable")] use bson::doc; +use bson::RawDocumentBuf; use futures_util::stream::TryStreamExt; use crate::{ @@ -15,12 +16,20 @@ use crate::{ ChangeStream, }, client::session::TransactionState, - cmap::conn::PinnedConnectionHandle, + cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription}, concern::{ReadConcern, WriteConcern}, - cursor::Cursor, + cursor::{Cursor, CursorSpecification}, error::{Error, ErrorKind, Result}, gridfs::{options::GridFsBucketOptions, GridFsBucket}, - operation::{Aggregate, AggregateTarget, Create, DropDatabase, ListCollections, RunCommand}, + operation::{ + Aggregate, + AggregateTarget, + Create, + DropDatabase, + ListCollections, + Operation, + RunCommand, + }, options::{ AggregateOptions, CollectionOptions, @@ -28,6 +37,7 @@ use crate::{ DatabaseOptions, DropDatabaseOptions, ListCollectionsOptions, + RunCursorCommandOptions, }, results::CollectionSpecification, selection_criteria::SelectionCriteria, @@ -468,6 +478,21 @@ impl Database { self.run_command_common(command, selection_criteria, None, None) .await } + /// Runs a database-level command and returns a cursor to the response. + pub async fn run_cursor_command( + &self, + command: Document, + options: RunCursorCommandOptions, + // operation: impl Operation, + ) -> Result> { + let rcc = RunCommand::new(self.name().to_string(), command, None, None)?; + let rc_command = RunCursorCommand { + run_command: rcc, + options: options, + }; + let client = self.client(); + client.execute_cursor_operation(rc_command).await + } /// Runs a database-level command using the provided `ClientSession`. /// @@ -607,3 +632,88 @@ impl Database { GridFsBucket::new(self.clone(), options.into().unwrap_or_default()) } } + +pub(super) struct RunCursorCommand<'conn> { + run_command: RunCommand<'conn>, + options: RunCursorCommandOptions, +} + +impl<'conn> Operation for RunCursorCommand<'conn> { + type O = CursorSpecification; + type Command = RawDocumentBuf; + + const NAME: &'static str = "run_cursor_command"; + + fn build(&mut self, _description: &StreamDescription) -> Result> { + Operation::build(&mut self.run_command, _description) + } + + fn serialize_command(&mut self, cmd: Command) -> Result> { + Operation::serialize_command(&mut self.run_command, cmd) + } + + fn extract_at_cluster_time( + &self, + _response: &bson::RawDocument, + ) -> Result> { + Operation::extract_at_cluster_time(&self.run_command, _response) + } + + fn handle_error(&self, error: Error) -> Result { + Err(error) + } + + fn selection_criteria(&self) -> Option<&SelectionCriteria> { + Operation::selection_criteria(&self.run_command) + } + + fn is_acknowledged(&self) -> bool { + Operation::is_acknowledged(&self.run_command) + } + + fn write_concern(&self) -> Option<&WriteConcern> { + Operation::write_concern(&self.run_command) + } + + fn supports_read_concern(&self, _description: &StreamDescription) -> bool { + Operation::supports_read_concern(&self.run_command, _description) + } + + fn supports_sessions(&self) -> bool { + Operation::supports_sessions(&self.run_command) + } + + fn retryability(&self) -> crate::operation::Retryability { + Operation::retryability(&self.run_command) + } + + fn update_for_retry(&mut self) { + Operation::update_for_retry(&mut self.run_command) + } + + fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { + Operation::pinned_connection(&self.run_command) + } + + fn name(&self) -> &str { + Operation::name(&self.run_command) + } + + fn handle_response( + &self, + response: RawCommandResponse, + description: &StreamDescription, + ) -> Result { + let doc = Operation::handle_response(&self.run_command, response, description)?; + let cursor_info = bson::from_document(doc)?; + // let response: CursorBody = response.body()?; + // let temp = bson::from_bson(self.options.comment.to_owned().unwrap()); + Ok(CursorSpecification::new( + cursor_info, + description.server_address.clone(), + self.options.batch_size, + self.options.max_time_ms, + self.options.comment.clone(), + )) + } +} diff --git a/src/db/options.rs b/src/db/options.rs index 5cdb1cbb6..a0fd05252 100644 --- a/src/db/options.rs +++ b/src/db/options.rs @@ -8,7 +8,7 @@ use typed_builder::TypedBuilder; use crate::{ bson::{Bson, Document}, concern::{ReadConcern, WriteConcern}, - options::Collation, + options::{Collation, CursorType}, selection_criteria::SelectionCriteria, serde_util, }; @@ -312,3 +312,22 @@ pub struct ChangeStreamPreAndPostImages { /// If `true`, change streams will be able to include pre- and post-images. pub enabled: bool, } + +#[derive(Clone, Debug, Deserialize)] +#[non_exhaustive] +pub enum TimeoutMode { + Iteration, + CursorLifetime, +} + +#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] +#[serde(rename_all = "camelCase")] +#[builder(field_defaults(default, setter(into)))] +#[non_exhaustive] +pub struct RunCursorCommandOptions { + pub timeout_mode: Option, + pub cursor_type: Option, + pub batch_size: Option, + pub max_time_ms: Option, + pub comment: Option, +} diff --git a/src/operation.rs b/src/operation.rs index f3e8fbd55..bffea0b10 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -74,7 +74,7 @@ pub(crate) use run_command::RunCommand; pub(crate) use update::Update; const SERVER_4_2_0_WIRE_VERSION: i32 = 8; -const SERVER_4_4_0_WIRE_VERSION: i32 = 9; +pub const SERVER_4_4_0_WIRE_VERSION: i32 = 9; /// A trait modeling the behavior of a server side operation. /// diff --git a/src/results.rs b/src/results.rs index dc4782147..f15e5c852 100644 --- a/src/results.rs +++ b/src/results.rs @@ -6,8 +6,8 @@ use crate::{ bson::{serde_helpers, Bson, Document}, change_stream::event::ResumeToken, db::options::CreateCollectionOptions, - Namespace, serde_util, + Namespace, }; use bson::{Binary, RawDocumentBuf}; From e6626f5bf7e0a98c8e64a1836faefee0bb4f87ab Mon Sep 17 00:00:00 2001 From: drshika <67125579+drshika@users.noreply.github.com> Date: Mon, 10 Jul 2023 14:29:46 -0400 Subject: [PATCH 02/15] docs --- src/db/options.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/db/options.rs b/src/db/options.rs index a0fd05252..d2cd834b2 100644 --- a/src/db/options.rs +++ b/src/db/options.rs @@ -313,21 +313,33 @@ pub struct ChangeStreamPreAndPostImages { pub enabled: bool, } +/// Determines the lifetime of a cursor. #[derive(Clone, Debug, Deserialize)] #[non_exhaustive] pub enum TimeoutMode { + /// Times out after one Iteration. Iteration, + /// Times out after the end of the Cursor Lifetime. CursorLifetime, } +/// Specifies the options to a +/// [`Database::RunCursorCommand`](../struct.Database.html#method.run_cursor_command) operation. #[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] #[serde(rename_all = "camelCase")] #[builder(field_defaults(default, setter(into)))] #[non_exhaustive] pub struct RunCursorCommandOptions { + /// Optional string enum value, one of 'iteration' | 'cursorLifetime'. pub timeout_mode: Option, + /// Optional string enum value, one of 'tailable' | 'tailableAwait' | 'nonTailable'. pub cursor_type: Option, + /// Number of documents to return per batch. pub batch_size: Option, + /// Optional non-negative integer value. Use this value to configure the maxTimeMS option sent + /// on subsequent getMore commands. pub max_time_ms: Option, + /// Optional BSON value. Use this value to configure the comment option sent on subsequent + /// getMore commands. pub comment: Option, } From dcee793163f642bade08825c302f4a2e3a42e0e2 Mon Sep 17 00:00:00 2001 From: drshika <67125579+drshika@users.noreply.github.com> Date: Tue, 11 Jul 2023 17:09:08 -0400 Subject: [PATCH 03/15] add run_cursor_command_with_session --- src/coll/options.rs | 2 +- src/db.rs | 144 +++++++++++++++++++++++++++++++++++++++----- src/db/options.rs | 34 ++++++++++- 3 files changed, 160 insertions(+), 20 deletions(-) diff --git a/src/coll/options.rs b/src/coll/options.rs index 4ff6d9445..140e6b910 100644 --- a/src/coll/options.rs +++ b/src/coll/options.rs @@ -78,7 +78,7 @@ impl Hint { } /// Specifies the type of cursor to return from a find operation. -#[derive(Debug, Clone, Copy, Deserialize)] +#[derive(Debug, Clone, Copy)] #[non_exhaustive] pub enum CursorType { /// Default; close the cursor after the last document is received from the server. diff --git a/src/db.rs b/src/db.rs index 2150430b0..2afe94526 100644 --- a/src/db.rs +++ b/src/db.rs @@ -48,6 +48,8 @@ use crate::{ SessionCursor, }; +use self::options::RunCursorCommandWithSessionOptions; + /// `Database` is the client-side abstraction of a MongoDB database. It can be used to perform /// database-level operations or to obtain handles to specific collections within the database. A /// `Database` can only be obtained through a [`Client`](struct.Client.html) by calling either @@ -478,17 +480,45 @@ impl Database { self.run_command_common(command, selection_criteria, None, None) .await } + /// Runs a database-level command and returns a cursor to the response. pub async fn run_cursor_command( &self, command: Document, options: RunCursorCommandOptions, - // operation: impl Operation, ) -> Result> { let rcc = RunCommand::new(self.name().to_string(), command, None, None)?; let rc_command = RunCursorCommand { run_command: rcc, - options: options, + options, + }; + let client = self.client(); + client.execute_cursor_operation(rc_command).await + } + + /// Runs a database-level command and returns a cursor to the response. + pub async fn run_cursor_command_with_session( + &self, + command: Document, + options: RunCursorCommandWithSessionOptions, + session: &mut ClientSession, + ) -> Result> { + match session.transaction.state { + TransactionState::Starting | TransactionState::InProgress => { + if command.contains_key("readConcern") { + return Err(ErrorKind::InvalidArgument { + message: "Cannot set read concern after starting a transaction".into(), + } + .into()); + } + } + _ => {} + } + let rcc = RunCommand::new(self.name().to_string(), command, options.read_preference.clone(), None)?; + let rc_command = RunCursorCommandWithSession { + run_command: rcc, + options, + session, }; let client = self.client(); client.execute_cursor_operation(rc_command).await @@ -645,18 +675,18 @@ impl<'conn> Operation for RunCursorCommand<'conn> { const NAME: &'static str = "run_cursor_command"; fn build(&mut self, _description: &StreamDescription) -> Result> { - Operation::build(&mut self.run_command, _description) + self.run_command.build(_description) } fn serialize_command(&mut self, cmd: Command) -> Result> { - Operation::serialize_command(&mut self.run_command, cmd) + self.run_command.serialize_command(cmd) } fn extract_at_cluster_time( &self, _response: &bson::RawDocument, ) -> Result> { - Operation::extract_at_cluster_time(&self.run_command, _response) + self.run_command.extract_at_cluster_time(_response) } fn handle_error(&self, error: Error) -> Result { @@ -664,39 +694,39 @@ impl<'conn> Operation for RunCursorCommand<'conn> { } fn selection_criteria(&self) -> Option<&SelectionCriteria> { - Operation::selection_criteria(&self.run_command) + self.run_command.selection_criteria() } fn is_acknowledged(&self) -> bool { - Operation::is_acknowledged(&self.run_command) + self.run_command.is_acknowledged() } fn write_concern(&self) -> Option<&WriteConcern> { - Operation::write_concern(&self.run_command) + self.run_command.write_concern() } fn supports_read_concern(&self, _description: &StreamDescription) -> bool { - Operation::supports_read_concern(&self.run_command, _description) + self.run_command.supports_read_concern(_description) } fn supports_sessions(&self) -> bool { - Operation::supports_sessions(&self.run_command) + self.run_command.supports_sessions() } fn retryability(&self) -> crate::operation::Retryability { - Operation::retryability(&self.run_command) + self.run_command.retryability() } fn update_for_retry(&mut self) { - Operation::update_for_retry(&mut self.run_command) + self.run_command.update_for_retry() } fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { - Operation::pinned_connection(&self.run_command) + self.run_command.pinned_connection() } fn name(&self) -> &str { - Operation::name(&self.run_command) + self.run_command.name() } fn handle_response( @@ -706,8 +736,6 @@ impl<'conn> Operation for RunCursorCommand<'conn> { ) -> Result { let doc = Operation::handle_response(&self.run_command, response, description)?; let cursor_info = bson::from_document(doc)?; - // let response: CursorBody = response.body()?; - // let temp = bson::from_bson(self.options.comment.to_owned().unwrap()); Ok(CursorSpecification::new( cursor_info, description.server_address.clone(), @@ -717,3 +745,87 @@ impl<'conn> Operation for RunCursorCommand<'conn> { )) } } + +pub(super) struct RunCursorCommandWithSession<'conn> { + run_command: RunCommand<'conn>, + options: RunCursorCommandWithSessionOptions, + session: &'conn mut ClientSession, +} + +impl<'conn> Operation for RunCursorCommandWithSession<'conn> { + type O = CursorSpecification; + type Command = RawDocumentBuf; + + const NAME: &'static str = "run_cursor_command"; + + fn build(&mut self, _description: &StreamDescription) -> Result> { + self.run_command.build(_description) + } + + fn serialize_command(&mut self, cmd: Command) -> Result> { + self.run_command.serialize_command(cmd) + } + + fn extract_at_cluster_time( + &self, + _response: &bson::RawDocument, + ) -> Result> { + self.run_command.extract_at_cluster_time(_response) + } + + fn handle_error(&self, error: Error) -> Result { + Err(error) + } + + fn selection_criteria(&self) -> Option<&SelectionCriteria> { + self.run_command.selection_criteria() + } + + fn is_acknowledged(&self) -> bool { + self.run_command.is_acknowledged() + } + + fn write_concern(&self) -> Option<&WriteConcern> { + self.run_command.write_concern() + } + + fn supports_read_concern(&self, _description: &StreamDescription) -> bool { + self.run_command.supports_read_concern(_description) + } + + fn supports_sessions(&self) -> bool { + self.run_command.supports_sessions() + } + + fn retryability(&self) -> crate::operation::Retryability { + self.run_command.retryability() + } + + fn update_for_retry(&mut self) { + self.run_command.update_for_retry() + } + + fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { + self.run_command.pinned_connection() + } + + fn name(&self) -> &str { + self.run_command.name() + } + + fn handle_response( + &self, + response: RawCommandResponse, + description: &StreamDescription, + ) -> Result { + let doc = Operation::handle_response(&self.run_command, response, description)?; + let cursor_info = bson::from_document(doc)?; + Ok(CursorSpecification::new( + cursor_info, + description.server_address.clone(), + self.options.batch_size, + self.options.max_time_ms, + self.options.comment.clone(), + )) + } +} \ No newline at end of file diff --git a/src/db/options.rs b/src/db/options.rs index d2cd834b2..81b53502a 100644 --- a/src/db/options.rs +++ b/src/db/options.rs @@ -314,7 +314,7 @@ pub struct ChangeStreamPreAndPostImages { } /// Determines the lifetime of a cursor. -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug)] #[non_exhaustive] pub enum TimeoutMode { /// Times out after one Iteration. @@ -325,13 +325,14 @@ pub enum TimeoutMode { /// Specifies the options to a /// [`Database::RunCursorCommand`](../struct.Database.html#method.run_cursor_command) operation. -#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] -#[serde(rename_all = "camelCase")] +#[derive(Clone, Debug, Default, TypedBuilder)] #[builder(field_defaults(default, setter(into)))] #[non_exhaustive] pub struct RunCursorCommandOptions { /// Optional string enum value, one of 'iteration' | 'cursorLifetime'. pub timeout_mode: Option, + /// The default read preference for operations. + pub read_preference: Option, /// Optional string enum value, one of 'tailable' | 'tailableAwait' | 'nonTailable'. pub cursor_type: Option, /// Number of documents to return per batch. @@ -342,4 +343,31 @@ pub struct RunCursorCommandOptions { /// Optional BSON value. Use this value to configure the comment option sent on subsequent /// getMore commands. pub comment: Option, + /// The session to run this command with. + pub session: Option, } + + +/// Specifies the options to a +/// [`Database::RunCursorCommand`](../struct.Database.html#method.run_cursor_command_with_session) operation. +#[derive(Clone, Debug, Default, TypedBuilder)] +#[builder(field_defaults(default, setter(into)))] +#[non_exhaustive] +pub struct RunCursorCommandWithSessionOptions { + /// Optional string enum value, one of 'iteration' | 'cursorLifetime'. + pub timeout_mode: Option, + /// The default read preference for operations. + pub read_preference: Option, + /// Optional string enum value, one of 'tailable' | 'tailableAwait' | 'nonTailable'. + pub cursor_type: Option, + /// Number of documents to return per batch. + pub batch_size: Option, + /// Optional non-negative integer value. Use this value to configure the maxTimeMS option sent + /// on subsequent getMore commands. + pub max_time_ms: Option, + /// Optional BSON value. Use this value to configure the comment option sent on subsequent + /// getMore commands. + pub comment: Option, + /// The session to run this command with. + pub session: Option, +} \ No newline at end of file From 9f9737f7ce8d86413b499cc9b34a4c6fde8c9147 Mon Sep 17 00:00:00 2001 From: drshika <67125579+drshika@users.noreply.github.com> Date: Wed, 12 Jul 2023 13:08:53 -0400 Subject: [PATCH 04/15] remove RunCursorCommandWithSession Enum --- src/db.rs | 95 ++--------------------------------------------- src/db/options.rs | 25 ------------- 2 files changed, 4 insertions(+), 116 deletions(-) diff --git a/src/db.rs b/src/db.rs index 2afe94526..38c49acf3 100644 --- a/src/db.rs +++ b/src/db.rs @@ -48,8 +48,6 @@ use crate::{ SessionCursor, }; -use self::options::RunCursorCommandWithSessionOptions; - /// `Database` is the client-side abstraction of a MongoDB database. It can be used to perform /// database-level operations or to obtain handles to specific collections within the database. A /// `Database` can only be obtained through a [`Client`](struct.Client.html) by calling either @@ -500,9 +498,9 @@ impl Database { pub async fn run_cursor_command_with_session( &self, command: Document, - options: RunCursorCommandWithSessionOptions, + options: RunCursorCommandOptions, session: &mut ClientSession, - ) -> Result> { + ) -> Result> { match session.transaction.state { TransactionState::Starting | TransactionState::InProgress => { if command.contains_key("readConcern") { @@ -515,13 +513,12 @@ impl Database { _ => {} } let rcc = RunCommand::new(self.name().to_string(), command, options.read_preference.clone(), None)?; - let rc_command = RunCursorCommandWithSession { + let rc_command = RunCursorCommand { run_command: rcc, options, - session, }; let client = self.client(); - client.execute_cursor_operation(rc_command).await + client.execute_session_cursor_operation(rc_command, session).await } /// Runs a database-level command using the provided `ClientSession`. @@ -729,90 +726,6 @@ impl<'conn> Operation for RunCursorCommand<'conn> { self.run_command.name() } - fn handle_response( - &self, - response: RawCommandResponse, - description: &StreamDescription, - ) -> Result { - let doc = Operation::handle_response(&self.run_command, response, description)?; - let cursor_info = bson::from_document(doc)?; - Ok(CursorSpecification::new( - cursor_info, - description.server_address.clone(), - self.options.batch_size, - self.options.max_time_ms, - self.options.comment.clone(), - )) - } -} - -pub(super) struct RunCursorCommandWithSession<'conn> { - run_command: RunCommand<'conn>, - options: RunCursorCommandWithSessionOptions, - session: &'conn mut ClientSession, -} - -impl<'conn> Operation for RunCursorCommandWithSession<'conn> { - type O = CursorSpecification; - type Command = RawDocumentBuf; - - const NAME: &'static str = "run_cursor_command"; - - fn build(&mut self, _description: &StreamDescription) -> Result> { - self.run_command.build(_description) - } - - fn serialize_command(&mut self, cmd: Command) -> Result> { - self.run_command.serialize_command(cmd) - } - - fn extract_at_cluster_time( - &self, - _response: &bson::RawDocument, - ) -> Result> { - self.run_command.extract_at_cluster_time(_response) - } - - fn handle_error(&self, error: Error) -> Result { - Err(error) - } - - fn selection_criteria(&self) -> Option<&SelectionCriteria> { - self.run_command.selection_criteria() - } - - fn is_acknowledged(&self) -> bool { - self.run_command.is_acknowledged() - } - - fn write_concern(&self) -> Option<&WriteConcern> { - self.run_command.write_concern() - } - - fn supports_read_concern(&self, _description: &StreamDescription) -> bool { - self.run_command.supports_read_concern(_description) - } - - fn supports_sessions(&self) -> bool { - self.run_command.supports_sessions() - } - - fn retryability(&self) -> crate::operation::Retryability { - self.run_command.retryability() - } - - fn update_for_retry(&mut self) { - self.run_command.update_for_retry() - } - - fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { - self.run_command.pinned_connection() - } - - fn name(&self) -> &str { - self.run_command.name() - } - fn handle_response( &self, response: RawCommandResponse, diff --git a/src/db/options.rs b/src/db/options.rs index 81b53502a..7772518e7 100644 --- a/src/db/options.rs +++ b/src/db/options.rs @@ -345,29 +345,4 @@ pub struct RunCursorCommandOptions { pub comment: Option, /// The session to run this command with. pub session: Option, -} - - -/// Specifies the options to a -/// [`Database::RunCursorCommand`](../struct.Database.html#method.run_cursor_command_with_session) operation. -#[derive(Clone, Debug, Default, TypedBuilder)] -#[builder(field_defaults(default, setter(into)))] -#[non_exhaustive] -pub struct RunCursorCommandWithSessionOptions { - /// Optional string enum value, one of 'iteration' | 'cursorLifetime'. - pub timeout_mode: Option, - /// The default read preference for operations. - pub read_preference: Option, - /// Optional string enum value, one of 'tailable' | 'tailableAwait' | 'nonTailable'. - pub cursor_type: Option, - /// Number of documents to return per batch. - pub batch_size: Option, - /// Optional non-negative integer value. Use this value to configure the maxTimeMS option sent - /// on subsequent getMore commands. - pub max_time_ms: Option, - /// Optional BSON value. Use this value to configure the comment option sent on subsequent - /// getMore commands. - pub comment: Option, - /// The session to run this command with. - pub session: Option, } \ No newline at end of file From 9e17b35f781f3766fcbaeeab9876e46f1355c6e3 Mon Sep 17 00:00:00 2001 From: drshika <67125579+drshika@users.noreply.github.com> Date: Wed, 12 Jul 2023 15:41:57 -0400 Subject: [PATCH 05/15] more comment fixes --- src/db.rs | 17 +++++++++++------ src/db/options.rs | 6 ++---- src/operation.rs | 2 +- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/db.rs b/src/db.rs index 38c49acf3..e30d92abe 100644 --- a/src/db.rs +++ b/src/db.rs @@ -501,18 +501,23 @@ impl Database { options: RunCursorCommandOptions, session: &mut ClientSession, ) -> Result> { + let mut selection_criteria = SelectionCriteria::ReadPreference(options.read_preference.clone().unwrap()).into(); match session.transaction.state { TransactionState::Starting | TransactionState::InProgress => { - if command.contains_key("readConcern") { - return Err(ErrorKind::InvalidArgument { - message: "Cannot set read concern after starting a transaction".into(), + selection_criteria = match selection_criteria { + Some(selection_criteria) => Some(selection_criteria), + None => { + if let Some(ref options) = session.transaction.options { + options.selection_criteria.clone() + } else { + None + } } - .into()); - } + }; } _ => {} } - let rcc = RunCommand::new(self.name().to_string(), command, options.read_preference.clone(), None)?; + let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?; let rc_command = RunCursorCommand { run_command: rcc, options, diff --git a/src/db/options.rs b/src/db/options.rs index 7772518e7..511b22a95 100644 --- a/src/db/options.rs +++ b/src/db/options.rs @@ -9,7 +9,7 @@ use crate::{ bson::{Bson, Document}, concern::{ReadConcern, WriteConcern}, options::{Collation, CursorType}, - selection_criteria::SelectionCriteria, + selection_criteria::{SelectionCriteria, ReadPreference}, serde_util, }; @@ -332,7 +332,7 @@ pub struct RunCursorCommandOptions { /// Optional string enum value, one of 'iteration' | 'cursorLifetime'. pub timeout_mode: Option, /// The default read preference for operations. - pub read_preference: Option, + pub read_preference: Option, /// Optional string enum value, one of 'tailable' | 'tailableAwait' | 'nonTailable'. pub cursor_type: Option, /// Number of documents to return per batch. @@ -343,6 +343,4 @@ pub struct RunCursorCommandOptions { /// Optional BSON value. Use this value to configure the comment option sent on subsequent /// getMore commands. pub comment: Option, - /// The session to run this command with. - pub session: Option, } \ No newline at end of file diff --git a/src/operation.rs b/src/operation.rs index bffea0b10..f3e8fbd55 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -74,7 +74,7 @@ pub(crate) use run_command::RunCommand; pub(crate) use update::Update; const SERVER_4_2_0_WIRE_VERSION: i32 = 8; -pub const SERVER_4_4_0_WIRE_VERSION: i32 = 9; +const SERVER_4_4_0_WIRE_VERSION: i32 = 9; /// A trait modeling the behavior of a server side operation. /// From 5d74bfabdb2e4aca961b6a319bb088672bde10eb Mon Sep 17 00:00:00 2001 From: drshika <67125579+drshika@users.noreply.github.com> Date: Fri, 14 Jul 2023 13:56:45 -0400 Subject: [PATCH 06/15] more comment fixes --- src/db.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/db.rs b/src/db.rs index e30d92abe..aa8546c2e 100644 --- a/src/db.rs +++ b/src/db.rs @@ -501,7 +501,9 @@ impl Database { options: RunCursorCommandOptions, session: &mut ClientSession, ) -> Result> { - let mut selection_criteria = SelectionCriteria::ReadPreference(options.read_preference.clone().unwrap()).into(); + let mut selection_criteria = options.read_preference + .clone() + .map(SelectionCriteria::ReadPreference); match session.transaction.state { TransactionState::Starting | TransactionState::InProgress => { selection_criteria = match selection_criteria { From 69c8d5138dab3bd2ca2b9050ae2f71e981b972b5 Mon Sep 17 00:00:00 2001 From: drshika <67125579+drshika@users.noreply.github.com> Date: Mon, 17 Jul 2023 15:58:02 -0400 Subject: [PATCH 07/15] all pr fixes but moving files --- src/db.rs | 7 +++++-- src/db/options.rs | 16 ++-------------- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/src/db.rs b/src/db.rs index aa8546c2e..073cf269f 100644 --- a/src/db.rs +++ b/src/db.rs @@ -485,7 +485,10 @@ impl Database { command: Document, options: RunCursorCommandOptions, ) -> Result> { - let rcc = RunCommand::new(self.name().to_string(), command, None, None)?; + let selection_criteria = options.read_preference + .clone() + .map(SelectionCriteria::ReadPreference); + let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?; let rc_command = RunCursorCommand { run_command: rcc, options, @@ -744,7 +747,7 @@ impl<'conn> Operation for RunCursorCommand<'conn> { cursor_info, description.server_address.clone(), self.options.batch_size, - self.options.max_time_ms, + self.options.max_time, self.options.comment.clone(), )) } diff --git a/src/db/options.rs b/src/db/options.rs index 511b22a95..fcfcdfcc8 100644 --- a/src/db/options.rs +++ b/src/db/options.rs @@ -313,33 +313,21 @@ pub struct ChangeStreamPreAndPostImages { pub enabled: bool, } -/// Determines the lifetime of a cursor. -#[derive(Clone, Debug)] -#[non_exhaustive] -pub enum TimeoutMode { - /// Times out after one Iteration. - Iteration, - /// Times out after the end of the Cursor Lifetime. - CursorLifetime, -} - /// Specifies the options to a /// [`Database::RunCursorCommand`](../struct.Database.html#method.run_cursor_command) operation. #[derive(Clone, Debug, Default, TypedBuilder)] #[builder(field_defaults(default, setter(into)))] #[non_exhaustive] pub struct RunCursorCommandOptions { - /// Optional string enum value, one of 'iteration' | 'cursorLifetime'. - pub timeout_mode: Option, /// The default read preference for operations. pub read_preference: Option, - /// Optional string enum value, one of 'tailable' | 'tailableAwait' | 'nonTailable'. + /// The type of cursor to return. pub cursor_type: Option, /// Number of documents to return per batch. pub batch_size: Option, /// Optional non-negative integer value. Use this value to configure the maxTimeMS option sent /// on subsequent getMore commands. - pub max_time_ms: Option, + pub max_time: Option, /// Optional BSON value. Use this value to configure the comment option sent on subsequent /// getMore commands. pub comment: Option, From ee5492a9fcdffb855cd90226ba4c86e913da145b Mon Sep 17 00:00:00 2001 From: drshika <67125579+drshika@users.noreply.github.com> Date: Mon, 17 Jul 2023 16:50:26 -0400 Subject: [PATCH 08/15] moved files --- src/db.rs | 102 ++---------------------- src/operation.rs | 2 + src/operation/run_cursor_command.rs | 117 ++++++++++++++++++++++++++++ 3 files changed, 125 insertions(+), 96 deletions(-) create mode 100644 src/operation/run_cursor_command.rs diff --git a/src/db.rs b/src/db.rs index 073cf269f..d363f8cb1 100644 --- a/src/db.rs +++ b/src/db.rs @@ -2,9 +2,8 @@ pub mod options; use std::{fmt::Debug, sync::Arc}; +use futures_util::TryStreamExt; #[cfg(feature = "in-use-encryption-unstable")] -use bson::doc; -use bson::RawDocumentBuf; use futures_util::stream::TryStreamExt; use crate::{ @@ -16,9 +15,9 @@ use crate::{ ChangeStream, }, client::session::TransactionState, - cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription}, + cmap::{conn::PinnedConnectionHandle}, concern::{ReadConcern, WriteConcern}, - cursor::{Cursor, CursorSpecification}, + cursor::{Cursor}, error::{Error, ErrorKind, Result}, gridfs::{options::GridFsBucketOptions, GridFsBucket}, operation::{ @@ -27,8 +26,8 @@ use crate::{ Create, DropDatabase, ListCollections, - Operation, RunCommand, + RunCursorCommand, }, options::{ AggregateOptions, @@ -489,10 +488,7 @@ impl Database { .clone() .map(SelectionCriteria::ReadPreference); let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?; - let rc_command = RunCursorCommand { - run_command: rcc, - options, - }; + let rc_command = RunCursorCommand::new(rcc, options)?; let client = self.client(); client.execute_cursor_operation(rc_command).await } @@ -523,10 +519,7 @@ impl Database { _ => {} } let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?; - let rc_command = RunCursorCommand { - run_command: rcc, - options, - }; + let rc_command = RunCursorCommand::new(rcc, options)?; let client = self.client(); client.execute_session_cursor_operation(rc_command, session).await } @@ -668,87 +661,4 @@ impl Database { pub fn gridfs_bucket(&self, options: impl Into>) -> GridFsBucket { GridFsBucket::new(self.clone(), options.into().unwrap_or_default()) } -} - -pub(super) struct RunCursorCommand<'conn> { - run_command: RunCommand<'conn>, - options: RunCursorCommandOptions, -} - -impl<'conn> Operation for RunCursorCommand<'conn> { - type O = CursorSpecification; - type Command = RawDocumentBuf; - - const NAME: &'static str = "run_cursor_command"; - - fn build(&mut self, _description: &StreamDescription) -> Result> { - self.run_command.build(_description) - } - - fn serialize_command(&mut self, cmd: Command) -> Result> { - self.run_command.serialize_command(cmd) - } - - fn extract_at_cluster_time( - &self, - _response: &bson::RawDocument, - ) -> Result> { - self.run_command.extract_at_cluster_time(_response) - } - - fn handle_error(&self, error: Error) -> Result { - Err(error) - } - - fn selection_criteria(&self) -> Option<&SelectionCriteria> { - self.run_command.selection_criteria() - } - - fn is_acknowledged(&self) -> bool { - self.run_command.is_acknowledged() - } - - fn write_concern(&self) -> Option<&WriteConcern> { - self.run_command.write_concern() - } - - fn supports_read_concern(&self, _description: &StreamDescription) -> bool { - self.run_command.supports_read_concern(_description) - } - - fn supports_sessions(&self) -> bool { - self.run_command.supports_sessions() - } - - fn retryability(&self) -> crate::operation::Retryability { - self.run_command.retryability() - } - - fn update_for_retry(&mut self) { - self.run_command.update_for_retry() - } - - fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { - self.run_command.pinned_connection() - } - - fn name(&self) -> &str { - self.run_command.name() - } - - fn handle_response( - &self, - response: RawCommandResponse, - description: &StreamDescription, - ) -> Result { - let doc = Operation::handle_response(&self.run_command, response, description)?; - let cursor_info = bson::from_document(doc)?; - Ok(CursorSpecification::new( - cursor_info, - description.server_address.clone(), - self.options.batch_size, - self.options.max_time, - self.options.comment.clone(), - )) - } } \ No newline at end of file diff --git a/src/operation.rs b/src/operation.rs index f3e8fbd55..22d1e6a9f 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -19,6 +19,7 @@ mod list_databases; mod list_indexes; mod raw_output; mod run_command; +mod run_cursor_command; mod update; #[cfg(test)] @@ -71,6 +72,7 @@ pub(crate) use list_indexes::ListIndexes; #[cfg(feature = "in-use-encryption-unstable")] pub(crate) use raw_output::RawOutput; pub(crate) use run_command::RunCommand; +pub(crate) use run_cursor_command::RunCursorCommand; pub(crate) use update::Update; const SERVER_4_2_0_WIRE_VERSION: i32 = 8; diff --git a/src/operation/run_cursor_command.rs b/src/operation/run_cursor_command.rs new file mode 100644 index 000000000..6985ad465 --- /dev/null +++ b/src/operation/run_cursor_command.rs @@ -0,0 +1,117 @@ + + +#[cfg(feature = "in-use-encryption-unstable")] +use bson::doc; +use bson::RawDocumentBuf; + + +use crate::{ + cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription}, + concern::{WriteConcern}, + cursor::{CursorSpecification}, + error::{Error, Result}, + operation::{ + Operation, + RunCommand, + }, + options::{ + RunCursorCommandOptions, + }, + selection_criteria::SelectionCriteria, +}; + +#[derive(Debug, Clone)] +pub(crate) struct RunCursorCommand<'conn> { + run_command: RunCommand<'conn>, + options: RunCursorCommandOptions, +} + +impl<'conn> RunCursorCommand<'conn>{ + pub(crate) fn new ( + run_command: RunCommand<'conn>, + options: RunCursorCommandOptions, + ) -> Result { + Ok(Self{ + run_command, + options, + }) + } +} + +impl<'conn> Operation for RunCursorCommand<'conn> { + type O = CursorSpecification; + type Command = RawDocumentBuf; + + const NAME: &'static str = "run_cursor_command"; + + fn build(&mut self, _description: &StreamDescription) -> Result> { + self.run_command.build(_description) + } + + fn serialize_command(&mut self, cmd: Command) -> Result> { + self.run_command.serialize_command(cmd) + } + + fn extract_at_cluster_time( + &self, + _response: &bson::RawDocument, + ) -> Result> { + self.run_command.extract_at_cluster_time(_response) + } + + fn handle_error(&self, error: Error) -> Result { + Err(error) + } + + fn selection_criteria(&self) -> Option<&SelectionCriteria> { + self.run_command.selection_criteria() + } + + fn is_acknowledged(&self) -> bool { + self.run_command.is_acknowledged() + } + + fn write_concern(&self) -> Option<&WriteConcern> { + self.run_command.write_concern() + } + + fn supports_read_concern(&self, _description: &StreamDescription) -> bool { + self.run_command.supports_read_concern(_description) + } + + fn supports_sessions(&self) -> bool { + self.run_command.supports_sessions() + } + + fn retryability(&self) -> crate::operation::Retryability { + self.run_command.retryability() + } + + fn update_for_retry(&mut self) { + self.run_command.update_for_retry() + } + + fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { + self.run_command.pinned_connection() + } + + fn name(&self) -> &str { + self.run_command.name() + } + + fn handle_response( + &self, + response: RawCommandResponse, + description: &StreamDescription, + ) -> Result { + let doc = Operation::handle_response(&self.run_command, response, description)?; + let cursor_info = bson::from_document(doc)?; + Ok(CursorSpecification::new( + cursor_info, + description.server_address.clone(), + self.options.batch_size, + self.options.max_time, + self.options.comment.clone(), + )) + } +} \ No newline at end of file From 960c80019c257158b360574a74ea2af40f572617 Mon Sep 17 00:00:00 2001 From: drshika <67125579+drshika@users.noreply.github.com> Date: Tue, 18 Jul 2023 11:26:49 -0400 Subject: [PATCH 09/15] resolve remaining comments --- src/db.rs | 9 ++------- src/db/options.rs | 2 +- src/operation/run_cursor_command.rs | 12 ++++++------ 3 files changed, 9 insertions(+), 14 deletions(-) diff --git a/src/db.rs b/src/db.rs index d363f8cb1..89b95c81c 100644 --- a/src/db.rs +++ b/src/db.rs @@ -484,10 +484,7 @@ impl Database { command: Document, options: RunCursorCommandOptions, ) -> Result> { - let selection_criteria = options.read_preference - .clone() - .map(SelectionCriteria::ReadPreference); - let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?; + let rcc = RunCommand::new(self.name().to_string(), command, options.read_preference.clone(), None)?; let rc_command = RunCursorCommand::new(rcc, options)?; let client = self.client(); client.execute_cursor_operation(rc_command).await @@ -500,9 +497,7 @@ impl Database { options: RunCursorCommandOptions, session: &mut ClientSession, ) -> Result> { - let mut selection_criteria = options.read_preference - .clone() - .map(SelectionCriteria::ReadPreference); + let mut selection_criteria = options.read_preference.clone(); match session.transaction.state { TransactionState::Starting | TransactionState::InProgress => { selection_criteria = match selection_criteria { diff --git a/src/db/options.rs b/src/db/options.rs index fcfcdfcc8..4b3e4facb 100644 --- a/src/db/options.rs +++ b/src/db/options.rs @@ -320,7 +320,7 @@ pub struct ChangeStreamPreAndPostImages { #[non_exhaustive] pub struct RunCursorCommandOptions { /// The default read preference for operations. - pub read_preference: Option, + pub read_preference: Option, /// The type of cursor to return. pub cursor_type: Option, /// Number of documents to return per batch. diff --git a/src/operation/run_cursor_command.rs b/src/operation/run_cursor_command.rs index 6985ad465..65f58bcac 100644 --- a/src/operation/run_cursor_command.rs +++ b/src/operation/run_cursor_command.rs @@ -44,8 +44,8 @@ impl<'conn> Operation for RunCursorCommand<'conn> { const NAME: &'static str = "run_cursor_command"; - fn build(&mut self, _description: &StreamDescription) -> Result> { - self.run_command.build(_description) + fn build(&mut self, description: &StreamDescription) -> Result> { + self.run_command.build(description) } fn serialize_command(&mut self, cmd: Command) -> Result> { @@ -54,9 +54,9 @@ impl<'conn> Operation for RunCursorCommand<'conn> { fn extract_at_cluster_time( &self, - _response: &bson::RawDocument, + response: &bson::RawDocument, ) -> Result> { - self.run_command.extract_at_cluster_time(_response) + self.run_command.extract_at_cluster_time(response) } fn handle_error(&self, error: Error) -> Result { @@ -75,8 +75,8 @@ impl<'conn> Operation for RunCursorCommand<'conn> { self.run_command.write_concern() } - fn supports_read_concern(&self, _description: &StreamDescription) -> bool { - self.run_command.supports_read_concern(_description) + fn supports_read_concern(&self, description: &StreamDescription) -> bool { + self.run_command.supports_read_concern(description) } fn supports_sessions(&self) -> bool { From 6fac64b236847dcd1502d4e500ee38d228f38dab Mon Sep 17 00:00:00 2001 From: Drshika A <67125579+drshika@users.noreply.github.com> Date: Tue, 18 Jul 2023 12:41:57 -0400 Subject: [PATCH 10/15] Update src/db/options.rs Co-authored-by: Isabel Atkinson --- src/db/options.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db/options.rs b/src/db/options.rs index 4b3e4facb..3ae0989de 100644 --- a/src/db/options.rs +++ b/src/db/options.rs @@ -320,7 +320,7 @@ pub struct ChangeStreamPreAndPostImages { #[non_exhaustive] pub struct RunCursorCommandOptions { /// The default read preference for operations. - pub read_preference: Option, + pub selection_criteria: Option, /// The type of cursor to return. pub cursor_type: Option, /// Number of documents to return per batch. From 5eeafec196d5a287f58e4317d16639e269302c79 Mon Sep 17 00:00:00 2001 From: Drshika A <67125579+drshika@users.noreply.github.com> Date: Tue, 18 Jul 2023 12:42:05 -0400 Subject: [PATCH 11/15] Update src/db.rs Co-authored-by: Isabel Atkinson --- src/db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db.rs b/src/db.rs index 89b95c81c..912c59e6b 100644 --- a/src/db.rs +++ b/src/db.rs @@ -482,7 +482,7 @@ impl Database { pub async fn run_cursor_command( &self, command: Document, - options: RunCursorCommandOptions, + options: impl Into>, ) -> Result> { let rcc = RunCommand::new(self.name().to_string(), command, options.read_preference.clone(), None)?; let rc_command = RunCursorCommand::new(rcc, options)?; From 7afc8a574edea70f49d976f422cc039bc916e330 Mon Sep 17 00:00:00 2001 From: drshika <67125579+drshika@users.noreply.github.com> Date: Wed, 19 Jul 2023 12:30:43 -0400 Subject: [PATCH 12/15] resolve remaining comments --- src/db.rs | 36 +++++++++++++++--------------------- src/db/options.rs | 2 +- 2 files changed, 16 insertions(+), 22 deletions(-) diff --git a/src/db.rs b/src/db.rs index 912c59e6b..c17eab749 100644 --- a/src/db.rs +++ b/src/db.rs @@ -482,9 +482,9 @@ impl Database { pub async fn run_cursor_command( &self, command: Document, - options: impl Into>, + options: RunCursorCommandOptions, ) -> Result> { - let rcc = RunCommand::new(self.name().to_string(), command, options.read_preference.clone(), None)?; + let rcc = RunCommand::new(self.name().to_string(), command, options.selection_criteria.clone(), None)?; let rc_command = RunCursorCommand::new(rcc, options)?; let client = self.client(); client.execute_cursor_operation(rc_command).await @@ -494,29 +494,23 @@ impl Database { pub async fn run_cursor_command_with_session( &self, command: Document, - options: RunCursorCommandOptions, + options: impl Into>, session: &mut ClientSession, ) -> Result> { - let mut selection_criteria = options.read_preference.clone(); - match session.transaction.state { - TransactionState::Starting | TransactionState::InProgress => { - selection_criteria = match selection_criteria { - Some(selection_criteria) => Some(selection_criteria), - None => { - if let Some(ref options) = session.transaction.options { - options.selection_criteria.clone() - } else { - None - } - } - }; - } - _ => {} - } + let mut options: Option = options.into().clone(); + resolve_selection_criteria_with_session!(self, options, Some(&mut *session))?; + let selection_criteria = match options.clone() { + Some(options) => options.selection_criteria, + None => None, + }; + let option = match options.clone() { + Some(options) => options, + None => RunCursorCommandOptions::default(), + }; let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?; - let rc_command = RunCursorCommand::new(rcc, options)?; + let rc_command = RunCursorCommand::new(rcc, option)?; let client = self.client(); - client.execute_session_cursor_operation(rc_command, session).await + client.execute_session_cursor_operation(rc_command, session).await } /// Runs a database-level command using the provided `ClientSession`. diff --git a/src/db/options.rs b/src/db/options.rs index 3ae0989de..66c46cd4c 100644 --- a/src/db/options.rs +++ b/src/db/options.rs @@ -9,7 +9,7 @@ use crate::{ bson::{Bson, Document}, concern::{ReadConcern, WriteConcern}, options::{Collation, CursorType}, - selection_criteria::{SelectionCriteria, ReadPreference}, + selection_criteria::{SelectionCriteria}, serde_util, }; From 54185f8879625569838c0fda3cd91d54fd3dcd3f Mon Sep 17 00:00:00 2001 From: drshika <67125579+drshika@users.noreply.github.com> Date: Wed, 19 Jul 2023 14:58:33 -0400 Subject: [PATCH 13/15] resolve remaining comments --- src/db.rs | 37 ++++++++++++----------- src/db/options.rs | 4 +-- src/operation/run_cursor_command.rs | 46 ++++++++++++++++------------- 3 files changed, 46 insertions(+), 41 deletions(-) diff --git a/src/db.rs b/src/db.rs index c17eab749..785b0669a 100644 --- a/src/db.rs +++ b/src/db.rs @@ -2,9 +2,9 @@ pub mod options; use std::{fmt::Debug, sync::Arc}; -use futures_util::TryStreamExt; #[cfg(feature = "in-use-encryption-unstable")] use futures_util::stream::TryStreamExt; +use futures_util::TryStreamExt; use crate::{ bson::{Bson, Document}, @@ -15,9 +15,9 @@ use crate::{ ChangeStream, }, client::session::TransactionState, - cmap::{conn::PinnedConnectionHandle}, + cmap::conn::PinnedConnectionHandle, concern::{ReadConcern, WriteConcern}, - cursor::{Cursor}, + cursor::Cursor, error::{Error, ErrorKind, Result}, gridfs::{options::GridFsBucketOptions, GridFsBucket}, operation::{ @@ -482,10 +482,14 @@ impl Database { pub async fn run_cursor_command( &self, command: Document, - options: RunCursorCommandOptions, + options: impl Into>, ) -> Result> { - let rcc = RunCommand::new(self.name().to_string(), command, options.selection_criteria.clone(), None)?; - let rc_command = RunCursorCommand::new(rcc, options)?; + let options: Option = options.into().clone(); + let selection_criteria = options + .as_ref() + .and_then(|options| options.selection_criteria.clone()); + let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?; + let rc_command = RunCursorCommand::new(rcc, options)?; let client = self.client(); client.execute_cursor_operation(rc_command).await } @@ -497,20 +501,17 @@ impl Database { options: impl Into>, session: &mut ClientSession, ) -> Result> { - let mut options: Option = options.into().clone(); + let mut options: Option = options.into(); resolve_selection_criteria_with_session!(self, options, Some(&mut *session))?; - let selection_criteria = match options.clone() { - Some(options) => options.selection_criteria, - None => None, - }; - let option = match options.clone() { - Some(options) => options, - None => RunCursorCommandOptions::default(), - }; + let selection_criteria = options + .as_ref() + .and_then(|options| options.selection_criteria.clone()); let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?; - let rc_command = RunCursorCommand::new(rcc, option)?; + let rc_command = RunCursorCommand::new(rcc, options)?; let client = self.client(); - client.execute_session_cursor_operation(rc_command, session).await + client + .execute_session_cursor_operation(rc_command, session) + .await } /// Runs a database-level command using the provided `ClientSession`. @@ -650,4 +651,4 @@ impl Database { pub fn gridfs_bucket(&self, options: impl Into>) -> GridFsBucket { GridFsBucket::new(self.clone(), options.into().unwrap_or_default()) } -} \ No newline at end of file +} diff --git a/src/db/options.rs b/src/db/options.rs index 66c46cd4c..13302a714 100644 --- a/src/db/options.rs +++ b/src/db/options.rs @@ -9,7 +9,7 @@ use crate::{ bson::{Bson, Document}, concern::{ReadConcern, WriteConcern}, options::{Collation, CursorType}, - selection_criteria::{SelectionCriteria}, + selection_criteria::SelectionCriteria, serde_util, }; @@ -331,4 +331,4 @@ pub struct RunCursorCommandOptions { /// Optional BSON value. Use this value to configure the comment option sent on subsequent /// getMore commands. pub comment: Option, -} \ No newline at end of file +} diff --git a/src/operation/run_cursor_command.rs b/src/operation/run_cursor_command.rs index 65f58bcac..7547a374c 100644 --- a/src/operation/run_cursor_command.rs +++ b/src/operation/run_cursor_command.rs @@ -1,37 +1,29 @@ - - #[cfg(feature = "in-use-encryption-unstable")] use bson::doc; use bson::RawDocumentBuf; - use crate::{ cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription}, - concern::{WriteConcern}, - cursor::{CursorSpecification}, + concern::WriteConcern, + cursor::CursorSpecification, error::{Error, Result}, - operation::{ - Operation, - RunCommand, - }, - options::{ - RunCursorCommandOptions, - }, + operation::{Operation, RunCommand}, + options::RunCursorCommandOptions, selection_criteria::SelectionCriteria, }; #[derive(Debug, Clone)] pub(crate) struct RunCursorCommand<'conn> { run_command: RunCommand<'conn>, - options: RunCursorCommandOptions, + options: Option, } -impl<'conn> RunCursorCommand<'conn>{ - pub(crate) fn new ( +impl<'conn> RunCursorCommand<'conn> { + pub(crate) fn new( run_command: RunCommand<'conn>, - options: RunCursorCommandOptions, + options: Option, ) -> Result { - Ok(Self{ + Ok(Self { run_command, options, }) @@ -106,12 +98,24 @@ impl<'conn> Operation for RunCursorCommand<'conn> { ) -> Result { let doc = Operation::handle_response(&self.run_command, response, description)?; let cursor_info = bson::from_document(doc)?; + let batch_size = match &self.options { + Some(options) => options.batch_size.clone(), + None => None, + }; + let max_time = match &self.options { + Some(options) => options.max_time.clone(), + None => None, + }; + let comment = match &self.options { + Some(options) => options.comment.clone(), + None => None, + }; Ok(CursorSpecification::new( cursor_info, description.server_address.clone(), - self.options.batch_size, - self.options.max_time, - self.options.comment.clone(), + batch_size, + max_time, + comment, )) } -} \ No newline at end of file +} From 0836a45a4c1069aefed839bbaa550e92837b60a2 Mon Sep 17 00:00:00 2001 From: drshika <67125579+drshika@users.noreply.github.com> Date: Wed, 19 Jul 2023 16:29:51 -0400 Subject: [PATCH 14/15] fix failing test cases --- src/db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db.rs b/src/db.rs index 785b0669a..ab3e683b8 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3,8 +3,8 @@ pub mod options; use std::{fmt::Debug, sync::Arc}; #[cfg(feature = "in-use-encryption-unstable")] +use bson::doc; use futures_util::stream::TryStreamExt; -use futures_util::TryStreamExt; use crate::{ bson::{Bson, Document}, From 9c690a63704dea95cf925b8ec5f7c004dfae8a07 Mon Sep 17 00:00:00 2001 From: Drshika A <67125579+drshika@users.noreply.github.com> Date: Wed, 19 Jul 2023 17:28:57 -0400 Subject: [PATCH 15/15] Update src/db.rs Co-authored-by: Isabel Atkinson --- src/db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db.rs b/src/db.rs index ab3e683b8..9fb870584 100644 --- a/src/db.rs +++ b/src/db.rs @@ -484,7 +484,7 @@ impl Database { command: Document, options: impl Into>, ) -> Result> { - let options: Option = options.into().clone(); + let options: Option = options.into(); let selection_criteria = options .as_ref() .and_then(|options| options.selection_criteria.clone());