Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUST-1588: Add RunCursorCommand #912

Merged
merged 15 commits into from
Jul 19, 2023
54 changes: 49 additions & 5 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ pub mod options;

use std::{fmt::Debug, sync::Arc};

use futures_util::TryStreamExt;
#[cfg(feature = "in-use-encryption-unstable")]
use bson::doc;
use futures_util::stream::TryStreamExt;

use crate::{
Expand All @@ -15,19 +15,28 @@ use crate::{
ChangeStream,
},
client::session::TransactionState,
cmap::conn::PinnedConnectionHandle,
cmap::{conn::PinnedConnectionHandle},
concern::{ReadConcern, WriteConcern},
cursor::Cursor,
cursor::{Cursor},
error::{Error, ErrorKind, Result},
gridfs::{options::GridFsBucketOptions, GridFsBucket},
operation::{Aggregate, AggregateTarget, Create, DropDatabase, ListCollections, RunCommand},
operation::{
Aggregate,
AggregateTarget,
Create,
DropDatabase,
ListCollections,
RunCommand,
RunCursorCommand,
},
options::{
AggregateOptions,
CollectionOptions,
CreateCollectionOptions,
DatabaseOptions,
DropDatabaseOptions,
ListCollectionsOptions,
RunCursorCommandOptions,
},
results::CollectionSpecification,
selection_criteria::SelectionCriteria,
Expand Down Expand Up @@ -469,6 +478,41 @@ impl Database {
.await
}

/// Runs a database-level command and returns a cursor to the response.
pub async fn run_cursor_command(
&self,
command: Document,
options: RunCursorCommandOptions,
drshika marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The option type still needs to be updated here

) -> Result<Cursor<Document>> {
let rcc = RunCommand::new(self.name().to_string(), command, options.selection_criteria.clone(), None)?;
let rc_command = RunCursorCommand::new(rcc, options)?;
let client = self.client();
client.execute_cursor_operation(rc_command).await
}

/// Runs a database-level command and returns a cursor to the response.
pub async fn run_cursor_command_with_session(
&self,
command: Document,
options: impl Into<Option<RunCursorCommandOptions>>,
session: &mut ClientSession,
) -> Result<SessionCursor<Document>> {
let mut options: Option<RunCursorCommandOptions> = options.into().clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cloning shouldn't be necessary here

resolve_selection_criteria_with_session!(self, options, Some(&mut *session))?;
let selection_criteria = match options.clone() {
Some(options) => options.selection_criteria,
None => None,
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than cloning the entire options struct, it would be more efficient only to clone the selection_criteria. You should be able to do this by using a reference to the options, which can be done either like this:

let selection_criteria = match options {
    Some(ref options) => options.selection_criteria.clone(),
    None => None,
};

or this:

let selection_criteria = match &options {
    Some(options) => options.selection_criteria.clone(),
    None => None,
}

(I slightly prefer the first one for no good reason; either is totally fine.)

You can also use and_then to achieve the same thing:

let selection_criteria = options
    .as_ref()
    .and_then(|options| options.selection_criteria.clone());

We use as_ref to get a reference to the struct inside the Option rather than an owned version of it so that we can continue to use it later. and_then does something called a flat map (you can read more about it here), which is useful when you want to avoid getting data that's wrapped in multiple layers of Options/Results.

All three of these options would work here, but it's generally considered best practice to use and_then rather than an explicit match in situations like these.

let option = match options.clone() {
Some(options) => options,
None => RunCursorCommandOptions::default(),
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than creating an empty options struct when the user hasn't provided any options, we can just make the options field on RunCursorCommand an Option<RunCursorCommandOptions>. This will save us an allocation when options is None. (You'll probably need to do similar calls to and_then in handle_response as I described above to extract fields from the options).

let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?;
let rc_command = RunCursorCommand::new(rcc, option)?;
let client = self.client();
client.execute_session_cursor_operation(rc_command, session).await
}

/// Runs a database-level command using the provided `ClientSession`.
///
/// If the `ClientSession` provided is currently in a transaction, `command` must not specify a
Expand Down Expand Up @@ -606,4 +650,4 @@ impl Database {
pub fn gridfs_bucket(&self, options: impl Into<Option<GridFsBucketOptions>>) -> GridFsBucket {
GridFsBucket::new(self.clone(), options.into().unwrap_or_default())
}
}
}
24 changes: 22 additions & 2 deletions src/db/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use typed_builder::TypedBuilder;
use crate::{
bson::{Bson, Document},
concern::{ReadConcern, WriteConcern},
options::Collation,
selection_criteria::SelectionCriteria,
options::{Collation, CursorType},
selection_criteria::{SelectionCriteria},
serde_util,
};

Expand Down Expand Up @@ -312,3 +312,23 @@ pub struct ChangeStreamPreAndPostImages {
/// If `true`, change streams will be able to include pre- and post-images.
pub enabled: bool,
}

/// Specifies the options to a
/// [`Database::RunCursorCommand`](../struct.Database.html#method.run_cursor_command) operation.
#[derive(Clone, Debug, Default, TypedBuilder)]
#[builder(field_defaults(default, setter(into)))]
#[non_exhaustive]
pub struct RunCursorCommandOptions {
drshika marked this conversation as resolved.
Show resolved Hide resolved
/// The default read preference for operations.
pub selection_criteria: Option<SelectionCriteria>,
/// The type of cursor to return.
pub cursor_type: Option<CursorType>,
/// Number of documents to return per batch.
pub batch_size: Option<u32>,
/// Optional non-negative integer value. Use this value to configure the maxTimeMS option sent
/// on subsequent getMore commands.
pub max_time: Option<Duration>,
/// Optional BSON value. Use this value to configure the comment option sent on subsequent
/// getMore commands.
pub comment: Option<Bson>,
}
2 changes: 2 additions & 0 deletions src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod list_databases;
mod list_indexes;
mod raw_output;
mod run_command;
mod run_cursor_command;
mod update;

#[cfg(test)]
Expand Down Expand Up @@ -71,6 +72,7 @@ pub(crate) use list_indexes::ListIndexes;
#[cfg(feature = "in-use-encryption-unstable")]
pub(crate) use raw_output::RawOutput;
pub(crate) use run_command::RunCommand;
pub(crate) use run_cursor_command::RunCursorCommand;
pub(crate) use update::Update;

const SERVER_4_2_0_WIRE_VERSION: i32 = 8;
Expand Down
117 changes: 117 additions & 0 deletions src/operation/run_cursor_command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@


#[cfg(feature = "in-use-encryption-unstable")]
use bson::doc;
use bson::RawDocumentBuf;


use crate::{
cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription},
concern::{WriteConcern},
cursor::{CursorSpecification},
error::{Error, Result},
operation::{
Operation,
RunCommand,
},
options::{
RunCursorCommandOptions,
},
selection_criteria::SelectionCriteria,
};

