Skip to content

Commit

Permalink
RUST-954 Pin connections for transactions when connected to a load ba…
Browse files Browse the repository at this point in the history
…lancer (mongodb#461)
  • Loading branch information
abr-egn authored and Andrew Witten committed Sep 21, 2021
1 parent 9e55062 commit 4406f78
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 39 deletions.
40 changes: 31 additions & 9 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,23 +159,32 @@ impl Client {
T: DeserializeOwned + Unpin + Send + Sync,
{
let mut details = self.execute_operation_with_details(op, session).await?;
let pinned = self.pin_connection_for_cursor(&mut details.output)?;
let pinned = if details.output.connection.is_pinned() {
// Cursor operations on load-balanced transactions will be pinned via the transaction
// pin.
None
} else {
self.pin_connection_for_cursor(&mut details.output)?
};
Ok(SessionCursor::new(
self.clone(),
details.output.operation_output,
pinned,
))
}

fn is_load_balanced(&self) -> bool {
self.inner.options.load_balanced.unwrap_or(false)
}

fn pin_connection_for_cursor<Op, T>(
&self,
details: &mut ExecutionOutput<Op>,
) -> Result<Option<PinnedConnectionHandle>>
where
Op: Operation<O = CursorSpecification<T>>,
{
let is_load_balanced = self.inner.options.load_balanced.unwrap_or(false);
if is_load_balanced && details.operation_output.info.id != 0 {
if self.is_load_balanced() && details.operation_output.info.id != 0 {
Ok(Some(details.connection.pin()?))
} else {
Ok(None)
Expand Down Expand Up @@ -205,7 +214,7 @@ impl Client {

let selection_criteria = session
.as_ref()
.and_then(|s| s.transaction.pinned_mongos.as_ref())
.and_then(|s| s.transaction.pinned_mongos())
.or_else(|| op.selection_criteria());

let server = match self.select_server(selection_criteria).await {
Expand Down Expand Up @@ -317,9 +326,20 @@ impl Client {
}
};

let mut conn = match op.pinned_connection() {
Some(c) => c.take_connection().await?,
None => match server.pool.check_out().await {
let session_pinned = session
.as_ref()
.and_then(|s| s.transaction.pinned_connection());
let mut conn = match (session_pinned, op.pinned_connection()) {
(Some(c), None) | (None, Some(c)) => c.take_connection().await?,
(Some(c), Some(_)) => {
// An operation executing in a transaction should never have a pinned connection,
// but in case it does, prefer the transaction's pin.
if cfg!(debug_assertions) {
panic!("pinned operation executing in pinned transaction");
}
c.take_connection().await?
}
(None, None) => match server.pool.check_out().await {
Ok(c) => c,
Err(_) => return Err(first_error),
},
Expand Down Expand Up @@ -411,7 +431,9 @@ impl Client {
cmd.set_start_transaction();
cmd.set_autocommit();
cmd.set_txn_read_concern(*session);
if is_sharded {
if self.is_load_balanced() {
session.pin_connection(connection.pin()?);
} else if is_sharded {
session.pin_mongos(connection.address().clone());
}
session.transaction.state = TransactionState::InProgress;
Expand Down Expand Up @@ -816,7 +838,7 @@ impl Error {
if self.contains_label(TRANSIENT_TRANSACTION_ERROR)
|| self.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT)
{
session.unpin_mongos();
session.unpin();
}
}

Expand Down
61 changes: 48 additions & 13 deletions src/client/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use uuid::Uuid;

use crate::{
bson::{doc, spec::BinarySubtype, Binary, Bson, Document, Timestamp},
cmap::conn::PinnedConnectionHandle,
error::{ErrorKind, Result},
operation::{AbortTransaction, CommitTransaction, Operation},
options::{SessionOptions, TransactionOptions},
Expand Down Expand Up @@ -110,11 +111,11 @@ pub struct ClientSession {
pub(crate) snapshot_time: Option<Timestamp>,
}

#[derive(Clone, Debug)]
#[derive(Debug)]
pub(crate) struct Transaction {
pub(crate) state: TransactionState,
pub(crate) options: Option<TransactionOptions>,
pub(crate) pinned_mongos: Option<SelectionCriteria>,
pub(crate) pinned: Option<TransactionPin>,
pub(crate) recovery_token: Option<Document>,
}

Expand All @@ -132,23 +133,46 @@ impl Transaction {
pub(crate) fn abort(&mut self) {
self.state = TransactionState::Aborted;
self.options = None;
self.pinned_mongos = None;
self.pinned = None;
}

pub(crate) fn reset(&mut self) {
self.state = TransactionState::None;
self.options = None;
self.pinned_mongos = None;
self.pinned = None;
self.recovery_token = None;
}

pub(crate) fn pinned_mongos(&self) -> Option<&SelectionCriteria> {
match &self.pinned {
Some(TransactionPin::Mongos(s)) => Some(s),
_ => None,
}
}

pub(crate) fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
match &self.pinned {
Some(TransactionPin::Connection(c)) => Some(c),
_ => None,
}
}

fn take(&mut self) -> Self {
Transaction {
state: self.state.clone(),
options: self.options.take(),
pinned: self.pinned.take(),
recovery_token: self.recovery_token.take(),
}
}
}

impl Default for Transaction {
fn default() -> Self {
Self {
state: TransactionState::None,
options: None,
pinned_mongos: None,
pinned: None,
recovery_token: None,
}
}
Expand All @@ -168,6 +192,12 @@ pub(crate) enum TransactionState {
Aborted,
}

#[derive(Debug)]
pub(crate) enum TransactionPin {
Mongos(SelectionCriteria),
Connection(PinnedConnectionHandle),
}

impl ClientSession {
/// Creates a new `ClientSession` wrapping the provided server session.
pub(crate) fn new(
Expand Down Expand Up @@ -256,13 +286,18 @@ impl ClientSession {

/// Pin mongos to session.
pub(crate) fn pin_mongos(&mut self, address: ServerAddress) {
self.transaction.pinned_mongos = Some(SelectionCriteria::Predicate(Arc::new(
move |server_info: &ServerInfo| *server_info.address() == address,
self.transaction.pinned = Some(TransactionPin::Mongos(SelectionCriteria::Predicate(
Arc::new(move |server_info: &ServerInfo| *server_info.address() == address),
)));
}

pub(crate) fn unpin_mongos(&mut self) {
self.transaction.pinned_mongos = None;
/// Pin the connection to the session.
pub(crate) fn pin_connection(&mut self, handle: PinnedConnectionHandle) {
self.transaction.pinned = Some(TransactionPin::Connection(handle));
}

pub(crate) fn unpin(&mut self) {
self.transaction.pinned = None;
}

/// Whether this session is dirty.
Expand Down Expand Up @@ -319,7 +354,7 @@ impl ClientSession {
.into());
}
TransactionState::Committed { .. } => {
self.unpin_mongos(); // Unpin session if previous transaction is committed.
self.unpin(); // Unpin session if previous transaction is committed.
}
_ => {}
}
Expand Down Expand Up @@ -495,8 +530,8 @@ impl ClientSession {
.as_ref()
.and_then(|options| options.write_concern.as_ref())
.cloned();
let selection_criteria = self.transaction.pinned_mongos.clone();
let abort_transaction = AbortTransaction::new(write_concern, selection_criteria);
let abort_transaction =
AbortTransaction::new(write_concern, self.transaction.pinned.take());
self.transaction.abort();
// Errors returned from running an abortTransaction command should be ignored.
let _result = self
Expand Down Expand Up @@ -549,7 +584,7 @@ impl Drop for ClientSession {
client: self.client.clone(),
is_implicit: self.is_implicit,
options: self.options.clone(),
transaction: self.transaction.clone(),
transaction: self.transaction.take(),
snapshot_time: self.snapshot_time,
};
RUNTIME.execute(async move {
Expand Down
5 changes: 5 additions & 0 deletions src/cmap/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ impl Connection {
})
}

/// Whether this connection has a live `PinnedConnectionHandle`.
pub(crate) fn is_pinned(&self) -> bool {
self.pinned_sender.is_some()
}

/// Close this connection, emitting a `ConnectionClosedEvent` with the supplied reason.
pub(super) fn close_and_drop(mut self, reason: ConnectionClosedReason) {
self.close(reason);
Expand Down
26 changes: 17 additions & 9 deletions src/operation/abort_transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use bson::Document;

use crate::{
bson::doc,
cmap::{Command, StreamDescription},
client::session::TransactionPin,
cmap::{conn::PinnedConnectionHandle, Command, StreamDescription},
error::Result,
operation::{Operation, Retryability},
options::WriteConcern,
Expand All @@ -13,17 +14,14 @@ use super::{CommandResponse, Response, WriteConcernOnlyBody};

pub(crate) struct AbortTransaction {
write_concern: Option<WriteConcern>,
selection_criteria: Option<SelectionCriteria>,
pinned: Option<TransactionPin>,
}

impl AbortTransaction {
pub(crate) fn new(
write_concern: Option<WriteConcern>,
selection_criteria: Option<SelectionCriteria>,
) -> Self {
pub(crate) fn new(write_concern: Option<WriteConcern>, pinned: Option<TransactionPin>) -> Self {
Self {
write_concern,
selection_criteria,
pinned,
}
}
}
Expand Down Expand Up @@ -59,7 +57,17 @@ impl Operation for AbortTransaction {
}

fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.selection_criteria.as_ref()
match &self.pinned {
Some(TransactionPin::Mongos(s)) => Some(s),
_ => None,
}
}

fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
match &self.pinned {
Some(TransactionPin::Connection(h)) => Some(h),
_ => None,
}
}

fn write_concern(&self) -> Option<&WriteConcern> {
Expand All @@ -72,6 +80,6 @@ impl Operation for AbortTransaction {

fn update_for_retry(&mut self) {
// The session must be "unpinned" before server selection for a retry.
self.selection_criteria = None;
self.pinned = None;
}
}
8 changes: 4 additions & 4 deletions src/test/spec/unified_runner/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1077,8 +1077,8 @@ impl TestOperation for TargetedFailPoint {
let session = test_runner.get_session(&self.session);
let selection_criteria = session
.transaction
.pinned_mongos
.clone()
.pinned_mongos()
.cloned()
.unwrap_or_else(|| panic!("ClientSession not pinned"));
let fail_point_guard = test_runner
.internal_client
Expand Down Expand Up @@ -1312,7 +1312,7 @@ impl TestOperation for AssertSessionPinned {
assert!(test_runner
.get_session(&self.session)
.transaction
.pinned_mongos
.pinned_mongos()
.is_some());
}
.boxed()
Expand All @@ -1334,7 +1334,7 @@ impl TestOperation for AssertSessionUnpinned {
assert!(test_runner
.get_session(&self.session)
.transaction
.pinned_mongos
.pinned_mongos()
.is_none());
}
.boxed()
Expand Down
4 changes: 2 additions & 2 deletions src/test/spec/v2_runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ pub async fn run_v2_test(test_file: TestFile) {
let selection_criteria = session
.unwrap()
.transaction
.pinned_mongos
.clone()
.pinned_mongos()
.cloned()
.unwrap_or_else(|| panic!("ClientSession is not pinned"));

fail_point_guards.push(
Expand Down
4 changes: 2 additions & 2 deletions src/test/spec/v2_runner/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ impl TestOperation for AssertSessionPinned {
session: &'a mut ClientSession,
) -> BoxFuture<'a, Result<Option<Bson>>> {
async move {
assert!(session.transaction.pinned_mongos.is_some());
assert!(session.transaction.pinned_mongos().is_some());
Ok(None)
}
.boxed()
Expand All @@ -1072,7 +1072,7 @@ impl TestOperation for AssertSessionUnpinned {
session: &'a mut ClientSession,
) -> BoxFuture<'a, Result<Option<Bson>>> {
async move {
assert!(session.transaction.pinned_mongos.is_none());
assert!(session.transaction.pinned_mongos().is_none());
Ok(None)
}
.boxed()
Expand Down

0 comments on commit 4406f78

Please sign in to comment.