diff --git a/Cargo.lock b/Cargo.lock index 6391920f4748..d483ec7088b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3156,6 +3156,7 @@ dependencies = [ "arrow", "arrow-array", "arrow-schema", + "base64 0.21.7", "common-base", "common-decimal", "common-error", @@ -3164,6 +3165,7 @@ dependencies = [ "common-time", "datafusion-common", "enum_dispatch", + "greptime-proto", "num", "num-traits", "ordered-float 3.9.2", diff --git a/src/datatypes/Cargo.toml b/src/datatypes/Cargo.toml index 281057ce80c9..b10ea682dd47 100644 --- a/src/datatypes/Cargo.toml +++ b/src/datatypes/Cargo.toml @@ -15,6 +15,7 @@ workspace = true arrow.workspace = true arrow-array.workspace = true arrow-schema.workspace = true +base64.workspace = true common-base.workspace = true common-decimal.workspace = true common-error.workspace = true @@ -23,6 +24,7 @@ common-telemetry.workspace = true common-time.workspace = true datafusion-common.workspace = true enum_dispatch = "0.3" +greptime-proto.workspace = true num = "0.4" num-traits = "0.2" ordered-float = { version = "3.0", features = ["serde"] } diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 15aa028f4fc7..6c49154e4058 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -18,6 +18,8 @@ use std::sync::Arc; use arrow::datatypes::{DataType as ArrowDataType, Field}; use arrow_array::{Array, ListArray}; +use base64::engine::general_purpose::URL_SAFE; +use base64::Engine as _; use common_base::bytes::{Bytes, StringBytes}; use common_decimal::Decimal128; use common_telemetry::error; @@ -28,8 +30,10 @@ use common_time::time::Time; use common_time::timestamp::{TimeUnit, Timestamp}; use common_time::{Duration, Interval, Timezone}; use datafusion_common::ScalarValue; +use greptime_proto::v1::value::ValueData; pub use ordered_float::OrderedFloat; use serde::{Deserialize, Serialize, Serializer}; +use serde_json::{Number, Value as JsonValue}; use snafu::{ensure, ResultExt}; use crate::error::{self, ConvertArrowArrayToScalarsSnafu, Error, Result, TryFromValueSnafu}; @@ -1364,15 +1368,179 @@ impl<'a> ValueRef<'a> { } } +pub fn column_data_to_json(data: ValueData) -> JsonValue { + match data { + ValueData::BinaryValue(b) => JsonValue::String(URL_SAFE.encode(b)), + ValueData::BoolValue(b) => JsonValue::Bool(b), + ValueData::U8Value(i) => JsonValue::Number(i.into()), + ValueData::U16Value(i) => JsonValue::Number(i.into()), + ValueData::U32Value(i) => JsonValue::Number(i.into()), + ValueData::U64Value(i) => JsonValue::Number(i.into()), + ValueData::I8Value(i) => JsonValue::Number(i.into()), + ValueData::I16Value(i) => JsonValue::Number(i.into()), + ValueData::I32Value(i) => JsonValue::Number(i.into()), + ValueData::I64Value(i) => JsonValue::Number(i.into()), + ValueData::F32Value(f) => Number::from_f64(f as f64) + .map(JsonValue::Number) + .unwrap_or(JsonValue::Null), + ValueData::F64Value(f) => Number::from_f64(f) + .map(JsonValue::Number) + .unwrap_or(JsonValue::Null), + ValueData::StringValue(s) => JsonValue::String(s), + ValueData::DateValue(d) => JsonValue::String(Date::from(d).to_string()), + ValueData::DatetimeValue(d) => JsonValue::String(DateTime::from(d).to_string()), + ValueData::TimeSecondValue(d) => JsonValue::String(Time::new_second(d).to_iso8601_string()), + ValueData::TimeMillisecondValue(d) => { + JsonValue::String(Time::new_millisecond(d).to_iso8601_string()) + } + ValueData::TimeMicrosecondValue(d) => { + JsonValue::String(Time::new_microsecond(d).to_iso8601_string()) + } + ValueData::TimeNanosecondValue(d) => { + JsonValue::String(Time::new_nanosecond(d).to_iso8601_string()) + } + ValueData::TimestampMicrosecondValue(d) => { + JsonValue::String(Timestamp::new_microsecond(d).to_iso8601_string()) + } + ValueData::TimestampMillisecondValue(d) => { + JsonValue::String(Timestamp::new_millisecond(d).to_iso8601_string()) + } + ValueData::TimestampNanosecondValue(d) => { + JsonValue::String(Timestamp::new_nanosecond(d).to_iso8601_string()) + } + ValueData::TimestampSecondValue(d) => { + JsonValue::String(Timestamp::new_second(d).to_iso8601_string()) + } + ValueData::IntervalYearMonthValue(d) => JsonValue::String(format!("interval year [{}]", d)), + ValueData::IntervalMonthDayNanoValue(d) => JsonValue::String(format!( + "interval month [{}][{}][{}]", + d.months, d.days, d.nanoseconds + )), + ValueData::IntervalDayTimeValue(d) => JsonValue::String(format!("interval day [{}]", d)), + ValueData::Decimal128Value(d) => { + JsonValue::String(format!("decimal128 [{}][{}]", d.hi, d.lo)) + } + } +} + #[cfg(test)] mod tests { use arrow::datatypes::DataType as ArrowDataType; use common_time::timezone::set_default_timezone; + use greptime_proto::v1::{Decimal128 as ProtoDecimal128, IntervalMonthDayNano}; use num_traits::Float; use super::*; use crate::vectors::ListVectorBuilder; + #[test] + fn test_column_data_to_json() { + assert_eq!( + column_data_to_json(ValueData::BinaryValue(b"hello".to_vec())), + JsonValue::String("aGVsbG8=".to_string()) + ); + assert_eq!( + column_data_to_json(ValueData::BoolValue(true)), + JsonValue::Bool(true) + ); + assert_eq!( + column_data_to_json(ValueData::U8Value(1)), + JsonValue::Number(1.into()) + ); + assert_eq!( + column_data_to_json(ValueData::U16Value(2)), + JsonValue::Number(2.into()) + ); + assert_eq!( + column_data_to_json(ValueData::U32Value(3)), + JsonValue::Number(3.into()) + ); + assert_eq!( + column_data_to_json(ValueData::U64Value(4)), + JsonValue::Number(4.into()) + ); + assert_eq!( + column_data_to_json(ValueData::I8Value(5)), + JsonValue::Number(5.into()) + ); + assert_eq!( + column_data_to_json(ValueData::I16Value(6)), + JsonValue::Number(6.into()) + ); + assert_eq!( + column_data_to_json(ValueData::I32Value(7)), + JsonValue::Number(7.into()) + ); + assert_eq!( + column_data_to_json(ValueData::I64Value(8)), + JsonValue::Number(8.into()) + ); + assert_eq!( + column_data_to_json(ValueData::F32Value(9.0)), + JsonValue::Number(Number::from_f64(9.0_f64).unwrap()) + ); + assert_eq!( + column_data_to_json(ValueData::F64Value(10.0)), + JsonValue::Number(Number::from_f64(10.0_f64).unwrap()) + ); + assert_eq!( + column_data_to_json(ValueData::StringValue("hello".to_string())), + JsonValue::String("hello".to_string()) + ); + assert_eq!( + column_data_to_json(ValueData::DateValue(123)), + JsonValue::String("1970-05-04".to_string()) + ); + assert_eq!( + column_data_to_json(ValueData::DatetimeValue(456)), + JsonValue::String("1970-01-01 00:00:00.456+0000".to_string()) + ); + assert_eq!( + column_data_to_json(ValueData::TimeSecondValue(789)), + JsonValue::String("00:13:09+0000".to_string()) + ); + assert_eq!( + column_data_to_json(ValueData::TimeMillisecondValue(789)), + JsonValue::String("00:00:00.789+0000".to_string()) + ); + assert_eq!( + column_data_to_json(ValueData::TimeMicrosecondValue(789)), + JsonValue::String("00:00:00.000789+0000".to_string()) + ); + assert_eq!( + column_data_to_json(ValueData::TimestampMillisecondValue(1234567890)), + JsonValue::String("1970-01-15 06:56:07.890+0000".to_string()) + ); + assert_eq!( + column_data_to_json(ValueData::TimestampNanosecondValue(1234567890123456789)), + JsonValue::String("2009-02-13 23:31:30.123456789+0000".to_string()) + ); + assert_eq!( + column_data_to_json(ValueData::TimestampSecondValue(1234567890)), + JsonValue::String("2009-02-13 23:31:30+0000".to_string()) + ); + assert_eq!( + column_data_to_json(ValueData::IntervalYearMonthValue(12)), + JsonValue::String("interval year [12]".to_string()) + ); + assert_eq!( + column_data_to_json(ValueData::IntervalMonthDayNanoValue(IntervalMonthDayNano { + months: 1, + days: 2, + nanoseconds: 3, + })), + JsonValue::String("interval month [1][2][3]".to_string()) + ); + assert_eq!( + column_data_to_json(ValueData::IntervalDayTimeValue(4)), + JsonValue::String("interval day [4]".to_string()) + ); + assert_eq!( + column_data_to_json(ValueData::Decimal128Value(ProtoDecimal128 { hi: 5, lo: 6 })), + JsonValue::String("decimal128 [5][6]".to_string()) + ); + } + #[test] fn test_try_from_scalar_value() { assert_eq!( diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index ad4ff5222561..5ac52157ead1 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -753,6 +753,7 @@ impl HttpServer { "/pipelines/:pipeline_name", routing::delete(event::delete_pipeline), ) + .route("/pipelines/dryrun", routing::post(event::pipeline_dryrun)) .layer( ServiceBuilder::new() .layer(HandleErrorLayer::new(handle_error)) diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index fb436142fc86..dbd7f1232a1b 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -23,15 +23,16 @@ use axum::headers::ContentType; use axum::http::header::CONTENT_TYPE; use axum::http::{Request, StatusCode}; use axum::response::{IntoResponse, Response}; -use axum::{async_trait, BoxError, Extension, TypedHeader}; +use axum::{async_trait, BoxError, Extension, Json, TypedHeader}; use common_query::{Output, OutputData}; use common_telemetry::{error, warn}; +use datatypes::value::column_data_to_json; use pipeline::error::PipelineTransformSnafu; use pipeline::util::to_pipeline_version; use pipeline::PipelineVersion; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use serde_json::{Deserializer, Value}; +use serde_json::{Deserializer, Map, Value}; use session::context::{Channel, QueryContext, QueryContextRef}; use snafu::{ensure, OptionExt, ResultExt}; @@ -230,6 +231,117 @@ fn transform_ndjson_array_factory( }) } +#[axum_macros::debug_handler] +pub async fn pipeline_dryrun( + State(log_state): State, + Query(query_params): Query, + Extension(mut query_ctx): Extension, + TypedHeader(content_type): TypedHeader, + payload: String, +) -> Result { + let handler = log_state.log_handler; + let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu { + reason: "pipeline_name is required", + })?; + + let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?; + + let ignore_errors = query_params.ignore_errors.unwrap_or(false); + + let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?; + + if value.len() > 10 { + return Err(InvalidParameterSnafu { + reason: "too many rows for dryrun", + } + .build()); + } + + query_ctx.set_channel(Channel::Http); + let query_ctx = Arc::new(query_ctx); + + let pipeline = handler + .get_pipeline(&pipeline_name, version, query_ctx.clone()) + .await?; + + let mut intermediate_state = pipeline.init_intermediate_state(); + + let mut results = Vec::with_capacity(value.len()); + for v in value { + pipeline + .prepare(v, &mut intermediate_state) + .map_err(|reason| PipelineTransformSnafu { reason }.build()) + .context(PipelineSnafu)?; + let r = pipeline + .exec_mut(&mut intermediate_state) + .map_err(|reason| PipelineTransformSnafu { reason }.build()) + .context(PipelineSnafu)?; + results.push(r); + pipeline.reset_intermediate_state(&mut intermediate_state); + } + + let colume_type_key = "colume_type"; + let data_type_key = "data_type"; + let name_key = "name"; + + let schema = pipeline + .schemas() + .iter() + .map(|cs| { + let mut map = Map::new(); + map.insert(name_key.to_string(), Value::String(cs.column_name.clone())); + map.insert( + data_type_key.to_string(), + Value::String(cs.datatype().as_str_name().to_string()), + ); + map.insert( + colume_type_key.to_string(), + Value::String(cs.semantic_type().as_str_name().to_string()), + ); + map.insert( + "fulltext".to_string(), + Value::Bool( + cs.options + .clone() + .is_some_and(|x| x.options.contains_key("fulltext")), + ), + ); + Value::Object(map) + }) + .collect::>(); + let rows = results + .into_iter() + .map(|row| { + let row = row + .values + .into_iter() + .enumerate() + .map(|(idx, v)| { + v.value_data + .map(|d| { + let mut map = Map::new(); + map.insert("value".to_string(), column_data_to_json(d)); + map.insert("key".to_string(), schema[idx][name_key].clone()); + map.insert( + "semantic_type".to_string(), + schema[idx][colume_type_key].clone(), + ); + map.insert("data_type".to_string(), schema[idx][data_type_key].clone()); + Value::Object(map) + }) + .unwrap_or(Value::Null) + }) + .collect(); + Value::Array(row) + }) + .collect::>(); + let mut result = Map::new(); + result.insert("schema".to_string(), Value::Array(schema)); + result.insert("rows".to_string(), Value::Array(rows)); + let result = Value::Object(result); + Ok(Json(result).into_response()) +} + #[axum_macros::debug_handler] pub async fn log_ingester( State(log_state): State, diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 497ea4969c4b..56307e0427e1 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -78,6 +78,7 @@ macro_rules! http_tests { test_vm_proto_remote_write, test_pipeline_api, + test_test_pipeline_api, test_plain_text_ingestion, ); )* @@ -1146,6 +1147,171 @@ transform: guard.remove_all().await; } +pub async fn test_test_pipeline_api(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_pipeline_api").await; + + // handshake + let client = TestClient::new(app); + + let body = r#" +processors: + - date: + field: time + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + ignore_missing: true + +transform: + - fields: + - id1 + - id2 + type: int32 + - fields: + - type + - log + - logger + type: string + - field: time + type: time + index: timestamp +"#; + + // 1. create pipeline + let res = client + .post("/v1/events/pipelines/test") + .header("Content-Type", "application/x-yaml") + .body(body) + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK); + + let content = res.text().await; + + let content = serde_json::from_str(&content); + assert!(content.is_ok()); + // {"execution_time_ms":13,"pipelines":[{"name":"test","version":"2024-07-04 08:31:00.987136"}]} + let content: Value = content.unwrap(); + + let execution_time = content.get("execution_time_ms"); + assert!(execution_time.unwrap().is_number()); + let pipelines = content.get("pipelines"); + let pipelines = pipelines.unwrap().as_array().unwrap(); + assert_eq!(pipelines.len(), 1); + let pipeline = pipelines.first().unwrap(); + assert_eq!(pipeline.get("name").unwrap(), "test"); + + // 2. write data + let data_body = r#" + [ + { + "id1": "2436", + "id2": "2528", + "logger": "INTERACT.MANAGER", + "type": "I", + "time": "2024-05-25 20:16:37.217", + "log": "ClusterAdapter:enter sendTextDataToCluster\\n" + } + ] + "#; + let res = client + .post("/v1/events/pipelines/dryrun?pipeline_name=test") + .header("Content-Type", "application/json") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body: serde_json::Value = res.json().await; + let schema = &body["schema"]; + let rows = &body["rows"]; + assert_eq!( + schema, + &json!([ + { + "colume_type": "FIELD", + "data_type": "INT32", + "fulltext": false, + "name": "id1" + }, + { + "colume_type": "FIELD", + "data_type": "INT32", + "fulltext": false, + "name": "id2" + }, + { + "colume_type": "FIELD", + "data_type": "STRING", + "fulltext": false, + "name": "type" + }, + { + "colume_type": "FIELD", + "data_type": "STRING", + "fulltext": false, + "name": "log" + }, + { + "colume_type": "FIELD", + "data_type": "STRING", + "fulltext": false, + "name": "logger" + }, + { + "colume_type": "TIMESTAMP", + "data_type": "TIMESTAMP_NANOSECOND", + "fulltext": false, + "name": "time" + } + ]) + ); + assert_eq!( + rows, + &json!([ + [ + { + "data_type": "INT32", + "key": "id1", + "semantic_type": "FIELD", + "value": 2436 + }, + { + "data_type": "INT32", + "key": "id2", + "semantic_type": "FIELD", + "value": 2528 + }, + { + "data_type": "STRING", + "key": "type", + "semantic_type": "FIELD", + "value": "I" + }, + { + "data_type": "STRING", + "key": "log", + "semantic_type": "FIELD", + "value": "ClusterAdapter:enter sendTextDataToCluster\\n" + }, + { + "data_type": "STRING", + "key": "logger", + "semantic_type": "FIELD", + "value": "INTERACT.MANAGER" + }, + { + "data_type": "TIMESTAMP_NANOSECOND", + "key": "time", + "semantic_type": "TIMESTAMP", + "value": "2024-05-25 20:16:37.217+0000" + } + ] + ]) + ); + guard.remove_all().await; +} + pub async fn test_plain_text_ingestion(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_pipeline_api").await;