Skip to content

Commit

Permalink
feat: upgrade pgwire apis
Browse files Browse the repository at this point in the history
  • Loading branch information
sunng87 committed Sep 19, 2024
1 parent 8f0a6c2 commit 2649aa3
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 17 deletions.
35 changes: 31 additions & 4 deletions src/servers/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ use std::sync::Arc;
use ::auth::UserProviderRef;
use derive_builder::Builder;
use pgwire::api::auth::ServerParameterProvider;
use pgwire::api::ClientInfo;
use pgwire::api::copy::NoopCopyHandler;
use pgwire::api::{ClientInfo, PgWireHandlerFactory};
pub use server::PostgresServer;
use session::context::Channel;
use session::Session;
Expand Down Expand Up @@ -68,7 +69,7 @@ impl ServerParameterProvider for GreptimeDBStartupParameters {
}
}

pub struct PostgresServerHandler {
pub struct PostgresServerHandlerInner {
query_handler: ServerSqlQueryHandlerRef,
login_verifier: PgLoginVerifier,
force_tls: bool,
Expand All @@ -87,17 +88,43 @@ pub(crate) struct MakePostgresServerHandler {
force_tls: bool,
}

pub(crate) struct PostgresServerHandler(Arc<PostgresServerHandlerInner>);

impl PgWireHandlerFactory for PostgresServerHandler {
type StartupHandler = PostgresServerHandlerInner;
type SimpleQueryHandler = PostgresServerHandlerInner;
type ExtendedQueryHandler = PostgresServerHandlerInner;
type CopyHandler = NoopCopyHandler;

fn simple_query_handler(&self) -> Arc<Self::SimpleQueryHandler> {
self.0.clone()
}

fn extended_query_handler(&self) -> Arc<Self::ExtendedQueryHandler> {
self.0.clone()
}

fn startup_handler(&self) -> Arc<Self::StartupHandler> {
self.0.clone()
}

fn copy_handler(&self) -> Arc<Self::CopyHandler> {
Arc::new(NoopCopyHandler)
}
}

impl MakePostgresServerHandler {
fn make(&self, addr: Option<SocketAddr>) -> PostgresServerHandler {
let session = Arc::new(Session::new(addr, Channel::Postgres, Default::default()));
PostgresServerHandler {
let handler = PostgresServerHandlerInner {
query_handler: self.query_handler.clone(),
login_verifier: PgLoginVerifier::new(self.user_provider.clone()),
force_tls: self.force_tls,
param_provider: self.param_provider.clone(),

session: session.clone(),
query_parser: Arc::new(DefaultQueryParser::new(self.query_handler.clone(), session)),
}
};
PostgresServerHandler(Arc::new(handler))
}
}
4 changes: 2 additions & 2 deletions src/servers/src/postgres/auth_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use pgwire::messages::{PgWireBackendMessage, PgWireFrontendMessage};
use session::Session;
use snafu::IntoError;

use super::PostgresServerHandler;
use super::PostgresServerHandlerInner;
use crate::error::{AuthSnafu, Result};
use crate::metrics::METRIC_AUTH_FAILURE;
use crate::postgres::types::PgErrorCode;
Expand Down Expand Up @@ -127,7 +127,7 @@ where
}

#[async_trait]
impl StartupHandler for PostgresServerHandler {
impl StartupHandler for PostgresServerHandlerInner {
async fn on_startup<C>(
&self,
client: &mut C,
Expand Down
6 changes: 3 additions & 3 deletions src/servers/src/postgres/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ use sql::dialect::PostgreSqlDialect;
use sql::parser::{ParseOptions, ParserContext};

use super::types::*;
use super::{fixtures, PostgresServerHandler};
use super::{fixtures, PostgresServerHandlerInner};
use crate::error::Result;
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
use crate::SqlPlan;

#[async_trait]
impl SimpleQueryHandler for PostgresServerHandler {
impl SimpleQueryHandler for PostgresServerHandlerInner {
#[tracing::instrument(skip_all, fields(protocol = "postgres"))]
async fn do_query<'a, C>(
&self,
Expand Down Expand Up @@ -237,7 +237,7 @@ impl QueryParser for DefaultQueryParser {
}

#[async_trait]
impl ExtendedQueryHandler for PostgresServerHandler {
impl ExtendedQueryHandler for PostgresServerHandlerInner {
type Statement = SqlPlan;
type QueryParser = DefaultQueryParser;

Expand Down
10 changes: 2 additions & 8 deletions src/servers/src/postgres/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,8 @@ impl PostgresServer {
let _handle = io_runtime.spawn(async move {
crate::metrics::METRIC_POSTGRES_CONNECTIONS.inc();
let pg_handler = Arc::new(handler_maker.make(addr));
let r = process_socket(
io_stream,
tls_acceptor.clone(),
pg_handler.clone(),
pg_handler.clone(),
pg_handler,
)
.await;
let r =
process_socket(io_stream, tls_acceptor.clone(), pg_handler).await;
crate::metrics::METRIC_POSTGRES_CONNECTIONS.dec();
r
});
Expand Down

0 comments on commit 2649aa3

Please sign in to comment.