From ad173b5f85e97f581e5e84e1b6e63b20bcbb1eb3 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Wed, 11 Sep 2024 15:19:46 +0800 Subject: [PATCH 1/2] chore: add auto-decompression for http request --- src/frontend/src/instance/otlp.rs | 2 +- src/servers/src/http.rs | 5 +++++ src/servers/src/http/otlp.rs | 32 ++++++++----------------------- 3 files changed, 14 insertions(+), 25 deletions(-) diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 8d707d12bd1f..09335af0804e 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -87,7 +87,7 @@ impl OpenTelemetryProtocolHandler for Instance { OTLP_TRACES_ROWS.inc_by(rows as u64); - self.handle_row_inserts(requests, ctx) + self.handle_log_inserts(requests, ctx) .await .map_err(BoxedError::new) .context(error::ExecuteGrpcQuerySnafu) diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 5ac52157ead1..2313d19bbeb5 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -866,6 +866,11 @@ impl HttpServer { Router::new() .route("/v1/metrics", routing::post(otlp::metrics)) .route("/v1/traces", routing::post(otlp::traces)) + .layer( + ServiceBuilder::new() + .layer(HandleErrorLayer::new(handle_error)) + .layer(RequestDecompressionLayer::new()), + ) .with_state(otlp_handler) } diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index a04d1d42a0a9..3efdaeec96d4 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -14,12 +14,12 @@ use std::sync::Arc; -use axum::extract::{RawBody, State}; +use axum::extract::State; use axum::http::header; use axum::response::IntoResponse; use axum::Extension; +use bytes::Bytes; use common_telemetry::tracing; -use hyper::Body; use opentelemetry_proto::tonic::collector::metrics::v1::{ ExportMetricsServiceRequest, ExportMetricsServiceResponse, }; @@ -39,7 +39,7 @@ use crate::query_handler::OpenTelemetryProtocolHandlerRef; pub async fn metrics( State(handler): State, Extension(mut query_ctx): Extension, - RawBody(body): RawBody, + bytes: Bytes, ) -> Result { let db = query_ctx.get_db_string(); query_ctx.set_channel(Channel::Otlp); @@ -47,7 +47,8 @@ pub async fn metrics( let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_METRICS_ELAPSED .with_label_values(&[db.as_str()]) .start_timer(); - let request = parse_metrics_body(body).await?; + let request = + ExportMetricsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?; handler .metrics(request, query_ctx) @@ -60,15 +61,6 @@ pub async fn metrics( }) } -async fn parse_metrics_body(body: Body) -> Result { - hyper::body::to_bytes(body) - .await - .context(error::HyperSnafu) - .and_then(|buf| { - ExportMetricsServiceRequest::decode(&buf[..]).context(error::DecodeOtlpRequestSnafu) - }) -} - pub struct OtlpMetricsResponse { resp_body: ExportMetricsServiceResponse, write_cost: usize, @@ -88,7 +80,7 @@ impl IntoResponse for OtlpMetricsResponse { pub async fn traces( State(handler): State, Extension(mut query_ctx): Extension, - RawBody(body): RawBody, + bytes: Bytes, ) -> Result { let db = query_ctx.get_db_string(); query_ctx.set_channel(Channel::Otlp); @@ -96,7 +88,8 @@ pub async fn traces( let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED .with_label_values(&[db.as_str()]) .start_timer(); - let request = parse_traces_body(body).await?; + let request = + ExportTraceServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?; handler .traces(request, query_ctx) .await @@ -108,15 +101,6 @@ pub async fn traces( }) } -async fn parse_traces_body(body: Body) -> Result { - hyper::body::to_bytes(body) - .await - .context(error::HyperSnafu) - .and_then(|buf| { - ExportTraceServiceRequest::decode(&buf[..]).context(error::DecodeOtlpRequestSnafu) - }) -} - pub struct OtlpTracesResponse { resp_body: ExportTraceServiceResponse, write_cost: usize, From beeb95ed24fe88fd48bfa9d62f6fca449b14ceab Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Wed, 18 Sep 2024 12:16:09 +0800 Subject: [PATCH 2/2] test: otlp --- Cargo.lock | 3 + Cargo.toml | 1 + tests-integration/Cargo.toml | 1 + tests-integration/src/test_util.rs | 1 + tests-integration/tests/http.rs | 152 ++++++++++++++++++++++++++--- 5 files changed, 144 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c5b8fc016a5b..0e9db0052712 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7228,9 +7228,11 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a8fddc9b68f5b80dae9d6f510b88e02396f006ad48cac349411fbecc80caae4" dependencies = [ + "hex", "opentelemetry 0.22.0", "opentelemetry_sdk 0.22.1", "prost 0.12.6", + "serde", "tonic 0.11.0", ] @@ -11775,6 +11777,7 @@ dependencies = [ "datanode", "datatypes", "dotenv", + "flate2", "flow", "frontend", "futures", diff --git a/Cargo.toml b/Cargo.toml index d412bf7e978e..672067140297 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -136,6 +136,7 @@ opentelemetry-proto = { version = "0.5", features = [ "gen-tonic", "metrics", "trace", + "with-serde", ] } parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] } paste = "1.0" diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index aa5f74540d14..997214bca00c 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -40,6 +40,7 @@ common-wal.workspace = true datanode = { workspace = true } datatypes.workspace = true dotenv.workspace = true +flate2 = "1.0" flow.workspace = true frontend = { workspace = true, features = ["testing"] } futures.workspace = true diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 07237c8bc1b3..cf125a577634 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -425,6 +425,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider( Some(instance.instance.clone()), ) .with_log_ingest_handler(instance.instance.clone(), None) + .with_otlp_handler(instance.instance.clone()) .with_greptime_config_options(instance.opts.to_toml().unwrap()); if let Some(user_provider) = user_provider { diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index fe28387cd6a1..d467e42dd4dd 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -13,11 +13,17 @@ // limitations under the License. use std::collections::BTreeMap; +use std::io::Write; use api::prom_store::remote::WriteRequest; use auth::user_provider_from_option; use axum::http::{HeaderName, StatusCode}; use common_error::status_code::StatusCode as ErrorCode; +use flate2::write::GzEncoder; +use flate2::Compression; +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use opentelemetry_proto::tonic::metrics::v1::ResourceMetrics; use prost::Message; use serde_json::{json, Value}; use servers::http::error_result::ErrorResponse; @@ -26,7 +32,7 @@ use servers::http::handler::HealthResponse; use servers::http::header::{GREPTIME_DB_HEADER_NAME, GREPTIME_TIMEZONE_HEADER_NAME}; use servers::http::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response}; use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse}; -use servers::http::test_helpers::TestClient; +use servers::http::test_helpers::{TestClient, TestResponse}; use servers::http::GreptimeQueryOutput; use servers::prom_store; use tests_integration::test_util::{ @@ -80,6 +86,9 @@ macro_rules! http_tests { test_pipeline_api, test_test_pipeline_api, test_plain_text_ingestion, + + test_otlp_metrics, + test_otlp_traces, ); )* }; @@ -1391,19 +1400,7 @@ transform: assert_eq!(res.status(), StatusCode::OK); let resp = res.text().await; - let resp: Value = serde_json::from_str(&resp).unwrap(); - let v = resp - .get("output") - .unwrap() - .as_array() - .unwrap() - .first() - .unwrap() - .get("records") - .unwrap() - .get("rows") - .unwrap() - .to_string(); + let v = get_rows_from_output(&resp); assert_eq!( v, @@ -1412,3 +1409,130 @@ transform: guard.remove_all().await; } + +pub async fn test_otlp_metrics(store_type: StorageType) { + // init + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_metrics").await; + + let content = r#" +{"resource":{"attributes":[],"droppedAttributesCount":0},"scopeMetrics":[{"scope":{"name":"","version":"","attributes":[],"droppedAttributesCount":0},"metrics":[{"name":"gen","description":"","unit":"","data":{"gauge":{"dataPoints":[{"attributes":[],"startTimeUnixNano":0,"timeUnixNano":1726053452870391000,"exemplars":[],"flags":0,"value":{"asInt":9471}}]}}}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.13.0"} + "#; + + let metrics: ResourceMetrics = serde_json::from_str(content).unwrap(); + let req = ExportMetricsServiceRequest { + resource_metrics: vec![metrics], + }; + let body = req.encode_to_vec(); + + // handshake + let client = TestClient::new(app); + + // write metrics data + let res = send_req(&client, "/v1/otlp/v1/metrics", body.clone(), false).await; + assert_eq!(StatusCode::OK, res.status()); + + // select metrics data + let expected = r#"[[1726053452870391000,9471.0]]"#; + validate_data(&client, "select * from gen;", expected).await; + + // drop table + let res = client.get("/v1/sql?sql=drop table gen;").send().await; + assert_eq!(res.status(), StatusCode::OK); + + // write metrics data with gzip + let res = send_req(&client, "/v1/otlp/v1/metrics", body.clone(), true).await; + assert_eq!(StatusCode::OK, res.status()); + + // select metrics data again + validate_data(&client, "select * from gen;", expected).await; + + guard.remove_all().await; +} + +pub async fn test_otlp_traces(store_type: StorageType) { + // init + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_traces").await; + + let content = r#" +{"resourceSpans":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"telemetrygen"}}],"droppedAttributesCount":0},"scopeSpans":[{"scope":{"name":"telemetrygen","version":"","attributes":[],"droppedAttributesCount":0},"spans":[{"traceId":"b5e5fb572cf0a3335dd194a14145fef5","spanId":"74c82efa6f628e80","traceState":"","parentSpanId":"3364d2da58c9fd2b","flags":0,"name":"okey-dokey-0","kind":2,"startTimeUnixNano":1726631197820927000,"endTimeUnixNano":1726631197821050000,"attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-client"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"b5e5fb572cf0a3335dd194a14145fef5","spanId":"3364d2da58c9fd2b","traceState":"","parentSpanId":"","flags":0,"name":"lets-go","kind":3,"startTimeUnixNano":1726631197820927000,"endTimeUnixNano":1726631197821050000,"attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-server"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"}]} + "#; + + let req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap(); + let body = req.encode_to_vec(); + + // handshake + let client = TestClient::new(app); + + // write traces data + let res = send_req(&client, "/v1/otlp/v1/traces", body.clone(), false).await; + assert_eq!(StatusCode::OK, res.status()); + + // select traces data + let expected = r#"[["b5e5fb572cf0a3335dd194a14145fef5","3364d2da58c9fd2b","","{\"service.name\":\"telemetrygen\"}","telemetrygen","","{}","","lets-go","SPAN_KIND_CLIENT","STATUS_CODE_UNSET","","{\"net.peer.ip\":\"1.2.3.4\",\"peer.service\":\"telemetrygen-server\"}","[]","[]",1726631197820927000,1726631197821050000,0.123,1726631197820927000],["b5e5fb572cf0a3335dd194a14145fef5","74c82efa6f628e80","3364d2da58c9fd2b","{\"service.name\":\"telemetrygen\"}","telemetrygen","","{}","","okey-dokey-0","SPAN_KIND_SERVER","STATUS_CODE_UNSET","","{\"net.peer.ip\":\"1.2.3.4\",\"peer.service\":\"telemetrygen-client\"}","[]","[]",1726631197820927000,1726631197821050000,0.123,1726631197820927000]]"#; + validate_data(&client, "select * from traces_preview_v01;", expected).await; + + // drop table + let res = client + .get("/v1/sql?sql=drop table traces_preview_v01;") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // write metrics data with gzip + let res = send_req(&client, "/v1/otlp/v1/traces", body.clone(), true).await; + assert_eq!(StatusCode::OK, res.status()); + + // select metrics data again + validate_data(&client, "select * from traces_preview_v01;", expected).await; + + guard.remove_all().await; +} + +async fn validate_data(client: &TestClient, sql: &str, expected: &str) { + let res = client + .get(format!("/v1/sql?sql={sql}").as_str()) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let resp = res.text().await; + let v = get_rows_from_output(&resp); + + assert_eq!(v, expected); +} + +async fn send_req(client: &TestClient, path: &str, body: Vec, with_gzip: bool) -> TestResponse { + let mut req = client + .post(path) + .header("content-type", "application/x-protobuf"); + + let mut len = body.len(); + + if with_gzip { + let encoded = compress_vec_with_gzip(body); + len = encoded.len(); + req = req.header("content-encoding", "gzip").body(encoded); + } else { + req = req.body(body); + } + + req.header("content-length", len).send().await +} + +fn get_rows_from_output(output: &str) -> String { + let resp: Value = serde_json::from_str(output).unwrap(); + resp.get("output") + .and_then(Value::as_array) + .and_then(|v| v.first()) + .and_then(|v| v.get("records")) + .and_then(|v| v.get("rows")) + .unwrap() + .to_string() +} + +fn compress_vec_with_gzip(data: Vec) -> Vec { + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(&data).unwrap(); + encoder.finish().unwrap() +}