Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUST-803 Conditionally use hello for monitoring #600

Merged
Merged
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion benchmarks/src/bench/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pub async fn drop_database(uri: &str, database: &str) -> Result<()> {

let hello = client
.database("admin")
.run_command(doc! { "ismaster": true }, None)
.run_command(doc! { "hello": true }, None)
.await?;

client.database(&database).drop(None).await?;
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bench/run_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Benchmark for RunCommandBenchmark {
Ok(RunCommandBenchmark {
db,
num_iter: options.num_iter,
cmd: doc! { "ismaster": true },
cmd: doc! { "hello": true },
kmahar marked this conversation as resolved.
Show resolved Hide resolved
uri: options.uri,
})
}
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ async fn run_benchmarks(
let run_command =
bench::run_benchmark::<RunCommandBenchmark>(run_command_options).await?;

comp_score += score_test(run_command, RUN_COMMAND_BENCH, 0.16, more_info);
comp_score += score_test(run_command, RUN_COMMAND_BENCH, 0.13, more_info);
}

// Small doc insertOne
Expand Down
2 changes: 1 addition & 1 deletion src/client/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ impl Credential {
}

/// If the mechanism is missing, append the appropriate mechanism negotiation key-value-pair to
/// the provided isMaster command document.
/// the provided hello or legacy hello command document.
pub(crate) fn append_needed_mechanism_negotiation(&self, command: &mut Document) {
if let (Some(username), None) = (self.username.as_ref(), self.mechanism.as_ref()) {
command.insert(
Expand Down
3 changes: 2 additions & 1 deletion src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
UNKNOWN_TRANSACTION_COMMIT_RESULT,
},
event::command::{CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent},
hello::LEGACY_HELLO_COMMAND_NAME_LOWERCASE,
operation::{
AbortTransaction,
AggregateTarget,
Expand Down Expand Up @@ -70,7 +71,7 @@ lazy_static! {
pub(crate) static ref HELLO_COMMAND_NAMES: HashSet<&'static str> = {
let mut hash_set = HashSet::new();
hash_set.insert("hello");
hash_set.insert("ismaster");
hash_set.insert(LEGACY_HELLO_COMMAND_NAME_LOWERCASE);
hash_set
};
}
Expand Down
3 changes: 1 addition & 2 deletions src/client/options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,7 @@ pub struct ClientOptions {
#[builder(default)]
pub driver_info: Option<DriverInfo>,

/// The amount of time each monitoring thread should wait between sending an isMaster command
/// to its respective server.
/// The amount of time each monitoring thread should wait between performing server checks.
///
/// The default value is 10 seconds.
#[builder(default)]
Expand Down
8 changes: 4 additions & 4 deletions src/cmap/conn/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
bson::Document,
client::{options::ServerApi, ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS},
error::{Error, ErrorKind, Result},
is_master::{IsMasterCommandResponse, IsMasterReply},
hello::{HelloCommandResponse, HelloReply},
operation::{CommandErrorBody, CommandResponse},
options::{ReadConcern, ReadConcernInternal, ReadConcernLevel, ServerAddress},
selection_criteria::ReadPreference,
Expand Down Expand Up @@ -234,12 +234,12 @@ impl RawCommandResponse {
.map_err(|_| Error::invalid_authentication_response(mechanism_name))
}

pub(crate) fn to_is_master_response(&self, round_trip_time: Duration) -> Result<IsMasterReply> {
match self.body::<CommandResponse<IsMasterCommandResponse>>() {
pub(crate) fn to_hello_reply(&self, round_trip_time: Duration) -> Result<HelloReply> {
match self.body::<CommandResponse<HelloCommandResponse>>() {
Ok(response) if response.is_success() => {
let server_address = self.source_address().clone();
let cluster_time = response.cluster_time().cloned();
Ok(IsMasterReply {
Ok(HelloReply {
server_address,
command_response: response.body,
round_trip_time,
Expand Down
16 changes: 10 additions & 6 deletions src/cmap/conn/stream_description.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Duration;

use crate::{client::options::ServerAddress, is_master::IsMasterReply, sdam::ServerType};
use crate::{client::options::ServerAddress, hello::HelloReply, sdam::ServerType};

/// Contains information about a given server in a format digestible by a connection.
#[derive(Debug, Default, Clone)]
Expand Down Expand Up @@ -32,24 +32,27 @@ pub(crate) struct StreamDescription {
/// can be included in a write batch. If more than this number of writes are included, the
/// server cannot guarantee space in the response document to reply to the batch.
pub(crate) max_write_batch_size: i64,

/// Whether the server associated with this connection supports the `hello` command.
pub(crate) hello_ok: bool,
}

impl StreamDescription {
/// Constructs a new StreamDescription from an IsMasterReply.
pub(crate) fn from_is_master(reply: IsMasterReply) -> Self {
/// Constructs a new StreamDescription from a `HelloReply`.
pub(crate) fn from_hello_reply(reply: &HelloReply) -> Self {
Self {
server_address: reply.server_address,
server_address: reply.server_address.clone(),
initial_server_type: reply.command_response.server_type(),
max_wire_version: reply.command_response.max_wire_version,
min_wire_version: reply.command_response.min_wire_version,
sasl_supported_mechs: reply.command_response.sasl_supported_mechs,
// TODO RUST-204: Add "saslSupportedMechs" if applicable.
sasl_supported_mechs: reply.command_response.sasl_supported_mechs.clone(),
logical_session_timeout: reply
.command_response
.logical_session_timeout_minutes
.map(|mins| Duration::from_secs(mins as u64 * 60)),
max_bson_object_size: reply.command_response.max_bson_object_size,
max_write_batch_size: reply.command_response.max_write_batch_size,
hello_ok: reply.command_response.hello_ok.unwrap_or(false),
}
}

Expand Down Expand Up @@ -78,6 +81,7 @@ impl StreamDescription {
logical_session_timeout: Some(Duration::from_secs(30 * 60)),
max_bson_object_size: 16 * 1024 * 1024,
max_write_batch_size: 100_000,
hello_ok: false,
}
}
}
2 changes: 0 additions & 2 deletions src/cmap/conn/wire/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
mod header;
mod message;
#[cfg(test)]
mod test;
mod util;

pub(crate) use self::{message::Message, util::next_request_id};
50 changes: 0 additions & 50 deletions src/cmap/conn/wire/test.rs

This file was deleted.

28 changes: 16 additions & 12 deletions src/cmap/establish/handshake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
compression::Compressor,
error::{ErrorKind, Result},
event::sdam::SdamEventHandler,
is_master::{is_master_command, run_is_master, IsMasterReply},
hello::{hello_command, run_hello, HelloReply},
options::{AuthMechanism, ClientOptions, Credential, DriverInfo, ServerApi},
sdam::Topology,
};
Expand Down Expand Up @@ -104,7 +104,7 @@ impl From<OsMetadata> for Bson {
lazy_static! {
/// Contains the basic handshake information that can be statically determined. This document
/// (potentially with additional fields added) can be cloned and put in the `client` field of
/// the `isMaster` command.
/// the `hello` or legacy hello command.
static ref BASE_CLIENT_METADATA: ClientMetadata = {
let mut metadata = ClientMetadata {
application: None,
Expand Down Expand Up @@ -143,10 +143,12 @@ lazy_static! {
/// Contains the logic needed to handshake a connection.
#[derive(Clone, Debug)]
pub(crate) struct Handshaker {
/// The `isMaster` command to send when handshaking. This will always be identical
/// The hello or legacy hello command to send when handshaking. This will always be identical
/// given the same pool options, so it can be created at the time the Handshaker is created.
command: Command,

credential: Option<Credential>,

// This field is not read without a compression feature flag turned on.
#[allow(dead_code)]
compressors: Option<Vec<Compressor>>,
Expand All @@ -160,8 +162,10 @@ impl Handshaker {

let mut compressors = None;

let mut command =
is_master_command(options.as_ref().and_then(|opts| opts.server_api.as_ref()));
let mut command = hello_command(
options.as_ref().and_then(|opts| opts.server_api.as_ref()),
None,
);

if let Some(options) = options {
if let Some(app_name) = options.app_name {
Expand Down Expand Up @@ -229,10 +233,10 @@ impl Handshaker {

let client_first = set_speculative_auth_info(&mut command.body, self.credential.as_ref())?;

let mut is_master_reply = run_is_master(conn, command, topology, handler).await?;
let mut hello_reply = run_hello(conn, command, topology, handler).await?;

if self.command.body.contains_key("loadBalanced")
&& is_master_reply.command_response.service_id.is_none()
&& hello_reply.command_response.service_id.is_none()
{
return Err(ErrorKind::IncompatibleServer {
message: "Driver attempted to initialize in load balancing mode, but the server \
Expand All @@ -241,12 +245,12 @@ impl Handshaker {
}
.into());
}
conn.stream_description = Some(StreamDescription::from_is_master(is_master_reply.clone()));
conn.stream_description = Some(StreamDescription::from_hello_reply(&hello_reply));

// Record the client's message and the server's response from speculative authentication if
// the server did send a response.
let first_round = client_first.and_then(|client_first| {
is_master_reply
hello_reply
.command_response
.speculative_authenticate
.take()
Expand All @@ -255,7 +259,7 @@ impl Handshaker {

// Check that master reply has a compressor list and unpack it
if let (Some(server_compressors), Some(client_compressors)) = (
is_master_reply.command_response.compressors.as_ref(),
hello_reply.command_response.compressors.as_ref(),
self.compressors.as_ref(),
) {
// Use the Client's first compressor choice that the server supports (comparing only on
Expand All @@ -275,7 +279,7 @@ impl Handshaker {
}

Ok(HandshakeResult {
is_master_reply,
hello_reply,
first_round,
})
}
Expand All @@ -288,7 +292,7 @@ impl Handshaker {
#[derive(Debug)]
pub(crate) struct HandshakeResult {
/// The response from the server.
pub(crate) is_master_reply: IsMasterReply,
pub(crate) hello_reply: HelloReply,

/// The first round of speculative authentication, if applicable.
pub(crate) first_round: Option<FirstRound>,
Expand Down
2 changes: 1 addition & 1 deletion src/cmap/establish/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl ConnectionEstablisher {
.handshake(&mut connection, None, &None)
.await
.map_err(|e| EstablishError::pre_hello(e, pool_gen.clone()))?;
let service_id = handshake.is_master_reply.command_response.service_id;
let service_id = handshake.hello_reply.command_response.service_id;

// If the handshake response had a `serviceId` field, this is a connection to a load
// balancer and must derive its generation from the service_generations map.
Expand Down
5 changes: 3 additions & 2 deletions src/cmap/test/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
bson::{doc, Document},
cmap::{options::ConnectionPoolOptions, Command, ConnectionPool},
event::cmap::{CmapEventHandler, ConnectionClosedReason},
hello::LEGACY_HELLO_COMMAND_NAME,
operation::CommandResponse,
runtime,
sdam::ServerUpdateSender,
Expand Down Expand Up @@ -107,7 +108,7 @@ async fn concurrent_connections() {
let failpoint = doc! {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": { "failCommands": [ "isMaster" ], "blockConnection": true, "blockTimeMS": 1000 }
"data": { "failCommands": [LEGACY_HELLO_COMMAND_NAME, "hello"], "blockConnection": true, "blockTimeMS": 1000 }
};
client
.database("admin")
Expand Down Expand Up @@ -196,7 +197,7 @@ async fn connection_error_during_establishment() {

let options = FailCommandOptions::builder().error_code(1234).build();
let failpoint = FailPoint::fail_command(
&["isMaster", "hello"],
&[LEGACY_HELLO_COMMAND_NAME, "hello"],
FailPointMode::Times(10),
Some(options),
);
Expand Down
6 changes: 3 additions & 3 deletions src/event/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct CommandStartedEvent {
/// The name of the database the command is being run against.
pub db: String,

/// The type of command being run, e.g. "find" or "isMaster".
/// The type of command being run, e.g. "find" or "hello".
pub command_name: String,

/// The driver-generated identifier for the request. Applications can use this to identify the
Expand All @@ -45,7 +45,7 @@ pub struct CommandSucceededEvent {
/// The server's reply to the command.
pub reply: Document,

/// The type of command that was run, e.g. "find" or "isMaster".
/// The type of command that was run, e.g. "find" or "hello".
pub command_name: String,

/// The driver-generated identifier for the request. Applications can use this to identify the
Expand All @@ -67,7 +67,7 @@ pub struct CommandFailedEvent {
/// The total execution time of the command (including the network round-trip).
pub duration: Duration,

/// The type of command that was run, e.g. "find" or "isMaster".
/// The type of command that was run, e.g. "find" or "hello".
pub command_name: String,

/// The error that the driver returned due to the event failing.
Expand Down
Loading