Skip to content

Commit

Permalink
feat: add test pipeline api (#4667)
Browse files Browse the repository at this point in the history
* chore: add test pipeline api

* chore: add test for test pipeline api

* chore: fix taplo check

* chore: change pipeline dryrun api path

* chore: add more info for pipeline dryrun api
  • Loading branch information
paomian authored Sep 6, 2024
1 parent e884658 commit 5d9f8a3
Show file tree
Hide file tree
Showing 6 changed files with 453 additions and 2 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/datatypes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ workspace = true
arrow.workspace = true
arrow-array.workspace = true
arrow-schema.workspace = true
base64.workspace = true
common-base.workspace = true
common-decimal.workspace = true
common-error.workspace = true
Expand All @@ -23,6 +24,7 @@ common-telemetry.workspace = true
common-time.workspace = true
datafusion-common.workspace = true
enum_dispatch = "0.3"
greptime-proto.workspace = true
num = "0.4"
num-traits = "0.2"
ordered-float = { version = "3.0", features = ["serde"] }
Expand Down
168 changes: 168 additions & 0 deletions src/datatypes/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use std::sync::Arc;

use arrow::datatypes::{DataType as ArrowDataType, Field};
use arrow_array::{Array, ListArray};
use base64::engine::general_purpose::URL_SAFE;
use base64::Engine as _;
use common_base::bytes::{Bytes, StringBytes};
use common_decimal::Decimal128;
use common_telemetry::error;
Expand All @@ -28,8 +30,10 @@ use common_time::time::Time;
use common_time::timestamp::{TimeUnit, Timestamp};
use common_time::{Duration, Interval, Timezone};
use datafusion_common::ScalarValue;
use greptime_proto::v1::value::ValueData;
pub use ordered_float::OrderedFloat;
use serde::{Deserialize, Serialize, Serializer};
use serde_json::{Number, Value as JsonValue};
use snafu::{ensure, ResultExt};

use crate::error::{self, ConvertArrowArrayToScalarsSnafu, Error, Result, TryFromValueSnafu};
Expand Down Expand Up @@ -1364,15 +1368,179 @@ impl<'a> ValueRef<'a> {
}
}

pub fn column_data_to_json(data: ValueData) -> JsonValue {
match data {
ValueData::BinaryValue(b) => JsonValue::String(URL_SAFE.encode(b)),
ValueData::BoolValue(b) => JsonValue::Bool(b),
ValueData::U8Value(i) => JsonValue::Number(i.into()),
ValueData::U16Value(i) => JsonValue::Number(i.into()),
ValueData::U32Value(i) => JsonValue::Number(i.into()),
ValueData::U64Value(i) => JsonValue::Number(i.into()),
ValueData::I8Value(i) => JsonValue::Number(i.into()),
ValueData::I16Value(i) => JsonValue::Number(i.into()),
ValueData::I32Value(i) => JsonValue::Number(i.into()),
ValueData::I64Value(i) => JsonValue::Number(i.into()),
ValueData::F32Value(f) => Number::from_f64(f as f64)
.map(JsonValue::Number)
.unwrap_or(JsonValue::Null),
ValueData::F64Value(f) => Number::from_f64(f)
.map(JsonValue::Number)
.unwrap_or(JsonValue::Null),
ValueData::StringValue(s) => JsonValue::String(s),
ValueData::DateValue(d) => JsonValue::String(Date::from(d).to_string()),
ValueData::DatetimeValue(d) => JsonValue::String(DateTime::from(d).to_string()),
ValueData::TimeSecondValue(d) => JsonValue::String(Time::new_second(d).to_iso8601_string()),
ValueData::TimeMillisecondValue(d) => {
JsonValue::String(Time::new_millisecond(d).to_iso8601_string())
}
ValueData::TimeMicrosecondValue(d) => {
JsonValue::String(Time::new_microsecond(d).to_iso8601_string())
}
ValueData::TimeNanosecondValue(d) => {
JsonValue::String(Time::new_nanosecond(d).to_iso8601_string())
}
ValueData::TimestampMicrosecondValue(d) => {
JsonValue::String(Timestamp::new_microsecond(d).to_iso8601_string())
}
ValueData::TimestampMillisecondValue(d) => {
JsonValue::String(Timestamp::new_millisecond(d).to_iso8601_string())
}
ValueData::TimestampNanosecondValue(d) => {
JsonValue::String(Timestamp::new_nanosecond(d).to_iso8601_string())
}
ValueData::TimestampSecondValue(d) => {
JsonValue::String(Timestamp::new_second(d).to_iso8601_string())
}
ValueData::IntervalYearMonthValue(d) => JsonValue::String(format!("interval year [{}]", d)),
ValueData::IntervalMonthDayNanoValue(d) => JsonValue::String(format!(
"interval month [{}][{}][{}]",
d.months, d.days, d.nanoseconds
)),
ValueData::IntervalDayTimeValue(d) => JsonValue::String(format!("interval day [{}]", d)),
ValueData::Decimal128Value(d) => {
JsonValue::String(format!("decimal128 [{}][{}]", d.hi, d.lo))
}
}
}

