Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUST-1588: Add RunCursorCommand #912

Merged
merged 15 commits into from
Jul 19, 2023
141 changes: 138 additions & 3 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{fmt::Debug, sync::Arc};

#[cfg(feature = "in-use-encryption-unstable")]
use bson::doc;
use bson::RawDocumentBuf;
use futures_util::stream::TryStreamExt;

use crate::{
Expand All @@ -15,19 +16,28 @@ use crate::{
ChangeStream,
},
client::session::TransactionState,
cmap::conn::PinnedConnectionHandle,
cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription},
concern::{ReadConcern, WriteConcern},
cursor::Cursor,
cursor::{Cursor, CursorSpecification},
error::{Error, ErrorKind, Result},
gridfs::{options::GridFsBucketOptions, GridFsBucket},
operation::{Aggregate, AggregateTarget, Create, DropDatabase, ListCollections, RunCommand},
operation::{
Aggregate,
AggregateTarget,
Create,
DropDatabase,
ListCollections,
Operation,
RunCommand,
},
options::{
AggregateOptions,
CollectionOptions,
CreateCollectionOptions,
DatabaseOptions,
DropDatabaseOptions,
ListCollectionsOptions,
RunCursorCommandOptions,
},
results::CollectionSpecification,
selection_criteria::SelectionCriteria,
Expand Down Expand Up @@ -469,6 +479,48 @@ impl Database {
.await
}

/// Runs a database-level command and returns a cursor to the response.
pub async fn run_cursor_command(
&self,
command: Document,
options: RunCursorCommandOptions,
drshika marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The option type still needs to be updated here

) -> Result<Cursor<Document>> {
let rcc = RunCommand::new(self.name().to_string(), command, None, None)?;
let rc_command = RunCursorCommand {
run_command: rcc,
options,
};
let client = self.client();
client.execute_cursor_operation(rc_command).await
}

/// Runs a database-level command and returns a cursor to the response.
pub async fn run_cursor_command_with_session(
&self,
command: Document,
options: RunCursorCommandOptions,
drshika marked this conversation as resolved.
Show resolved Hide resolved
session: &mut ClientSession,
) -> Result<SessionCursor<Document>> {
match session.transaction.state {
TransactionState::Starting | TransactionState::InProgress => {
if command.contains_key("readConcern") {
drshika marked this conversation as resolved.
Show resolved Hide resolved
return Err(ErrorKind::InvalidArgument {
message: "Cannot set read concern after starting a transaction".into(),
}
.into());
}
}
_ => {}
}
let rcc = RunCommand::new(self.name().to_string(), command, options.read_preference.clone(), None)?;
let rc_command = RunCursorCommand {
run_command: rcc,
options,
};
let client = self.client();
client.execute_session_cursor_operation(rc_command, session).await
}

/// Runs a database-level command using the provided `ClientSession`.
///
/// If the `ClientSession` provided is currently in a transaction, `command` must not specify a
Expand Down Expand Up @@ -607,3 +659,86 @@ impl Database {
GridFsBucket::new(self.clone(), options.into().unwrap_or_default())
}
}

pub(super) struct RunCursorCommand<'conn> {
drshika marked this conversation as resolved.
Show resolved Hide resolved
run_command: RunCommand<'conn>,
options: RunCursorCommandOptions,
}

impl<'conn> Operation for RunCursorCommand<'conn> {
type O = CursorSpecification;
type Command = RawDocumentBuf;

const NAME: &'static str = "run_cursor_command";

fn build(&mut self, _description: &StreamDescription) -> Result<Command<Self::Command>> {
drshika marked this conversation as resolved.
Show resolved Hide resolved
drshika marked this conversation as resolved.
Show resolved Hide resolved
self.run_command.build(_description)
}

fn serialize_command(&mut self, cmd: Command<Self::Command>) -> Result<Vec<u8>> {
self.run_command.serialize_command(cmd)
}

fn extract_at_cluster_time(
&self,
_response: &bson::RawDocument,
) -> Result<Option<bson::Timestamp>> {
self.run_command.extract_at_cluster_time(_response)
}

fn handle_error(&self, error: Error) -> Result<Self::O> {
Err(error)
}

fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.run_command.selection_criteria()
}

