Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUST-803 Conditionally use hello for monitoring #600

Merged
Merged
  •  
  •  
  •  
8 changes: 4 additions & 4 deletions src/cmap/conn/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
bson::Document,
client::{options::ServerApi, ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS},
error::{Error, ErrorKind, Result},
is_master::{IsMasterCommandResponse, IsMasterReply},
hello::{HelloCommandResponse, HelloReply},
operation::{CommandErrorBody, CommandResponse},
options::{ReadConcern, ReadConcernInternal, ReadConcernLevel, ServerAddress},
selection_criteria::ReadPreference,
Expand Down Expand Up @@ -234,12 +234,12 @@ impl RawCommandResponse {
.map_err(|_| Error::invalid_authentication_response(mechanism_name))
}

pub(crate) fn to_is_master_response(&self, round_trip_time: Duration) -> Result<IsMasterReply> {
match self.body::<CommandResponse<IsMasterCommandResponse>>() {
pub(crate) fn to_hello_reply(&self, round_trip_time: Duration) -> Result<HelloReply> {
match self.body::<CommandResponse<HelloCommandResponse>>() {
Ok(response) if response.is_success() => {
let server_address = self.source_address().clone();
let cluster_time = response.cluster_time().cloned();
Ok(IsMasterReply {
Ok(HelloReply {
server_address,
command_response: response.body,
round_trip_time,
Expand Down
14 changes: 9 additions & 5 deletions src/cmap/conn/stream_description.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Duration;

use crate::{client::options::ServerAddress, is_master::IsMasterReply, sdam::ServerType};
use crate::{client::options::ServerAddress, hello::HelloReply, sdam::ServerType};

/// Contains information about a given server in a format digestible by a connection.
#[derive(Debug, Default, Clone)]
Expand Down Expand Up @@ -32,24 +32,27 @@ pub(crate) struct StreamDescription {
/// can be included in a write batch. If more than this number of writes are included, the
/// server cannot guarantee space in the response document to reply to the batch.
pub(crate) max_write_batch_size: i64,

/// Whether the server associated with this connection supports the `hello` command.
pub(crate) hello_ok: bool,
}

impl StreamDescription {
/// Constructs a new StreamDescription from an IsMasterReply.
pub(crate) fn from_is_master(reply: IsMasterReply) -> Self {
pub(crate) fn from_hello_reply(reply: &HelloReply) -> Self {
Self {
server_address: reply.server_address,
server_address: reply.server_address.clone(),
initial_server_type: reply.command_response.server_type(),
max_wire_version: reply.command_response.max_wire_version,
min_wire_version: reply.command_response.min_wire_version,
sasl_supported_mechs: reply.command_response.sasl_supported_mechs,
// TODO RUST-204: Add "saslSupportedMechs" if applicable.
sasl_supported_mechs: reply.command_response.sasl_supported_mechs.clone(),
logical_session_timeout: reply
.command_response
.logical_session_timeout_minutes
.map(|mins| Duration::from_secs(mins as u64 * 60)),
max_bson_object_size: reply.command_response.max_bson_object_size,
max_write_batch_size: reply.command_response.max_write_batch_size,
hello_ok: reply.command_response.hello_ok.unwrap_or(false),
}
}

Expand Down Expand Up @@ -78,6 +81,7 @@ impl StreamDescription {
logical_session_timeout: Some(Duration::from_secs(30 * 60)),
max_bson_object_size: 16 * 1024 * 1024,
max_write_batch_size: 100_000,
hello_ok: false,
}
}
}
26 changes: 15 additions & 11 deletions src/cmap/establish/handshake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
compression::Compressor,
error::{ErrorKind, Result},
event::sdam::SdamEventHandler,
is_master::{is_master_command, run_is_master, IsMasterReply},
hello::{hello_command, run_hello, HelloReply},
options::{AuthMechanism, ClientOptions, Credential, DriverInfo, ServerApi},
sdam::Topology,
};
Expand Down Expand Up @@ -143,10 +143,12 @@ lazy_static! {
/// Contains the logic needed to handshake a connection.
#[derive(Clone, Debug)]
pub(crate) struct Handshaker {
/// The `isMaster` command to send when handshaking. This will always be identical
/// The hello or legacy hello command to send when handshaking. This will always be identical
/// given the same pool options, so it can be created at the time the Handshaker is created.
command: Command,

credential: Option<Credential>,

// This field is not read without a compression feature flag turned on.
#[allow(dead_code)]
compressors: Option<Vec<Compressor>>,
Expand All @@ -160,8 +162,10 @@ impl Handshaker {

let mut compressors = None;

let mut command =
is_master_command(options.as_ref().and_then(|opts| opts.server_api.as_ref()));
let mut command = hello_command(
options.as_ref().and_then(|opts| opts.server_api.as_ref()),
None,
);

if let Some(options) = options {
if let Some(app_name) = options.app_name {
Expand Down Expand Up @@ -229,10 +233,10 @@ impl Handshaker {

let client_first = set_speculative_auth_info(&mut command.body, self.credential.as_ref())?;

let mut is_master_reply = run_is_master(conn, command, topology, handler).await?;
let mut hello_reply = run_hello(conn, command, topology, handler).await?;

if self.command.body.contains_key("loadBalanced")
&& is_master_reply.command_response.service_id.is_none()
&& hello_reply.command_response.service_id.is_none()
{
return Err(ErrorKind::IncompatibleServer {
message: "Driver attempted to initialize in load balancing mode, but the server \
Expand All @@ -241,12 +245,12 @@ impl Handshaker {
}
.into());
}
conn.stream_description = Some(StreamDescription::from_is_master(is_master_reply.clone()));
conn.stream_description = Some(StreamDescription::from_hello_reply(&hello_reply));

// Record the client's message and the server's response from speculative authentication if
// the server did send a response.
let first_round = client_first.and_then(|client_first| {
is_master_reply
hello_reply
.command_response
.speculative_authenticate
.take()
Expand All @@ -255,7 +259,7 @@ impl Handshaker {

// Check that master reply has a compressor list and unpack it
if let (Some(server_compressors), Some(client_compressors)) = (
is_master_reply.command_response.compressors.as_ref(),
hello_reply.command_response.compressors.as_ref(),
self.compressors.as_ref(),
) {
// Use the Client's first compressor choice that the server supports (comparing only on
Expand All @@ -275,7 +279,7 @@ impl Handshaker {
}

Ok(HandshakeResult {
is_master_reply,
hello_reply,
first_round,
})
}
Expand All @@ -288,7 +292,7 @@ impl Handshaker {
#[derive(Debug)]
pub(crate) struct HandshakeResult {
/// The response from the server.
pub(crate) is_master_reply: IsMasterReply,
pub(crate) hello_reply: HelloReply,

/// The first round of speculative authentication, if applicable.
pub(crate) first_round: Option<FirstRound>,
Expand Down
2 changes: 1 addition & 1 deletion src/cmap/establish/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl ConnectionEstablisher {
.handshake(&mut connection, None, &None)
.await
.map_err(|e| EstablishError::pre_hello(e, pool_gen.clone()))?;
let service_id = handshake.is_master_reply.command_response.service_id;
let service_id = handshake.hello_reply.command_response.service_id;

// If the handshake response had a `serviceId` field, this is a connection to a load
// balancer and must derive its generation from the service_generations map.
Expand Down
Loading