Skip to content

Commit

Permalink
refactor(connector): remove WriteGuard and fulfill_meta_column fr…
Browse files Browse the repository at this point in the history
…om parser (#12542)

Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao authored Sep 27, 2023
1 parent 85248b7 commit 590faea
Show file tree
Hide file tree
Showing 16 changed files with 200 additions and 199 deletions.
2 changes: 1 addition & 1 deletion src/connector/benches/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async fn parse(parser: JsonParser, column_desc: Vec<SourceColumnDesc>, input: Ve
SourceStreamChunkBuilder::with_capacity(column_desc.clone(), input_inner.len());
for payload in input_inner {
let row_writer = builder.row_writer();
parser.parse_inner(Some(payload), row_writer).await.unwrap();
parser.parse_inner(payload, row_writer).await.unwrap();
}
builder.finish();
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#![feature(associated_type_defaults)]
#![feature(impl_trait_in_assoc_type)]
#![feature(iter_from_generator)]
#![feature(if_let_guard)]

use std::time::Duration;

Expand Down
8 changes: 3 additions & 5 deletions src/connector/src/parser/canal/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ use crate::parser::canal::operators::*;
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::unified::ChangeEventOperation;
use crate::parser::{
ByteStreamSourceParser, JsonProperties, SourceStreamChunkRowWriter, WriteGuard,
};
use crate::parser::{ByteStreamSourceParser, JsonProperties, SourceStreamChunkRowWriter};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

const DATA: &str = "data";
Expand Down Expand Up @@ -55,7 +53,7 @@ impl CanalJsonParser {
&self,
mut payload: Vec<u8>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<WriteGuard> {
) -> Result<()> {
let mut event: BorrowedValue<'_> =
simd_json::to_borrowed_value(&mut payload[self.payload_start_idx..])
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
Expand Down Expand Up @@ -128,7 +126,7 @@ impl ByteStreamSourceParser for CanalJsonParser {
_key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<WriteGuard> {
) -> Result<()> {
only_parse_payload!(self, payload, writer)
}
}
Expand Down
15 changes: 8 additions & 7 deletions src/connector/src/parser/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;

use simd_json::{BorrowedValue, ValueAccess};

pub(crate) fn json_object_smart_get_value<'a, 'b>(
/// Get a value from a json object by key, case insensitive.
///
/// Returns `None` if the given json value is not an object, or the key is not found.
pub(crate) fn json_object_get_case_insensitive<'a, 'b>(
v: &'b simd_json::BorrowedValue<'a>,
key: Cow<'b, str>,
key: &'b str,
) -> Option<&'b BorrowedValue<'a>> {
let obj = v.as_object()?;
let value = obj.get(key.as_ref());
let value = obj.get(key);
if value.is_some() {
return value;
return value; // fast path
}
for (k, v) in obj {
if k.eq_ignore_ascii_case(key.as_ref()) {
if k.eq_ignore_ascii_case(key) {
return Some(v);
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/parser/csv_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::types::{Date, Datum, Decimal, ScalarImpl, Time, Timestamp

use super::{ByteStreamSourceParser, CsvProperties};
use crate::only_parse_payload;
use crate::parser::{SourceStreamChunkRowWriter, WriteGuard};
use crate::parser::SourceStreamChunkRowWriter;
use crate::source::{DataType, SourceColumnDesc, SourceContext, SourceContextRef};

macro_rules! to_rust_type {
Expand Down Expand Up @@ -107,14 +107,14 @@ impl CsvParser {
&mut self,
payload: Vec<u8>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<WriteGuard> {
) -> Result<()> {
let mut fields = self.read_row(&payload)?;
if let Some(headers) = &mut self.headers {
if headers.is_empty() {
*headers = fields;
// Here we want a row, but got nothing. So it's an error for the `parse_inner` but
// has no bad impact on the system.
return Err(RwError::from(ProtocolError("This message indicates a header, no row will be inserted. However, internal parser state was updated.".to_string())));
return Err(RwError::from(ProtocolError("This message indicates a header, no row will be inserted. However, internal parser state was updated.".to_string())));
}
writer.insert(|desc| {
if let Some(i) = headers.iter().position(|name| name == &desc.name) {
Expand Down Expand Up @@ -157,7 +157,7 @@ impl ByteStreamSourceParser for CsvParser {
_key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<WriteGuard> {
) -> Result<()> {
only_parse_payload!(self, payload, writer)
}
}
Expand Down
14 changes: 6 additions & 8 deletions src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Either;
use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{Result, RwError};

Expand All @@ -23,8 +22,7 @@ use crate::parser::unified::debezium::DebeziumChangeEvent;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, JsonProperties,
ProtocolProperties, SourceStreamChunkRowWriter, SpecificParserConfig, TransactionControl,
WriteGuard,
ParseResult, ProtocolProperties, SourceStreamChunkRowWriter, SpecificParserConfig,
};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

Expand Down Expand Up @@ -93,7 +91,7 @@ impl DebeziumParser {
key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<Either<WriteGuard, TransactionControl>> {
) -> Result<ParseResult> {
// tombetone messages are handled implicitly by these accessors
let key_accessor = match key {
None => None,
Expand All @@ -106,12 +104,12 @@ impl DebeziumParser {
let row_op = DebeziumChangeEvent::new(key_accessor, payload_accessor);

match apply_row_operation_on_stream_chunk_writer(&row_op, &mut writer) {
Ok(guard) => Ok(Either::Left(guard)),
Ok(_) => Ok(ParseResult::Rows),
Err(err) => {
// Only try to access transaction control message if the row operation access failed
// to make it a fast path.
if let Ok(transaction_control) = row_op.transaction_control() {
Ok(Either::Right(transaction_control))
Ok(ParseResult::TransactionControl(transaction_control))
} else {
Err(err)
}
Expand All @@ -135,7 +133,7 @@ impl ByteStreamSourceParser for DebeziumParser {
_key: Option<Vec<u8>>,
_payload: Option<Vec<u8>>,
_writer: SourceStreamChunkRowWriter<'a>,
) -> Result<WriteGuard> {
) -> Result<()> {
unreachable!("should call `parse_one_with_txn` instead")
}

Expand All @@ -144,7 +142,7 @@ impl ByteStreamSourceParser for DebeziumParser {
key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<Either<WriteGuard, TransactionControl>> {
) -> Result<ParseResult> {
self.parse_inner(key, payload, writer).await
}
}
6 changes: 3 additions & 3 deletions src/connector/src/parser/debezium/mongo_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::only_parse_payload;
use crate::parser::unified::debezium::{DebeziumChangeEvent, MongoProjeciton};
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard};
use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

#[derive(Debug)]
Expand Down Expand Up @@ -82,7 +82,7 @@ impl DebeziumMongoJsonParser {
&self,
mut payload: Vec<u8>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<WriteGuard> {
) -> Result<()> {
let mut event: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload)
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;

Expand Down Expand Up @@ -117,7 +117,7 @@ impl ByteStreamSourceParser for DebeziumMongoJsonParser {
_key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<WriteGuard> {
) -> Result<()> {
only_parse_payload!(self, payload, writer)
}
}
Expand Down
35 changes: 14 additions & 21 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ use super::avro::schema_resolver::ConfluentSchemaResolver;
use super::schema_registry::Client;
use super::util::{get_kafka_topic, read_schema_from_http, read_schema_from_local};
use super::{EncodingProperties, SchemaRegistryAuth, SpecificParserConfig};
use crate::only_parse_payload;
use crate::parser::avro::util::avro_schema_to_column_descs;
use crate::parser::schema_registry::handle_sr_list;
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::util::apply_row_accessor_on_stream_chunk_writer;
use crate::parser::unified::AccessImpl;
use crate::parser::{
AccessBuilder, ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard,
};
use crate::parser::{AccessBuilder, ByteStreamSourceParser, SourceStreamChunkRowWriter};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

#[derive(Debug)]
Expand Down Expand Up @@ -106,17 +105,11 @@ impl JsonParser {
#[allow(clippy::unused_async)]
pub async fn parse_inner(
&self,
mut payload: Option<Vec<u8>>,
mut payload: Vec<u8>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<WriteGuard> {
if payload.is_none() {
return Err(RwError::from(ErrorCode::InternalError(
"Empty payload with nonempty key for non-upsert".into(),
)));
}
let value =
simd_json::to_borrowed_value(&mut payload.as_mut().unwrap()[self.payload_start_idx..])
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
) -> Result<()> {
let value = simd_json::to_borrowed_value(&mut payload[self.payload_start_idx..])
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
let values = if let simd_json::BorrowedValue::Array(arr) = value {
arr
} else {
Expand Down Expand Up @@ -194,8 +187,8 @@ impl ByteStreamSourceParser for JsonParser {
_key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<WriteGuard> {
self.parse_inner(payload, writer).await
) -> Result<()> {
only_parse_payload!(self, payload, writer)
}
}

Expand Down Expand Up @@ -255,7 +248,7 @@ mod tests {

for payload in get_payload() {
let writer = builder.row_writer();
parser.parse_inner(Some(payload), writer).await.unwrap();
parser.parse_inner(payload, writer).await.unwrap();
}

let chunk = builder.finish();
Expand Down Expand Up @@ -358,7 +351,7 @@ mod tests {
{
let writer = builder.row_writer();
let payload = br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec();
parser.parse_inner(Some(payload), writer).await.unwrap();
parser.parse_inner(payload, writer).await.unwrap();
}

// Parse an incorrect record.
Expand All @@ -367,14 +360,14 @@ mod tests {
// `v2` overflowed.
let payload = br#"{"v1": 1, "v2": 65536, "v3": "3"}"#.to_vec();
// ignored the error, and fill None at v2.
parser.parse_inner(Some(payload), writer).await.unwrap();
parser.parse_inner(payload, writer).await.unwrap();
}

// Parse a correct record.
{
let writer = builder.row_writer();
let payload = br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec();
parser.parse_inner(Some(payload), writer).await.unwrap();
parser.parse_inner(payload, writer).await.unwrap();
}

let chunk = builder.finish();
Expand Down Expand Up @@ -444,7 +437,7 @@ mod tests {
let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1);
{
let writer = builder.row_writer();
parser.parse_inner(Some(payload), writer).await.unwrap();
parser.parse_inner(payload, writer).await.unwrap();
}
let chunk = builder.finish();
let (op, row) = chunk.rows().next().unwrap();
Expand Down Expand Up @@ -504,7 +497,7 @@ mod tests {
let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1);
{
let writer = builder.row_writer();
parser.parse_inner(Some(payload), writer).await.unwrap();
parser.parse_inner(payload, writer).await.unwrap();
}
let chunk = builder.finish();
let (op, row) = chunk.rows().next().unwrap();
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/parser/maxwell/maxwell_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::parser::unified::maxwell::MaxwellChangeEvent;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType,
SourceStreamChunkRowWriter, SpecificParserConfig, WriteGuard,
SourceStreamChunkRowWriter, SpecificParserConfig,
};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

Expand Down Expand Up @@ -57,7 +57,7 @@ impl MaxwellParser {
&mut self,
payload: Vec<u8>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<WriteGuard> {
) -> Result<()> {
let payload_accessor = self.payload_builder.generate_accessor(payload).await?;
let row_op = MaxwellChangeEvent::new(payload_accessor);

Expand All @@ -79,7 +79,7 @@ impl ByteStreamSourceParser for MaxwellParser {
_key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
writer: SourceStreamChunkRowWriter<'a>,
) -> Result<WriteGuard> {
) -> Result<()> {
// restrict the behaviours since there is no corresponding
// key/value test for maxwell yet.
only_parse_payload!(self, payload, writer)
Expand Down
Loading

0 comments on commit 590faea

Please sign in to comment.