fn is_acknowledged(&self) -> bool {
self.run_command.is_acknowledged()
}

fn write_concern(&self) -> Option<&WriteConcern> {
self.run_command.write_concern()
}

fn supports_read_concern(&self, _description: &StreamDescription) -> bool {
self.run_command.supports_read_concern(_description)
}

fn supports_sessions(&self) -> bool {
self.run_command.supports_sessions()
}

fn retryability(&self) -> crate::operation::Retryability {
self.run_command.retryability()
}

fn update_for_retry(&mut self) {
self.run_command.update_for_retry()
}

fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
self.run_command.pinned_connection()
}

fn name(&self) -> &str {
self.run_command.name()
}

fn handle_response(
&self,
response: RawCommandResponse,
description: &StreamDescription,
) -> Result<Self::O> {
let doc = Operation::handle_response(&self.run_command, response, description)?;
let cursor_info = bson::from_document(doc)?;
Ok(CursorSpecification::new(
cursor_info,
description.server_address.clone(),
self.options.batch_size,
self.options.max_time_ms,
self.options.comment.clone(),
))
}
}
36 changes: 35 additions & 1 deletion src/db/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use typed_builder::TypedBuilder;
use crate::{
bson::{Bson, Document},
concern::{ReadConcern, WriteConcern},
options::Collation,
options::{Collation, CursorType},
selection_criteria::SelectionCriteria,
serde_util,
};
Expand Down Expand Up @@ -312,3 +312,37 @@ pub struct ChangeStreamPreAndPostImages {
/// If `true`, change streams will be able to include pre- and post-images.
pub enabled: bool,
}

/// Determines the lifetime of a cursor.
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum TimeoutMode {
/// Times out after one Iteration.
Iteration,
/// Times out after the end of the Cursor Lifetime.
CursorLifetime,
}
drshika marked this conversation as resolved.
Show resolved Hide resolved

/// Specifies the options to a
/// [`Database::RunCursorCommand`](../struct.Database.html#method.run_cursor_command) operation.
#[derive(Clone, Debug, Default, TypedBuilder)]
#[builder(field_defaults(default, setter(into)))]
#[non_exhaustive]
pub struct RunCursorCommandOptions {
drshika marked this conversation as resolved.
Show resolved Hide resolved
/// Optional string enum value, one of 'iteration' | 'cursorLifetime'.
pub timeout_mode: Option<TimeoutMode>,
/// The default read preference for operations.
pub read_preference: Option<SelectionCriteria>,
drshika marked this conversation as resolved.
Show resolved Hide resolved
drshika marked this conversation as resolved.
Show resolved Hide resolved
/// Optional string enum value, one of 'tailable' | 'tailableAwait' | 'nonTailable'.
drshika marked this conversation as resolved.
Show resolved Hide resolved
pub cursor_type: Option<CursorType>,
/// Number of documents to return per batch.
pub batch_size: Option<u32>,
/// Optional non-negative integer value. Use this value to configure the maxTimeMS option sent
/// on subsequent getMore commands.
pub max_time_ms: Option<Duration>,
drshika marked this conversation as resolved.
Show resolved Hide resolved
/// Optional BSON value. Use this value to configure the comment option sent on subsequent
/// getMore commands.
pub comment: Option<Bson>,
/// The session to run this command with.
drshika marked this conversation as resolved.
Show resolved Hide resolved
pub session: Option<String>,
}
2 changes: 1 addition & 1 deletion src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub(crate) use run_command::RunCommand;
pub(crate) use update::Update;

const SERVER_4_2_0_WIRE_VERSION: i32 = 8;
const SERVER_4_4_0_WIRE_VERSION: i32 = 9;
pub const SERVER_4_4_0_WIRE_VERSION: i32 = 9;
drshika marked this conversation as resolved.
Show resolved Hide resolved

/// A trait modeling the behavior of a server side operation.
///
Expand Down
2 changes: 1 addition & 1 deletion src/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::{
bson::{serde_helpers, Bson, Document},
change_stream::event::ResumeToken,
db::options::CreateCollectionOptions,
Namespace,
serde_util,
Namespace,
};

use bson::{Binary, RawDocumentBuf};
Expand Down