Skip to content

Commit

Permalink
adapter: expose Materialize version in mz_version parameter
Browse files Browse the repository at this point in the history
This commit exposes the Materialize-specific information in a new
`mz_version` parameter, as in:

    materialize=> show mz_version;
           mz_version
    -------------------------
     v0.39.0-dev (73f6bed)
    (1 row)

The `mz_version` parameter is additionally added to the parameter set
that is automatically sent to the client during part of startup. This
approach comes from CockroachDB, and allows clients to easily detect
whether they're talking to Materialize or PostgreSQL without incurring
an additional roundtrip.

I already have a PR out to sqlx [0] that uses this feature to
automatically disable PostgreSQL-specific features that Materialize does
not support.

The version string matches the output of the `mz_version()` function.

The implementation is somewhat irritating, as it requires plumbing the
`BuildInfo` from the adapter into each session. Doesn't turn out too
complicated, though, now that it's all written out.

[0]: launchbadge/sqlx#2282
  • Loading branch information
benesch committed Jan 9, 2023
1 parent e490ebd commit 5fcb756
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 43 deletions.
23 changes: 20 additions & 3 deletions src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tokio::sync::{mpsc, oneshot, watch};
use tracing::error;
use uuid::Uuid;

use mz_build_info::BuildInfo;
use mz_ore::collections::CollectionExt;
use mz_ore::id_gen::IdAllocator;
use mz_ore::task::{AbortOnDropHandle, JoinHandleExt};
Expand All @@ -29,7 +30,7 @@ use crate::catalog::INTROSPECTION_USER;
use crate::command::{Canceled, Command, ExecuteResponse, Response, StartupResponse};
use crate::error::AdapterError;
use crate::metrics::Metrics;
use crate::session::{EndTransactionAction, PreparedStatement, Session, TransactionId};
use crate::session::{EndTransactionAction, PreparedStatement, Session, TransactionId, User};
use crate::PeekResponseUnary;

