Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add auto-decompression layer for otlp http request #4723

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ opentelemetry-proto = { version = "0.5", features = [
"gen-tonic",
"metrics",
"trace",
"with-serde",
] }
parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl OpenTelemetryProtocolHandler for Instance {

OTLP_TRACES_ROWS.inc_by(rows as u64);

self.handle_row_inserts(requests, ctx)
self.handle_log_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
Expand Down
5 changes: 5 additions & 0 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,11 @@ impl HttpServer {
Router::new()
.route("/v1/metrics", routing::post(otlp::metrics))
.route("/v1/traces", routing::post(otlp::traces))
.layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(handle_error))
.layer(RequestDecompressionLayer::new()),
)
.with_state(otlp_handler)
}

Expand Down
32 changes: 8 additions & 24 deletions src/servers/src/http/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

use std::sync::Arc;

use axum::extract::{RawBody, State};
use axum::extract::State;
use axum::http::header;
use axum::response::IntoResponse;
use axum::Extension;
use bytes::Bytes;
use common_telemetry::tracing;
use hyper::Body;
use opentelemetry_proto::tonic::collector::metrics::v1::{
ExportMetricsServiceRequest, ExportMetricsServiceResponse,
};
Expand All @@ -39,15 +39,16 @@ use crate::query_handler::OpenTelemetryProtocolHandlerRef;
pub async fn metrics(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
Extension(mut query_ctx): Extension<QueryContext>,
RawBody(body): RawBody,
bytes: Bytes,
) -> Result<OtlpMetricsResponse> {
let db = query_ctx.get_db_string();
query_ctx.set_channel(Channel::Otlp);
let query_ctx = Arc::new(query_ctx);
let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_METRICS_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();
let request = parse_metrics_body(body).await?;
let request =
ExportMetricsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;

handler
.metrics(request, query_ctx)
Expand All @@ -60,15 +61,6 @@ pub async fn metrics(
})
}

async fn parse_metrics_body(body: Body) -> Result<ExportMetricsServiceRequest> {
hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)
.and_then(|buf| {
ExportMetricsServiceRequest::decode(&buf[..]).context(error::DecodeOtlpRequestSnafu)
})
}

pub struct OtlpMetricsResponse {
resp_body: ExportMetricsServiceResponse,
write_cost: usize,
Expand All @@ -88,15 +80,16 @@ impl IntoResponse for OtlpMetricsResponse {
pub async fn traces(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
Extension(mut query_ctx): Extension<QueryContext>,
RawBody(body): RawBody,
bytes: Bytes,
) -> Result<OtlpTracesResponse> {
let db = query_ctx.get_db_string();
query_ctx.set_channel(Channel::Otlp);
let query_ctx = Arc::new(query_ctx);
let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();
let request = parse_traces_body(body).await?;
let request =
ExportTraceServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
handler
.traces(request, query_ctx)
.await
Expand All @@ -108,15 +101,6 @@ pub async fn traces(
})
}

async fn parse_traces_body(body: Body) -> Result<ExportTraceServiceRequest> {
hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)
.and_then(|buf| {
ExportTraceServiceRequest::decode(&buf[..]).context(error::DecodeOtlpRequestSnafu)
})
}

pub struct OtlpTracesResponse {
resp_body: ExportTraceServiceResponse,
write_cost: usize,
Expand Down
1 change: 1 addition & 0 deletions tests-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ common-wal.workspace = true
datanode = { workspace = true }
datatypes.workspace = true
dotenv.workspace = true
flate2 = "1.0"
flow.workspace = true
frontend = { workspace = true, features = ["testing"] }
futures.workspace = true
Expand Down
1 change: 1 addition & 0 deletions tests-integration/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
Some(instance.instance.clone()),
)
.with_log_ingest_handler(instance.instance.clone(), None)
.with_otlp_handler(instance.instance.clone())
.with_greptime_config_options(instance.opts.to_toml().unwrap());

if let Some(user_provider) = user_provider {
Expand Down
152 changes: 138 additions & 14 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@
// limitations under the License.

use std::collections::BTreeMap;
use std::io::Write;

use api::prom_store::remote::WriteRequest;
use auth::user_provider_from_option;
use axum::http::{HeaderName, StatusCode};
use common_error::status_code::StatusCode as ErrorCode;
use flate2::write::GzEncoder;
use flate2::Compression;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use opentelemetry_proto::tonic::metrics::v1::ResourceMetrics;
use prost::Message;
use serde_json::{json, Value};
use servers::http::error_result::ErrorResponse;
Expand All @@ -26,7 +32,7 @@ use servers::http::handler::HealthResponse;
use servers::http::header::{GREPTIME_DB_HEADER_NAME, GREPTIME_TIMEZONE_HEADER_NAME};
use servers::http::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response};
use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse};
use servers::http::test_helpers::TestClient;
use servers::http::test_helpers::{TestClient, TestResponse};
use servers::http::GreptimeQueryOutput;
use servers::prom_store;
use tests_integration::test_util::{
Expand Down Expand Up @@ -80,6 +86,9 @@ macro_rules! http_tests {
test_pipeline_api,
test_test_pipeline_api,
test_plain_text_ingestion,

test_otlp_metrics,
test_otlp_traces,
);
)*
};
Expand Down Expand Up @@ -1391,19 +1400,7 @@ transform:
assert_eq!(res.status(), StatusCode::OK);
let resp = res.text().await;

let resp: Value = serde_json::from_str(&resp).unwrap();
let v = resp
.get("output")
.unwrap()
.as_array()
.unwrap()
.first()
.unwrap()
.get("records")
.unwrap()
.get("rows")
.unwrap()
.to_string();
let v = get_rows_from_output(&resp);

