Skip to content

Commit

Permalink
resolve remaining comments
Browse files Browse the repository at this point in the history
  • Loading branch information
drshika committed Jul 19, 2023
1 parent 7afc8a5 commit 54185f8
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 41 deletions.
37 changes: 19 additions & 18 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ pub mod options;

use std::{fmt::Debug, sync::Arc};

use futures_util::TryStreamExt;
#[cfg(feature = "in-use-encryption-unstable")]
use futures_util::stream::TryStreamExt;
use futures_util::TryStreamExt;

use crate::{
bson::{Bson, Document},
Expand All @@ -15,9 +15,9 @@ use crate::{
ChangeStream,
},
client::session::TransactionState,
cmap::{conn::PinnedConnectionHandle},
cmap::conn::PinnedConnectionHandle,
concern::{ReadConcern, WriteConcern},
cursor::{Cursor},
cursor::Cursor,
error::{Error, ErrorKind, Result},
gridfs::{options::GridFsBucketOptions, GridFsBucket},
operation::{
Expand Down Expand Up @@ -482,10 +482,14 @@ impl Database {
pub async fn run_cursor_command(
&self,
command: Document,
options: RunCursorCommandOptions,
options: impl Into<Option<RunCursorCommandOptions>>,
) -> Result<Cursor<Document>> {
let rcc = RunCommand::new(self.name().to_string(), command, options.selection_criteria.clone(), None)?;
let rc_command = RunCursorCommand::new(rcc, options)?;
let options: Option<RunCursorCommandOptions> = options.into().clone();
let selection_criteria = options
.as_ref()
.and_then(|options| options.selection_criteria.clone());
let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?;
let rc_command = RunCursorCommand::new(rcc, options)?;
let client = self.client();
client.execute_cursor_operation(rc_command).await
}
Expand All @@ -497,20 +501,17 @@ impl Database {
options: impl Into<Option<RunCursorCommandOptions>>,
session: &mut ClientSession,
) -> Result<SessionCursor<Document>> {
let mut options: Option<RunCursorCommandOptions> = options.into().clone();
let mut options: Option<RunCursorCommandOptions> = options.into();
resolve_selection_criteria_with_session!(self, options, Some(&mut *session))?;
let selection_criteria = match options.clone() {
Some(options) => options.selection_criteria,
None => None,
};
let option = match options.clone() {
Some(options) => options,
None => RunCursorCommandOptions::default(),
};
let selection_criteria = options
.as_ref()
.and_then(|options| options.selection_criteria.clone());
let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?;
let rc_command = RunCursorCommand::new(rcc, option)?;
let rc_command = RunCursorCommand::new(rcc, options)?;
let client = self.client();
client.execute_session_cursor_operation(rc_command, session).await
client
.execute_session_cursor_operation(rc_command, session)
.await
}

/// Runs a database-level command using the provided `ClientSession`.
Expand Down Expand Up @@ -650,4 +651,4 @@ impl Database {
pub fn gridfs_bucket(&self, options: impl Into<Option<GridFsBucketOptions>>) -> GridFsBucket {
GridFsBucket::new(self.clone(), options.into().unwrap_or_default())
}
}
}
4 changes: 2 additions & 2 deletions src/db/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
bson::{Bson, Document},
concern::{ReadConcern, WriteConcern},
options::{Collation, CursorType},
selection_criteria::{SelectionCriteria},
selection_criteria::SelectionCriteria,
serde_util,
};

Expand Down Expand Up @@ -331,4 +331,4 @@ pub struct RunCursorCommandOptions {
/// Optional BSON value. Use this value to configure the comment option sent on subsequent
/// getMore commands.
pub comment: Option<Bson>,
}
}
46 changes: 25 additions & 21 deletions src/operation/run_cursor_command.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,29 @@


#[cfg(feature = "in-use-encryption-unstable")]
use bson::doc;
use bson::RawDocumentBuf;


use crate::{
cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription},
concern::{WriteConcern},
cursor::{CursorSpecification},
concern::WriteConcern,
cursor::CursorSpecification,
error::{Error, Result},
operation::{
Operation,
RunCommand,
},
options::{
RunCursorCommandOptions,
},
operation::{Operation, RunCommand},
options::RunCursorCommandOptions,
selection_criteria::SelectionCriteria,
};

#[derive(Debug, Clone)]
pub(crate) struct RunCursorCommand<'conn> {
run_command: RunCommand<'conn>,
options: RunCursorCommandOptions,
options: Option<RunCursorCommandOptions>,
}

impl<'conn> RunCursorCommand<'conn>{
pub(crate) fn new (
impl<'conn> RunCursorCommand<'conn> {
pub(crate) fn new(
run_command: RunCommand<'conn>,
options: RunCursorCommandOptions,
options: Option<RunCursorCommandOptions>,
) -> Result<Self> {
Ok(Self{
Ok(Self {
run_command,
options,
})
Expand Down Expand Up @@ -106,12 +98,24 @@ impl<'conn> Operation for RunCursorCommand<'conn> {
) -> Result<Self::O> {
let doc = Operation::handle_response(&self.run_command, response, description)?;
let cursor_info = bson::from_document(doc)?;
let batch_size = match &self.options {
Some(options) => options.batch_size.clone(),
None => None,
};
let max_time = match &self.options {
Some(options) => options.max_time.clone(),
None => None,
};
let comment = match &self.options {
Some(options) => options.comment.clone(),
None => None,
};
Ok(CursorSpecification::new(
cursor_info,
description.server_address.clone(),
self.options.batch_size,
self.options.max_time,
self.options.comment.clone(),
batch_size,
max_time,
comment,
))
}
}
}

0 comments on commit 54185f8

Please sign in to comment.