/// An abstraction allowing us to name different connections.
Expand Down Expand Up @@ -71,14 +72,20 @@ impl Handle {
/// outstanding clients have dropped.
#[derive(Debug, Clone)]
pub struct Client {
build_info: &'static BuildInfo,
inner_cmd_tx: mpsc::UnboundedSender<Command>,
id_alloc: Arc<IdAllocator<ConnectionId>>,
metrics: Metrics,
}

impl Client {
pub(crate) fn new(cmd_tx: mpsc::UnboundedSender<Command>, metrics: Metrics) -> Client {
pub(crate) fn new(
build_info: &'static BuildInfo,
cmd_tx: mpsc::UnboundedSender<Command>,
metrics: Metrics,
) -> Client {
Client {
build_info,
inner_cmd_tx: cmd_tx,
id_alloc: Arc::new(IdAllocator::new(1, 1 << 16)),
metrics,
Expand All @@ -88,6 +95,7 @@ impl Client {
/// Allocates a client for an incoming connection.
pub fn new_conn(&self) -> Result<ConnClient, AdapterError> {
Ok(ConnClient {
build_info: self.build_info,
conn_id: self
.id_alloc
.alloc()
Expand All @@ -101,7 +109,7 @@ impl Client {
pub async fn introspection_execute_one(&self, sql: &str) -> Result<Vec<Row>, anyhow::Error> {
// Connect to the coordinator.
let conn_client = self.new_conn()?;
let session = Session::new(conn_client.conn_id(), INTROSPECTION_USER.clone());
let session = conn_client.new_session(INTROSPECTION_USER.clone());
let (mut session_client, _) = conn_client.startup(session, false).await?;

// Parse the SQL statement.
Expand Down Expand Up @@ -146,11 +154,20 @@ impl Client {
/// See also [`Client`].
#[derive(Debug)]
pub struct ConnClient {
build_info: &'static BuildInfo,
conn_id: ConnectionId,
inner: Client,
}

impl ConnClient {
/// Creates a new session associated with this connection for the given
/// user.
///
/// It is the caller's responsibility to have authenticated the user.
pub fn new_session(&self, user: User) -> Session {
Session::new(self.build_info, self.conn_id, user)
}

/// Returns the ID of the connection associated with this client.
pub fn conn_id(&self) -> ConnectionId {
self.conn_id
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/src/config/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use mz_repr::Row;
use mz_sql::ast::{Ident, Raw, ShowStatement, ShowVariableStatement, Statement};

use crate::catalog::SYSTEM_USER;
use crate::session::{EndTransactionAction, Session};
use crate::session::EndTransactionAction;
use crate::{AdapterError, Client, ExecuteResponse, PeekResponseUnary, SessionClient};

use super::SynchronizedParameters;
Expand All @@ -27,7 +27,7 @@ pub struct SystemParameterBackend {
impl SystemParameterBackend {
pub async fn new(client: Client) -> Result<Self, AdapterError> {
let conn_client = client.new_conn()?;
let session = Session::new(conn_client.conn_id(), SYSTEM_USER.clone());
let session = conn_client.new_session(SYSTEM_USER.clone());
let (session_client, _) = conn_client.startup(session, true).await?;
Ok(Self { session_client })
}
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,7 @@ pub async fn serve<S: Append + 'static>(
start_instant,
_thread: thread.join_on_drop(),
};
let client = Client::new(cmd_tx.clone(), metrics_clone);
let client = Client::new(build_info, cmd_tx.clone(), metrics_clone);
Ok((handle, client))
}
Err(e) => Err(e),
Expand Down
23 changes: 16 additions & 7 deletions src/adapter/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::OwnedMutexGuard;
use uuid::Uuid;

use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
use mz_pgrepr::Format;
use mz_repr::{Datum, Diff, GlobalId, Row, ScalarType, TimestampManipulation};
use mz_sql::ast::{Raw, Statement, TransactionAccessMode};
Expand Down Expand Up @@ -95,25 +96,33 @@ pub struct Session<T = mz_repr::Timestamp> {

impl<T: TimestampManipulation> Session<T> {
/// Creates a new session for the specified connection ID.
pub fn new(conn_id: ConnectionId, user: User) -> Session<T> {
pub(crate) fn new(
build_info: &'static BuildInfo,
conn_id: ConnectionId,
user: User,
) -> Session<T> {
assert_ne!(conn_id, DUMMY_CONNECTION_ID);
Self::new_internal(conn_id, user)
Self::new_internal(build_info, conn_id, user)
}

/// Creates a new dummy session.
///
/// Dummy sessions are intended for use when executing queries on behalf of
/// the system itself, rather than on behalf of a user.
pub fn dummy() -> Session<T> {
Self::new_internal(DUMMY_CONNECTION_ID, SYSTEM_USER.clone())
Self::new_internal(&DUMMY_BUILD_INFO, DUMMY_CONNECTION_ID, SYSTEM_USER.clone())
}

fn new_internal(conn_id: ConnectionId, user: User) -> Session<T> {
fn new_internal(
build_info: &'static BuildInfo,
conn_id: ConnectionId,
user: User,
) -> Session<T> {
let (notices_tx, notices_rx) = mpsc::unbounded_channel();
let vars = if INTERNAL_USER_NAMES.contains(&user.name) {
SessionVars::for_cluster(&user.name)
SessionVars::for_cluster(build_info, &user.name)
} else {
SessionVars::default()
SessionVars::new(build_info)
};
Session {
conn_id,
Expand Down Expand Up @@ -593,7 +602,7 @@ impl<T: TimestampManipulation> Session<T> {
pub fn reset(&mut self) {
let _ = self.clear_transaction();
self.prepared_statements.clear();
self.vars = SessionVars::default();
self.vars = SessionVars::new(self.vars.build_info());
}

/// Returns the user who owns this session.
Expand Down
70 changes: 57 additions & 13 deletions src/adapter/src/session/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use once_cell::sync::Lazy;
use serde::Serialize;
use uncased::UncasedStr;

use mz_build_info::BuildInfo;
use mz_ore::cast;
use mz_sql::ast::{Ident, SetVariableValue, Value as AstValue};
use mz_sql::DEFAULT_SCHEMA;
Expand Down Expand Up @@ -125,6 +126,8 @@ const INTERVAL_STYLE: ServerVar<str> = ServerVar {
internal: false,
};

const MZ_VERSION_NAME: &UncasedStr = UncasedStr::new("mz_version");

const QGM_OPTIMIZATIONS: ServerVar<bool> = ServerVar {
name: UncasedStr::new("qgm_optimizations_experimental"),
value: &false,
Expand Down Expand Up @@ -399,6 +402,7 @@ static EMIT_TRACE_ID_NOTICE: ServerVar<bool> = ServerVar {
#[derive(Debug)]
pub struct SessionVars {
application_name: SessionVar<str>,
build_info: &'static BuildInfo,
client_encoding: ServerVar<str>,
client_min_messages: SessionVar<ClientSeverity>,
cluster: SessionVar<str>,
Expand All @@ -424,10 +428,12 @@ pub struct SessionVars {
emit_trace_id_notice: SessionVar<bool>,
}

impl Default for SessionVars {
fn default() -> SessionVars {
impl SessionVars {
/// Creates a new [`SessionVars`].
pub fn new(build_info: &'static BuildInfo) -> SessionVars {
SessionVars {
application_name: SessionVar::new(&APPLICATION_NAME),
build_info,
client_encoding: CLIENT_ENCODING,
client_min_messages: SessionVar::new(&CLIENT_MIN_MESSAGES),
cluster: SessionVar::new(&CLUSTER),
Expand Down Expand Up @@ -455,24 +461,20 @@ impl Default for SessionVars {
emit_trace_id_notice: SessionVar::new(&EMIT_TRACE_ID_NOTICE),
}
}
}

impl SessionVars {
/// Returns a new SessionVars with the cluster variable set to `cluster`.
pub fn for_cluster(cluster_name: &str) -> Self {
let mut cluster = SessionVar::new(&CLUSTER);
cluster.session_value = Some(cluster_name.into());
Self {
cluster,
..Default::default()
}
pub fn for_cluster(build_info: &'static BuildInfo, cluster_name: &str) -> Self {
let mut vars = SessionVars::new(build_info);
vars.cluster.session_value = Some(cluster_name.into());
vars
}

/// Returns an iterator over the configuration parameters and their current
/// values for this session.
pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
let vars: [&dyn Var; 24] = [
let vars: [&dyn Var; 25] = [
&self.application_name,
self.build_info,
&self.client_encoding,
&self.client_min_messages,
&self.cluster,
Expand Down Expand Up @@ -504,7 +506,7 @@ impl SessionVars {
/// values for this session) that are expected to be sent to the client when
/// a new connection is established or when their value changes.
pub fn notify_set(&self) -> impl Iterator<Item = &dyn Var> {
let vars: [&dyn Var; 8] = [
let vars: [&dyn Var; 9] = [
&self.application_name,
&self.client_encoding,
&self.date_style,
Expand All @@ -513,6 +515,13 @@ impl SessionVars {
&self.standard_conforming_strings,
&self.timezone,
&self.interval_style,
// Including `mz_version` in the notify set is a Materialize
// extension. Doing so allows applications to detect whether they
// are talking to Materialize or PostgreSQL without an additional
// network roundtrip. This is known to be safe because CockroachDB
// has an analogous extension [0].
// [0]: https://github.com/cockroachdb/cockroach/blob/369c4057a/pkg/sql/pgwire/conn.go#L1840
self.build_info,
];
vars.into_iter()
}
Expand Down Expand Up @@ -550,6 +559,8 @@ impl SessionVars {
Ok(&self.integer_datetimes)
} else if name == INTERVAL_STYLE.name {
Ok(&self.interval_style)
} else if name == MZ_VERSION_NAME {
Ok(self.build_info)
} else if name == QGM_OPTIMIZATIONS.name {
Ok(&self.qgm_optimizations)
} else if name == SEARCH_PATH.name {
Expand Down Expand Up @@ -789,6 +800,7 @@ impl SessionVars {
// call to `end_transaction` below.
let SessionVars {
application_name,
build_info: _,
client_encoding: _,
client_min_messages,
cluster,
Expand Down Expand Up @@ -836,6 +848,11 @@ impl SessionVars {
self.application_name.value()
}

/// Returns the build info.
pub fn build_info(&self) -> &'static BuildInfo {
self.build_info
}

/// Returns the value of the `client_encoding` configuration parameter.
pub fn client_encoding(&self) -> &'static str {
self.client_encoding.value
Expand Down Expand Up @@ -881,6 +898,11 @@ impl SessionVars {
self.interval_style.value
}

/// Returns the value of the `mz_version` configuration parameter.
pub fn mz_version(&self) -> String {
self.build_info.value()
}

/// Returns the value of the `qgm_optimizations` configuration parameter.
pub fn qgm_optimizations(&self) -> bool {
*self.qgm_optimizations.value()
Expand Down Expand Up @@ -1589,6 +1611,28 @@ where
}
}

impl Var for BuildInfo {
fn name(&self) -> &'static str {
"mz_version"
}

fn value(&self) -> String {
self.human_version()
}

fn description(&self) -> &'static str {
"Shows the Materialize server version (Materialize)."
}

fn type_name(&self) -> &'static str {
str::TYPE_NAME
}

fn visible(&self, _: &User) -> bool {
true
}
}

/// A value that can be stored in a session or server variable.
pub trait Value: ToOwned + Send + Sync {
/// The name of the value type.
Expand Down
4 changes: 2 additions & 2 deletions src/environmentd/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use tower_http::cors::{AllowOrigin, Any, CorsLayer};
use tracing::{error, warn};

use mz_adapter::catalog::{HTTP_DEFAULT_USER, SYSTEM_USER};
use mz_adapter::session::{ExternalUserMetadata, Session, User};
use mz_adapter::session::{ExternalUserMetadata, User};
use mz_adapter::{AdapterError, Client, SessionClient};
use mz_frontegg_auth::{FronteggAuthentication, FronteggError};
use mz_ore::metrics::MetricsRegistry;
Expand Down Expand Up @@ -282,7 +282,7 @@ impl AuthedClient {
create_if_not_exists,
} = user;
let adapter_client = adapter_client.new_conn()?;
let session = Session::new(adapter_client.conn_id(), user);
let session = adapter_client.new_session(user);
let (adapter_client, _) = adapter_client
.startup(session, create_if_not_exists)
.await?;
Expand Down
5 changes: 0 additions & 5 deletions src/pgwire/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ where
}
}

/// Returns the ID of this connection.
pub fn id(&self) -> u32 {
self.conn_id
}

/// Reads and decodes one frontend message from the client.
///
/// Blocks until the client sends a complete message. If the client
Expand Down
13 changes: 5 additions & 8 deletions src/pgwire/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use mz_adapter::catalog::INTERNAL_USER_NAMES;
use mz_adapter::session::User;
use mz_adapter::session::{
EndTransactionAction, ExternalUserMetadata, InProgressRows, Portal, PortalState,
RowBatchStream, Session, TransactionStatus,
RowBatchStream, TransactionStatus,
};
use mz_adapter::{ExecuteResponse, PeekResponseUnary, RowsFuture};
use mz_frontegg_auth::FronteggAuthentication;
Expand Down Expand Up @@ -224,13 +224,10 @@ where
};

// Construct session.
let mut session = Session::new(
conn.id(),
User {
name: user,
external_metadata,
},
);
let mut session = adapter_client.new_session(User {
name: user,
external_metadata,
});
for (name, value) in params {
let local = false;
let _ = session.vars_mut().set(&name, &value, local);
Expand Down
Loading

0 comments on commit 5fcb756

Please sign in to comment.