#[derive(Debug, Clone)]
pub(crate) struct RunCursorCommand<'conn> {
run_command: RunCommand<'conn>,
options: RunCursorCommandOptions,
}

impl<'conn> RunCursorCommand<'conn>{
pub(crate) fn new (
run_command: RunCommand<'conn>,
options: RunCursorCommandOptions,
) -> Result<Self> {
Ok(Self{
run_command,
options,
})
}
}

impl<'conn> Operation for RunCursorCommand<'conn> {
type O = CursorSpecification;
type Command = RawDocumentBuf;

const NAME: &'static str = "run_cursor_command";

fn build(&mut self, description: &StreamDescription) -> Result<Command<Self::Command>> {
self.run_command.build(description)
}

fn serialize_command(&mut self, cmd: Command<Self::Command>) -> Result<Vec<u8>> {
self.run_command.serialize_command(cmd)
}

fn extract_at_cluster_time(
&self,
response: &bson::RawDocument,
) -> Result<Option<bson::Timestamp>> {
self.run_command.extract_at_cluster_time(response)
}

fn handle_error(&self, error: Error) -> Result<Self::O> {
Err(error)
}

fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.run_command.selection_criteria()
}

fn is_acknowledged(&self) -> bool {
self.run_command.is_acknowledged()
}

fn write_concern(&self) -> Option<&WriteConcern> {
self.run_command.write_concern()
}

fn supports_read_concern(&self, description: &StreamDescription) -> bool {
self.run_command.supports_read_concern(description)
}

fn supports_sessions(&self) -> bool {
self.run_command.supports_sessions()
}

fn retryability(&self) -> crate::operation::Retryability {
self.run_command.retryability()
}

fn update_for_retry(&mut self) {
self.run_command.update_for_retry()
}

fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
self.run_command.pinned_connection()
}

fn name(&self) -> &str {
self.run_command.name()
}

fn handle_response(
&self,
response: RawCommandResponse,
description: &StreamDescription,
) -> Result<Self::O> {
let doc = Operation::handle_response(&self.run_command, response, description)?;
let cursor_info = bson::from_document(doc)?;
Ok(CursorSpecification::new(
cursor_info,
description.server_address.clone(),
self.options.batch_size,
self.options.max_time,
self.options.comment.clone(),
))
}
}
2 changes: 1 addition & 1 deletion src/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::{
bson::{serde_helpers, Bson, Document},
change_stream::event::ResumeToken,
db::options::CreateCollectionOptions,
Namespace,
serde_util,
Namespace,
};

use bson::{Binary, RawDocumentBuf};
Expand Down