diff --git a/Cargo.lock b/Cargo.lock index 991829a75813..1d3da6dff55d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7900,6 +7900,7 @@ dependencies = [ "futures", "greptime-proto", "itertools 0.10.5", + "jsonb", "lazy_static", "moka", "once_cell", diff --git a/src/pipeline/Cargo.toml b/src/pipeline/Cargo.toml index 5b85b999feff..2402605f7fe8 100644 --- a/src/pipeline/Cargo.toml +++ b/src/pipeline/Cargo.toml @@ -40,6 +40,7 @@ enum_dispatch = "0.3" futures.workspace = true greptime-proto.workspace = true itertools.workspace = true +jsonb.workspace = true lazy_static.workspace = true moka = { workspace = true, features = ["sync"] } once_cell.workspace = true diff --git a/src/pipeline/src/etl/error.rs b/src/pipeline/src/etl/error.rs index 3680053ba0d7..0a28cb7423dc 100644 --- a/src/pipeline/src/etl/error.rs +++ b/src/pipeline/src/etl/error.rs @@ -40,7 +40,7 @@ pub enum Error { location: Location, }, - #[snafu(display("{processor} processor: missing field: {field}"))] + #[snafu(display("Processor {processor}: missing field: {field}"))] ProcessorMissingField { processor: String, field: String, @@ -48,7 +48,7 @@ pub enum Error { location: Location, }, - #[snafu(display("{processor} processor: expect string value, but got {v:?}"))] + #[snafu(display("Processor {processor}: expect string value, but got {v:?}"))] ProcessorExpectString { processor: String, v: crate::etl::Value, @@ -56,7 +56,7 @@ pub enum Error { location: Location, }, - #[snafu(display("{processor} processor: unsupported value {val}"))] + #[snafu(display("Processor {processor}: unsupported value {val}"))] ProcessorUnsupportedValue { processor: &'static str, val: String, @@ -64,13 +64,13 @@ pub enum Error { location: Location, }, - #[snafu(display("processor key must be a string"))] + #[snafu(display("Processor key must be a string"))] ProcessorKeyMustBeString { #[snafu(implicit)] location: Location, }, - #[snafu(display("{kind} processor: failed to parse {value}"))] + #[snafu(display("Processor {kind}: failed to parse {value}"))] ProcessorFailedToParseString { kind: String, value: String, @@ -78,13 +78,13 @@ pub enum Error { location: Location, }, - #[snafu(display("processor must have a string key"))] + #[snafu(display("Processor must have a string key"))] ProcessorMustHaveStringKey { #[snafu(implicit)] location: Location, }, - #[snafu(display("unsupported {processor} processor"))] + #[snafu(display("Unsupported {processor} processor"))] UnsupportedProcessor { processor: String, #[snafu(implicit)] @@ -108,7 +108,7 @@ pub enum Error { location: Location, }, - #[snafu(display("failed to parse {key} as int: {value}"))] + #[snafu(display("Failed to parse {key} as int: {value}"))] FailedToParseIntKey { key: String, value: String, @@ -126,7 +126,7 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - #[snafu(display("failed to parse {key} as float: {value}"))] + #[snafu(display("Failed to parse {key} as float: {value}"))] FailedToParseFloatKey { key: String, value: String, @@ -136,7 +136,7 @@ pub enum Error { location: Location, }, - #[snafu(display("{kind} processor.{key} not found in intermediate keys"))] + #[snafu(display("Processor {kind}: {key} not found in intermediate keys"))] IntermediateKeyIndex { kind: String, key: String, @@ -144,41 +144,41 @@ pub enum Error { location: Location, }, - #[snafu(display("{k} missing value in {s}"))] + #[snafu(display("Cmcd {k} missing value in {s}"))] CmcdMissingValue { k: String, s: String, #[snafu(implicit)] location: Location, }, - #[snafu(display("{part} missing key in {s}"))] + #[snafu(display("Part: {part} missing key in {s}"))] CmcdMissingKey { part: String, s: String, #[snafu(implicit)] location: Location, }, - #[snafu(display("key must be a string, but got {k:?}"))] + #[snafu(display("Key must be a string, but got {k:?}"))] KeyMustBeString { k: yaml_rust::Yaml, #[snafu(implicit)] location: Location, }, - #[snafu(display("csv read error"))] + #[snafu(display("Csv read error"))] CsvRead { #[snafu(implicit)] location: Location, #[snafu(source)] error: csv::Error, }, - #[snafu(display("expected at least one record from csv format, but got none"))] + #[snafu(display("Expected at least one record from csv format, but got none"))] CsvNoRecord { #[snafu(implicit)] location: Location, }, - #[snafu(display("'{separator}' must be a single character, but got '{value}'"))] + #[snafu(display("Separator '{separator}' must be a single character, but got '{value}'"))] CsvSeparatorName { separator: &'static str, value: String, @@ -186,7 +186,7 @@ pub enum Error { location: Location, }, - #[snafu(display("'{quote}' must be a single character, but got '{value}'"))] + #[snafu(display("Quote '{quote}' must be a single character, but got '{value}'"))] CsvQuoteName { quote: &'static str, value: String, @@ -212,19 +212,19 @@ pub enum Error { location: Location, }, - #[snafu(display("failed to get local timezone"))] + #[snafu(display("Failed to get local timezone"))] DateFailedToGetLocalTimezone { #[snafu(implicit)] location: Location, }, - #[snafu(display("failed to get timestamp"))] + #[snafu(display("Failed to get timestamp"))] DateFailedToGetTimestamp { #[snafu(implicit)] location: Location, }, - #[snafu(display("{processor} processor: invalid format {s}"))] + #[snafu(display("Processor {processor}: invalid format {s}"))] DateInvalidFormat { s: String, processor: String, @@ -245,20 +245,20 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - #[snafu(display("'{split}' exceeds the input"))] + #[snafu(display("Split: '{split}' exceeds the input"))] DissectSplitExceedsInput { split: String, #[snafu(implicit)] location: Location, }, - #[snafu(display("'{split}' does not match the input '{input}'"))] + #[snafu(display("Split: '{split}' does not match the input '{input}'"))] DissectSplitNotMatchInput { split: String, input: String, #[snafu(implicit)] location: Location, }, - #[snafu(display("consecutive names are not allowed: '{name1}' '{name2}'"))] + #[snafu(display("Consecutive names are not allowed: '{name1}' '{name2}'"))] DissectConsecutiveNames { name1: String, name2: String, @@ -270,7 +270,7 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - #[snafu(display("'{m}' modifier already set, but found {modifier}"))] + #[snafu(display("Modifier '{m}' already set, but found {modifier}"))] DissectModifierAlreadySet { m: String, modifier: String, @@ -304,23 +304,23 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - #[snafu(display("invalid resolution: {resolution}"))] + #[snafu(display("Invalid resolution: {resolution}"))] EpochInvalidResolution { resolution: String, #[snafu(implicit)] location: Location, }, - #[snafu(display("pattern is required"))] + #[snafu(display("Pattern is required"))] GsubPatternRequired { #[snafu(implicit)] location: Location, }, - #[snafu(display("replacement is required"))] + #[snafu(display("Replacement is required"))] GsubReplacementRequired { #[snafu(implicit)] location: Location, }, - #[snafu(display("invalid regex pattern: {pattern}"))] + #[snafu(display("Invalid regex pattern: {pattern}"))] Regex { #[snafu(source)] error: regex::Error, @@ -328,72 +328,72 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - #[snafu(display("separator is required"))] + #[snafu(display("Separator is required"))] JoinSeparatorRequired { #[snafu(implicit)] location: Location, }, - #[snafu(display("invalid method: {method}"))] + #[snafu(display("Invalid method: {method}"))] LetterInvalidMethod { method: String, #[snafu(implicit)] location: Location, }, - #[snafu(display("no named group found in regex {origin}"))] + #[snafu(display("No named group found in regex {origin}"))] RegexNamedGroupNotFound { origin: String, #[snafu(implicit)] location: Location, }, - #[snafu(display("no valid field found in {processor} processor"))] + #[snafu(display("No valid field found in {processor} processor"))] RegexNoValidField { processor: String, #[snafu(implicit)] location: Location, }, - #[snafu(display("no valid pattern found in {processor} processor"))] + #[snafu(display("No valid pattern found in {processor} processor"))] RegexNoValidPattern { processor: String, #[snafu(implicit)] location: Location, }, - #[snafu(display("invalid method: {s}"))] + #[snafu(display("Invalid method: {s}"))] UrlEncodingInvalidMethod { s: String, #[snafu(implicit)] location: Location, }, - #[snafu(display("url decoding error"))] + #[snafu(display("Url decoding error"))] UrlEncodingDecode { #[snafu(source)] error: std::string::FromUtf8Error, #[snafu(implicit)] location: Location, }, - #[snafu(display("invalid transform on_failure value: {value}"))] + #[snafu(display("Invalid transform on_failure value: {value}"))] TransformOnFailureInvalidValue { value: String, #[snafu(implicit)] location: Location, }, - #[snafu(display("transform element must be a map"))] + #[snafu(display("Transform element must be a map"))] TransformElementMustBeMap { #[snafu(implicit)] location: Location, }, - #[snafu(display("transform {fields:?} type MUST BE set before default {default}"))] + #[snafu(display("Transform {fields:?} type MUST BE set before default {default}"))] TransformTypeMustBeSet { fields: String, default: String, #[snafu(implicit)] location: Location, }, - #[snafu(display("transform cannot be empty"))] + #[snafu(display("Transform cannot be empty"))] TransformEmpty { #[snafu(implicit)] location: Location, }, - #[snafu(display("column name must be unique, but got duplicated: {duplicates}"))] + #[snafu(display("Column name must be unique, but got duplicated: {duplicates}"))] TransformColumnNameMustBeUnique { duplicates: String, #[snafu(implicit)] @@ -407,7 +407,7 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - #[snafu(display("transform must have exactly one field specified as timestamp Index, but got {count}: {columns}"))] + #[snafu(display("Transform must have exactly one field specified as timestamp Index, but got {count}: {columns}"))] TransformTimestampIndexCount { count: usize, columns: String, @@ -425,13 +425,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - #[snafu(display("{ty} value not supported for Epoch"))] + #[snafu(display("Type: {ty} value not supported for Epoch"))] CoerceUnsupportedEpochType { ty: String, #[snafu(implicit)] location: Location, }, - #[snafu(display("failed to coerce string value '{s}' to type '{ty}'"))] + #[snafu(display("Failed to coerce string value '{s}' to type '{ty}'"))] CoerceStringToType { s: String, ty: String, @@ -440,7 +440,7 @@ pub enum Error { }, #[snafu(display( - "invalid resolution: '{resolution}'. Available resolutions: {valid_resolution}" + "Invalid resolution: '{resolution}'. Available resolutions: {valid_resolution}" ))] ValueInvalidResolution { resolution: String, @@ -449,14 +449,14 @@ pub enum Error { location: Location, }, - #[snafu(display("failed to parse type: '{t}'"))] + #[snafu(display("Failed to parse type: '{t}'"))] ValueParseType { t: String, #[snafu(implicit)] location: Location, }, - #[snafu(display("failed to parse {ty}: {v}"))] + #[snafu(display("Failed to parse {ty}: {v}"))] ValueParseInt { ty: String, v: String, @@ -466,7 +466,7 @@ pub enum Error { location: Location, }, - #[snafu(display("failed to parse {ty}: {v}"))] + #[snafu(display("Failed to parse {ty}: {v}"))] ValueParseFloat { ty: String, v: String, @@ -476,7 +476,7 @@ pub enum Error { location: Location, }, - #[snafu(display("failed to parse {ty}: {v}"))] + #[snafu(display("Failed to parse {ty}: {v}"))] ValueParseBoolean { ty: String, v: String, @@ -485,19 +485,19 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - #[snafu(display("default value not unsupported for type {value}"))] + #[snafu(display("Default value not unsupported for type {value}"))] ValueDefaultValueUnsupported { value: String, #[snafu(implicit)] location: Location, }, - #[snafu(display("unsupported number type: {value}"))] + #[snafu(display("Unsupported number type: {value}"))] ValueUnsupportedNumberType { value: serde_json::Number, #[snafu(implicit)] location: Location, }, - #[snafu(display("unsupported yaml type: {value:?}"))] + #[snafu(display("Unsupported yaml type: {value:?}"))] ValueUnsupportedYamlType { value: yaml_rust::Yaml, #[snafu(implicit)] @@ -531,12 +531,26 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - #[snafu(display("unsupported index type: {value}"))] + #[snafu(display("Unsupported index type: {value}"))] UnsupportedIndexType { value: String, #[snafu(implicit)] location: Location, }, + #[snafu(display("Unsupported number type: {value:?}"))] + UnsupportedNumberType { + value: serde_json::Number, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Column datatype mismatch. For column: {column}, expected datatype: {expected}, actual datatype: {actual}"))] + IdentifyPipelineColumnTypeMismatch { + column: String, + expected: String, + actual: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; diff --git a/src/pipeline/src/etl/transform/transformer.rs b/src/pipeline/src/etl/transform/transformer.rs index 28aacdcbfb2d..7a185246622a 100644 --- a/src/pipeline/src/etl/transform/transformer.rs +++ b/src/pipeline/src/etl/transform/transformer.rs @@ -13,3 +13,4 @@ // limitations under the License. pub mod greptime; +pub use greptime::identity_pipeline; diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 000a2ddc26f9..11d63c45032a 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -16,13 +16,20 @@ pub mod coerce; use std::collections::HashSet; +use ahash::HashMap; +use api::helper::proto_value_type; +use api::v1::column_data_type_extension::TypeExt; +use api::v1::value::ValueData; +use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType}; use coerce::{coerce_columns, coerce_value}; use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue}; use itertools::Itertools; +use serde_json::{Map, Number}; use crate::etl::error::{ - Result, TransformColumnNameMustBeUniqueSnafu, TransformEmptySnafu, - TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, + IdentifyPipelineColumnTypeMismatchSnafu, Result, TransformColumnNameMustBeUniqueSnafu, + TransformEmptySnafu, TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, + UnsupportedNumberTypeSnafu, }; use crate::etl::field::{InputFieldInfo, OneInputOneOutputField}; use crate::etl::transform::index::Index; @@ -120,6 +127,7 @@ impl Transformer for GreptimeTransformer { if let Some(idx) = transform.index { if idx == Index::Time { match transform.real_fields.len() { + //Safety unwrap is fine here because we have checked the length of real_fields 1 => timestamp_columns .push(transform.real_fields.first().unwrap().input_name()), _ => { @@ -194,3 +202,304 @@ impl Transformer for GreptimeTransformer { &mut self.transforms } } + +/// This is used to record the current state schema information and a sequential cache of field names. +/// As you traverse the user input JSON, this will change. +/// It will record a superset of all user input schemas. +#[derive(Debug, Default)] +struct SchemaInfo { + /// schema info + schema: Vec, + /// index of the column name + index: HashMap, +} + +fn resolve_schema( + index: Option, + value_data: ValueData, + column_schema: ColumnSchema, + row: &mut Vec, + schema_info: &mut SchemaInfo, +) -> Result<()> { + if let Some(index) = index { + let api_value = GreptimeValue { + value_data: Some(value_data), + }; + // Safety unwrap is fine here because api_value is always valid + let value_column_data_type = proto_value_type(&api_value).unwrap(); + // Safety unwrap is fine here because index is always valid + let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype(); + if value_column_data_type != schema_column_data_type { + IdentifyPipelineColumnTypeMismatchSnafu { + column: column_schema.column_name, + expected: schema_column_data_type.as_str_name(), + actual: value_column_data_type.as_str_name(), + } + .fail() + } else { + row[index] = api_value; + Ok(()) + } + } else { + let key = column_schema.column_name.clone(); + schema_info.schema.push(column_schema); + schema_info.index.insert(key, schema_info.schema.len() - 1); + let api_value = GreptimeValue { + value_data: Some(value_data), + }; + row.push(api_value); + Ok(()) + } +} + +fn resolve_number_schema( + n: Number, + column_name: String, + index: Option, + row: &mut Vec, + schema_info: &mut SchemaInfo, +) -> Result<()> { + let (value, datatype, semantic_type) = if n.is_i64() { + ( + ValueData::I64Value(n.as_i64().unwrap()), + ColumnDataType::Int64 as i32, + SemanticType::Field as i32, + ) + } else if n.is_u64() { + ( + ValueData::U64Value(n.as_u64().unwrap()), + ColumnDataType::Uint64 as i32, + SemanticType::Field as i32, + ) + } else if n.is_f64() { + ( + ValueData::F64Value(n.as_f64().unwrap()), + ColumnDataType::Float64 as i32, + SemanticType::Field as i32, + ) + } else { + return UnsupportedNumberTypeSnafu { value: n }.fail(); + }; + resolve_schema( + index, + value, + ColumnSchema { + column_name, + datatype, + semantic_type, + datatype_extension: None, + options: None, + }, + row, + schema_info, + ) +} + +fn json_value_to_row( + schema_info: &mut SchemaInfo, + map: Map, +) -> Result { + let mut row: Vec = Vec::with_capacity(schema_info.schema.len()); + for _ in 0..schema_info.schema.len() { + row.push(GreptimeValue { value_data: None }); + } + for (column_name, value) in map { + if column_name == DEFAULT_GREPTIME_TIMESTAMP_COLUMN { + continue; + } + let index = schema_info.index.get(&column_name).copied(); + match value { + serde_json::Value::Null => { + // do nothing + } + serde_json::Value::String(s) => { + resolve_schema( + index, + ValueData::StringValue(s), + ColumnSchema { + column_name, + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Field as i32, + datatype_extension: None, + options: None, + }, + &mut row, + schema_info, + )?; + } + serde_json::Value::Bool(b) => { + resolve_schema( + index, + ValueData::BoolValue(b), + ColumnSchema { + column_name, + datatype: ColumnDataType::Boolean as i32, + semantic_type: SemanticType::Field as i32, + datatype_extension: None, + options: None, + }, + &mut row, + schema_info, + )?; + } + serde_json::Value::Number(n) => { + resolve_number_schema(n, column_name, index, &mut row, schema_info)?; + } + serde_json::Value::Array(_) | serde_json::Value::Object(_) => { + resolve_schema( + index, + ValueData::BinaryValue(jsonb::Value::from(value).to_vec()), + ColumnSchema { + column_name, + datatype: ColumnDataType::Binary as i32, + semantic_type: SemanticType::Field as i32, + datatype_extension: Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + options: None, + }, + &mut row, + schema_info, + )?; + } + } + } + Ok(Row { values: row }) +} + +/// Identity pipeline for Greptime +/// This pipeline will convert the input JSON array to Greptime Rows +/// 1. The pipeline will add a default timestamp column to the schema +/// 2. The pipeline not resolve NULL value +/// 3. The pipeline assumes that the json format is fixed +/// 4. The pipeline will return an error if the same column datatype is mismatched +/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema. +pub fn identity_pipeline(array: Vec) -> Result { + let mut rows = Vec::with_capacity(array.len()); + + let mut schema = SchemaInfo::default(); + for value in array { + if let serde_json::Value::Object(map) = value { + let row = json_value_to_row(&mut schema, map)?; + rows.push(row); + } + } + let greptime_timestamp_schema = ColumnSchema { + column_name: DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(), + datatype: ColumnDataType::TimestampNanosecond as i32, + semantic_type: SemanticType::Timestamp as i32, + datatype_extension: None, + options: None, + }; + let ns = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0); + let ts = GreptimeValue { + value_data: Some(ValueData::TimestampNanosecondValue(ns)), + }; + let column_count = schema.schema.len(); + for row in rows.iter_mut() { + let diff = column_count - row.values.len(); + for _ in 0..diff { + row.values.push(GreptimeValue { value_data: None }); + } + row.values.push(ts.clone()); + } + schema.schema.push(greptime_timestamp_schema); + Ok(Rows { + schema: schema.schema, + rows, + }) +} + +#[cfg(test)] +mod tests { + use crate::identity_pipeline; + + #[test] + fn test_identify_pipeline() { + { + let array = vec![ + serde_json::json!({ + "woshinull": null, + "name": "Alice", + "age": 20, + "is_student": true, + "score": 99.5, + "hobbies": "reading", + "address": "Beijing", + }), + serde_json::json!({ + "name": "Bob", + "age": 21, + "is_student": false, + "score": "88.5", + "hobbies": "swimming", + "address": "Shanghai", + "gaga": "gaga" + }), + ]; + let rows = identity_pipeline(array); + assert!(rows.is_err()); + assert_eq!( + rows.err().unwrap().to_string(), + "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(), + ); + } + { + let array = vec![ + serde_json::json!({ + "woshinull": null, + "name": "Alice", + "age": 20, + "is_student": true, + "score": 99.5, + "hobbies": "reading", + "address": "Beijing", + }), + serde_json::json!({ + "name": "Bob", + "age": 21, + "is_student": false, + "score": 88, + "hobbies": "swimming", + "address": "Shanghai", + "gaga": "gaga" + }), + ]; + let rows = identity_pipeline(array); + assert!(rows.is_err()); + assert_eq!( + rows.err().unwrap().to_string(), + "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(), + ); + } + { + let array = vec![ + serde_json::json!({ + "woshinull": null, + "name": "Alice", + "age": 20, + "is_student": true, + "score": 99.5, + "hobbies": "reading", + "address": "Beijing", + }), + serde_json::json!({ + "name": "Bob", + "age": 21, + "is_student": false, + "score": 88.5, + "hobbies": "swimming", + "address": "Shanghai", + "gaga": "gaga" + }), + ]; + let rows = identity_pipeline(array); + assert!(rows.is_ok()); + let rows = rows.unwrap(); + assert_eq!(rows.schema.len(), 8); + assert_eq!(rows.rows.len(), 2); + assert_eq!(8, rows.rows[0].values.len()); + assert_eq!(8, rows.rows[1].values.len()); + } + } +} diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index 8fc72c584484..36ef3023f6cb 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -18,6 +18,7 @@ mod metrics; pub use etl::error::Result; pub use etl::processor::Processor; +pub use etl::transform::transformer::identity_pipeline; pub use etl::transform::{GreptimeTransformer, Transformer}; pub use etl::value::{Array, Map, Value}; pub use etl::{parse, Content, Pipeline}; diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 7877d2b84ab5..80f15ff21167 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -50,6 +50,9 @@ use crate::metrics::{ }; use crate::query_handler::LogHandlerRef; +const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_"; +const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity"; + #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] pub struct LogIngesterQueryParams { pub table: Option, @@ -121,6 +124,12 @@ pub async fn add_pipeline( reason: "pipeline_name is required in path", } ); + ensure!( + !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX), + InvalidParameterSnafu { + reason: "pipeline_name cannot start with greptime_", + } + ); ensure!( !payload.is_empty(), InvalidParameterSnafu { @@ -425,46 +434,53 @@ async fn ingest_logs_inner( let db = query_ctx.get_db_string(); let exec_timer = std::time::Instant::now(); - let pipeline = state - .get_pipeline(&pipeline_name, version, query_ctx.clone()) - .await?; - - let transform_timer = std::time::Instant::now(); - let mut intermediate_state = pipeline.init_intermediate_state(); - let mut results = Vec::with_capacity(pipeline_data.len()); - - for v in pipeline_data { - pipeline - .prepare(v, &mut intermediate_state) - .inspect_err(|_| { - METRIC_HTTP_LOGS_TRANSFORM_ELAPSED - .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) - .observe(transform_timer.elapsed().as_secs_f64()); - }) + let transformed_data: Rows; + if pipeline_name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME { + let rows = pipeline::identity_pipeline(pipeline_data) .context(PipelineTransformSnafu) .context(PipelineSnafu)?; - let r = pipeline - .exec_mut(&mut intermediate_state) - .inspect_err(|_| { - METRIC_HTTP_LOGS_TRANSFORM_ELAPSED - .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) - .observe(transform_timer.elapsed().as_secs_f64()); - }) - .context(PipelineTransformSnafu) - .context(PipelineSnafu)?; - results.push(r); - pipeline.reset_intermediate_state(&mut intermediate_state); - } + transformed_data = rows; + } else { + let pipeline = state + .get_pipeline(&pipeline_name, version, query_ctx.clone()) + .await?; + + let transform_timer = std::time::Instant::now(); + let mut intermediate_state = pipeline.init_intermediate_state(); + + for v in pipeline_data { + pipeline + .prepare(v, &mut intermediate_state) + .inspect_err(|_| { + METRIC_HTTP_LOGS_TRANSFORM_ELAPSED + .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) + .observe(transform_timer.elapsed().as_secs_f64()); + }) + .context(PipelineTransformSnafu) + .context(PipelineSnafu)?; + let r = pipeline + .exec_mut(&mut intermediate_state) + .inspect_err(|_| { + METRIC_HTTP_LOGS_TRANSFORM_ELAPSED + .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) + .observe(transform_timer.elapsed().as_secs_f64()); + }) + .context(PipelineTransformSnafu) + .context(PipelineSnafu)?; + results.push(r); + pipeline.reset_intermediate_state(&mut intermediate_state); + } - METRIC_HTTP_LOGS_TRANSFORM_ELAPSED - .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE]) - .observe(transform_timer.elapsed().as_secs_f64()); + METRIC_HTTP_LOGS_TRANSFORM_ELAPSED + .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE]) + .observe(transform_timer.elapsed().as_secs_f64()); - let transformed_data: Rows = Rows { - rows: results, - schema: pipeline.schemas().clone(), - }; + transformed_data = Rows { + rows: results, + schema: pipeline.schemas().clone(), + }; + } let insert_request = RowInsertRequest { rows: Some(transformed_data), diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index e11060fbbda5..5cdcecdb5183 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -86,6 +86,7 @@ macro_rules! http_tests { test_pipeline_api, test_test_pipeline_api, test_plain_text_ingestion, + test_identify_pipeline, test_otlp_metrics, test_otlp_traces, @@ -1072,6 +1073,21 @@ transform: "#; // 1. create pipeline + let res = client + .post("/v1/events/pipelines/greptime_guagua") + .header("Content-Type", "application/x-yaml") + .body(body) + .send() + .await; + + assert_eq!(res.status(), StatusCode::BAD_REQUEST); + assert_eq!( + res.json::().await["error"] + .as_str() + .unwrap(), + "Invalid request parameter: pipeline_name cannot start with greptime_" + ); + let res = client .post("/v1/events/pipelines/test") .header("Content-Type", "application/x-yaml") @@ -1157,6 +1173,61 @@ transform: guard.remove_all().await; } +pub async fn test_identify_pipeline(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_pipeline_api").await; + + // handshake + let client = TestClient::new(app); + let body = r#"{"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java"} +{"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java","hasagei":"hasagei","dongdongdong":"guaguagua"}"#; + let res = client + .post("/v1/events/logs?db=public&table=logs&pipeline_name=greptime_identity") + .header("Content-Type", "application/json") + .body(body) + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK); + + let body: serde_json::Value = res.json().await; + + assert!(body.get("execution_time_ms").unwrap().is_number()); + assert_eq!(body["output"][0]["affectedrows"], 2); + + let res = client.get("/v1/sql?sql=select * from logs").send().await; + + assert_eq!(res.status(), StatusCode::OK); + + let line1_expected = r#"["10.170.***.***",1453809242,"","10.200.**.***","200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","aliyun-sdk-java","guaguagua","hasagei",null]"#; + let line2_expected = r#"["10.170.***.***",1453809242,"","10.200.**.***","200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","aliyun-sdk-java",null,null,null]"#; + let res = client.get("/v1/sql?sql=select * from logs").send().await; + assert_eq!(res.status(), StatusCode::OK); + let resp: serde_json::Value = res.json().await; + let result = resp["output"][0]["records"]["rows"].as_array().unwrap(); + assert_eq!(result.len(), 2); + let mut line1 = result[0].as_array().unwrap().clone(); + let mut line2 = result[1].as_array().unwrap().clone(); + assert!(line1.last().unwrap().is_i64()); + assert!(line2.last().unwrap().is_i64()); + *line1.last_mut().unwrap() = serde_json::Value::Null; + *line2.last_mut().unwrap() = serde_json::Value::Null; + + assert_eq!( + line1, + serde_json::from_str::>(line1_expected).unwrap() + ); + assert_eq!( + line2, + serde_json::from_str::>(line2_expected).unwrap() + ); + + let expected = r#"[["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"],["dongdongdong","String","","YES","","FIELD"],["hasagei","String","","YES","","FIELD"],["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"]]"#; + validate_data(&client, "desc logs", expected).await; + + guard.remove_all().await; +} + pub async fn test_test_pipeline_api(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_pipeline_api").await;