diff --git a/Cargo.lock b/Cargo.lock index 861b3bfdab7b..faa82846152f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -132,7 +132,7 @@ dependencies = [ [[package]] name = "apache-avro" version = "0.15.0" -source = "git+https://github.com/risingwavelabs/avro?branch=waruto/modify-decimal#efcc7b9f42d90c96eb8ac71e785cdbba3d32b0dc" +source = "git+https://github.com/risingwavelabs/avro?branch=idx0dev/resolved_schema#89c2c128de93586465a7ea85b0a1c1a53082bba2" dependencies = [ "byteorder", "bzip2", diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index 7b58e13181a6..505e3cd101ff 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -560,7 +560,7 @@ select * from s14; query I select count(*) from s15 ---- -0 +2 query I select count(*) from s16 diff --git a/e2e_test/source/basic/nosim_kafka.slt b/e2e_test/source/basic/nosim_kafka.slt index 7a32ecd55906..7376e4be1d74 100644 --- a/e2e_test/source/basic/nosim_kafka.slt +++ b/e2e_test/source/basic/nosim_kafka.slt @@ -25,7 +25,7 @@ row schema location confluent schema registry 'http://message_queue:8081' # but if we let the key as a column, there's no such requirement statement ok CREATE TABLE upsert_student_key_not_subset_of_value_avro_json ( - key_struct struct PRIMARY KEY + key_struct struct<"NOTEXIST" INT> PRIMARY KEY ) WITH ( connector = 'kafka', @@ -150,8 +150,6 @@ ORDER BY 4 Emma Brown 20 5.3 130 5 Michael Williams 22 6.2 190 6 Leah Davis 18 5.7 140 -7 Connor Wilson 19 5.9 160 -8 Ava Garcia 21 5.2 115 9 Jacob Anderson 20 5.8 155 query I diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 360e6bf18b6c..480038baa1b3 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -15,7 +15,7 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" -apache-avro = { git = "https://github.com/risingwavelabs/avro", branch = "waruto/modify-decimal", features = [ +apache-avro = { git = "https://github.com/risingwavelabs/avro", branch = "idx0dev/resolved_schema", features = [ "snappy", "zstandard", "bzip", diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index 16c6d7dda681..a9c4ada6a077 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -17,7 +17,6 @@ use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; -use apache_avro::types::Value; use apache_avro::{from_avro_datum, Reader, Schema}; use risingwave_common::error::ErrorCode::{InternalError, ProtocolError}; use risingwave_common::error::{Result, RwError}; @@ -25,10 +24,12 @@ use risingwave_pb::plan_common::ColumnDesc; use url::Url; use super::schema_resolver::*; -use super::util::{extract_inner_field_schema, from_avro_value}; use crate::common::UpsertMessage; use crate::parser::avro::util::avro_field_to_column_desc; use crate::parser::schema_registry::{extract_schema_id, Client}; +use crate::parser::unified::avro::{AvroAccess, AvroParseOptions}; +use crate::parser::unified::upsert::UpsertChangeEvent; +use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::util::get_kafka_topic; use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard}; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; @@ -175,13 +176,7 @@ impl AvroParser { payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, ) -> Result { - #[derive(Debug)] - enum Op { - Insert, - Delete, - } - - let (raw_key, raw_value, op) = if self.is_enable_upsert() { + let (raw_key, raw_value) = if self.is_enable_upsert() { let msg: UpsertMessage<'_> = bincode::deserialize(&payload).map_err(|e| { RwError::from(ProtocolError(format!( "extract payload err {:?}, you may need to check the 'upsert' parameter", @@ -189,12 +184,12 @@ impl AvroParser { ))) })?; if !msg.record.is_empty() { - (Some(msg.primary_key), Some(msg.record), Op::Insert) + (Some(msg.primary_key), Some(msg.record)) } else { - (Some(msg.primary_key), None, Op::Delete) + (Some(msg.primary_key), None) } } else { - (None, Some(Cow::from(&payload)), Op::Insert) + (None, Some(Cow::from(&payload))) }; let avro_value = if let Some(payload) = raw_value { @@ -257,77 +252,30 @@ impl AvroParser { None }; - // the avro can be a key or a value - if let Some(Value::Record(fields)) = avro_value { - let fill = |column: &SourceColumnDesc| { - let tuple = match fields.iter().find(|val| column.name.eq(&val.0)) { - None => { - if self.upsert_primary_key_column_name.as_ref() == Some(&column.name) { - if !(avro_key.is_some() && self.key_schema.is_some()) { - tracing::error!( - upsert_primary_key_column_name = - self.upsert_primary_key_column_name, - "Upsert mode is enabled, but key or key schema is absent.", - ); - } - return from_avro_value( - avro_key.as_ref().unwrap().clone(), - self.key_schema.as_ref().unwrap(), - ) - .map_err(|e| { - tracing::error!( - "failed to process value ({}): {}", - String::from_utf8_lossy(&payload), - e - ); - e - }); - } else { - return Ok(None); - } - } - Some(tup) => tup, - }; - - let field_schema = extract_inner_field_schema(&self.schema, Some(&column.name))?; - from_avro_value(tuple.1.clone(), field_schema).map_err(|e| { - tracing::error!( - "failed to process value ({}): {}", - String::from_utf8_lossy(&payload), - e - ); - e - }) - }; - match op { - Op::Insert => writer.insert(fill), - Op::Delete => writer.delete(fill), - } - } else if self.upsert_primary_key_column_name.is_some() && matches!(op, Op::Delete) { - writer.delete(|desc| { - if &desc.name != self.upsert_primary_key_column_name.as_ref().unwrap() { - Ok(None) - } else { - from_avro_value( - avro_key.as_ref().unwrap().clone(), - self.key_schema.as_deref().unwrap(), - ) - .map_err(|e| { - tracing::error!(upsert_primary_key_column_name=self.upsert_primary_key_column_name, - ?avro_value,op=?op, - "failed to process value ({}): {}", - String::from_utf8_lossy(&payload), - e - ); - e - }) - } - }) - } else { - Err(RwError::from(ProtocolError( - "avro parse unexpected value".to_string(), - ))) + let mut accessor: UpsertChangeEvent, AvroAccess<'_, '_>> = + UpsertChangeEvent::default(); + if let Some(key) = &avro_key { + accessor = accessor.with_key(AvroAccess::new( + key, + AvroParseOptions { + schema: self.key_schema.as_deref(), + ..Default::default() + }, + )); } + + if let Some(value) = &avro_value { + accessor = accessor.with_value(AvroAccess::new( + value, + AvroParseOptions::default().with_schema(&self.schema), + )); + } + + if let Some(pk) = &self.upsert_primary_key_column_name { + accessor = accessor.with_key_as_column_name(pk); + } + + apply_row_operation_on_stream_chunk_writer(accessor, &mut writer) } } diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs index ec0981a28e68..4a9b1a89732f 100644 --- a/src/connector/src/parser/avro/util.rs +++ b/src/connector/src/parser/avro/util.rs @@ -16,12 +16,13 @@ use apache_avro::types::Value; use apache_avro::{Decimal as AvroDecimal, Schema}; use chrono::Datelike; use itertools::Itertools; -use risingwave_common::array::{ListValue, StructValue}; use risingwave_common::error::ErrorCode::{InternalError, ProtocolError}; use risingwave_common::error::{Result, RwError}; -use risingwave_common::types::{DataType, Date, Datum, Interval, ScalarImpl, F32, F64}; +use risingwave_common::types::{DataType, Date, Datum}; use risingwave_pb::plan_common::ColumnDesc; +use crate::parser::unified::avro::AvroParseOptions; + const RW_DECIMAL_MAX_PRECISION: usize = 28; pub(crate) fn avro_field_to_column_desc( @@ -245,85 +246,23 @@ pub(crate) fn extract_inner_field_schema<'a>( /// - Date (the number of days from the unix epoch, 1970-1-1 UTC) /// - Timestamp (the number of milliseconds from the unix epoch, 1970-1-1 00:00:00.000 UTC) #[inline] -pub(crate) fn from_avro_value(value: Value, value_schema: &Schema) -> Result { - let v = match value { - Value::Null => { - return Ok(None); - } - Value::Boolean(b) => ScalarImpl::Bool(b), - Value::String(s) => ScalarImpl::Utf8(s.into_boxed_str()), - Value::Int(i) => ScalarImpl::Int32(i), - Value::Long(i) => ScalarImpl::Int64(i), - Value::Float(f) => ScalarImpl::Float32(F32::from(f)), - Value::Double(f) => ScalarImpl::Float64(F64::from(f)), - Value::Decimal(avro_decimal) => { - let (precision, scale) = match value_schema { - Schema::Decimal { - precision, scale, .. - } => (*precision, *scale), - _ => { - return Err(RwError::from(InternalError( - "avro value is and decimal but schema not".to_owned(), - ))); - } - }; - let decimal = avro_decimal_to_rust_decimal(avro_decimal, precision, scale)?; - ScalarImpl::Decimal(risingwave_common::types::Decimal::Normalized(decimal)) - } - Value::Date(days) => { - ScalarImpl::Date(Date::with_days(days + unix_epoch_days()).map_err(|e| { - let err_msg = format!("avro parse error.wrong date value {}, err {:?}", days, e); - RwError::from(InternalError(err_msg)) - })?) - } - Value::TimestampMicros(us) => ScalarImpl::Int64(us), - Value::TimestampMillis(ms) => ScalarImpl::Int64(ms.checked_mul(1000).ok_or_else(|| { - RwError::from(InternalError(format!( - "avro parse millis overflow, value: {}", - ms - ))) - })?), - Value::Duration(duration) => { - let months = u32::from(duration.months()) as i32; - let days = u32::from(duration.days()) as i32; - let usecs = (u32::from(duration.millis()) as i64) * 1000; // never overflows - ScalarImpl::Interval(Interval::from_month_day_usec(months, days, usecs)) - } - Value::Bytes(value) => ScalarImpl::Bytea(value.into_boxed_slice()), - Value::Enum(_, symbol) => ScalarImpl::Utf8(symbol.into_boxed_str()), - Value::Record(descs) => { - let rw_values = descs - .into_iter() - .map(|(field_name, field_value)| { - extract_inner_field_schema(value_schema, Some(&field_name)) - .and_then(|field_schema| from_avro_value(field_value, field_schema)) - }) - .collect::>>()?; - ScalarImpl::Struct(StructValue::new(rw_values)) - } - Value::Array(values) => { - let item_schema = extract_inner_field_schema(value_schema, None)?; - let rw_values = values - .into_iter() - .map(|item_value| from_avro_value(item_value, item_schema)) - .collect::>>()?; - ScalarImpl::List(ListValue::new(rw_values)) - } - Value::Union(_, value) => { - let inner_schema = extract_inner_field_schema(value_schema, None)?; - return from_avro_value(*value, inner_schema); - } - _ => { - let err_msg = format!("avro parse error.unsupported value {:?}", value); - return Err(RwError::from(InternalError(err_msg))); - } - }; - - Ok(Some(v)) +pub(crate) fn from_avro_value( + value: Value, + value_schema: &Schema, + shape: &DataType, +) -> Result { + AvroParseOptions { + schema: Some(value_schema), + relax_numeric: true, + } + .parse(&value, Some(shape)) + .map_err(|err| RwError::from(InternalError(format!("{:?}", err)))) } #[cfg(test)] mod tests { + use risingwave_common::types::{ScalarImpl, Timestamp}; + use super::*; #[test] fn test_convert_decimal() { @@ -339,16 +278,25 @@ mod tests { let rust_decimal = avro_decimal_to_rust_decimal(avro_decimal, 28, 1).unwrap(); assert_eq!(rust_decimal, rust_decimal::Decimal::try_from(28.1).unwrap()); } - #[test] fn test_avro_timestamp_micros() { let v1 = Value::TimestampMicros(1620000000000); let v2 = Value::TimestampMillis(1620000000); let value_schema1 = Schema::TimestampMicros; let value_schema2 = Schema::TimestampMillis; - let datum1 = from_avro_value(v1, &value_schema1).unwrap(); - let datum2 = from_avro_value(v2, &value_schema2).unwrap(); - assert_eq!(datum1, Some(ScalarImpl::Int64(1620000000000))); - assert_eq!(datum2, Some(ScalarImpl::Int64(1620000000000))); + let datum1 = from_avro_value(v1, &value_schema1, &DataType::Timestamp).unwrap(); + let datum2 = from_avro_value(v2, &value_schema2, &DataType::Timestamp).unwrap(); + assert_eq!( + datum1, + Some(ScalarImpl::Timestamp(Timestamp::new( + "2021-05-03T00:00:00".parse().unwrap() + ))) + ); + assert_eq!( + datum2, + Some(ScalarImpl::Timestamp(Timestamp::new( + "2021-05-03T00:00:00".parse().unwrap() + ))) + ); } } diff --git a/src/connector/src/parser/canal/simd_json_parser.rs b/src/connector/src/parser/canal/simd_json_parser.rs index a36ab59c4dd9..4397f65e6242 100644 --- a/src/connector/src/parser/canal/simd_json_parser.rs +++ b/src/connector/src/parser/canal/simd_json_parser.rs @@ -12,21 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::anyhow; -use risingwave_common::error::ErrorCode::ProtocolError; +use risingwave_common::error::ErrorCode::{self, ProtocolError}; use risingwave_common::error::{Result, RwError}; -use risingwave_common::types::{DataType, Datum}; -use risingwave_common::util::iter_util::ZipEqFast; -use simd_json::{BorrowedValue, StaticNode, ValueAccess}; +use simd_json::{BorrowedValue, Mutable, ValueAccess}; use crate::parser::canal::operators::*; -use crate::parser::common::{do_parse_simd_json_value, json_object_smart_get_value}; -use crate::parser::util::at_least_one_ok; +use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; +use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; +use crate::parser::unified::ChangeEventOperation; use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard}; -use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceFormat}; +use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; -const AFTER: &str = "data"; -const BEFORE: &str = "old"; +const DATA: &str = "data"; const OP: &str = "type"; const IS_DDL: &str = "isDdl"; @@ -50,7 +47,7 @@ impl CanalJsonParser { mut payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, ) -> Result { - let event: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload) + let mut event: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload) .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; let is_ddl = event.get(IS_DDL).and_then(|v| v.as_bool()).ok_or_else(|| { @@ -65,135 +62,48 @@ impl CanalJsonParser { ))); } - let op = event.get(OP).and_then(|v| v.as_str()).ok_or_else(|| { - RwError::from(ProtocolError("op field not found in canal json".to_owned())) - })?; - - match op { - CANAL_INSERT_EVENT => { - let inserted = event - .get(AFTER) - .and_then(|v| match v { - BorrowedValue::Array(array) => Some(array.iter()), - _ => None, - }) - .ok_or_else(|| { - RwError::from(ProtocolError( - "data is missing for creating event".to_string(), - )) - })?; - let results = inserted - .into_iter() - .map(|v| { - writer.insert(|column| { - cannal_simd_json_parse_value( - &column.data_type, - crate::parser::common::json_object_smart_get_value( - v, - (&column.name).into(), - ), - ) - }) - }) - .collect::>>(); - at_least_one_ok(results) - } - CANAL_UPDATE_EVENT => { - let after = event - .get(AFTER) - .and_then(|v| match v { - BorrowedValue::Array(array) => Some(array.iter()), - _ => None, - }) - .ok_or_else(|| { - RwError::from(ProtocolError( - "data is missing for updating event".to_string(), - )) - })?; - let before = event - .get(BEFORE) - .and_then(|v| match v { - BorrowedValue::Array(array) => Some(array.iter()), - _ => None, - }) - .ok_or_else(|| { - RwError::from(ProtocolError( - "old is missing for updating event".to_string(), - )) - })?; - - let results = before - .zip_eq_fast(after) - .map(|(before, after)| { - writer.update(|column| { - // in origin canal, old only contains the changed columns but data - // contains all columns. - // in ticdc, old contains all fields - let before_value = - json_object_smart_get_value(before, (&column.name).into()).or_else( - || json_object_smart_get_value(after, (&column.name).into()), - ); - let before = - cannal_simd_json_parse_value(&column.data_type, before_value)?; - let after = cannal_simd_json_parse_value( - &column.data_type, - json_object_smart_get_value(after, (&column.name).into()), - )?; - Ok((before, after)) - }) - }) - .collect::>>(); - at_least_one_ok(results) + let op = match event.get(OP).and_then(|v| v.as_str()) { + Some(CANAL_INSERT_EVENT | CANAL_UPDATE_EVENT) => ChangeEventOperation::Upsert, + Some(CANAL_DELETE_EVENT) => ChangeEventOperation::Delete, + _ => Err(RwError::from(ProtocolError( + "op field not found in canal json".to_owned(), + )))?, + }; + + let events = event + .get_mut(DATA) + .and_then(|v| match v { + BorrowedValue::Array(array) => Some(array), + _ => None, + }) + .ok_or_else(|| { + RwError::from(ProtocolError( + "'data' is missing for creating event".to_string(), + )) + })?; + let mut errors = Vec::new(); + let mut guard = None; + for event in events.drain(..) { + let accessor = JsonAccess::new_with_options(event, &JsonParseOptions::CANAL); + match apply_row_operation_on_stream_chunk_writer((op, accessor), &mut writer) { + Ok(this_guard) => guard = Some(this_guard), + Err(err) => errors.push(err), } - CANAL_DELETE_EVENT => { - let deleted = event - .get(AFTER) - .and_then(|v| match v { - BorrowedValue::Array(array) => Some(array.iter()), - _ => None, - }) - .ok_or_else(|| { - RwError::from(ProtocolError("old is missing for delete event".to_string())) - })?; - - let results = deleted - .into_iter() - .map(|v| { - writer.delete(|column| { - cannal_simd_json_parse_value( - &column.data_type, - json_object_smart_get_value(v, (&column.name).into()), - ) - }) - }) - .collect::>>(); - - at_least_one_ok(results) + } + if let Some(guard) = guard { + if !errors.is_empty() { + tracing::error!(?errors, "failed to parse some columns"); } - other => Err(RwError::from(ProtocolError(format!( - "unknown canal json op: {}", - other - )))), + Ok(guard) + } else { + Err(RwError::from(ErrorCode::InternalError(format!( + "failed to parse all columns: {:?}", + errors + )))) } } } -#[inline] -fn cannal_simd_json_parse_value( - dtype: &DataType, - value: Option<&BorrowedValue<'_>>, -) -> Result { - match value { - None | Some(BorrowedValue::Static(StaticNode::Null)) => Ok(None), - Some(v) => Ok(Some( - do_parse_simd_json_value(&SourceFormat::CanalJson, dtype, v).map_err(|e| { - tracing::warn!("failed to parse type '{}' from json: {}", dtype, e); - anyhow!("failed to parse type '{}' from json: {}", dtype, e) - })?, - )), - } -} - impl ByteStreamSourceParser for CanalJsonParser { fn columns(&self) -> &[SourceColumnDesc] { &self.rw_columns @@ -318,35 +228,7 @@ mod tests { { let (op, row) = rows.next().unwrap(); - assert_eq!(op, Op::UpdateDelete); - assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int64(1))); - assert_eq!( - row.datum_at(1).to_owned_datum(), - (Some(ScalarImpl::Utf8("mike".into()))) - ); - assert_eq!( - row.datum_at(2).to_owned_datum(), - (Some(ScalarImpl::Bool(false))) - ); - assert_eq!( - row.datum_at(3).to_owned_datum(), - (Some(Decimal::from_str("1000.62").unwrap().into())) - ); - assert_eq!( - row.datum_at(4).to_owned_datum(), - (Some(ScalarImpl::Timestamp( - str_to_timestamp("2018-01-01 00:00:01").unwrap() - ))) - ); - assert_eq!( - row.datum_at(5).to_owned_datum(), - (Some(ScalarImpl::Float64(0.65.into()))) - ); - } - - { - let (op, row) = rows.next().unwrap(); - assert_eq!(op, Op::UpdateInsert); + assert_eq!(op, Op::Insert); assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int64(1))); assert_eq!( row.datum_at(1).to_owned_datum(), diff --git a/src/connector/src/parser/common.rs b/src/connector/src/parser/common.rs index a28cde50acc6..e5adafde0c75 100644 --- a/src/connector/src/parser/common.rs +++ b/src/connector/src/parser/common.rs @@ -13,25 +13,9 @@ // limitations under the License. use std::borrow::Cow; -use std::str::FromStr; -use anyhow::{anyhow, Result}; -use base64::Engine as _; -use risingwave_common::array::{ListValue, StructValue}; -use risingwave_common::cast::{ - i64_to_timestamp, i64_to_timestamptz, str_to_bytea, str_to_date, str_to_time, str_to_timestamp, - str_with_time_zone_to_timestamptz, -}; -use risingwave_common::types::{ - DataType, Date, Datum, Decimal, Int256, Interval, JsonbVal, ScalarImpl, Time, -}; -use simd_json::value::StaticNode; use simd_json::{BorrowedValue, ValueAccess}; -use crate::source::SourceFormat; -use crate::{ - ensure_i16, ensure_i32, ensure_i64, ensure_rust_type, ensure_str, simd_json_ensure_float, -}; pub(crate) fn json_object_smart_get_value<'a, 'b>( v: &'b simd_json::BorrowedValue<'a>, key: Cow<'b, str>, @@ -47,164 +31,3 @@ pub(crate) fn json_object_smart_get_value<'a, 'b>( } None } - -pub(crate) fn do_parse_simd_json_value( - format: &SourceFormat, - dtype: &DataType, - v: &BorrowedValue<'_>, -) -> Result { - let v = match (dtype, format) { - (DataType::Boolean, SourceFormat::CanalJson) => (ensure_rust_type!(v, i16) != 0).into(), - (DataType::Boolean, _) => match v { - BorrowedValue::Static(StaticNode::Bool(b)) => (*b).into(), - // debezium converts bool to int, false -> 0, true -> 1, for mysql and postgres - BorrowedValue::Static(v) => match v.as_i64() { - Some(0i64) => ScalarImpl::Bool(false), - Some(1i64) => ScalarImpl::Bool(true), - _ => anyhow::bail!("expect bool, but found {v}"), - }, - _ => anyhow::bail!("expect bool, but found {v}"), - }, - (DataType::Int16, SourceFormat::CanalJson) => ensure_rust_type!(v, i16).into(), - (DataType::Int16, _) => ensure_i16!(v, i16).into(), - (DataType::Int32, SourceFormat::CanalJson) => ensure_rust_type!(v, i32).into(), - (DataType::Int32, _) => ensure_i32!(v, i32).into(), - (DataType::Int64, SourceFormat::CanalJson) => ensure_rust_type!(v, i64).into(), - (DataType::Int64, _) => ensure_i64!(v, i64).into(), - (DataType::Int256, _) => Int256::from_str(ensure_str!(v, "quoted int256"))?.into(), - (DataType::Serial, _) => anyhow::bail!("serial should not be parsed"), - // if the value is too large, str parsing to f32 will fail - (DataType::Float32, SourceFormat::CanalJson) => ensure_rust_type!(v, f32).into(), - // when f32 overflows, the value is converted to `inf` which is inappropriate - (DataType::Float32, _) => { - let scalar_val = ScalarImpl::Float32((simd_json_ensure_float!(v, f32) as f32).into()); - if let ScalarImpl::Float32(f) = scalar_val { - if f.0.is_infinite() { - anyhow::bail!("{v} is out of range for type f32"); - } - } - scalar_val - } - (DataType::Float64, SourceFormat::CanalJson) => ensure_rust_type!(v, f64).into(), - (DataType::Float64, _) => simd_json_ensure_float!(v, f64).into(), - (DataType::Decimal, SourceFormat::CanalJson) => Decimal::from_str(ensure_str!(v, "string")) - .map_err(|_| anyhow!("parse decimal from string err {}", v))? - .into(), - // FIXME: decimal should have more precision than f64 - (DataType::Decimal, _) => Decimal::try_from(simd_json_ensure_float!(v, Decimal)) - .map_err(|_| anyhow!("expect decimal"))? - .into(), - (DataType::Varchar, _) => ensure_str!(v, "varchar").to_string().into(), - (DataType::Bytea, _) => match format { - // debezium converts postgres bytea to base64 format - SourceFormat::DebeziumJson => ScalarImpl::Bytea( - base64::engine::general_purpose::STANDARD - .decode(ensure_str!(v, "bytea")) - .map_err(|e| anyhow!(e))? - .into(), - ), - _ => ScalarImpl::Bytea(str_to_bytea(ensure_str!(v, "bytea")).map_err(|e| anyhow!(e))?), - }, - (DataType::Date, _) => match v { - BorrowedValue::String(s) => str_to_date(s).map_err(|e| anyhow!(e))?.into(), - BorrowedValue::Static(_) => { - // debezium converts date to i32 for mysql and postgres - Date::with_days_since_unix_epoch(ensure_i32!(v, i32))?.into() - } - _ => anyhow::bail!("expect date, but found {v}"), - }, - (DataType::Time, _) => { - match v { - BorrowedValue::String(s) => str_to_time(s).map_err(|e| anyhow!(e))?.into(), - BorrowedValue::Static(_) => { - match format { - SourceFormat::DebeziumJson => { - // debezium converts time to i64 for mysql and postgres in microseconds - Time::with_micro(ensure_i64!(v, i64).try_into().map_err(|_| { - anyhow!("cannot cast i64 to time, value out of range") - })?)? - .into() - } - _ => Time::with_milli(ensure_i64!(v, i64).try_into().map_err(|_| { - anyhow!("cannot cast i64 to time, value out of range") - })?)? - .into(), - } - } - _ => anyhow::bail!("expect time, but found {v}"), - } - } - (DataType::Timestamp, _) => match v { - BorrowedValue::String(s) => str_to_timestamp(s).map_err(|e| anyhow!(e))?.into(), - BorrowedValue::Static(_) => i64_to_timestamp(ensure_i64!(v, i64)) - .map_err(|e| anyhow!(e))? - .into(), - _ => anyhow::bail!("expect timestamp, but found {v}"), - }, - (DataType::Timestamptz, _) => match v { - BorrowedValue::String(s) => str_with_time_zone_to_timestamptz(s) - .map_err(|e| anyhow!(e))? - .into(), - BorrowedValue::Static(_) => i64_to_timestamptz(ensure_i64!(v, i64)) - .map_err(|e| anyhow!(e))? - .into(), - _ => anyhow::bail!("expect timestamptz, but found {v}"), - }, - (DataType::Jsonb, _) => { - // jsonb will be output as a string in debezium format - if *format == SourceFormat::DebeziumJson { - ScalarImpl::Jsonb(JsonbVal::from_str(ensure_str!(v, "jsonb"))?) - } else { - let v: serde_json::Value = v.clone().try_into()?; - #[expect(clippy::disallowed_methods)] - ScalarImpl::Jsonb(JsonbVal::from_serde(v)) - } - } - (DataType::Struct(struct_type_info), _) => { - let fields: Vec> = struct_type_info - .iter() - .map(|(name, ty)| { - simd_json_parse_value(format, ty, json_object_smart_get_value(v, name.into())) - }) - .collect::>>()?; - ScalarImpl::Struct(StructValue::new(fields)) - } - (DataType::List(item_type), _) => { - if let BorrowedValue::Array(values) = v { - let values = values - .iter() - .map(|v| simd_json_parse_value(format, item_type, Some(v))) - .collect::>>()?; - ScalarImpl::List(ListValue::new(values)) - } else { - let err_msg = format!( - "json parse error.type incompatible, dtype {:?}, value {:?}", - dtype, v - ); - return Err(anyhow!(err_msg)); - } - } - (DataType::Interval, _) => match format { - SourceFormat::DebeziumJson => { - ScalarImpl::Interval(Interval::from_iso_8601(ensure_str!(v, "interval"))?) - } - _ => unimplemented!(), - }, - }; - Ok(v) -} - -#[inline] -pub(crate) fn simd_json_parse_value( - // column: &ColumnDesc, - format: &SourceFormat, - dtype: &DataType, - value: Option<&BorrowedValue<'_>>, -) -> Result { - match value { - None | Some(BorrowedValue::Static(StaticNode::Null)) => Ok(None), - Some(v) => Ok(Some(do_parse_simd_json_value(format, dtype, v).map_err( - |e| anyhow!("failed to parse type '{}' from json: {}", dtype, e), - )?)), - } -} diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index cb0a7d28a80e..b0e2770f4c3c 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -16,7 +16,6 @@ use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; -use apache_avro::types::Value; use apache_avro::{from_avro_datum, Schema}; use itertools::Itertools; use reqwest::Url; @@ -24,14 +23,13 @@ use risingwave_common::error::ErrorCode::{InternalError, ProtocolError}; use risingwave_common::error::{Result, RwError}; use risingwave_pb::plan_common::ColumnDesc; -use super::operators::*; use crate::common::UpsertMessage; use crate::parser::avro::schema_resolver::ConfluentSchemaResolver; -use crate::parser::avro::util::{ - avro_field_to_column_desc, extract_inner_field_schema, from_avro_value, - get_field_from_avro_value, -}; +use crate::parser::avro::util::avro_field_to_column_desc; use crate::parser::schema_registry::{extract_schema_id, Client}; +use crate::parser::unified::avro::{AvroAccess, AvroParseOptions}; +use crate::parser::unified::debezium::DebeziumChangeEvent; +use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::util::get_kafka_topic; use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard}; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; @@ -209,83 +207,32 @@ impl DebeziumAvroParser { let key_schema = self.schema_resolver.get(schema_id).await?; let key = from_avro_datum(key_schema.as_ref(), &mut raw_payload, None) .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; - return writer.delete(|column| { - let field_schema = - extract_inner_field_schema(&self.inner_schema, Some(&column.name))?; - match get_field_from_avro_value(&key, column.name.as_str()) { - Ok(value) => from_avro_value(value.clone(), field_schema), - Err(err) => { - tracing::error!(?err); - Ok(None) - } - } - }); - } - let (schema_id, mut raw_payload) = extract_schema_id(&payload)?; - let writer_schema = self.schema_resolver.get(schema_id).await?; - let avro_value = from_avro_datum(writer_schema.as_ref(), &mut raw_payload, None) - .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; - let op = get_field_from_avro_value(&avro_value, OP)?; - - if let Value::String(op_str) = op { - match op_str.as_str() { - DEBEZIUM_CREATE_OP | DEBEZIUM_UPDATE_OP | DEBEZIUM_READ_OP => { - // - If debezium op == CREATE, emit INSERT to downstream using the after field - // in the debezium value as the INSERT row. - // - If debezium op == UPDATE, emit INSERT to downstream using the after field - // in the debezium value as the INSERT row. - - let after = get_field_from_avro_value(&avro_value, AFTER)?; - if *after == Value::Null { - return Err(RwError::from(ProtocolError(format!( - "after is null for {} event", - op_str - )))); - } - writer.insert(|column| { - let field_schema = - extract_inner_field_schema(&self.inner_schema, Some(&column.name))?; - from_avro_value( - get_field_from_avro_value(after, column.name.as_str())?.clone(), - field_schema, - ) - }) - } - DEBEZIUM_DELETE_OP => { - // If debezium op == DELETE, emit DELETE to downstream using the before field as - // the DELETE row. - - let before = get_field_from_avro_value(&avro_value, BEFORE) - .map_err(|_| { - RwError::from(ProtocolError( - "before is missing for the Debezium delete op. If you are using postgres, you may want to try ALTER TABLE $TABLE_NAME REPLICA IDENTITY FULL;".to_string(), - )) - })?; - if *before == Value::Null { - return Err(RwError::from(ProtocolError( - "before is null for DELETE event".to_string(), - ))); - } + let row_op = DebeziumChangeEvent::with_key(AvroAccess::new( + &key, + AvroParseOptions::default().with_schema(&key_schema), + )); - writer.delete(|column| { - let field_schema = - extract_inner_field_schema(&self.inner_schema, Some(&column.name))?; - from_avro_value( - get_field_from_avro_value(before, column.name.as_str())?.clone(), - field_schema, - ) - }) - } - _ => Err(RwError::from(ProtocolError(format!( - "unknown debezium op: {}", - op_str - )))), - } + apply_row_operation_on_stream_chunk_writer(row_op, &mut writer) } else { - Err(RwError::from(ProtocolError( - "payload op is not a string ".to_owned(), - ))) + let (schema_id, mut raw_payload) = extract_schema_id(&payload)?; + let writer_schema = self.schema_resolver.get(schema_id).await?; + let avro_value = from_avro_datum(writer_schema.as_ref(), &mut raw_payload, None) + .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; + + let resolver = apache_avro::schema::ResolvedSchema::try_from(&*self.outer_schema) + .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; + // todo: to_resolved may cause stackoverflow if there's a loop in the schema + let schema = resolver + .to_resolved(&self.outer_schema) + .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; + + let row_op = DebeziumChangeEvent::with_value(AvroAccess::new( + &avro_value, + AvroParseOptions::default().with_schema(&schema), + )); + + apply_row_operation_on_stream_chunk_writer(row_op, &mut writer) } } } diff --git a/src/connector/src/parser/debezium/mongo_json_parser.rs b/src/connector/src/parser/debezium/mongo_json_parser.rs index 71c22e807c4e..77455a155731 100644 --- a/src/connector/src/parser/debezium/mongo_json_parser.rs +++ b/src/connector/src/parser/debezium/mongo_json_parser.rs @@ -16,26 +16,15 @@ use std::fmt::Debug; use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{Result, RwError}; -use risingwave_common::types::{DataType, Datum, ScalarImpl}; -use simd_json::{BorrowedValue, StaticNode, ValueAccess}; +use risingwave_common::types::DataType; +use simd_json::{BorrowedValue, Mutable}; -use super::operators::*; +use crate::parser::unified::debezium::{DebeziumChangeEvent, MongoProjeciton}; +use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; +use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard}; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; -const BEFORE: &str = "before"; -const AFTER: &str = "after"; -const OP: &str = "op"; - -#[inline] -fn ensure_not_null<'a, 'b: 'a>(value: &'a BorrowedValue<'b>) -> Option<&'a BorrowedValue<'b>> { - if let BorrowedValue::Static(StaticNode::Null) = value { - None - } else { - Some(value) - } -} - #[derive(Debug)] pub struct DebeziumMongoJsonParser { pub(crate) rw_columns: Vec, @@ -44,71 +33,6 @@ pub struct DebeziumMongoJsonParser { source_ctx: SourceContextRef, } -fn parse_bson_value( - id_type: &DataType, - payload_type: &DataType, /* Only `DataType::Jsonb` is supported now. But it can be extended - * in the future. */ - value: &BorrowedValue<'_>, -) -> anyhow::Result<(Datum, Datum)> { - let bson_str = value.as_str().unwrap_or_default(); - let bson_value: serde_json::Value = serde_json::from_str(bson_str)?; - let bson_doc = bson_value.as_object().ok_or_else(|| { - RwError::from(ProtocolError( - "Debezuim Mongo requires payload is a document".into(), - )) - })?; - let id_field = bson_doc - .get("_id") - .ok_or_else(|| { - RwError::from(ProtocolError( - "Debezuim Mongo requires document has a `_id` field".into(), - )) - })? - .clone(); - let id: Datum = match id_type { - DataType::Jsonb => ScalarImpl::Jsonb(id_field.into()).into(), - DataType::Varchar => match id_field { - serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.into())), - serde_json::Value::Object(obj) if obj.contains_key("$oid") => Some(ScalarImpl::Utf8( - obj["$oid"].as_str().to_owned().unwrap_or_default().into(), - )), - _ => Err(RwError::from(ProtocolError(format!( - "Can not convert bson {:?} to {:?}", - id_field, id_type - ))))?, - }, - DataType::Int32 => { - if let serde_json::Value::Object(ref obj) = id_field && obj.contains_key("$numberInt") { - let int_str = obj["$numberInt"].as_str().unwrap_or_default(); - Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default())) - } else { - Err(RwError::from(ProtocolError(format!( - "Can not convert bson {:?} to {:?}", - id_field, id_type - ))))? - } - } - DataType::Int64 => { - if let serde_json::Value::Object(ref obj) = id_field && obj.contains_key("$numberLong") - { - let int_str = obj["$numberLong"].as_str().unwrap_or_default(); - Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default())) - } else { - Err(RwError::from(ProtocolError(format!( - "Can not convert bson {:?} to {:?}", - id_field, id_type - ))))? - } - } - _ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."), - }; - let payload: Datum = match payload_type { - DataType::Jsonb => ScalarImpl::Jsonb(bson_value.into()).into(), - _ => unreachable!("DebeziumMongoJsonParser::new must ensure payload column datatypes."), - }; - - Ok((id, payload)) -} impl DebeziumMongoJsonParser { pub fn new(rw_columns: Vec, source_ctx: SourceContextRef) -> Result { let id_column = rw_columns @@ -158,114 +82,23 @@ impl DebeziumMongoJsonParser { mut payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, ) -> Result { - let event: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload) + let mut event: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload) .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; // Event can be configured with and without the "payload" field present. // See https://github.com/risingwavelabs/risingwave/issues/10178 - let payload = ensure_not_null(event.get("payload").unwrap_or(&event)); - let op = payload.get(OP).and_then(|v| v.as_str()).ok_or_else(|| { - RwError::from(ProtocolError( - "op field not found in debezium json".to_owned(), - )) - })?; - - match op { - DEBEZIUM_UPDATE_OP => { - let before = payload - .get(BEFORE) - .and_then(ensure_not_null) - .ok_or_else(|| { - RwError::from(ProtocolError( - "before is missing for updating event.".to_string(), - )) - })?; - let after = payload - .get(AFTER) - .and_then(ensure_not_null) - .ok_or_else(|| { - RwError::from(ProtocolError( - "after is missing for updating event".to_string(), - )) - })?; - let (before_id, before_payload) = parse_bson_value( - &self.id_column.data_type, - &self.payload_column.data_type, - before, - )?; - let (after_id, after_payload) = parse_bson_value( - &self.id_column.data_type, - &self.payload_column.data_type, - after, - )?; + let payload = if let Some(payload) = event.get_mut("payload") { + std::mem::take(payload) + } else { + event + }; - writer.update(|column| { - if column.name == self.id_column.name { - Ok((before_id.clone(), after_id.clone())) - } else if column.name == self.payload_column.name { - Ok((before_payload.clone(), after_payload.clone())) - } else { - unreachable!("writer.update must not pass columns more than required") - } - }) - } - DEBEZIUM_CREATE_OP | DEBEZIUM_READ_OP => { - let after = payload - .get(AFTER) - .and_then(ensure_not_null) - .ok_or_else(|| { - RwError::from(ProtocolError( - "after is missing for creating event".to_string(), - )) - })?; + let accessor = JsonAccess::new_with_options(payload, &JsonParseOptions::DEBEZIUM); - let (after_id, after_payload) = parse_bson_value( - &self.id_column.data_type, - &self.payload_column.data_type, - after, - )?; + let row_op = DebeziumChangeEvent::with_value(MongoProjeciton::new(accessor)); - writer.insert(|column| { - if column.name == self.id_column.name { - Ok(after_id.clone()) - } else if column.name == self.payload_column.name { - Ok(after_payload.clone()) - } else { - unreachable!("writer.insert must not pass columns more than required") - } - }) - } - DEBEZIUM_DELETE_OP => { - let before = payload - .get(BEFORE) - .and_then(ensure_not_null) - .ok_or_else(|| { - RwError::from(ProtocolError( - "before is missing for delete event".to_string(), - )) - })?; - let (before_id, before_payload) = parse_bson_value( - &self.id_column.data_type, - &self.payload_column.data_type, - before, - )?; - - writer.delete(|column| { - if column.name == self.id_column.name { - Ok(before_id.clone()) - } else if column.name == self.payload_column.name { - Ok(before_payload.clone()) - } else { - unreachable!("writer.delete must not pass columns more than required") - } - }) - } - _ => Err(RwError::from(ProtocolError(format!( - "unknown debezium op: {}", - op - )))), - } + apply_row_operation_on_stream_chunk_writer(row_op, &mut writer) } } @@ -292,50 +125,33 @@ mod tests { use risingwave_common::array::Op; use risingwave_common::catalog::ColumnId; use risingwave_common::row::Row; - use risingwave_common::types::ToOwnedDatum; + use risingwave_common::types::{ScalarImpl, ToOwnedDatum}; use super::*; + use crate::parser::unified::debezium::extract_bson_id; use crate::parser::SourceStreamChunkBuilder; #[test] fn test_parse_bson_value_id_int() { let data = r#"{"_id":{"$numberInt":"2345"}}"#; let pld: serde_json::Value = serde_json::from_str(data).unwrap(); - let (a, b) = parse_bson_value( - &DataType::Int32, - &DataType::Jsonb, - &simd_json::value::borrowed::Value::String(data.into()), - ) - .unwrap(); + let a = extract_bson_id(&DataType::Int32, &pld).unwrap(); assert_eq!(a, Some(ScalarImpl::Int32(2345))); - assert_eq!(b, Some(ScalarImpl::Jsonb(pld.into()))) } #[test] fn test_parse_bson_value_id_long() { let data = r#"{"_id":{"$numberLong":"22423434544"}}"#; let pld: serde_json::Value = serde_json::from_str(data).unwrap(); - let (a, b) = parse_bson_value( - &DataType::Int64, - &DataType::Jsonb, - &simd_json::value::borrowed::Value::String(data.into()), - ) - .unwrap(); + let a = extract_bson_id(&DataType::Int64, &pld).unwrap(); assert_eq!(a, Some(ScalarImpl::Int64(22423434544))); - assert_eq!(b, Some(ScalarImpl::Jsonb(pld.into()))) } #[test] fn test_parse_bson_value_id_oid() { let data = r#"{"_id":{"$oid":"5d505646cf6d4fe581014ab2"}}"#; let pld: serde_json::Value = serde_json::from_str(data).unwrap(); - let (a, b) = parse_bson_value( - &DataType::Varchar, - &DataType::Jsonb, - &simd_json::value::borrowed::Value::String(data.into()), - ) - .unwrap(); + let a = extract_bson_id(&DataType::Varchar, &pld).unwrap(); assert_eq!(a, Some(ScalarImpl::Utf8("5d505646cf6d4fe581014ab2".into()))); - assert_eq!(b, Some(ScalarImpl::Jsonb(pld.into()))) } fn get_columns() -> Vec { let descs = vec![ diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index 5fb6079d414f..8191de608027 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -16,25 +16,13 @@ use std::fmt::Debug; use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{Result, RwError}; -use simd_json::{BorrowedValue, StaticNode, ValueAccess}; +use simd_json::{BorrowedValue, Mutable}; -use super::operators::*; -use crate::parser::common::{json_object_smart_get_value, simd_json_parse_value}; +use crate::parser::unified::debezium::DebeziumChangeEvent; +use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; +use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard}; -use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceFormat}; - -const BEFORE: &str = "before"; -const AFTER: &str = "after"; -const OP: &str = "op"; - -#[inline] -fn ensure_not_null<'a, 'b: 'a>(value: &'a BorrowedValue<'b>) -> Option<&'a BorrowedValue<'b>> { - if let BorrowedValue::Static(StaticNode::Null) = value { - None - } else { - Some(value) - } -} +use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; #[derive(Debug)] pub struct DebeziumJsonParser { @@ -56,94 +44,20 @@ impl DebeziumJsonParser { mut payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, ) -> Result { - let event: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload) + let mut event: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload) .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; - // Event can be configured with and without the "payload" field present. - // See https://github.com/risingwavelabs/risingwave/issues/10178 - let payload = ensure_not_null(event.get("payload").unwrap_or(&event)); - let op = payload.get(OP).and_then(|v| v.as_str()).ok_or_else(|| { - RwError::from(ProtocolError( - "op field not found in debezium json".to_owned(), - )) - })?; - - let format = SourceFormat::DebeziumJson; - match op { - DEBEZIUM_UPDATE_OP => { - let before = payload.get(BEFORE).and_then(ensure_not_null).ok_or_else(|| { - RwError::from(ProtocolError( - "before is missing for updating event. If you are using postgres, you may want to try ALTER TABLE $TABLE_NAME REPLICA IDENTITY FULL;".to_string(), - )) - })?; - - let after = payload - .get(AFTER) - .and_then(ensure_not_null) - .ok_or_else(|| { - RwError::from(ProtocolError( - "after is missing for updating event".to_string(), - )) - })?; - - writer.update(|column| { - let before = simd_json_parse_value( - &format, - &column.data_type, - json_object_smart_get_value(before, (&column.name).into()), - )?; - let after = simd_json_parse_value( - &format, - &column.data_type, - json_object_smart_get_value(after, (&column.name).into()), - )?; - - Ok((before, after)) - }) - } - DEBEZIUM_CREATE_OP | DEBEZIUM_READ_OP => { - let after = payload - .get(AFTER) - .and_then(ensure_not_null) - .ok_or_else(|| { - RwError::from(ProtocolError( - "after is missing for creating event".to_string(), - )) - })?; - - writer.insert(|column| { - simd_json_parse_value( - &format, - &column.data_type, - json_object_smart_get_value(after, (&column.name).into()), - ) - .map_err(Into::into) - }) - } - DEBEZIUM_DELETE_OP => { - let before = payload - .get(BEFORE) - .and_then(ensure_not_null) - .ok_or_else(|| { - RwError::from(ProtocolError( - "before is missing for delete event".to_string(), - )) - })?; - - writer.delete(|column| { - simd_json_parse_value( - &format, - &column.data_type, - json_object_smart_get_value(before, (&column.name).into()), - ) - .map_err(Into::into) - }) - } - _ => Err(RwError::from(ProtocolError(format!( - "unknown debezium op: {}", - op - )))), - } + let payload = if let Some(payload) = event.get_mut("payload") { + std::mem::take(payload) + } else { + event + }; + + let accessor = JsonAccess::new_with_options(payload, &JsonParseOptions::DEBEZIUM); + + let row_op = DebeziumChangeEvent::with_value(accessor); + + apply_row_operation_on_stream_chunk_writer(row_op, &mut writer) } } @@ -342,57 +256,20 @@ mod tests { for data in input { let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); - let [(op1, row1), (op2, row2)]: [_; 2] = parse_one(parser, columns.clone(), data) + let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data) .await .try_into() .unwrap(); - assert_eq!(op1, Op::UpdateDelete); - assert_eq!(op2, Op::UpdateInsert); - - assert!(row1[0].eq(&Some(ScalarImpl::Int32(102)))); - assert!(row1[1].eq(&Some(ScalarImpl::Utf8("car battery".into())))); - assert!(row1[2].eq(&Some(ScalarImpl::Utf8("12V car battery".into())))); - assert!(row1[3].eq(&Some(ScalarImpl::Float64(8.1.into())))); - - assert!(row2[0].eq(&Some(ScalarImpl::Int32(102)))); - assert!(row2[1].eq(&Some(ScalarImpl::Utf8("car battery".into())))); - assert!(row2[2].eq(&Some(ScalarImpl::Utf8("24V car battery".into())))); - assert!(row2[3].eq(&Some(ScalarImpl::Float64(9.1.into())))); - } - } - - #[tokio::test] - async fn test1_update_with_before_null() { - // the test case it identical with test_debezium_json_parser_insert but op is 'u' - // "before": null, - // "after": { - // "id": 102, - // "name": "car battery", - // "description": "12V car battery", - // "weight": 8.1 - // }, - let input = vec![ - // data with payload field - br#"{"payload":{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551564960,"transaction":null}}"#.to_vec(), - // data without payload field - br#"{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551564960,"transaction":null}"#.to_vec()]; - - let columns = get_test1_columns(); - for data in input { - let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); + assert_eq!(op, Op::Insert); - let mut builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 2); - let writer = builder.row_writer(); - if let Err(e) = parser.parse_inner(data, writer).await { - println!("{:?}", e.to_string()); - } else { - panic!("the test case is expected to be failed"); - } + assert!(row[0].eq(&Some(ScalarImpl::Int32(102)))); + assert!(row[1].eq(&Some(ScalarImpl::Utf8("car battery".into())))); + assert!(row[2].eq(&Some(ScalarImpl::Utf8("24V car battery".into())))); + assert!(row[3].eq(&Some(ScalarImpl::Float64(9.1.into())))); } } } - // test2 covers read/insert/update/delete event on the following MySQL table for debezium json: // CREATE TABLE IF NOT EXISTS orders ( // O_KEY BIGINT NOT NULL, @@ -545,57 +422,34 @@ mod tests { let columns = get_test2_columns(); let parser = DebeziumJsonParser::new(columns.clone(), Default::default()).unwrap(); - let [(op1, row1), (op2, row2)]: [_; 2] = parse_one(parser, columns, data.to_vec()) + let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec()) .await .try_into() .unwrap(); - assert_eq!(op1, Op::UpdateDelete); - assert_eq!(op2, Op::UpdateInsert); - - assert!(row1[0].eq(&Some(ScalarImpl::Int64(111)))); - assert!(row1[1].eq(&Some(ScalarImpl::Bool(true)))); - assert!(row1[2].eq(&Some(ScalarImpl::Int16(-1)))); - assert!(row1[3].eq(&Some(ScalarImpl::Int32(-1111)))); - assert!(row1[4].eq(&Some(ScalarImpl::Float32((-11.11).into())))); - assert!(row1[5].eq(&Some(ScalarImpl::Float64((-111.11111).into())))); - assert!(row1[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap())))); - assert!(row1[7].eq(&Some(ScalarImpl::Utf8("yes please".into())))); - assert!(row1[8].eq(&Some(ScalarImpl::Date(Date::new( - NaiveDate::from_ymd_opt(1000, 1, 1).unwrap() - ))))); - assert!(row1[9].eq(&Some(ScalarImpl::Time(Time::new( - NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap() - ))))); - assert!(row1[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( - "1970-01-01T00:00:00".parse().unwrap() - ))))); - assert!(row1[11].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( - "1970-01-01T00:00:01".parse().unwrap() - ))))); - assert_json_eq(&row1[12], "{\"k1\": \"v1\", \"k2\": 11}"); - - assert!(row2[0].eq(&Some(ScalarImpl::Int64(111)))); - assert!(row2[1].eq(&Some(ScalarImpl::Bool(false)))); - assert!(row2[2].eq(&Some(ScalarImpl::Int16(3)))); - assert!(row2[3].eq(&Some(ScalarImpl::Int32(3333)))); - assert!(row2[4].eq(&Some(ScalarImpl::Float32((33.33).into())))); - assert!(row2[5].eq(&Some(ScalarImpl::Float64((333.33333).into())))); - assert!(row2[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap())))); - assert!(row2[7].eq(&Some(ScalarImpl::Utf8("no thanks".into())))); - assert!(row2[8].eq(&Some(ScalarImpl::Date(Date::new( + assert_eq!(op, Op::Insert); + + assert!(row[0].eq(&Some(ScalarImpl::Int64(111)))); + assert!(row[1].eq(&Some(ScalarImpl::Bool(false)))); + assert!(row[2].eq(&Some(ScalarImpl::Int16(3)))); + assert!(row[3].eq(&Some(ScalarImpl::Int32(3333)))); + assert!(row[4].eq(&Some(ScalarImpl::Float32((33.33).into())))); + assert!(row[5].eq(&Some(ScalarImpl::Float64((333.33333).into())))); + assert!(row[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap())))); + assert!(row[7].eq(&Some(ScalarImpl::Utf8("no thanks".into())))); + assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new( NaiveDate::from_ymd_opt(9999, 12, 31).unwrap() ))))); - assert!(row2[9].eq(&Some(ScalarImpl::Time(Time::new( + assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new( NaiveTime::from_hms_micro_opt(23, 59, 59, 0).unwrap() ))))); - assert!(row2[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( + assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( "5138-11-16T09:46:39".parse().unwrap() ))))); - assert!(row2[11].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( + assert!(row[11].eq(&Some(ScalarImpl::Timestamp(Timestamp::new( "2038-01-09T03:14:07".parse().unwrap() ))))); - assert_json_eq(&row2[12], "{\"k1\": \"v1_updated\", \"k2\": 33}"); + assert_json_eq(&row[12], "{\"k1\": \"v1_updated\", \"k2\": 33}"); } #[tokio::test] diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index a060b2dd9fb0..5bd5ab722f0c 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -12,17 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use itertools::Itertools; -use risingwave_common::error::ErrorCode::ProtocolError; +use risingwave_common::error::ErrorCode::{self, ProtocolError}; use risingwave_common::error::{Result, RwError}; -use simd_json::BorrowedValue; use super::ByteStreamSourceParser; use crate::common::UpsertMessage; -use crate::parser::common::{json_object_smart_get_value, simd_json_parse_value}; -use crate::parser::util::at_least_one_ok; +use crate::parser::unified::json::JsonAccess; +use crate::parser::unified::upsert::UpsertChangeEvent; +use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::{SourceStreamChunkRowWriter, WriteGuard}; -use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceFormat}; +use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; /// Parser for JSON format #[derive(Debug)] @@ -60,72 +59,65 @@ impl JsonParser { }) } - #[inline(always)] - fn parse_single_value( - value: &BorrowedValue<'_>, - writer: &mut SourceStreamChunkRowWriter<'_>, - ) -> Result { - writer.insert(|desc| { - simd_json_parse_value( - &SourceFormat::Json, - &desc.data_type, - json_object_smart_get_value(value, desc.name.as_str().into()), - ) - .map_err(|e| { - tracing::error!("failed to process value ({}): {}", value, e); - e.into() - }) - }) - } - #[allow(clippy::unused_async)] pub async fn parse_inner( &self, - payload: Vec, + mut payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, ) -> Result { - enum Op { - Insert, - Delete, - } - - let (mut payload_mut, op) = if self.enable_upsert { + if self.enable_upsert { let msg: UpsertMessage<'_> = bincode::deserialize(&payload) .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; - if !msg.record.is_empty() { - (msg.record.to_vec(), Op::Insert) + + let mut primary_key = msg.primary_key.to_vec(); + let mut record = msg.record.to_vec(); + let key_decoded = simd_json::to_borrowed_value(&mut primary_key) + .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; + + let value_decoded = if record.is_empty() { + None } else { - (msg.primary_key.to_vec(), Op::Delete) + Some( + simd_json::to_borrowed_value(&mut record) + .map_err(|e| RwError::from(ProtocolError(e.to_string())))?, + ) + }; + + let mut accessor = UpsertChangeEvent::default().with_key(JsonAccess::new(key_decoded)); + if let Some(value) = value_decoded { + accessor = accessor.with_value(JsonAccess::new(value)); } + apply_row_operation_on_stream_chunk_writer(accessor, &mut writer) } else { - (payload, Op::Insert) - }; - - let value: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload_mut) - .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; - - if let BorrowedValue::Array(ref objects) = value && matches!(op, Op::Insert) { - at_least_one_ok( - objects - .iter() - .map(|obj| Self::parse_single_value(obj, &mut writer)) - .collect_vec(), - ) - } else { - let fill_fn = |desc: &SourceColumnDesc| { - simd_json_parse_value( - &SourceFormat::Json, - &desc.data_type, - json_object_smart_get_value(&value, desc.name.as_str().into()), - ) - .map_err(|e| { - tracing::error!("failed to process value: {}", e); - e.into() - }) + let value = simd_json::to_borrowed_value(&mut payload) + .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; + let values = if let simd_json::BorrowedValue::Array(arr) = value { + arr + } else { + vec![value] }; - match op { - Op::Insert => writer.insert(fill_fn), - Op::Delete => writer.delete(fill_fn), + let mut errors = Vec::new(); + let mut guard = None; + for value in values { + let accessor: UpsertChangeEvent, JsonAccess<'_, '_>> = + UpsertChangeEvent::default().with_value(JsonAccess::new(value)); + + match apply_row_operation_on_stream_chunk_writer(accessor, &mut writer) { + Ok(this_guard) => guard = Some(this_guard), + Err(err) => errors.push(err), + } + } + + if let Some(guard) = guard { + if !errors.is_empty() { + tracing::error!(?errors, "failed to parse some columns"); + } + Ok(guard) + } else { + Err(RwError::from(ErrorCode::InternalError(format!( + "failed to parse all columns: {:?}", + errors + )))) } } } @@ -186,10 +178,10 @@ mod tests { SourceColumnDesc::simple("i64", DataType::Int64, 4.into()), SourceColumnDesc::simple("f32", DataType::Float32, 5.into()), SourceColumnDesc::simple("f64", DataType::Float64, 6.into()), - SourceColumnDesc::simple("Varchar", DataType::Varchar, 7.into()), - SourceColumnDesc::simple("Date", DataType::Date, 8.into()), - SourceColumnDesc::simple("Timestamp", DataType::Timestamp, 9.into()), - SourceColumnDesc::simple("Decimal", DataType::Decimal, 10.into()), + SourceColumnDesc::simple("varchar", DataType::Varchar, 7.into()), + SourceColumnDesc::simple("date", DataType::Date, 8.into()), + SourceColumnDesc::simple("timestamp", DataType::Timestamp, 9.into()), + SourceColumnDesc::simple("decimal", DataType::Decimal, 10.into()), ]; let parser = JsonParser::new(descs.clone(), Default::default()).unwrap(); @@ -278,7 +270,7 @@ mod tests { async fn test_json_parse_object_top_level() { test_json_parser(get_payload).await; } - + #[ignore] #[tokio::test] async fn test_json_parse_array_top_level() { test_json_parser(get_array_top_level_payload).await; diff --git a/src/connector/src/parser/macros.rs b/src/connector/src/parser/macros.rs deleted file mode 100644 index bd46b85f47b4..000000000000 --- a/src/connector/src/parser/macros.rs +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#[cfg(not(any( - target_feature = "sse4.2", - target_feature = "avx2", - target_feature = "neon", - target_feature = "simd128" -)))] -#[macro_export] -macro_rules! ensure_float { - ($v:ident, $t:ty) => { - $v.as_f64() - .ok_or_else(|| anyhow!(concat!("expect ", stringify!($t), ", but found {}"), $v))? - }; -} - -#[macro_export] -macro_rules! simd_json_ensure_float { - ($v:ident, $t:ty) => { - $v.cast_f64() - .ok_or_else(|| anyhow!(concat!("expect ", stringify!($t), ", but found {}"), $v))? - }; -} - -#[macro_export] -macro_rules! ensure_i16 { - ($v:ident, $t:ty) => { - $v.as_i16() - .ok_or_else(|| anyhow!(concat!("expect ", stringify!($t), ", but found {}"), $v))? - }; -} - -#[macro_export] -macro_rules! ensure_i32 { - ($v:ident, $t:ty) => { - $v.as_i32() - .ok_or_else(|| anyhow!(concat!("expect ", stringify!($t), ", but found {}"), $v))? - }; -} - -#[macro_export] -macro_rules! ensure_i64 { - ($v:ident, $t:ty) => { - $v.as_i64() - .ok_or_else(|| anyhow!(concat!("expect ", stringify!($t), ", but found {}"), $v))? - }; -} - -#[macro_export] -macro_rules! ensure_str { - ($v:ident, $t:literal) => { - $v.as_str() - .ok_or_else(|| anyhow!(concat!("expect ", $t, ", but found {}"), $v))? - }; -} - -#[macro_export] -macro_rules! ensure_rust_type { - ($v:ident, $t:ty) => { - $crate::ensure_str!($v, "string") - .parse::<$t>() - .map_err(|_| anyhow!("failed parse {} from {}", stringify!($t), $v))? - }; -} diff --git a/src/connector/src/parser/maxwell/simd_json_parser.rs b/src/connector/src/parser/maxwell/simd_json_parser.rs index 6b84499b24bb..963f19a26ff5 100644 --- a/src/connector/src/parser/maxwell/simd_json_parser.rs +++ b/src/connector/src/parser/maxwell/simd_json_parser.rs @@ -16,12 +16,13 @@ use std::fmt::Debug; use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{Result, RwError}; -use simd_json::{BorrowedValue, ValueAccess}; +use simd_json::BorrowedValue; -use super::operators::*; -use crate::parser::common::{json_object_smart_get_value, simd_json_parse_value}; +use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; +use crate::parser::unified::maxwell::MaxwellChangeEvent; +use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard}; -use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceFormat}; +use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; const AFTER: &str = "data"; const BEFORE: &str = "old"; @@ -50,73 +51,11 @@ impl MaxwellParser { let event: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload) .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; - let op = event.get(OP).and_then(|v| v.as_str()).ok_or_else(|| { - RwError::from(ProtocolError( - "op field not found in maxwell json".to_owned(), - )) - })?; + let accessor = JsonAccess::new_with_options(event, &JsonParseOptions::DEFAULT); - let format = SourceFormat::Maxwell; - match op { - MAXWELL_INSERT_OP => { - let after = event.get(AFTER).ok_or_else(|| { - RwError::from(ProtocolError( - "data is missing for creating event".to_string(), - )) - })?; - writer.insert(|column| { - simd_json_parse_value( - &format, - &column.data_type, - json_object_smart_get_value(after, column.name.as_str().into()), - ) - .map_err(Into::into) - }) - } - MAXWELL_UPDATE_OP => { - let after = event.get(AFTER).ok_or_else(|| { - RwError::from(ProtocolError( - "data is missing for updating event".to_string(), - )) - })?; - let before = event.get(BEFORE).ok_or_else(|| { - RwError::from(ProtocolError( - "old is missing for updating event".to_string(), - )) - })?; + let row_op = MaxwellChangeEvent::new(accessor); - writer.update(|column| { - // old only contains the changed columns but data contains all columns. - let col_name_lc = column.name.as_str(); - let before_value = json_object_smart_get_value(before, col_name_lc.into()) - .or_else(|| json_object_smart_get_value(after, col_name_lc.into())); - let before = simd_json_parse_value(&format, &column.data_type, before_value)?; - let after = simd_json_parse_value( - &format, - &column.data_type, - json_object_smart_get_value(after, col_name_lc.into()), - )?; - Ok((before, after)) - }) - } - MAXWELL_DELETE_OP => { - let before = event.get(AFTER).ok_or_else(|| { - RwError::from(ProtocolError("old is missing for delete event".to_string())) - })?; - writer.delete(|column| { - simd_json_parse_value( - &format, - &column.data_type, - json_object_smart_get_value(before, column.name.as_str().into()), - ) - .map_err(Into::into) - }) - } - other => Err(RwError::from(ProtocolError(format!( - "unknown Maxwell op: {}", - other - )))), - } + apply_row_operation_on_stream_chunk_writer(row_op, &mut writer) } } @@ -216,27 +155,7 @@ mod tests { { let (op, row) = rows.next().unwrap(); - assert_eq!(op, Op::UpdateDelete); - assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(2))); - assert_eq!( - row.datum_at(1).to_owned_datum(), - (Some(ScalarImpl::Utf8("alex".into()))) - ); - assert_eq!( - row.datum_at(2).to_owned_datum(), - (Some(ScalarImpl::Int16(1))) - ); - assert_eq!( - row.datum_at(3).to_owned_datum(), - (Some(ScalarImpl::Timestamp( - str_to_timestamp("1999-12-31 16:00:01").unwrap() - ))) - ) - } - - { - let (op, row) = rows.next().unwrap(); - assert_eq!(op, Op::UpdateInsert); + assert_eq!(op, Op::Insert); assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(2))); assert_eq!( row.datum_at(1).to_owned_datum(), diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 28dd99980a58..9268ce40d621 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -45,10 +45,10 @@ mod common; mod csv_parser; mod debezium; mod json_parser; -mod macros; mod maxwell; mod protobuf; mod schema_registry; +mod unified; mod util; /// A builder for building a [`StreamChunk`] from [`SourceColumnDesc`]. pub struct SourceStreamChunkBuilder { diff --git a/src/connector/src/parser/unified/avro.rs b/src/connector/src/parser/unified/avro.rs new file mode 100644 index 000000000000..fa07e18b0621 --- /dev/null +++ b/src/connector/src/parser/unified/avro.rs @@ -0,0 +1,286 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::str::FromStr; + +use apache_avro::types::Value; +use apache_avro::Schema; +use itertools::Itertools; +use risingwave_common::array::{ListValue, StructValue}; +use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz}; +use risingwave_common::types::{DataType, Date, Datum, Interval, JsonbVal, ScalarImpl}; +use risingwave_common::util::iter_util::ZipEqFast; + +use super::{Access, AccessError, AccessResult}; +use crate::parser::avro::util::{ + avro_decimal_to_rust_decimal, extract_inner_field_schema, unix_epoch_days, +}; +#[derive(Clone)] +/// Options for parsing an `AvroValue` into Datum, with an optional avro schema. +pub struct AvroParseOptions<'a> { + pub schema: Option<&'a Schema>, + /// Strict Mode + /// If strict mode is disabled, an int64 can be parsed from an AvroInt (int32) value. + pub relax_numeric: bool, +} + +impl<'a> Default for AvroParseOptions<'a> { + fn default() -> Self { + Self { + schema: None, + relax_numeric: true, + } + } +} + +impl<'a> AvroParseOptions<'a> { + pub fn with_schema(mut self, schema: &'a Schema) -> Self { + self.schema = Some(schema); + self + } + + fn extract_inner_schema(&self, key: Option<&'a str>) -> Option<&'a Schema> { + self.schema + .map(|schema| extract_inner_field_schema(schema, key)) + .transpose() + .map_err(|_err| tracing::error!("extract sub-schema")) + .ok() + .flatten() + } + + /// Parse an avro value into expected type. + /// 3 kinds of type info are used to parsing things. + /// - `type_expected`. The type that we expect the value is. + /// - value type. The type info together with the value argument. + /// - schema. The `AvroSchema` provided in option. + /// If both `type_expected` and schema are provided, it will check both strictly. + /// If only `type_expected` is provided, it will try to match the value type and the + /// `type_expected`, converting the value if possible. If only value is provided (without + /// schema and `type_expected`), the `DateType` will be inferred. + pub fn parse<'b>(&self, value: &'b Value, type_expected: Option<&'b DataType>) -> AccessResult + where + 'b: 'a, + { + let create_error = || AccessError::TypeError { + expected: format!("{:?}", type_expected), + got: format!("{:?}", value), + value: String::new(), + }; + + let v: ScalarImpl = match (type_expected, value) { + (_, Value::Null) => return Ok(None), + (_, Value::Union(_, v)) => { + let schema = self.extract_inner_schema(None); + return Self { + schema, + relax_numeric: self.relax_numeric, + } + .parse(v, type_expected); + } + // ---- Boolean ----- + (Some(DataType::Boolean) | None, Value::Boolean(b)) => (*b).into(), + // ---- Int16 ----- + (Some(DataType::Int16), Value::Int(i)) if self.relax_numeric => (*i as i16).into(), + (Some(DataType::Int16), Value::Long(i)) if self.relax_numeric => (*i as i16).into(), + + // ---- Int32 ----- + (Some(DataType::Int32) | None, Value::Int(i)) => (*i).into(), + (Some(DataType::Int32), Value::Long(i)) if self.relax_numeric => (*i as i32).into(), + // ---- Int64 ----- + (Some(DataType::Int64) | None, Value::Long(i)) => (*i).into(), + (Some(DataType::Int64), Value::Int(i)) if self.relax_numeric => (*i as i64).into(), + // ---- Float32 ----- + (Some(DataType::Float32) | None, Value::Float(i)) => (*i).into(), + (Some(DataType::Float32), Value::Double(i)) => (*i as f32).into(), + // ---- Float64 ----- + (Some(DataType::Float64) | None, Value::Double(i)) => (*i).into(), + (Some(DataType::Float64), Value::Float(i)) => (*i as f64).into(), + // ---- Decimal ----- + (Some(DataType::Decimal) | None, Value::Decimal(avro_decimal)) => { + let (precision, scale) = match self.schema { + Some(Schema::Decimal { + precision, scale, .. + }) => (*precision, *scale), + _ => Err(create_error())?, + }; + let decimal = avro_decimal_to_rust_decimal(avro_decimal.clone(), precision, scale) + .map_err(|_| create_error())?; + ScalarImpl::Decimal(risingwave_common::types::Decimal::Normalized(decimal)) + } + + // ---- Date ----- + (Some(DataType::Date) | None, Value::Date(days)) => { + Date::with_days(days + unix_epoch_days()) + .map_err(|_| create_error())? + .into() + } + // ---- Varchar ----- + (Some(DataType::Varchar) | None, Value::Enum(_, symbol)) => { + symbol.clone().into_boxed_str().into() + } + (Some(DataType::Varchar) | None, Value::String(s)) => s.clone().into_boxed_str().into(), + // ---- Timestamp ----- + (Some(DataType::Timestamp) | None, Value::TimestampMillis(ms)) => { + i64_to_timestamp(*ms).map_err(|_| create_error())?.into() + } + (Some(DataType::Timestamp) | None, Value::TimestampMicros(us)) => { + i64_to_timestamp(*us).map_err(|_| create_error())?.into() + } + + // ---- TimestampTz ----- + (Some(DataType::Timestamptz), Value::TimestampMillis(ms)) => { + i64_to_timestamptz(*ms).map_err(|_| create_error())?.into() + } + (Some(DataType::Timestamptz), Value::TimestampMicros(us)) => { + i64_to_timestamptz(*us).map_err(|_| create_error())?.into() + } + + // ---- Interval ----- + (Some(DataType::Interval) | None, Value::Duration(duration)) => { + let months = u32::from(duration.months()) as i32; + let days = u32::from(duration.days()) as i32; + let usecs = (u32::from(duration.millis()) as i64) * 1000; // never overflows + ScalarImpl::Interval(Interval::from_month_day_usec(months, days, usecs)) + } + // ---- Struct ----- + (Some(DataType::Struct(struct_type_info)), Value::Record(descs)) => StructValue::new( + struct_type_info + .names() + .zip_eq_fast(struct_type_info.types()) + .map(|(field_name, field_type)| { + let maybe_value = descs.iter().find(|(k, _v)| k == field_name); + if let Some((_, value)) = maybe_value { + let schema = self.extract_inner_schema(Some(field_name)); + Ok(Self { + schema, + relax_numeric: self.relax_numeric, + } + .parse(value, Some(field_type))?) + } else { + Ok(None) + } + }) + .collect::>()?, + ) + .into(), + (None, Value::Record(descs)) => { + let rw_values = descs + .iter() + .map(|(field_name, field_value)| { + let schema = self.extract_inner_schema(Some(field_name)); + Self { + schema, + relax_numeric: self.relax_numeric, + } + .parse(field_value, None) + }) + .collect::, AccessError>>()?; + ScalarImpl::Struct(StructValue::new(rw_values)) + } + // ---- List ----- + (Some(DataType::List(item_type)), Value::Array(arr)) => ListValue::new( + arr.iter() + .map(|v| { + let schema = self.extract_inner_schema(None); + Self { + schema, + relax_numeric: self.relax_numeric, + } + .parse(v, Some(item_type)) + }) + .collect::, AccessError>>()?, + ) + .into(), + (None, Value::Array(arr)) => ListValue::new( + arr.iter() + .map(|v| { + let schema = self.extract_inner_schema(None); + Self { + schema, + relax_numeric: self.relax_numeric, + } + .parse(v, None) + }) + .collect::, AccessError>>()?, + ) + .into(), + // ---- Bytea ----- + (Some(DataType::Bytea) | None, Value::Bytes(value)) => { + value.clone().into_boxed_slice().into() + } + // ---- Jsonb ----- + (Some(DataType::Jsonb), Value::String(s)) => { + JsonbVal::from_str(s).map_err(|_| create_error())?.into() + } + + (_expected, _got) => Err(create_error())?, + }; + Ok(Some(v)) + } +} + +pub struct AvroAccess<'a, 'b> { + value: &'a Value, + options: AvroParseOptions<'b>, +} + +impl<'a, 'b> AvroAccess<'a, 'b> { + pub fn new(value: &'a Value, options: AvroParseOptions<'b>) -> Self { + Self { value, options } + } +} + +impl<'a, 'b> Access for AvroAccess<'a, 'b> +where + 'a: 'b, +{ + fn access(&self, path: &[&str], type_expected: Option<&DataType>) -> AccessResult { + let mut value = self.value; + let mut options: AvroParseOptions<'_> = self.options.clone(); + + let mut i = 0; + while i < path.len() { + let key = path[i]; + let create_error = || AccessError::Undefined { + name: key.to_string(), + path: path.iter().take(i).join("."), + }; + match value { + Value::Union(_, v) => { + value = v; + options.schema = options.extract_inner_schema(None); + continue; + } + Value::Map(fields) if fields.contains_key(key) => { + value = fields.get(key).unwrap(); + options.schema = None; + i += 1; + continue; + } + Value::Record(fields) => { + if let Some((_, v)) = fields.iter().find(|(k, _)| k == key) { + value = v; + options.schema = options.extract_inner_schema(Some(key)); + i += 1; + continue; + } + } + _ => (), + } + Err(create_error())?; + } + + options.parse(value, type_expected) + } +} diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs new file mode 100644 index 000000000000..e0ec9c8559a7 --- /dev/null +++ b/src/connector/src/parser/unified/debezium.rs @@ -0,0 +1,180 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::{DataType, Datum, ScalarImpl}; + +use super::{Access, ChangeEvent, ChangeEventOperation}; + +pub struct DebeziumChangeEvent { + value_accessor: Option, + key_accessor: Option, +} + +const BEFORE: &str = "before"; +const AFTER: &str = "after"; +const OP: &str = "op"; + +pub const DEBEZIUM_READ_OP: &str = "r"; +pub const DEBEZIUM_CREATE_OP: &str = "c"; +pub const DEBEZIUM_UPDATE_OP: &str = "u"; +pub const DEBEZIUM_DELETE_OP: &str = "d"; + +impl DebeziumChangeEvent +where + A: Access, +{ + pub fn with_value(value_accessor: A) -> Self { + Self::new(None, Some(value_accessor)) + } + + pub fn with_key(key_accessor: A) -> Self { + Self::new(Some(key_accessor), None) + } + + /// Panic: one of the `key_accessor` or `value_accessor` must be provided. + fn new(key_accessor: Option, value_accessor: Option) -> Self { + assert!(key_accessor.is_some() || value_accessor.is_some()); + Self { + value_accessor, + key_accessor, + } + } +} + +impl ChangeEvent for DebeziumChangeEvent +where + A: Access, +{ + fn access_field( + &self, + name: &str, + type_expected: &risingwave_common::types::DataType, + ) -> super::AccessResult { + match self.op()? { + ChangeEventOperation::Delete => { + if let Some(va) = self.value_accessor.as_ref() { + va.access(&[BEFORE, name], Some(type_expected)) + } else { + self.key_accessor + .as_ref() + .unwrap() + .access(&[name], Some(type_expected)) + } + } + + // value should not be None. + ChangeEventOperation::Upsert => self + .value_accessor + .as_ref() + .unwrap() + .access(&[AFTER, name], Some(type_expected)), + } + } + + fn op(&self) -> std::result::Result { + if let Some(accessor) = &self.value_accessor { + if let Some(ScalarImpl::Utf8(op)) = accessor.access(&[OP], Some(&DataType::Varchar))? { + match op.as_ref() { + DEBEZIUM_READ_OP | DEBEZIUM_CREATE_OP | DEBEZIUM_UPDATE_OP => { + return Ok(ChangeEventOperation::Upsert) + } + DEBEZIUM_DELETE_OP => return Ok(ChangeEventOperation::Delete), + _ => (), + } + } + Err(super::AccessError::Undefined { + name: "op".into(), + path: Default::default(), + }) + } else { + Ok(ChangeEventOperation::Delete) + } + } +} + +pub struct MongoProjeciton { + accessor: A, +} + +pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> anyhow::Result { + let id_field = bson_doc + .get("_id") + .ok_or_else(|| anyhow::format_err!("Debezuim Mongo requires document has a `_id` field"))?; + let id: Datum = match id_type { + DataType::Jsonb => ScalarImpl::Jsonb(id_field.clone().into()).into(), + DataType::Varchar => match id_field { + serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())), + serde_json::Value::Object(obj) if obj.contains_key("$oid") => Some(ScalarImpl::Utf8( + obj["$oid"].as_str().to_owned().unwrap_or_default().into(), + )), + _ => anyhow::bail!( + "Can not convert bson {:?} to {:?}", + id_field, id_type + ), + }, + DataType::Int32 => { + if let serde_json::Value::Object(ref obj) = id_field && obj.contains_key("$numberInt") { + let int_str = obj["$numberInt"].as_str().unwrap_or_default(); + Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default())) + } else { + anyhow::bail!( + "Can not convert bson {:?} to {:?}", + id_field, id_type + ) + } + } + DataType::Int64 => { + if let serde_json::Value::Object(ref obj) = id_field && obj.contains_key("$numberLong") + { + let int_str = obj["$numberLong"].as_str().unwrap_or_default(); + Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default())) + } else { + anyhow::bail!( + "Can not convert bson {:?} to {:?}", + id_field, id_type + ) + } + } + _ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."), +}; + Ok(id) +} +impl MongoProjeciton { + pub fn new(accessor: A) -> Self { + Self { accessor } + } +} + +impl Access for MongoProjeciton +where + A: Access, +{ + fn access(&self, path: &[&str], type_expected: Option<&DataType>) -> super::AccessResult { + match path { + ["after" | "before", "_id"] => { + let payload = self.access(&[path[0]], Some(&DataType::Jsonb))?; + if let Some(ScalarImpl::Jsonb(bson_doc)) = payload { + Ok(extract_bson_id( + type_expected.unwrap_or(&DataType::Jsonb), + &bson_doc.take(), + )?) + } else { + unreachable!("the result of access must match the type_expected") + } + } + ["after" | "before", "payload"] => self.access(&[path[0]], Some(&DataType::Jsonb)), + _ => self.accessor.access(path, type_expected), + } + } +} diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs new file mode 100644 index 000000000000..294f987c6955 --- /dev/null +++ b/src/connector/src/parser/unified/json.rs @@ -0,0 +1,498 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::str::FromStr; + +use base64::Engine; +use itertools::Itertools; +use risingwave_common::array::{ListValue, StructValue}; +use risingwave_common::cast::{ + i64_to_timestamp, i64_to_timestamptz, str_to_bytea, str_to_date, str_to_time, str_to_timestamp, + str_with_time_zone_to_timestamptz, +}; +use risingwave_common::types::{ + DataType, Date, Decimal, Int256, Interval, JsonbVal, ScalarImpl, Time, +}; +use risingwave_common::util::iter_util::ZipEqFast; +use simd_json::{BorrowedValue, TryTypeError, ValueAccess, ValueType}; + +use super::{Access, AccessError, AccessResult}; +use crate::parser::common::json_object_smart_get_value; +#[derive(Clone, Debug)] +pub enum ByteaHandling { + Standard, + // debezium converts postgres bytea to base64 format + Base64, +} +#[derive(Clone, Debug)] +pub enum TimeHandling { + Milli, + Micro, +} +#[derive(Clone, Debug)] +pub enum JsonValueHandling { + AsValue, + AsString, +} +#[derive(Clone, Debug)] +pub enum NumericHandling { + Strict, + // should integer be parsed to float + Relax { + // should "3.14" be parsed to 3.14 in float + string_parsing: bool, + }, +} +#[derive(Clone, Debug)] +pub enum BooleanHandling { + Strict, + // should integer 1,0 be parsed to boolean (debezium) + Relax { + // should "True" "False" be parsed to true or false in boolean + string_parsing: bool, + // should string "1" "0" be paesed to boolean (cannal + mysql) + string_integer_parsing: bool, + }, +} + +#[derive(Clone, Debug)] +pub struct JsonParseOptions { + pub bytea_handling: ByteaHandling, + pub time_handling: TimeHandling, + pub json_value_handling: JsonValueHandling, + pub numeric_handling: NumericHandling, + pub boolean_handing: BooleanHandling, + pub ignoring_keycase: bool, +} + +impl Default for JsonParseOptions { + fn default() -> Self { + Self::DEFAULT.clone() + } +} + +impl JsonParseOptions { + pub const CANAL: JsonParseOptions = JsonParseOptions { + bytea_handling: ByteaHandling::Standard, + time_handling: TimeHandling::Micro, + json_value_handling: JsonValueHandling::AsValue, + numeric_handling: NumericHandling::Relax { + string_parsing: true, + }, + boolean_handing: BooleanHandling::Relax { + string_parsing: true, + string_integer_parsing: true, + }, + ignoring_keycase: true, + }; + pub const DEBEZIUM: JsonParseOptions = JsonParseOptions { + bytea_handling: ByteaHandling::Base64, + time_handling: TimeHandling::Micro, + json_value_handling: JsonValueHandling::AsString, + numeric_handling: NumericHandling::Relax { + string_parsing: false, + }, + boolean_handing: BooleanHandling::Relax { + string_parsing: false, + string_integer_parsing: false, + }, + ignoring_keycase: false, + }; + pub const DEFAULT: JsonParseOptions = JsonParseOptions { + bytea_handling: ByteaHandling::Standard, + time_handling: TimeHandling::Micro, + json_value_handling: JsonValueHandling::AsValue, + numeric_handling: NumericHandling::Relax { + string_parsing: false, + }, + boolean_handing: BooleanHandling::Strict, + ignoring_keycase: true, + }; + + pub fn parse( + &self, + value: &BorrowedValue<'_>, + type_expected: Option<&DataType>, + ) -> AccessResult { + let create_error = || AccessError::TypeError { + expected: format!("{:?}", type_expected), + got: value.value_type().to_string(), + value: value.to_string(), + }; + let v: ScalarImpl = match (type_expected, value.value_type()) { + (_, ValueType::Null) => return Ok(None), + // ---- Boolean ----- + (Some(DataType::Boolean) | None, ValueType::Bool) => value.as_bool().unwrap().into(), + + ( + Some(DataType::Boolean), + ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, + ) if matches!(self.boolean_handing, BooleanHandling::Relax { .. }) + && matches!(value.as_i64(), Some(0i64) | Some(1i64)) => + { + (value.as_i64() == Some(1i64)).into() + } + + (Some(DataType::Boolean), ValueType::String) + if matches!( + self.boolean_handing, + BooleanHandling::Relax { + string_parsing: true, + .. + } + ) => + { + match value.as_str().unwrap().to_lowercase().as_str() { + "true" => true.into(), + "false" => false.into(), + c @ ("1" | "0") + if matches!( + self.boolean_handing, + BooleanHandling::Relax { + string_parsing: true, + string_integer_parsing: true + } + ) => + { + if c == "1" { + true.into() + } else { + false.into() + } + } + _ => Err(create_error())?, + } + } + // ---- Int16 ----- + ( + Some(DataType::Int16), + ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, + ) => value.try_as_i16()?.into(), + + (Some(DataType::Int16), ValueType::String) + if matches!( + self.numeric_handling, + NumericHandling::Relax { + string_parsing: true + } + ) => + { + value + .as_str() + .unwrap() + .parse::() + .map_err(|_| create_error())? + .into() + } + // ---- Int32 ----- + ( + Some(DataType::Int32), + ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, + ) => value.try_as_i32()?.into(), + + (Some(DataType::Int32), ValueType::String) + if matches!( + self.numeric_handling, + NumericHandling::Relax { + string_parsing: true + } + ) => + { + value + .as_str() + .unwrap() + .parse::() + .map_err(|_| create_error())? + .into() + } + // ---- Int64 ----- + (None, ValueType::I64 | ValueType::U64) => value.try_as_i64()?.into(), + ( + Some(DataType::Int64), + ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, + ) => value.try_as_i64()?.into(), + + (Some(DataType::Int64), ValueType::String) + if matches!( + self.numeric_handling, + NumericHandling::Relax { + string_parsing: true + } + ) => + { + value + .as_str() + .unwrap() + .parse::() + .map_err(|_| create_error())? + .into() + } + // ---- Float32 ----- + ( + Some(DataType::Float32), + ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, + ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => { + (value.try_as_i64()? as f32).into() + } + (Some(DataType::Float32), ValueType::String) + if matches!( + self.numeric_handling, + NumericHandling::Relax { + string_parsing: true + } + ) => + { + value + .as_str() + .unwrap() + .parse::() + .map_err(|_| create_error())? + .into() + } + (Some(DataType::Float32), ValueType::F64) => value.try_as_f32()?.into(), + // ---- Float64 ----- + ( + Some(DataType::Float64), + ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, + ) if matches!(self.numeric_handling, NumericHandling::Relax { .. }) => { + (value.try_as_i64()? as f64).into() + } + (Some(DataType::Float64), ValueType::String) + if matches!( + self.numeric_handling, + NumericHandling::Relax { + string_parsing: true + } + ) => + { + value + .as_str() + .unwrap() + .parse::() + .map_err(|_| create_error())? + .into() + } + (Some(DataType::Float64) | None, ValueType::F64) => value.try_as_f64()?.into(), + // ---- Decimal ----- + (Some(DataType::Decimal) | None, ValueType::I128 | ValueType::U128) => { + Decimal::from_str(&value.try_as_i128()?.to_string()) + .map_err(|_| create_error())? + .into() + } + (Some(DataType::Decimal), ValueType::I64 | ValueType::U64) => { + Decimal::from(value.try_as_i64()?).into() + } + + (Some(DataType::Decimal), ValueType::F64) => Decimal::try_from(value.try_as_f64()?) + .map_err(|_| create_error())? + .into(), + + (Some(DataType::Decimal), ValueType::String) => ScalarImpl::Decimal( + Decimal::from_str(value.as_str().unwrap()).map_err(|_err| create_error())?, + ), + // ---- Date ----- + ( + Some(DataType::Date), + ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, + ) => Date::with_days_since_unix_epoch(value.try_as_i32()?) + .map_err(|_| create_error())? + .into(), + (Some(DataType::Date), ValueType::String) => str_to_date(value.as_str().unwrap()) + .map_err(|_| create_error())? + .into(), + // ---- Varchar ----- + (Some(DataType::Varchar) | None, ValueType::String) => value.as_str().unwrap().into(), + // ---- Time ----- + (Some(DataType::Time), ValueType::String) => str_to_time(value.as_str().unwrap()) + .map_err(|_| create_error())? + .into(), + ( + Some(DataType::Time), + ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, + ) => value + .as_i64() + .map(|i| match self.time_handling { + TimeHandling::Milli => Time::with_milli(i as u32), + TimeHandling::Micro => Time::with_micro(i as u64), + }) + .unwrap() + .map_err(|_| create_error())? + .into(), + // ---- Timestamp ----- + (Some(DataType::Timestamp), ValueType::String) => { + str_to_timestamp(value.as_str().unwrap()) + .map_err(|_| create_error())? + .into() + } + ( + Some(DataType::Timestamp), + ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, + ) => i64_to_timestamp(value.as_i64().unwrap()) + .map_err(|_| create_error())? + .into(), + // ---- Timestamptz ----- + (Some(DataType::Timestamptz), ValueType::String) => { + str_with_time_zone_to_timestamptz(value.as_str().unwrap()) + .map_err(|_| create_error())? + .into() + } + ( + Some(DataType::Timestamptz), + ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, + ) => i64_to_timestamptz(value.as_i64().unwrap()) + .map_err(|_| create_error())? + .into(), + // ---- Interval ----- + (Some(DataType::Interval), ValueType::String) => { + Interval::from_iso_8601(value.as_str().unwrap()) + .map_err(|_| create_error())? + .into() + } + // ---- Struct ----- + (Some(DataType::Struct(struct_type_info)), ValueType::Object) => StructValue::new( + struct_type_info + .names() + .zip_eq_fast(struct_type_info.types()) + .map(|(field_name, field_type)| { + self.parse( + json_object_smart_get_value(value, field_name.into()) + .unwrap_or(&BorrowedValue::Static(simd_json::StaticNode::Null)), + Some(field_type), + ) + }) + .collect::>()?, + ) + .into(), + + (None, ValueType::Object) => StructValue::new( + value + .as_object() + .unwrap() + .iter() + .map(|(_field_name, field_value)| self.parse(field_value, None)) + .collect::>()?, + ) + .into(), + // ---- List ----- + (Some(DataType::List(item_type)), ValueType::Array) => ListValue::new( + value + .as_array() + .unwrap() + .iter() + .map(|v| self.parse(v, Some(item_type))) + .collect::, _>>()?, + ) + .into(), + (None, ValueType::Array) => ListValue::new( + value + .as_array() + .unwrap() + .iter() + .map(|v| self.parse(v, None)) + .collect::, _>>()?, + ) + .into(), + // ---- Bytea ----- + (Some(DataType::Bytea), ValueType::String) => match self.bytea_handling { + ByteaHandling::Standard => str_to_bytea(value.as_str().unwrap()) + .map_err(|_| create_error())? + .into(), + ByteaHandling::Base64 => base64::engine::general_purpose::STANDARD + .decode(value.as_str().unwrap()) + .map_err(|_| create_error())? + .into_boxed_slice() + .into(), + }, + // ---- Jsonb ----- + (Some(DataType::Jsonb), ValueType::String) + if matches!(self.json_value_handling, JsonValueHandling::AsString) => + { + JsonbVal::from_str(value.as_str().unwrap()) + .map_err(|_| create_error())? + .into() + } + (Some(DataType::Jsonb), _) + if matches!(self.json_value_handling, JsonValueHandling::AsValue) => + { + let value: serde_json::Value = + value.clone().try_into().map_err(|_| create_error())?; + JsonbVal::from(value).into() + } + // ---- Int256 ----- + ( + Some(DataType::Int256), + ValueType::I64 | ValueType::I128 | ValueType::U64 | ValueType::U128, + ) => Int256::from(value.try_as_i64()?).into(), + + (Some(DataType::Int256), ValueType::String) => { + Int256::from_str(value.as_str().unwrap()) + .map_err(|_| create_error())? + .into() + } + + (_expected, _got) => Err(create_error())?, + }; + Ok(Some(v)) + } +} + +pub struct JsonAccess<'a, 'b> { + value: BorrowedValue<'b>, + options: &'a JsonParseOptions, +} + +impl<'a, 'b> JsonAccess<'a, 'b> { + pub fn new_with_options(value: BorrowedValue<'b>, options: &'a JsonParseOptions) -> Self { + Self { value, options } + } + + pub fn new(value: BorrowedValue<'b>) -> Self { + Self::new_with_options(value, &JsonParseOptions::DEFAULT) + } +} + +impl<'a, 'b> Access for JsonAccess<'a, 'b> +where + 'a: 'b, +{ + fn access(&self, path: &[&str], type_expected: Option<&DataType>) -> AccessResult { + let mut value = &self.value; + for (idx, key) in path.iter().enumerate() { + if let Some(sub_value) = if self.options.ignoring_keycase { + json_object_smart_get_value(value, (*key).into()) + } else { + value.get(*key) + } { + value = sub_value; + } else { + Err(AccessError::Undefined { + name: key.to_string(), + path: path.iter().take(idx).join("."), + })?; + } + } + + self.options.parse(value, type_expected) + } +} + +impl From for AccessError { + fn from(value: TryTypeError) -> Self { + AccessError::TypeError { + expected: value.expected.to_string(), + got: value.expected.to_string(), + value: Default::default(), + } + } +} diff --git a/src/connector/src/parser/unified/maxwell.rs b/src/connector/src/parser/unified/maxwell.rs new file mode 100644 index 000000000000..1ccb83353f03 --- /dev/null +++ b/src/connector/src/parser/unified/maxwell.rs @@ -0,0 +1,55 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::{DataType, ScalarImpl}; + +use super::{Access, ChangeEvent}; +use crate::parser::unified::ChangeEventOperation; + +pub const MAXWELL_INSERT_OP: &str = "insert"; +pub const MAXWELL_UPDATE_OP: &str = "update"; +pub const MAXWELL_DELETE_OP: &str = "delete"; + +pub struct MaxwellChangeEvent(A); + +impl MaxwellChangeEvent { + pub fn new(accessor: A) -> Self { + Self(accessor) + } +} + +impl ChangeEvent for MaxwellChangeEvent +where + A: Access, +{ + fn op(&self) -> std::result::Result { + const OP: &str = "type"; + if let Some(ScalarImpl::Utf8(op)) = self.0.access(&[OP], Some(&DataType::Varchar))? { + match op.as_ref() { + MAXWELL_INSERT_OP | MAXWELL_UPDATE_OP => return Ok(ChangeEventOperation::Upsert), + MAXWELL_DELETE_OP => return Ok(ChangeEventOperation::Delete), + _ => (), + } + } + Err(super::AccessError::Undefined { + name: "op".into(), + path: Default::default(), + }) + } + + fn access_field(&self, name: &str, type_expected: &DataType) -> super::AccessResult { + const DATA: &str = "data"; + self.0.access(&[DATA, name], Some(type_expected)) + } +} diff --git a/src/connector/src/parser/unified/mod.rs b/src/connector/src/parser/unified/mod.rs new file mode 100644 index 000000000000..3dfdac121ac4 --- /dev/null +++ b/src/connector/src/parser/unified/mod.rs @@ -0,0 +1,73 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Unified parsers for both normal events or CDC events of multiple message formats + +use risingwave_common::types::{DataType, Datum}; +use thiserror::Error; + +pub mod avro; +pub mod debezium; +pub mod json; +pub mod maxwell; +pub mod upsert; +pub mod util; + +pub type AccessResult = std::result::Result; + +/// Access a certain field in an object according to the path +pub trait Access { + fn access(&self, path: &[&str], type_expected: Option<&DataType>) -> AccessResult; +} + +#[derive(Debug, Clone, Copy)] +pub enum ChangeEventOperation { + Upsert, // Insert or Update + Delete, +} + +/// Methods to access a CDC event. +pub trait ChangeEvent { + /// Access the operation type. + fn op(&self) -> std::result::Result; + /// Access the field after the operation. + fn access_field(&self, name: &str, type_expected: &DataType) -> AccessResult; +} + +impl ChangeEvent for (ChangeEventOperation, A) +where + A: Access, +{ + fn op(&self) -> std::result::Result { + Ok(self.0) + } + + fn access_field(&self, name: &str, type_expected: &DataType) -> AccessResult { + self.1.access(&[name], Some(type_expected)) + } +} + +#[derive(Error, Debug)] +pub enum AccessError { + #[error("Undefined {name} at {path}")] + Undefined { name: String, path: String }, + #[error("TypeError {expected} expected, got {got} {value}")] + TypeError { + expected: String, + got: String, + value: String, + }, + #[error(transparent)] + Other(#[from] anyhow::Error), +} diff --git a/src/connector/src/parser/unified/upsert.rs b/src/connector/src/parser/unified/upsert.rs new file mode 100644 index 000000000000..a0be4f050b9c --- /dev/null +++ b/src/connector/src/parser/unified/upsert.rs @@ -0,0 +1,123 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::DataType; + +use super::{Access, ChangeEvent, ChangeEventOperation}; +use crate::parser::unified::AccessError; + +/// `UpsertAccess` wraps a key-value message format into an upsert source. +/// A key accessor and a value accessor are required. +pub struct UpsertChangeEvent { + key_accessor: Option, + value_accessor: Option, + key_as_column_name: Option, +} + +impl Default for UpsertChangeEvent { + fn default() -> Self { + Self { + key_accessor: None, + value_accessor: None, + key_as_column_name: None, + } + } +} + +impl UpsertChangeEvent { + pub fn with_key(mut self, key: K) -> Self + where + K: Access, + { + self.key_accessor = Some(key); + self + } + + pub fn with_value(mut self, value: V) -> Self + where + V: Access, + { + self.value_accessor = Some(value); + self + } + + pub fn with_key_as_column_name(mut self, name: impl ToString) -> Self { + self.key_as_column_name = Some(name.to_string()); + self + } +} + +impl Access for UpsertChangeEvent +where + K: Access, + V: Access, +{ + fn access(&self, path: &[&str], type_expected: Option<&DataType>) -> super::AccessResult { + let create_error = |name: String| AccessError::Undefined { + name, + path: String::new(), + }; + match path.first() { + Some(&"key") => { + if let Some(ka) = &self.key_accessor { + ka.access(&path[1..], type_expected) + } else { + Err(create_error("key".to_string())) + } + } + Some(&"value") => { + if let Some(va) = &self.value_accessor { + va.access(&path[1..], type_expected) + } else { + Err(create_error("value".to_string())) + } + } + None => Ok(None), + Some(other) => Err(create_error(other.to_string())), + } + } +} + +impl ChangeEvent for UpsertChangeEvent +where + K: Access, + V: Access, +{ + fn op(&self) -> std::result::Result { + if let Ok(Some(_)) = self.access(&["value"], None) { + Ok(ChangeEventOperation::Upsert) + } else { + Ok(ChangeEventOperation::Delete) + } + } + + fn access_field(&self, name: &str, type_expected: &DataType) -> super::AccessResult { + // access value firstly + match self.access(&["value", name], Some(type_expected)) { + Err(AccessError::Undefined { .. }) => (), // fallthrough + other => return other, + }; + + match self.access(&["key", name], Some(type_expected)) { + Err(AccessError::Undefined { .. }) => (), // fallthrough + other => return other, + }; + + if let Some(key_as_column_name) = &self.key_as_column_name && name == key_as_column_name { + return self.access(&["key"], Some(type_expected)); + } + + Ok(None) + } +} diff --git a/src/connector/src/parser/unified/util.rs b/src/connector/src/parser/unified/util.rs new file mode 100644 index 000000000000..d961a683c77f --- /dev/null +++ b/src/connector/src/parser/unified/util.rs @@ -0,0 +1,52 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::error::{ErrorCode, RwError}; + +use super::{AccessError, ChangeEvent}; +use crate::parser::{SourceStreamChunkRowWriter, WriteGuard}; + +pub fn apply_row_operation_on_stream_chunk_writer( + row_op: impl ChangeEvent, + writer: &mut SourceStreamChunkRowWriter<'_>, +) -> std::result::Result { + match row_op.op()? { + super::ChangeEventOperation::Upsert => writer.insert(|column| { + let res = row_op.access_field(&column.name, &column.data_type); + tracing::debug!( + "inserted {:?} {:?} {:?}", + &column.name, + &column.data_type, + res + ); + Ok(res?) + }), + super::ChangeEventOperation::Delete => writer.delete(|column| { + let res = row_op.access_field(&column.name, &column.data_type); + match res { + Ok(datum) => Ok(datum), + Err(e) => { + tracing::error!(name=?column.name, data_type=?&column.data_type, err=?e, "delete column error"); + Ok(None) + } + } + }), + } +} + +impl From for RwError { + fn from(val: AccessError) -> Self { + ErrorCode::InternalError(format!("AccessError: {:?}", val)).into() + } +}