assert_eq!(
v,
Expand All @@ -1412,3 +1409,130 @@ transform:

guard.remove_all().await;
}

pub async fn test_otlp_metrics(store_type: StorageType) {
// init
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_metrics").await;

let content = r#"
{"resource":{"attributes":[],"droppedAttributesCount":0},"scopeMetrics":[{"scope":{"name":"","version":"","attributes":[],"droppedAttributesCount":0},"metrics":[{"name":"gen","description":"","unit":"","data":{"gauge":{"dataPoints":[{"attributes":[],"startTimeUnixNano":0,"timeUnixNano":1726053452870391000,"exemplars":[],"flags":0,"value":{"asInt":9471}}]}}}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.13.0"}
"#;

let metrics: ResourceMetrics = serde_json::from_str(content).unwrap();
let req = ExportMetricsServiceRequest {
resource_metrics: vec![metrics],
};
let body = req.encode_to_vec();

// handshake
let client = TestClient::new(app);

// write metrics data
let res = send_req(&client, "/v1/otlp/v1/metrics", body.clone(), false).await;
assert_eq!(StatusCode::OK, res.status());

// select metrics data
let expected = r#"[[1726053452870391000,9471.0]]"#;
validate_data(&client, "select * from gen;", expected).await;

// drop table
let res = client.get("/v1/sql?sql=drop table gen;").send().await;
assert_eq!(res.status(), StatusCode::OK);

// write metrics data with gzip
let res = send_req(&client, "/v1/otlp/v1/metrics", body.clone(), true).await;
assert_eq!(StatusCode::OK, res.status());

// select metrics data again
validate_data(&client, "select * from gen;", expected).await;

guard.remove_all().await;
}

pub async fn test_otlp_traces(store_type: StorageType) {
// init
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_traces").await;

let content = r#"
{"resourceSpans":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"telemetrygen"}}],"droppedAttributesCount":0},"scopeSpans":[{"scope":{"name":"telemetrygen","version":"","attributes":[],"droppedAttributesCount":0},"spans":[{"traceId":"b5e5fb572cf0a3335dd194a14145fef5","spanId":"74c82efa6f628e80","traceState":"","parentSpanId":"3364d2da58c9fd2b","flags":0,"name":"okey-dokey-0","kind":2,"startTimeUnixNano":1726631197820927000,"endTimeUnixNano":1726631197821050000,"attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-client"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"b5e5fb572cf0a3335dd194a14145fef5","spanId":"3364d2da58c9fd2b","traceState":"","parentSpanId":"","flags":0,"name":"lets-go","kind":3,"startTimeUnixNano":1726631197820927000,"endTimeUnixNano":1726631197821050000,"attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-server"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"}]}
"#;

let req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap();
let body = req.encode_to_vec();

// handshake
let client = TestClient::new(app);

// write traces data
let res = send_req(&client, "/v1/otlp/v1/traces", body.clone(), false).await;
assert_eq!(StatusCode::OK, res.status());

// select traces data
let expected = r#"[["b5e5fb572cf0a3335dd194a14145fef5","3364d2da58c9fd2b","","{\"service.name\":\"telemetrygen\"}","telemetrygen","","{}","","lets-go","SPAN_KIND_CLIENT","STATUS_CODE_UNSET","","{\"net.peer.ip\":\"1.2.3.4\",\"peer.service\":\"telemetrygen-server\"}","[]","[]",1726631197820927000,1726631197821050000,0.123,1726631197820927000],["b5e5fb572cf0a3335dd194a14145fef5","74c82efa6f628e80","3364d2da58c9fd2b","{\"service.name\":\"telemetrygen\"}","telemetrygen","","{}","","okey-dokey-0","SPAN_KIND_SERVER","STATUS_CODE_UNSET","","{\"net.peer.ip\":\"1.2.3.4\",\"peer.service\":\"telemetrygen-client\"}","[]","[]",1726631197820927000,1726631197821050000,0.123,1726631197820927000]]"#;
validate_data(&client, "select * from traces_preview_v01;", expected).await;

// drop table
let res = client
.get("/v1/sql?sql=drop table traces_preview_v01;")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);

// write metrics data with gzip
let res = send_req(&client, "/v1/otlp/v1/traces", body.clone(), true).await;
assert_eq!(StatusCode::OK, res.status());

// select metrics data again
validate_data(&client, "select * from traces_preview_v01;", expected).await;

guard.remove_all().await;
}

async fn validate_data(client: &TestClient, sql: &str, expected: &str) {
let res = client
.get(format!("/v1/sql?sql={sql}").as_str())
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let resp = res.text().await;
let v = get_rows_from_output(&resp);

assert_eq!(v, expected);
}

async fn send_req(client: &TestClient, path: &str, body: Vec<u8>, with_gzip: bool) -> TestResponse {
let mut req = client
.post(path)
.header("content-type", "application/x-protobuf");

let mut len = body.len();

if with_gzip {
let encoded = compress_vec_with_gzip(body);
len = encoded.len();
req = req.header("content-encoding", "gzip").body(encoded);
} else {
req = req.body(body);
}

req.header("content-length", len).send().await
}

fn get_rows_from_output(output: &str) -> String {
let resp: Value = serde_json::from_str(output).unwrap();
resp.get("output")
.and_then(Value::as_array)
.and_then(|v| v.first())
.and_then(|v| v.get("records"))
.and_then(|v| v.get("rows"))
.unwrap()
.to_string()
}

fn compress_vec_with_gzip(data: Vec<u8>) -> Vec<u8> {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(&data).unwrap();
encoder.finish().unwrap()
}