Skip to content

Commit

Permalink
RUST-803 Conditionally use hello for monitoring (#600)
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickfreed authored Mar 24, 2022
1 parent c7947cd commit d55c079
Show file tree
Hide file tree
Showing 419 changed files with 8,416 additions and 1,102 deletions.
2 changes: 1 addition & 1 deletion benchmarks/src/bench/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pub async fn drop_database(uri: &str, database: &str) -> Result<()> {

let hello = client
.database("admin")
.run_command(doc! { "ismaster": true }, None)
.run_command(doc! { "hello": true }, None)
.await?;

client.database(&database).drop(None).await?;
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bench/run_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Benchmark for RunCommandBenchmark {
Ok(RunCommandBenchmark {
db,
num_iter: options.num_iter,
cmd: doc! { "ismaster": true },
cmd: doc! { "hello": true },
uri: options.uri,
})
}
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ async fn run_benchmarks(
let run_command =
bench::run_benchmark::<RunCommandBenchmark>(run_command_options).await?;

comp_score += score_test(run_command, RUN_COMMAND_BENCH, 0.16, more_info);
comp_score += score_test(run_command, RUN_COMMAND_BENCH, 0.13, more_info);
}

// Small doc insertOne
Expand Down
2 changes: 1 addition & 1 deletion src/client/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ impl Credential {
}

/// If the mechanism is missing, append the appropriate mechanism negotiation key-value-pair to
/// the provided isMaster command document.
/// the provided hello or legacy hello command document.
pub(crate) fn append_needed_mechanism_negotiation(&self, command: &mut Document) {
if let (Some(username), None) = (self.username.as_ref(), self.mechanism.as_ref()) {
command.insert(
Expand Down
3 changes: 2 additions & 1 deletion src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
UNKNOWN_TRANSACTION_COMMIT_RESULT,
},
event::command::{CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent},
hello::LEGACY_HELLO_COMMAND_NAME_LOWERCASE,
operation::{
AbortTransaction,
AggregateTarget,
Expand Down Expand Up @@ -70,7 +71,7 @@ lazy_static! {
pub(crate) static ref HELLO_COMMAND_NAMES: HashSet<&'static str> = {
let mut hash_set = HashSet::new();
hash_set.insert("hello");
hash_set.insert("ismaster");
hash_set.insert(LEGACY_HELLO_COMMAND_NAME_LOWERCASE);
hash_set
};
}
Expand Down
3 changes: 1 addition & 2 deletions src/client/options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,7 @@ pub struct ClientOptions {
#[builder(default)]
pub driver_info: Option<DriverInfo>,

/// The amount of time each monitoring thread should wait between sending an isMaster command
/// to its respective server.
/// The amount of time each monitoring thread should wait between performing server checks.
///
/// The default value is 10 seconds.
#[builder(default)]
Expand Down
8 changes: 4 additions & 4 deletions src/cmap/conn/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
bson::Document,
client::{options::ServerApi, ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS},
error::{Error, ErrorKind, Result},
is_master::{IsMasterCommandResponse, IsMasterReply},
hello::{HelloCommandResponse, HelloReply},
operation::{CommandErrorBody, CommandResponse},
options::{ReadConcern, ReadConcernInternal, ReadConcernLevel, ServerAddress},
selection_criteria::ReadPreference,
Expand Down Expand Up @@ -244,12 +244,12 @@ impl RawCommandResponse {
.map_err(|_| Error::invalid_authentication_response(mechanism_name))
}

pub(crate) fn to_is_master_response(&self, round_trip_time: Duration) -> Result<IsMasterReply> {
match self.body::<CommandResponse<IsMasterCommandResponse>>() {
pub(crate) fn to_hello_reply(&self, round_trip_time: Duration) -> Result<HelloReply> {
match self.body::<CommandResponse<HelloCommandResponse>>() {
Ok(response) if response.is_success() => {
let server_address = self.source_address().clone();
let cluster_time = response.cluster_time().cloned();
Ok(IsMasterReply {
Ok(HelloReply {
server_address,
command_response: response.body,
round_trip_time,
Expand Down
16 changes: 10 additions & 6 deletions src/cmap/conn/stream_description.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Duration;

use crate::{client::options::ServerAddress, is_master::IsMasterReply, sdam::ServerType};
use crate::{client::options::ServerAddress, hello::HelloReply, sdam::ServerType};

/// Contains information about a given server in a format digestible by a connection.
#[derive(Debug, Default, Clone)]
Expand Down Expand Up @@ -32,24 +32,27 @@ pub(crate) struct StreamDescription {
/// can be included in a write batch. If more than this number of writes are included, the
/// server cannot guarantee space in the response document to reply to the batch.
pub(crate) max_write_batch_size: i64,

/// Whether the server associated with this connection supports the `hello` command.
pub(crate) hello_ok: bool,
}

impl StreamDescription {
/// Constructs a new StreamDescription from an IsMasterReply.
pub(crate) fn from_is_master(reply: IsMasterReply) -> Self {
/// Constructs a new StreamDescription from a `HelloReply`.
pub(crate) fn from_hello_reply(reply: &HelloReply) -> Self {
Self {
server_address: reply.server_address,
server_address: reply.server_address.clone(),
initial_server_type: reply.command_response.server_type(),
max_wire_version: reply.command_response.max_wire_version,
min_wire_version: reply.command_response.min_wire_version,
sasl_supported_mechs: reply.command_response.sasl_supported_mechs,
// TODO RUST-204: Add "saslSupportedMechs" if applicable.
sasl_supported_mechs: reply.command_response.sasl_supported_mechs.clone(),
logical_session_timeout: reply
.command_response
.logical_session_timeout_minutes
.map(|mins| Duration::from_secs(mins as u64 * 60)),
max_bson_object_size: reply.command_response.max_bson_object_size,
max_write_batch_size: reply.command_response.max_write_batch_size,
hello_ok: reply.command_response.hello_ok.unwrap_or(false),
}
}

Expand Down Expand Up @@ -78,6 +81,7 @@ impl StreamDescription {
logical_session_timeout: Some(Duration::from_secs(30 * 60)),
max_bson_object_size: 16 * 1024 * 1024,
max_write_batch_size: 100_000,
hello_ok: false,
}
}
}
2 changes: 0 additions & 2 deletions src/cmap/conn/wire/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
mod header;
mod message;
#[cfg(test)]
mod test;
mod util;

pub(crate) use self::{message::Message, util::next_request_id};
50 changes: 0 additions & 50 deletions src/cmap/conn/wire/test.rs

This file was deleted.

28 changes: 16 additions & 12 deletions src/cmap/establish/handshake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
compression::Compressor,
error::{ErrorKind, Result},
event::sdam::SdamEventHandler,
is_master::{is_master_command, run_is_master, IsMasterReply},
hello::{hello_command, run_hello, HelloReply},
options::{AuthMechanism, ClientOptions, Credential, DriverInfo, ServerApi},
sdam::Topology,
};
Expand Down Expand Up @@ -104,7 +104,7 @@ impl From<OsMetadata> for Bson {
lazy_static! {
/// Contains the basic handshake information that can be statically determined. This document
/// (potentially with additional fields added) can be cloned and put in the `client` field of
/// the `isMaster` command.
/// the `hello` or legacy hello command.
static ref BASE_CLIENT_METADATA: ClientMetadata = {
let mut metadata = ClientMetadata {
application: None,
Expand Down Expand Up @@ -143,10 +143,12 @@ lazy_static! {
/// Contains the logic needed to handshake a connection.
#[derive(Clone, Debug)]
pub(crate) struct Handshaker {
/// The `isMaster` command to send when handshaking. This will always be identical
/// The hello or legacy hello command to send when handshaking. This will always be identical
/// given the same pool options, so it can be created at the time the Handshaker is created.
command: Command,

credential: Option<Credential>,

// This field is not read without a compression feature flag turned on.
#[allow(dead_code)]
compressors: Option<Vec<Compressor>>,
Expand All @@ -160,8 +162,10 @@ impl Handshaker {

let mut compressors = None;

let mut command =
is_master_command(options.as_ref().and_then(|opts| opts.server_api.as_ref()));
let mut command = hello_command(
options.as_ref().and_then(|opts| opts.server_api.as_ref()),
None,
);

if let Some(options) = options {
if let Some(app_name) = options.app_name {
Expand Down Expand Up @@ -229,10 +233,10 @@ impl Handshaker {

let client_first = set_speculative_auth_info(&mut command.body, self.credential.as_ref())?;

let mut is_master_reply = run_is_master(conn, command, topology, handler).await?;
let mut hello_reply = run_hello(conn, command, topology, handler).await?;

if self.command.body.contains_key("loadBalanced")
&& is_master_reply.command_response.service_id.is_none()
&& hello_reply.command_response.service_id.is_none()
{
return Err(ErrorKind::IncompatibleServer {
message: "Driver attempted to initialize in load balancing mode, but the server \
Expand All @@ -241,12 +245,12 @@ impl Handshaker {
}
.into());
}
conn.stream_description = Some(StreamDescription::from_is_master(is_master_reply.clone()));
conn.stream_description = Some(StreamDescription::from_hello_reply(&hello_reply));

// Record the client's message and the server's response from speculative authentication if
// the server did send a response.
let first_round = client_first.and_then(|client_first| {
is_master_reply
hello_reply
.command_response
.speculative_authenticate
.take()
Expand All @@ -255,7 +259,7 @@ impl Handshaker {

// Check that master reply has a compressor list and unpack it
if let (Some(server_compressors), Some(client_compressors)) = (
is_master_reply.command_response.compressors.as_ref(),
hello_reply.command_response.compressors.as_ref(),
self.compressors.as_ref(),
) {
// Use the Client's first compressor choice that the server supports (comparing only on
Expand All @@ -275,7 +279,7 @@ impl Handshaker {
}

Ok(HandshakeResult {
is_master_reply,
hello_reply,
first_round,
})
}
Expand All @@ -288,7 +292,7 @@ impl Handshaker {
#[derive(Debug)]
pub(crate) struct HandshakeResult {
/// The response from the server.
pub(crate) is_master_reply: IsMasterReply,
pub(crate) hello_reply: HelloReply,

/// The first round of speculative authentication, if applicable.
pub(crate) first_round: Option<FirstRound>,
Expand Down
2 changes: 1 addition & 1 deletion src/cmap/establish/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl ConnectionEstablisher {
.handshake(&mut connection, None, &None)
.await
.map_err(|e| EstablishError::pre_hello(e, pool_gen.clone()))?;
let service_id = handshake.is_master_reply.command_response.service_id;
let service_id = handshake.hello_reply.command_response.service_id;

// If the handshake response had a `serviceId` field, this is a connection to a load
// balancer and must derive its generation from the service_generations map.
Expand Down
5 changes: 3 additions & 2 deletions src/cmap/test/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
bson::{doc, Document},
cmap::{options::ConnectionPoolOptions, Command, ConnectionPool},
event::cmap::{CmapEventHandler, ConnectionClosedReason},
hello::LEGACY_HELLO_COMMAND_NAME,
operation::CommandResponse,
runtime,
sdam::ServerUpdateSender,
Expand Down Expand Up @@ -107,7 +108,7 @@ async fn concurrent_connections() {
let failpoint = doc! {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": { "failCommands": [ "isMaster" ], "blockConnection": true, "blockTimeMS": 1000 }
"data": { "failCommands": [LEGACY_HELLO_COMMAND_NAME, "hello"], "blockConnection": true, "blockTimeMS": 1000 }
};
client
.database("admin")
Expand Down Expand Up @@ -196,7 +197,7 @@ async fn connection_error_during_establishment() {

let options = FailCommandOptions::builder().error_code(1234).build();
let failpoint = FailPoint::fail_command(
&["isMaster", "hello"],
&[LEGACY_HELLO_COMMAND_NAME, "hello"],
FailPointMode::Times(10),
Some(options),
);
Expand Down
6 changes: 3 additions & 3 deletions src/event/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct CommandStartedEvent {
/// The name of the database the command is being run against.
pub db: String,

/// The type of command being run, e.g. "find" or "isMaster".
/// The type of command being run, e.g. "find" or "hello".
pub command_name: String,

/// The driver-generated identifier for the request. Applications can use this to identify the
Expand All @@ -45,7 +45,7 @@ pub struct CommandSucceededEvent {
/// The server's reply to the command.
pub reply: Document,

/// The type of command that was run, e.g. "find" or "isMaster".
/// The type of command that was run, e.g. "find" or "hello".
pub command_name: String,

/// The driver-generated identifier for the request. Applications can use this to identify the
Expand All @@ -67,7 +67,7 @@ pub struct CommandFailedEvent {
/// The total execution time of the command (including the network round-trip).
pub duration: Duration,

/// The type of command that was run, e.g. "find" or "isMaster".
/// The type of command that was run, e.g. "find" or "hello".
pub command_name: String,

/// The error that the driver returned due to the event failing.
Expand Down
Loading

0 comments on commit d55c079

Please sign in to comment.