#[cfg(test)]
mod tests {
use arrow::datatypes::DataType as ArrowDataType;
use common_time::timezone::set_default_timezone;
use greptime_proto::v1::{Decimal128 as ProtoDecimal128, IntervalMonthDayNano};
use num_traits::Float;

use super::*;
use crate::vectors::ListVectorBuilder;

#[test]
fn test_column_data_to_json() {
assert_eq!(
column_data_to_json(ValueData::BinaryValue(b"hello".to_vec())),
JsonValue::String("aGVsbG8=".to_string())
);
assert_eq!(
column_data_to_json(ValueData::BoolValue(true)),
JsonValue::Bool(true)
);
assert_eq!(
column_data_to_json(ValueData::U8Value(1)),
JsonValue::Number(1.into())
);
assert_eq!(
column_data_to_json(ValueData::U16Value(2)),
JsonValue::Number(2.into())
);
assert_eq!(
column_data_to_json(ValueData::U32Value(3)),
JsonValue::Number(3.into())
);
assert_eq!(
column_data_to_json(ValueData::U64Value(4)),
JsonValue::Number(4.into())
);
assert_eq!(
column_data_to_json(ValueData::I8Value(5)),
JsonValue::Number(5.into())
);
assert_eq!(
column_data_to_json(ValueData::I16Value(6)),
JsonValue::Number(6.into())
);
assert_eq!(
column_data_to_json(ValueData::I32Value(7)),
JsonValue::Number(7.into())
);
assert_eq!(
column_data_to_json(ValueData::I64Value(8)),
JsonValue::Number(8.into())
);
assert_eq!(
column_data_to_json(ValueData::F32Value(9.0)),
JsonValue::Number(Number::from_f64(9.0_f64).unwrap())
);
assert_eq!(
column_data_to_json(ValueData::F64Value(10.0)),
JsonValue::Number(Number::from_f64(10.0_f64).unwrap())
);
assert_eq!(
column_data_to_json(ValueData::StringValue("hello".to_string())),
JsonValue::String("hello".to_string())
);
assert_eq!(
column_data_to_json(ValueData::DateValue(123)),
JsonValue::String("1970-05-04".to_string())
);
assert_eq!(
column_data_to_json(ValueData::DatetimeValue(456)),
JsonValue::String("1970-01-01 00:00:00.456+0000".to_string())
);
assert_eq!(
column_data_to_json(ValueData::TimeSecondValue(789)),
JsonValue::String("00:13:09+0000".to_string())
);
assert_eq!(
column_data_to_json(ValueData::TimeMillisecondValue(789)),
JsonValue::String("00:00:00.789+0000".to_string())
);
assert_eq!(
column_data_to_json(ValueData::TimeMicrosecondValue(789)),
JsonValue::String("00:00:00.000789+0000".to_string())
);
assert_eq!(
column_data_to_json(ValueData::TimestampMillisecondValue(1234567890)),
JsonValue::String("1970-01-15 06:56:07.890+0000".to_string())
);
assert_eq!(
column_data_to_json(ValueData::TimestampNanosecondValue(1234567890123456789)),
JsonValue::String("2009-02-13 23:31:30.123456789+0000".to_string())
);
assert_eq!(
column_data_to_json(ValueData::TimestampSecondValue(1234567890)),
JsonValue::String("2009-02-13 23:31:30+0000".to_string())
);
assert_eq!(
column_data_to_json(ValueData::IntervalYearMonthValue(12)),
JsonValue::String("interval year [12]".to_string())
);
assert_eq!(
column_data_to_json(ValueData::IntervalMonthDayNanoValue(IntervalMonthDayNano {
months: 1,
days: 2,
nanoseconds: 3,
})),
JsonValue::String("interval month [1][2][3]".to_string())
);
assert_eq!(
column_data_to_json(ValueData::IntervalDayTimeValue(4)),
JsonValue::String("interval day [4]".to_string())
);
assert_eq!(
column_data_to_json(ValueData::Decimal128Value(ProtoDecimal128 { hi: 5, lo: 6 })),
JsonValue::String("decimal128 [5][6]".to_string())
);
}

#[test]
fn test_try_from_scalar_value() {
assert_eq!(
Expand Down
1 change: 1 addition & 0 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,7 @@ impl HttpServer {
"/pipelines/:pipeline_name",
routing::delete(event::delete_pipeline),
)
.route("/pipelines/dryrun", routing::post(event::pipeline_dryrun))
.layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(handle_error))
Expand Down
116 changes: 114 additions & 2 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ use axum::headers::ContentType;
use axum::http::header::CONTENT_TYPE;
use axum::http::{Request, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::{async_trait, BoxError, Extension, TypedHeader};
use axum::{async_trait, BoxError, Extension, Json, TypedHeader};
use common_query::{Output, OutputData};
use common_telemetry::{error, warn};
use datatypes::value::column_data_to_json;
use pipeline::error::PipelineTransformSnafu;
use pipeline::util::to_pipeline_version;
use pipeline::PipelineVersion;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::{Deserializer, Value};
use serde_json::{Deserializer, Map, Value};
use session::context::{Channel, QueryContext, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};

Expand Down Expand Up @@ -230,6 +231,117 @@ fn transform_ndjson_array_factory(
})
}

#[axum_macros::debug_handler]
pub async fn pipeline_dryrun(
State(log_state): State<LogState>,
Query(query_params): Query<LogIngesterQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TypedHeader(content_type): TypedHeader<ContentType>,
payload: String,
) -> Result<Response> {
let handler = log_state.log_handler;
let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
reason: "pipeline_name is required",
})?;

