From 590faea40533edfa149d1b142f3546c20e4b5100 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 27 Sep 2023 13:15:11 +0800 Subject: [PATCH] refactor(connector): remove `WriteGuard` and `fulfill_meta_column` from parser (#12542) Signed-off-by: Bugen Zhao --- src/connector/benches/parser.rs | 2 +- src/connector/src/lib.rs | 1 + .../src/parser/canal/simd_json_parser.rs | 8 +- src/connector/src/parser/common.rs | 15 +- src/connector/src/parser/csv_parser.rs | 8 +- .../src/parser/debezium/debezium_parser.rs | 14 +- .../src/parser/debezium/mongo_json_parser.rs | 6 +- src/connector/src/parser/json_parser.rs | 35 +-- .../src/parser/maxwell/maxwell_parser.rs | 6 +- src/connector/src/parser/mod.rs | 261 +++++++++--------- src/connector/src/parser/plain_parser.rs | 6 +- src/connector/src/parser/unified/json.rs | 10 +- src/connector/src/parser/unified/util.rs | 14 +- src/connector/src/parser/upsert_parser.rs | 6 +- src/connector/src/parser/util.rs | 3 +- src/source/benches/json_parser.rs | 4 +- 16 files changed, 200 insertions(+), 199 deletions(-) diff --git a/src/connector/benches/parser.rs b/src/connector/benches/parser.rs index 2c3b666b91d62..81bda6fccb395 100644 --- a/src/connector/benches/parser.rs +++ b/src/connector/benches/parser.rs @@ -91,7 +91,7 @@ async fn parse(parser: JsonParser, column_desc: Vec, input: Ve SourceStreamChunkBuilder::with_capacity(column_desc.clone(), input_inner.len()); for payload in input_inner { let row_writer = builder.row_writer(); - parser.parse_inner(Some(payload), row_writer).await.unwrap(); + parser.parse_inner(payload, row_writer).await.unwrap(); } builder.finish(); } diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 0d1a015f97663..1fba061555f44 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -31,6 +31,7 @@ #![feature(associated_type_defaults)] #![feature(impl_trait_in_assoc_type)] #![feature(iter_from_generator)] +#![feature(if_let_guard)] use std::time::Duration; diff --git a/src/connector/src/parser/canal/simd_json_parser.rs b/src/connector/src/parser/canal/simd_json_parser.rs index 40289c11612c8..e9d15368cbf7c 100644 --- a/src/connector/src/parser/canal/simd_json_parser.rs +++ b/src/connector/src/parser/canal/simd_json_parser.rs @@ -21,9 +21,7 @@ use crate::parser::canal::operators::*; use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::unified::ChangeEventOperation; -use crate::parser::{ - ByteStreamSourceParser, JsonProperties, SourceStreamChunkRowWriter, WriteGuard, -}; +use crate::parser::{ByteStreamSourceParser, JsonProperties, SourceStreamChunkRowWriter}; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; const DATA: &str = "data"; @@ -55,7 +53,7 @@ impl CanalJsonParser { &self, mut payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result { + ) -> Result<()> { let mut event: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload[self.payload_start_idx..]) .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; @@ -128,7 +126,7 @@ impl ByteStreamSourceParser for CanalJsonParser { _key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> Result { + ) -> Result<()> { only_parse_payload!(self, payload, writer) } } diff --git a/src/connector/src/parser/common.rs b/src/connector/src/parser/common.rs index 168b13e1b2e22..5a288dfd80b8d 100644 --- a/src/connector/src/parser/common.rs +++ b/src/connector/src/parser/common.rs @@ -12,21 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::borrow::Cow; - use simd_json::{BorrowedValue, ValueAccess}; -pub(crate) fn json_object_smart_get_value<'a, 'b>( +/// Get a value from a json object by key, case insensitive. +/// +/// Returns `None` if the given json value is not an object, or the key is not found. +pub(crate) fn json_object_get_case_insensitive<'a, 'b>( v: &'b simd_json::BorrowedValue<'a>, - key: Cow<'b, str>, + key: &'b str, ) -> Option<&'b BorrowedValue<'a>> { let obj = v.as_object()?; - let value = obj.get(key.as_ref()); + let value = obj.get(key); if value.is_some() { - return value; + return value; // fast path } for (k, v) in obj { - if k.eq_ignore_ascii_case(key.as_ref()) { + if k.eq_ignore_ascii_case(key) { return Some(v); } } diff --git a/src/connector/src/parser/csv_parser.rs b/src/connector/src/parser/csv_parser.rs index f1d140370bc2c..238595f82c1f1 100644 --- a/src/connector/src/parser/csv_parser.rs +++ b/src/connector/src/parser/csv_parser.rs @@ -21,7 +21,7 @@ use risingwave_common::types::{Date, Datum, Decimal, ScalarImpl, Time, Timestamp use super::{ByteStreamSourceParser, CsvProperties}; use crate::only_parse_payload; -use crate::parser::{SourceStreamChunkRowWriter, WriteGuard}; +use crate::parser::SourceStreamChunkRowWriter; use crate::source::{DataType, SourceColumnDesc, SourceContext, SourceContextRef}; macro_rules! to_rust_type { @@ -107,14 +107,14 @@ impl CsvParser { &mut self, payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result { + ) -> Result<()> { let mut fields = self.read_row(&payload)?; if let Some(headers) = &mut self.headers { if headers.is_empty() { *headers = fields; // Here we want a row, but got nothing. So it's an error for the `parse_inner` but // has no bad impact on the system. - return Err(RwError::from(ProtocolError("This message indicates a header, no row will be inserted. However, internal parser state was updated.".to_string()))); + return Err(RwError::from(ProtocolError("This message indicates a header, no row will be inserted. However, internal parser state was updated.".to_string()))); } writer.insert(|desc| { if let Some(i) = headers.iter().position(|name| name == &desc.name) { @@ -157,7 +157,7 @@ impl ByteStreamSourceParser for CsvParser { _key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> Result { + ) -> Result<()> { only_parse_payload!(self, payload, writer) } } diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 8fd0f6f2ba4f6..e98a57650d308 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use itertools::Either; use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{Result, RwError}; @@ -23,8 +22,7 @@ use crate::parser::unified::debezium::DebeziumChangeEvent; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::{ AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, JsonProperties, - ProtocolProperties, SourceStreamChunkRowWriter, SpecificParserConfig, TransactionControl, - WriteGuard, + ParseResult, ProtocolProperties, SourceStreamChunkRowWriter, SpecificParserConfig, }; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; @@ -93,7 +91,7 @@ impl DebeziumParser { key: Option>, payload: Option>, mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result> { + ) -> Result { // tombetone messages are handled implicitly by these accessors let key_accessor = match key { None => None, @@ -106,12 +104,12 @@ impl DebeziumParser { let row_op = DebeziumChangeEvent::new(key_accessor, payload_accessor); match apply_row_operation_on_stream_chunk_writer(&row_op, &mut writer) { - Ok(guard) => Ok(Either::Left(guard)), + Ok(_) => Ok(ParseResult::Rows), Err(err) => { // Only try to access transaction control message if the row operation access failed // to make it a fast path. if let Ok(transaction_control) = row_op.transaction_control() { - Ok(Either::Right(transaction_control)) + Ok(ParseResult::TransactionControl(transaction_control)) } else { Err(err) } @@ -135,7 +133,7 @@ impl ByteStreamSourceParser for DebeziumParser { _key: Option>, _payload: Option>, _writer: SourceStreamChunkRowWriter<'a>, - ) -> Result { + ) -> Result<()> { unreachable!("should call `parse_one_with_txn` instead") } @@ -144,7 +142,7 @@ impl ByteStreamSourceParser for DebeziumParser { key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> Result> { + ) -> Result { self.parse_inner(key, payload, writer).await } } diff --git a/src/connector/src/parser/debezium/mongo_json_parser.rs b/src/connector/src/parser/debezium/mongo_json_parser.rs index 910b5a3e0131d..4b6478f3d73c1 100644 --- a/src/connector/src/parser/debezium/mongo_json_parser.rs +++ b/src/connector/src/parser/debezium/mongo_json_parser.rs @@ -23,7 +23,7 @@ use crate::only_parse_payload; use crate::parser::unified::debezium::{DebeziumChangeEvent, MongoProjeciton}; use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; -use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard}; +use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter}; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; #[derive(Debug)] @@ -82,7 +82,7 @@ impl DebeziumMongoJsonParser { &self, mut payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result { + ) -> Result<()> { let mut event: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload) .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; @@ -117,7 +117,7 @@ impl ByteStreamSourceParser for DebeziumMongoJsonParser { _key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> Result { + ) -> Result<()> { only_parse_payload!(self, payload, writer) } } diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 53559480818a2..b3e108a242ab7 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -25,14 +25,13 @@ use super::avro::schema_resolver::ConfluentSchemaResolver; use super::schema_registry::Client; use super::util::{get_kafka_topic, read_schema_from_http, read_schema_from_local}; use super::{EncodingProperties, SchemaRegistryAuth, SpecificParserConfig}; +use crate::only_parse_payload; use crate::parser::avro::util::avro_schema_to_column_descs; use crate::parser::schema_registry::handle_sr_list; use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; use crate::parser::unified::util::apply_row_accessor_on_stream_chunk_writer; use crate::parser::unified::AccessImpl; -use crate::parser::{ - AccessBuilder, ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard, -}; +use crate::parser::{AccessBuilder, ByteStreamSourceParser, SourceStreamChunkRowWriter}; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; #[derive(Debug)] @@ -106,17 +105,11 @@ impl JsonParser { #[allow(clippy::unused_async)] pub async fn parse_inner( &self, - mut payload: Option>, + mut payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result { - if payload.is_none() { - return Err(RwError::from(ErrorCode::InternalError( - "Empty payload with nonempty key for non-upsert".into(), - ))); - } - let value = - simd_json::to_borrowed_value(&mut payload.as_mut().unwrap()[self.payload_start_idx..]) - .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; + ) -> Result<()> { + let value = simd_json::to_borrowed_value(&mut payload[self.payload_start_idx..]) + .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; let values = if let simd_json::BorrowedValue::Array(arr) = value { arr } else { @@ -194,8 +187,8 @@ impl ByteStreamSourceParser for JsonParser { _key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> Result { - self.parse_inner(payload, writer).await + ) -> Result<()> { + only_parse_payload!(self, payload, writer) } } @@ -255,7 +248,7 @@ mod tests { for payload in get_payload() { let writer = builder.row_writer(); - parser.parse_inner(Some(payload), writer).await.unwrap(); + parser.parse_inner(payload, writer).await.unwrap(); } let chunk = builder.finish(); @@ -358,7 +351,7 @@ mod tests { { let writer = builder.row_writer(); let payload = br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec(); - parser.parse_inner(Some(payload), writer).await.unwrap(); + parser.parse_inner(payload, writer).await.unwrap(); } // Parse an incorrect record. @@ -367,14 +360,14 @@ mod tests { // `v2` overflowed. let payload = br#"{"v1": 1, "v2": 65536, "v3": "3"}"#.to_vec(); // ignored the error, and fill None at v2. - parser.parse_inner(Some(payload), writer).await.unwrap(); + parser.parse_inner(payload, writer).await.unwrap(); } // Parse a correct record. { let writer = builder.row_writer(); let payload = br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec(); - parser.parse_inner(Some(payload), writer).await.unwrap(); + parser.parse_inner(payload, writer).await.unwrap(); } let chunk = builder.finish(); @@ -444,7 +437,7 @@ mod tests { let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1); { let writer = builder.row_writer(); - parser.parse_inner(Some(payload), writer).await.unwrap(); + parser.parse_inner(payload, writer).await.unwrap(); } let chunk = builder.finish(); let (op, row) = chunk.rows().next().unwrap(); @@ -504,7 +497,7 @@ mod tests { let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1); { let writer = builder.row_writer(); - parser.parse_inner(Some(payload), writer).await.unwrap(); + parser.parse_inner(payload, writer).await.unwrap(); } let chunk = builder.finish(); let (op, row) = chunk.rows().next().unwrap(); diff --git a/src/connector/src/parser/maxwell/maxwell_parser.rs b/src/connector/src/parser/maxwell/maxwell_parser.rs index fff28092fa451..0980472021bff 100644 --- a/src/connector/src/parser/maxwell/maxwell_parser.rs +++ b/src/connector/src/parser/maxwell/maxwell_parser.rs @@ -19,7 +19,7 @@ use crate::parser::unified::maxwell::MaxwellChangeEvent; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::{ AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, - SourceStreamChunkRowWriter, SpecificParserConfig, WriteGuard, + SourceStreamChunkRowWriter, SpecificParserConfig, }; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; @@ -57,7 +57,7 @@ impl MaxwellParser { &mut self, payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result { + ) -> Result<()> { let payload_accessor = self.payload_builder.generate_accessor(payload).await?; let row_op = MaxwellChangeEvent::new(payload_accessor); @@ -79,7 +79,7 @@ impl ByteStreamSourceParser for MaxwellParser { _key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> Result { + ) -> Result<()> { // restrict the behaviours since there is no corresponding // key/value test for maxwell yet. only_parse_payload!(self, payload, writer) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 987d049a11f0c..8c7914fe16354 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -22,14 +22,13 @@ use csv_parser::CsvParser; pub use debezium::*; use futures::{Future, TryFutureExt}; use futures_async_stream::try_stream; -use itertools::{Either, Itertools}; pub use json_parser::*; pub use protobuf::*; use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk}; use risingwave_common::catalog::KAFKA_TIMESTAMP_COLUMN_NAME; use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{Result, RwError}; -use risingwave_common::types::Datum; +use risingwave_common::types::{Datum, Scalar}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::catalog::{ SchemaRegistryNameStrategy as PbSchemaRegistryNameStrategy, StreamSourceInfo, @@ -47,8 +46,9 @@ use self::util::get_kafka_topic; use crate::aws_auth::AwsAuthProps; use crate::parser::maxwell::MaxwellParser; use crate::source::{ - BoxSourceStream, SourceColumnDesc, SourceContext, SourceContextRef, SourceEncode, SourceFormat, - SourceMeta, SourceStruct, SourceWithStateStream, SplitId, StreamChunkWithState, + BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext, SourceContextRef, + SourceEncode, SourceFormat, SourceMeta, SourceStruct, SourceWithStateStream, SplitId, + StreamChunkWithState, }; mod avro; @@ -93,6 +93,7 @@ impl SourceStreamChunkBuilder { descs: &self.descs, builders: &mut self.builders, op_builder: &mut self.op_builder, + row_meta: None, } } @@ -125,24 +126,58 @@ impl SourceStreamChunkBuilder { } } -/// `SourceStreamChunkRowWriter` is responsible to write one row (Insert/Delete) or two rows -/// (Update) to the [`StreamChunk`]. +/// `SourceStreamChunkRowWriter` is responsible to write one or more records to the [`StreamChunk`], +/// where each contains either one row (Insert/Delete) or two rows (Update) that can be written atomically. pub struct SourceStreamChunkRowWriter<'a> { descs: &'a [SourceColumnDesc], builders: &'a mut [ArrayBuilderImpl], op_builder: &'a mut Vec, + + /// An optional meta data of the original message. + /// + /// When this is set by `with_meta`, it'll be used to fill the columns of types other than [`SourceColumnType::Normal`]. + row_meta: Option, } -/// `WriteGuard` can't be constructed directly in other mods due to a private field, so it can be -/// used to ensure that all methods on [`SourceStreamChunkRowWriter`] are called at least once in -/// the `SourceParser::parse` implementation. -#[derive(Debug)] -pub struct WriteGuard(()); +/// The meta data of the original message for a row writer. +/// +/// Extracted from the `SourceMessage`. +pub struct MessageMeta { + meta: SourceMeta, + offset: String, +} + +impl MessageMeta { + /// Extract the value for the given column. + /// + /// Returns `None` if the column is not a meta column. + fn value_for_column(&self, desc: &SourceColumnDesc) -> Option { + match desc.column_type { + // Row id columns are filled with `NULL` here and will be filled with the real + // row id generated by `RowIdGenExecutor` later. + SourceColumnType::RowId => Datum::None.into(), + // Extract the offset from the meta data. + SourceColumnType::Offset => Datum::Some(self.offset.as_str().into()).into(), + // Extract custom meta data per connector. + SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = &self.meta => { + assert_eq!(desc.name.as_str(), KAFKA_TIMESTAMP_COLUMN_NAME, "unexpected meta column name"); + kafka_meta.timestamp.map(|ts| { + risingwave_common::cast::i64_to_timestamptz(ts) + .unwrap() + .to_scalar_value() + }).into() + } + + // For other cases, return `None`. + SourceColumnType::Meta | SourceColumnType::Normal => None, + } + } +} trait OpAction { type Output; - const DEFAULT_OUTPUT: Self::Output; + fn output_for(datum: Datum) -> Self::Output; fn apply(builder: &mut ArrayBuilderImpl, output: Self::Output); @@ -156,7 +191,10 @@ struct OpActionInsert; impl OpAction for OpActionInsert { type Output = Datum; - const DEFAULT_OUTPUT: Self::Output = None; + #[inline(always)] + fn output_for(datum: Datum) -> Self::Output { + datum + } #[inline(always)] fn apply(builder: &mut ArrayBuilderImpl, output: Datum) { @@ -179,7 +217,10 @@ struct OpActionDelete; impl OpAction for OpActionDelete { type Output = Datum; - const DEFAULT_OUTPUT: Self::Output = None; + #[inline(always)] + fn output_for(datum: Datum) -> Self::Output { + datum + } #[inline(always)] fn apply(builder: &mut ArrayBuilderImpl, output: Datum) { @@ -202,7 +243,10 @@ struct OpActionUpdate; impl OpAction for OpActionUpdate { type Output = (Datum, Datum); - const DEFAULT_OUTPUT: Self::Output = (None, None); + #[inline(always)] + fn output_for(datum: Datum) -> Self::Output { + (datum.clone(), datum) + } #[inline(always)] fn apply(builder: &mut ArrayBuilderImpl, output: (Datum, Datum)) { @@ -224,91 +268,76 @@ impl OpAction for OpActionUpdate { } impl SourceStreamChunkRowWriter<'_> { - #[expect( - clippy::disallowed_methods, - reason = "FIXME: why zip_eq_fast leads to compile error?" - )] + /// Set the meta data of the original message for this row writer. + /// + /// This should always be called except for tests. + fn with_meta(self, row_meta: MessageMeta) -> Self { + Self { + row_meta: Some(row_meta), + ..self + } + } +} + +impl SourceStreamChunkRowWriter<'_> { fn do_action( &mut self, mut f: impl FnMut(&SourceColumnDesc) -> Result, - ) -> Result { - let mut modify_col = Vec::with_capacity(self.descs.len()); - self.descs - .iter() - .zip_eq(self.builders.iter_mut()) - .enumerate() - .try_for_each(|(idx, (desc, builder))| -> Result<()> { - if desc.is_meta() || desc.is_offset() { - return Ok(()); - } - let output = if desc.is_row_id() { - A::DEFAULT_OUTPUT - } else { - f(desc)? - }; - A::apply(builder, output); - modify_col.push(idx); + ) -> Result<()> { + let mut f_with_meta = |desc: &SourceColumnDesc| { + if let Some(meta_value) = + (self.row_meta.as_ref()).and_then(|row_meta| row_meta.value_for_column(desc)) + { + Ok(A::output_for(meta_value)) + } else { + f(desc) + } + }; - Ok(()) - }) - .inspect_err(|e| { - tracing::warn!("failed to parse source data: {}", e); - modify_col.iter().for_each(|idx| { - A::rollback(&mut self.builders[*idx]); - }); - })?; + // Columns that changes have been applied to. Used to rollback when an error occurs. + let mut applied_columns = Vec::with_capacity(self.descs.len()); - A::finish(self); + let result = (self.descs.iter()) + .zip_eq_fast(self.builders.iter_mut()) + .try_for_each(|(desc, builder)| { + f_with_meta(desc).map(|output| { + A::apply(builder, output); + applied_columns.push(builder); + }) + }); - Ok(WriteGuard(())) + match result { + Ok(_) => { + A::finish(self); + Ok(()) + } + Err(e) => { + tracing::warn!("failed to parse source data: {}", e); + for builder in applied_columns { + A::rollback(builder); + } + Err(e) + } + } } /// Write an `Insert` record to the [`StreamChunk`]. /// /// # Arguments /// - /// * `self`: Ownership is consumed so only one record can be written. /// * `f`: A failable closure that produced one [`Datum`] by corresponding [`SourceColumnDesc`]. - pub fn insert( - &mut self, - f: impl FnMut(&SourceColumnDesc) -> Result, - ) -> Result { + /// Callers only need to handle columns with the type [`SourceColumnType::Normal`]. + pub fn insert(&mut self, f: impl FnMut(&SourceColumnDesc) -> Result) -> Result<()> { self.do_action::(f) } - /// For other op like 'insert', 'update', 'delete', we will leave the hollow for the meta column - /// builder. e.g after insert - /// `data_builder = [1], meta_column_builder = [], op = [insert]` - /// - /// This function is used to fulfill this hollow in `meta_column_builder`. - /// e.g after fulfill - /// `data_builder = [1], meta_column_builder = [1], op = [insert]` - pub fn fulfill_meta_column( - &mut self, - mut f: impl FnMut(&SourceColumnDesc) -> Option, - ) -> Result { - self.descs - .iter() - .zip_eq_fast(self.builders.iter_mut()) - .for_each(|(desc, builder)| { - if let Some(output) = f(desc) { - builder.append(output); - } - }); - - Ok(WriteGuard(())) - } - /// Write a `Delete` record to the [`StreamChunk`]. /// /// # Arguments /// - /// * `self`: Ownership is consumed so only one record can be written. /// * `f`: A failable closure that produced one [`Datum`] by corresponding [`SourceColumnDesc`]. - pub fn delete( - &mut self, - f: impl FnMut(&SourceColumnDesc) -> Result, - ) -> Result { + /// Callers only need to handle columns with the type [`SourceColumnType::Normal`]. + pub fn delete(&mut self, f: impl FnMut(&SourceColumnDesc) -> Result) -> Result<()> { self.do_action::(f) } @@ -316,13 +345,12 @@ impl SourceStreamChunkRowWriter<'_> { /// /// # Arguments /// - /// * `self`: Ownership is consumed so only one record can be written. /// * `f`: A failable closure that produced two [`Datum`]s as old and new value by corresponding - /// [`SourceColumnDesc`]. + /// [`SourceColumnDesc`]. Callers only need to handle columns with the type [`SourceColumnType::Normal`]. pub fn update( &mut self, f: impl FnMut(&SourceColumnDesc) -> Result<(Datum, Datum)>, - ) -> Result { + ) -> Result<()> { self.do_action::(f) } } @@ -333,6 +361,14 @@ pub enum TransactionControl { Commit { id: Box }, } +/// The result of parsing a message. +pub enum ParseResult { + /// Some rows are parsed and written to the [`SourceStreamChunkRowWriter`]. + Rows, + /// A transaction control message is parsed. + TransactionControl(TransactionControl), +} + /// `ByteStreamSourceParser` is a new message parser, the parser should consume /// the input data stream and return a stream of parsed msgs. pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { @@ -348,7 +384,7 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> impl Future> + Send + 'a; + ) -> impl Future> + Send + 'a; /// Parse one record from the given `payload`, either write it to the `writer` or interpret it /// as a transaction control message. @@ -360,8 +396,9 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> impl Future>> + Send + 'a { - self.parse_one(key, payload, writer).map_ok(Either::Left) + ) -> impl Future> + Send + 'a { + self.parse_one(key, payload, writer) + .map_ok(|_| ParseResult::Rows) } /// Parse a data stream of one source split into a stream of [`StreamChunk`]. @@ -436,57 +473,31 @@ async fn into_chunk_stream(mut parser: P, data_stream continue; } - let msg_offset = msg.offset; - split_offset_mapping.insert(msg.split_id, msg_offset.clone()); + split_offset_mapping.insert(msg.split_id, msg.offset.clone()); let old_op_num = builder.op_num(); match parser - .parse_one_with_txn(msg.key, msg.payload, builder.row_writer()) + .parse_one_with_txn( + msg.key, + msg.payload, + builder.row_writer().with_meta(MessageMeta { + meta: msg.meta, + offset: msg.offset, + }), + ) .await { - Ok(Either::Left(WriteGuard(_))) => { - // new_op_num - old_op_num is the number of rows added to the builder - let new_op_num = builder.op_num(); + Ok(ParseResult::Rows) => { + // The number of rows added to the builder. + let num = builder.op_num() - old_op_num; // Aggregate the number of rows in the current transaction. if let Some(Transaction { len, .. }) = &mut current_transaction { - *len += new_op_num - old_op_num; - } - - // fill in meta column for specific source and offset column if needed - for _ in old_op_num..new_op_num { - let f = - |desc: &SourceColumnDesc| -> Option { - if desc.is_meta() && let SourceMeta::Kafka(kafka_meta) = &msg.meta { - match desc.name.as_str() { - KAFKA_TIMESTAMP_COLUMN_NAME => { - Some(kafka_meta.timestamp.map(|ts| { - risingwave_common::cast::i64_to_timestamptz(ts) - .unwrap() - .into() - })) - } - _ => { - unreachable!( - "kafka will not have this meta column: {}", - desc.name - ) - } - } - } else if desc.is_offset() { - Some(Some(msg_offset.as_str().into())) - } else { - // None will be ignored by `fulfill_meta_column` - None - } - }; - - // fill in meta or offset column if any - builder.row_writer().fulfill_meta_column(f)?; + *len += num; } } - Ok(Either::Right(txn_ctl)) => { + Ok(ParseResult::TransactionControl(txn_ctl)) => { match txn_ctl { TransactionControl::Begin { id } => { if let Some(Transaction { id: current_id, .. }) = ¤t_transaction { diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index b50ab0e322ba3..53f87660de031 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -18,7 +18,7 @@ use risingwave_common::error::{ErrorCode, Result, RwError}; use super::unified::util::apply_row_accessor_on_stream_chunk_writer; use super::{ AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, - SourceStreamChunkRowWriter, SpecificParserConfig, WriteGuard, + SourceStreamChunkRowWriter, SpecificParserConfig, }; use crate::only_parse_payload; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; @@ -59,7 +59,7 @@ impl PlainParser { &mut self, payload: Vec, mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result { + ) -> Result<()> { let accessor = self.payload_builder.generate_accessor(payload).await?; apply_row_accessor_on_stream_chunk_writer(accessor, &mut writer) @@ -80,7 +80,7 @@ impl ByteStreamSourceParser for PlainParser { _key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> Result { + ) -> Result<()> { only_parse_payload!(self, payload, writer) } } diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index 4795fb70e001f..433b7e5f74306 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -26,7 +26,7 @@ use risingwave_common::util::iter_util::ZipEqFast; use simd_json::{BorrowedValue, TryTypeError, ValueAccess, ValueType}; use super::{Access, AccessError, AccessResult}; -use crate::parser::common::json_object_smart_get_value; +use crate::parser::common::json_object_get_case_insensitive; use crate::parser::unified::avro::extract_decimal; #[derive(Clone, Debug)] @@ -446,7 +446,7 @@ impl JsonParseOptions { .zip_eq_fast(struct_type_info.types()) .map(|(field_name, field_type)| { self.parse( - json_object_smart_get_value(value, field_name.into()) + json_object_get_case_insensitive(value, field_name) .unwrap_or(&BorrowedValue::Static(simd_json::StaticNode::Null)), Some(field_type), ) @@ -561,11 +561,11 @@ where { fn access(&self, path: &[&str], type_expected: Option<&DataType>) -> AccessResult { let mut value = &self.value; - for (idx, key) in path.iter().enumerate() { + for (idx, &key) in path.iter().enumerate() { if let Some(sub_value) = if self.options.ignoring_keycase { - json_object_smart_get_value(value, (*key).into()) + json_object_get_case_insensitive(value, key) } else { - value.get(*key) + value.get(key) } { value = sub_value; } else { diff --git a/src/connector/src/parser/unified/util.rs b/src/connector/src/parser/unified/util.rs index 2e63ee71a5a07..38b83a3fc00d2 100644 --- a/src/connector/src/parser/unified/util.rs +++ b/src/connector/src/parser/unified/util.rs @@ -17,18 +17,18 @@ use risingwave_common::types::Datum; use super::{Access, AccessError, ChangeEvent}; use crate::parser::unified::ChangeEventOperation; -use crate::parser::{SourceStreamChunkRowWriter, WriteGuard}; +use crate::parser::SourceStreamChunkRowWriter; pub fn apply_delete_on_stream_chunk_writer( row_op: impl ChangeEvent, writer: &mut SourceStreamChunkRowWriter<'_>, -) -> std::result::Result { +) -> std::result::Result<(), RwError> { writer.delete(|column| { let res = row_op.access_field(&column.name, &column.data_type); match res { Ok(datum) => Ok(datum), Err(e) => { - tracing::error!(name=?column.name, data_type=?&column.data_type, err=?e, "delete column error"); + tracing::error!(name=column.name, data_type=%column.data_type, err=%e, "delete column error"); if column.is_pk { // It should be an error when pk column is missing in the message Err(e)? @@ -43,7 +43,7 @@ pub fn apply_delete_on_stream_chunk_writer( pub fn apply_upsert_on_stream_chunk_writer( row_op: impl ChangeEvent, writer: &mut SourceStreamChunkRowWriter<'_>, -) -> std::result::Result { +) -> std::result::Result<(), RwError> { writer.insert(|column| { let res = match row_op.access_field(&column.name, &column.data_type) { Ok(o) => Ok(o), @@ -69,7 +69,7 @@ pub fn apply_row_operation_on_stream_chunk_writer_with_op( row_op: impl ChangeEvent, writer: &mut SourceStreamChunkRowWriter<'_>, op: ChangeEventOperation, -) -> std::result::Result { +) -> std::result::Result<(), RwError> { match op { ChangeEventOperation::Upsert => apply_upsert_on_stream_chunk_writer(row_op, writer), ChangeEventOperation::Delete => apply_delete_on_stream_chunk_writer(row_op, writer), @@ -79,7 +79,7 @@ pub fn apply_row_operation_on_stream_chunk_writer_with_op( pub fn apply_row_operation_on_stream_chunk_writer( row_op: impl ChangeEvent, writer: &mut SourceStreamChunkRowWriter<'_>, -) -> std::result::Result { +) -> std::result::Result<(), RwError> { let op = row_op.op()?; apply_row_operation_on_stream_chunk_writer_with_op(row_op, writer, op) } @@ -87,7 +87,7 @@ pub fn apply_row_operation_on_stream_chunk_writer( pub fn apply_row_accessor_on_stream_chunk_writer( accessor: impl Access, writer: &mut SourceStreamChunkRowWriter<'_>, -) -> Result { +) -> Result<(), RwError> { writer.insert(|column| { let res: Result = match accessor .access(&[&column.name], Some(&column.data_type)) diff --git a/src/connector/src/parser/upsert_parser.rs b/src/connector/src/parser/upsert_parser.rs index a9fce2f991475..381e429565d11 100644 --- a/src/connector/src/parser/upsert_parser.rs +++ b/src/connector/src/parser/upsert_parser.rs @@ -22,7 +22,7 @@ use super::unified::util::apply_row_operation_on_stream_chunk_writer_with_op; use super::unified::{AccessImpl, ChangeEventOperation}; use super::{ AccessBuilderImpl, ByteStreamSourceParser, BytesProperties, EncodingProperties, EncodingType, - SourceStreamChunkRowWriter, SpecificParserConfig, WriteGuard, + SourceStreamChunkRowWriter, SpecificParserConfig, }; use crate::extract_key_config; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; @@ -100,7 +100,7 @@ impl UpsertParser { key: Option>, payload: Option>, mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result { + ) -> Result<()> { let mut row_op: UpsertChangeEvent, AccessImpl<'_, '_>> = UpsertChangeEvent::default(); let mut change_event_op = ChangeEventOperation::Delete; @@ -134,7 +134,7 @@ impl ByteStreamSourceParser for UpsertParser { key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> Result { + ) -> Result<()> { self.parse_inner(key, payload, writer).await } } diff --git a/src/connector/src/parser/util.rs b/src/connector/src/parser/util.rs index 7444fe202de46..b4b36ed4b7cc1 100644 --- a/src/connector/src/parser/util.rs +++ b/src/connector/src/parser/util.rs @@ -24,7 +24,6 @@ use risingwave_common::error::{Result, RwError}; use crate::aws_auth::AwsAuthProps; use crate::aws_utils::{default_conn_config, s3_client}; -use crate::parser::WriteGuard; const AVRO_SCHEMA_LOCATION_S3_REGION: &str = "region"; @@ -71,7 +70,7 @@ pub(super) async fn download_from_http(location: &Url) -> Result { // if all ok, return ok // if part of them are errors, log err and return ok #[inline] -pub(super) fn at_least_one_ok(mut results: Vec>) -> Result { +pub(super) fn at_least_one_ok(mut results: Vec>) -> Result<()> { let errors = results .iter() .filter_map(|r| r.as_ref().err()) diff --git a/src/source/benches/json_parser.rs b/src/source/benches/json_parser.rs index 70df93b902f57..e54a51befa9f1 100644 --- a/src/source/benches/json_parser.rs +++ b/src/source/benches/json_parser.rs @@ -85,11 +85,11 @@ fn generate_json_row(rng: &mut impl Rng) -> String { ) } -fn generate_json_rows() -> Vec>> { +fn generate_json_rows() -> Vec> { let mut rng = rand::thread_rng(); let mut records = Vec::with_capacity(NUM_RECORDS); for _ in 0..NUM_RECORDS { - records.push(Some(generate_json_row(&mut rng).into_bytes())); + records.push(generate_json_row(&mut rng).into_bytes()); } records }