Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support compression on gRPC server #3961

Merged
merged 14 commits into from
May 20, 2024
Merged
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ tokio = { version = "1.36", features = ["full"] }
tokio-stream = { version = "0.1" }
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"
tonic = { version = "0.11", features = ["tls"] }
tonic = { version = "0.11", features = ["tls", "gzip", "zstd"] }
uuid = { version = "1.7", features = ["serde", "v4", "fast-rng"] }
zstd = "0.13"

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ config-docs: ## Generate configuration documentation from toml files.
docker run --rm \
-v ${PWD}:/greptimedb \
-w /greptimedb/config \
toml2docs/toml2docs:latest \
toml2docs/toml2docs:v0.1.1 \
-p '##' \
-t ./config-docs-template.md \
-o ./config.md
Expand Down
29 changes: 20 additions & 9 deletions src/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use arrow_flight::flight_service_client::FlightServiceClient;
use common_grpc::channel_manager::ChannelManager;
use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt};
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;

use crate::load_balance::{LoadBalance, Loadbalancer};
Expand Down Expand Up @@ -151,24 +152,34 @@ impl Client {

pub fn make_flight_client(&self) -> Result<FlightClient> {
let (addr, channel) = self.find_channel()?;
Ok(FlightClient {
addr,
client: FlightServiceClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size()),
})

let client = FlightServiceClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size())
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd);

Ok(FlightClient { addr, client })
}

pub(crate) fn raw_region_client(&self) -> Result<PbRegionClient<Channel>> {
let (_, channel) = self.find_channel()?;
Ok(PbRegionClient::new(channel)
let client = PbRegionClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size()))
.max_encoding_message_size(self.max_grpc_send_message_size())
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd);
Ok(client)
}

pub fn make_prometheus_gateway_client(&self) -> Result<PrometheusGatewayClient<Channel>> {
let (_, channel) = self.find_channel()?;
Ok(PrometheusGatewayClient::new(channel))
let client = PrometheusGatewayClient::new(channel)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd);
Ok(client)
}

pub async fn health_check(&self) -> Result<()> {
Expand Down
9 changes: 8 additions & 1 deletion src/meta-srv/src/procedure/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod mock {
use common_runtime::{Builder as RuntimeBuilder, Runtime};
use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
use tokio::sync::mpsc;
use tonic::codec::CompressionEncoding;
use tonic::transport::Server;
use tower::service_fn;

Expand Down Expand Up @@ -57,7 +58,13 @@ pub mod mock {

tokio::spawn(async move {
Server::builder()
.add_service(RegionServer::new(handler))
.add_service(
RegionServer::new(handler)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd),
)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, Error>(server)]))
.await
});
Expand Down
33 changes: 24 additions & 9 deletions src/servers/src/grpc/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_runtime::Runtime;
use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer;
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
use tokio::sync::Mutex;
use tonic::codec::CompressionEncoding;
use tonic::transport::server::RoutesBuilder;
use tower::ServiceBuilder;

Expand All @@ -45,11 +46,15 @@ macro_rules! add_service {
let max_recv_message_size = $builder.config().max_recv_message_size;
let max_send_message_size = $builder.config().max_send_message_size;

$builder.routes_builder_mut().add_service(
$service
.max_decoding_message_size(max_recv_message_size)
.max_encoding_message_size(max_send_message_size),
)
let service_builder = $service
.max_decoding_message_size(max_recv_message_size)
.max_encoding_message_size(max_send_message_size)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd);

$builder.routes_builder_mut().add_service(service_builder);
};
}

Expand Down Expand Up @@ -123,16 +128,26 @@ impl GrpcServerBuilder {
otlp_handler: OpenTelemetryProtocolHandlerRef,
user_provider: Option<UserProviderRef>,
) -> Self {
let tracing_service = TraceServiceServer::new(OtlpService::new(otlp_handler.clone()))
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd);

let trace_server = ServiceBuilder::new()
.layer(AuthMiddlewareLayer::with(user_provider.clone()))
.service(TraceServiceServer::new(OtlpService::new(
otlp_handler.clone(),
)));
.service(tracing_service);
self.routes_builder.add_service(trace_server);

let metrics_service = MetricsServiceServer::new(OtlpService::new(otlp_handler))
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd);

let metrics_server = ServiceBuilder::new()
.layer(AuthMiddlewareLayer::with(user_provider))
.service(MetricsServiceServer::new(OtlpService::new(otlp_handler)));
.service(metrics_service);
self.routes_builder.add_service(metrics_server);

self
Expand Down
5 changes: 5 additions & 0 deletions src/servers/tests/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use table::TableRef;
use tests_integration::database::Database;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::codec::CompressionEncoding;

use crate::{create_testing_grpc_query_handler, LOCALHOST_WITH_0};

Expand Down Expand Up @@ -64,6 +65,10 @@ impl MockGrpcServer {
)
.into();
FlightServiceServer::new(service)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd)
}
}

Expand Down
17 changes: 15 additions & 2 deletions tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use servers::grpc::region_server::RegionServerRequestHandler;
use servers::heartbeat_options::HeartbeatOptions;
use servers::Mode;
use tempfile::TempDir;
use tonic::codec::CompressionEncoding;
use tonic::transport::Server;
use tower::service_fn;
use uuid::Uuid;
Expand Down Expand Up @@ -436,8 +437,20 @@ async fn create_datanode_client(datanode: &Datanode) -> (String, Client) {

let _handle = tokio::spawn(async move {
Server::builder()
.add_service(FlightServiceServer::new(flight_handler))
.add_service(RegionServer::new(region_server_handler))
.add_service(
FlightServiceServer::new(flight_handler)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd),
)
.add_service(
RegionServer::new(region_server_handler)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd),
)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
});
Expand Down
20 changes: 20 additions & 0 deletions tests-integration/tests/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ macro_rules! grpc_tests {
test_insert_and_select,
test_dbname,
test_grpc_message_size_ok,
test_grpc_zstd_compression,
test_grpc_message_size_limit_recv,
test_grpc_message_size_limit_send,
test_grpc_auth,
Expand Down Expand Up @@ -142,6 +143,25 @@ pub async fn test_grpc_message_size_ok(store_type: StorageType) {
guard.remove_all().await;
}

pub async fn test_grpc_zstd_compression(store_type: StorageType) {
// server and client both support gzip
let config = GrpcServerConfig {
max_recv_message_size: 1024,
max_send_message_size: 1024,
};
let (addr, mut guard, fe_grpc_server) =
setup_grpc_server_with(store_type, "auto_create_table", None, Some(config)).await;

let grpc_client = Client::with_urls(vec![addr]);
let db = Database::new_with_dbname(
format!("{}-{}", DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME),
grpc_client,
);
db.sql("show tables;").await.unwrap();
let _ = fe_grpc_server.shutdown().await;
guard.remove_all().await;
}

pub async fn test_grpc_message_size_limit_send(store_type: StorageType) {
let config = GrpcServerConfig {
max_recv_message_size: 1024,
Expand Down