let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?;

let ignore_errors = query_params.ignore_errors.unwrap_or(false);

let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;

if value.len() > 10 {
return Err(InvalidParameterSnafu {
reason: "too many rows for dryrun",
}
.build());
}

query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);

let pipeline = handler
.get_pipeline(&pipeline_name, version, query_ctx.clone())
.await?;

let mut intermediate_state = pipeline.init_intermediate_state();

let mut results = Vec::with_capacity(value.len());
for v in value {
pipeline
.prepare(v, &mut intermediate_state)
.map_err(|reason| PipelineTransformSnafu { reason }.build())
.context(PipelineSnafu)?;
let r = pipeline
.exec_mut(&mut intermediate_state)
.map_err(|reason| PipelineTransformSnafu { reason }.build())
.context(PipelineSnafu)?;
results.push(r);
pipeline.reset_intermediate_state(&mut intermediate_state);
}

let colume_type_key = "colume_type";
let data_type_key = "data_type";
let name_key = "name";

let schema = pipeline
.schemas()
.iter()
.map(|cs| {
let mut map = Map::new();
map.insert(name_key.to_string(), Value::String(cs.column_name.clone()));
map.insert(
data_type_key.to_string(),
Value::String(cs.datatype().as_str_name().to_string()),
);
map.insert(
colume_type_key.to_string(),
Value::String(cs.semantic_type().as_str_name().to_string()),
);
map.insert(
"fulltext".to_string(),
Value::Bool(
cs.options
.clone()
.is_some_and(|x| x.options.contains_key("fulltext")),
),
);
Value::Object(map)
})
.collect::<Vec<_>>();
let rows = results
.into_iter()
.map(|row| {
let row = row
.values
.into_iter()
.enumerate()
.map(|(idx, v)| {
v.value_data
.map(|d| {
let mut map = Map::new();
map.insert("value".to_string(), column_data_to_json(d));
map.insert("key".to_string(), schema[idx][name_key].clone());
map.insert(
"semantic_type".to_string(),
schema[idx][colume_type_key].clone(),
);
map.insert("data_type".to_string(), schema[idx][data_type_key].clone());
Value::Object(map)
})
.unwrap_or(Value::Null)
})
.collect();
Value::Array(row)
})
.collect::<Vec<_>>();
let mut result = Map::new();
result.insert("schema".to_string(), Value::Array(schema));
result.insert("rows".to_string(), Value::Array(rows));
let result = Value::Object(result);
Ok(Json(result).into_response())
}

#[axum_macros::debug_handler]
pub async fn log_ingester(
State(log_state): State<LogState>,
Expand Down
Loading

0 comments on commit 5d9f8a3

Please sign in to comment.