Skip to content

Commit

Permalink
Add option to gzip compress request bodies (#239)
Browse files Browse the repository at this point in the history
  • Loading branch information
VimCommando authored Dec 17, 2024
1 parent e6e2f02 commit 5e95345
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 20 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions elasticsearch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ bytes = "1"
dyn-clone = "1"
lazy_static = "1"
percent-encoding = "2"
reqwest = { version = "0.12", default-features = false, features = ["gzip", "json"] }
reqwest = { version = "0.12", default-features = false, features = [
"gzip",
"json",
] }
url = "2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Expand All @@ -40,6 +43,7 @@ serde_with = "3"

#tokio = { version = "1", default-features = false, features = ["macros", "net", "time", "rt-multi-thread"] }
void = "1"
flate2 = "^1.0.34"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio]
version = "1.0"
Expand All @@ -48,14 +52,14 @@ features = ["macros", "net", "time", "rt-multi-thread"]

[dev-dependencies]
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4", features = ["env"]}
clap = { version = "4", features = ["env"] }
failure = "0.1"
futures = "0.3"
http = "1"
axum = "0.7"
#hyper = { version = "1", features = ["server", "http1"] }
os_type = "2"
regex="1"
regex = "1"
#sysinfo = "0.31"
textwrap = "0.16"
xml-rs = "0.8"
Expand Down
55 changes: 40 additions & 15 deletions elasticsearch/src/http/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
*/
//! HTTP transport and connection components
#[cfg(all(target_arch = "wasm32", any(feature = "native-tls", feature = "rustls-tls")))]
#[cfg(all(
target_arch = "wasm32",
any(feature = "native-tls", feature = "rustls-tls")
))]
compile_error!("TLS features are not compatible with the wasm target");

#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
Expand All @@ -30,8 +33,8 @@ use crate::{
error::Error,
http::{
headers::{
HeaderMap, HeaderName, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_TYPE,
DEFAULT_ACCEPT, DEFAULT_CONTENT_TYPE, DEFAULT_USER_AGENT, USER_AGENT,
HeaderMap, HeaderName, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_ENCODING,
CONTENT_TYPE, DEFAULT_ACCEPT, DEFAULT_CONTENT_TYPE, DEFAULT_USER_AGENT, USER_AGENT,
},
request::Body,
response::Response,
Expand All @@ -40,6 +43,7 @@ use crate::{
};
use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, write::EncoderWriter, Engine};
use bytes::BytesMut;
use flate2::{write::GzEncoder, Compression};
use lazy_static::lazy_static;
use serde::Serialize;
use serde_json::Value;
Expand Down Expand Up @@ -147,6 +151,7 @@ pub struct TransportBuilder {
credentials: Option<Credentials>,
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
cert_validation: Option<CertificateValidation>,
request_body_compression: bool,
#[cfg(not(target_arch = "wasm32"))]
proxy: Option<Url>,
#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -172,6 +177,7 @@ impl TransportBuilder {
credentials: None,
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
cert_validation: None,
request_body_compression: false,
#[cfg(not(target_arch = "wasm32"))]
proxy: None,
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -215,6 +221,12 @@ impl TransportBuilder {
self
}

/// Gzip compress the body of requests, adds the `Content-Encoding: gzip` header.
pub fn request_body_compression(mut self, enabled: bool) -> Self {
self.request_body_compression = enabled;
self
}

/// Validation applied to the certificate provided to establish a HTTPS connection.
/// By default, full validation is applied. When using a self-signed certificate,
/// different validation can be applied.
Expand Down Expand Up @@ -335,6 +347,7 @@ impl TransportBuilder {
Ok(Transport {
client,
conn_pool: self.conn_pool,
request_body_compression: self.request_body_compression,
credentials: self.credentials,
send_meta: self.meta_header,
})
Expand Down Expand Up @@ -381,6 +394,7 @@ impl Connection {
pub struct Transport {
client: reqwest::Client,
credentials: Option<Credentials>,
request_body_compression: bool,
conn_pool: Arc<dyn ConnectionPool>,
send_meta: bool,
}
Expand Down Expand Up @@ -481,8 +495,7 @@ impl Transport {
headers: HeaderMap,
query_string: Option<&Q>,
body: Option<B>,
#[allow(unused_variables)]
timeout: Option<Duration>,
#[allow(unused_variables)] timeout: Option<Duration>,
) -> Result<reqwest::RequestBuilder, Error>
where
B: Body,
Expand Down Expand Up @@ -552,7 +565,17 @@ impl Transport {
bytes_mut.split().freeze()
};

request_builder = request_builder.body(bytes);
match self.request_body_compression {
true => {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(&bytes)?;
request_builder = request_builder.body(encoder.finish()?);
request_builder = request_builder.header(CONTENT_ENCODING, "gzip");
}
false => {
request_builder = request_builder.body(bytes);
}
}
};

if let Some(q) = query_string {
Expand Down Expand Up @@ -589,15 +612,17 @@ impl Transport {
let connection = self.conn_pool.next();

// Build node info request
let node_request = self.request_builder(
&connection,
Method::Get,
"_nodes/http?filter_path=nodes.*.http",
HeaderMap::default(),
None::<&()>,
None::<()>,
None,
).unwrap();
let node_request = self
.request_builder(
&connection,
Method::Get,
"_nodes/http?filter_path=nodes.*.http",
HeaderMap::default(),
None::<&()>,
None::<()>,
None,
)
.unwrap();

let scheme = connection.url.scheme();
let resp = node_request.send().await.unwrap();
Expand Down

0 comments on commit 5e95345

Please sign in to comment.