Skip to content

Commit

Permalink
feat(parser): make json parser case insensitive (#7256)
Browse files Browse the repository at this point in the history
**This section will be used as the commit message. Please do not leave this empty!**

Please explain **IN DETAIL** what the changes are in this PR and why they are needed:

- Summarize your change (**mandatory**)
- How does this PR work? Need a brief introduction for the changed logic (optional)
- Describe clearly one logical change and avoid lazy messages (optional)
- Describe any limitations of the current code (optional)

Approved-By: waruto210
  • Loading branch information
tabVersion authored Jan 9, 2023
1 parent 60a3bdb commit b0c36c1
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 29 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/source/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ risingwave_expr = { path = "../expr" }
risingwave_pb = { path = "../prost" }
risingwave_storage = { path = "../storage" }
serde = { version = "1", features = ["derive"] }
simd-json = "0.7"
simd-json = { git = "https://github.com/tabVersion/simd-json.git", branch = "main", features = ["key-to-lowercase"] }
smallvec = "1"
static_assertions = "1"
tempfile = "3"
Expand Down
4 changes: 2 additions & 2 deletions src/source/src/parser/canal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ mod tests {
let payload = br#"{"data":[{"id":"1","name":"mike","is_adult":"0","balance":"1500.62","reg_time":"2018-01-01 00:00:01","win_rate":"0.65"}],"database":"demo","es":1668673476000,"id":7,"isDdl":false,"mysqlType":{"id":"int","name":"varchar(40)","is_adult":"boolean","balance":"decimal(10,2)","reg_time":"timestamp","win_rate":"double"},"old":[{"balance":"1000.62"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"is_adult":-6,"balance":3,"reg_time":93,"win_rate":8},"table":"demo","ts":1668673476732,"type":"UPDATE"}"#;
let parser = CanalJsonParser;
let descs = vec![
SourceColumnDesc::simple("id", DataType::Int64, 0.into()),
SourceColumnDesc::simple("name", DataType::Varchar, 1.into()),
SourceColumnDesc::simple("ID", DataType::Int64, 0.into()),
SourceColumnDesc::simple("NAME", DataType::Varchar, 1.into()),
SourceColumnDesc::simple("is_adult", DataType::Boolean, 2.into()),
SourceColumnDesc::simple("balance", DataType::Decimal, 3.into()),
SourceColumnDesc::simple("reg_time", DataType::Timestamp, 4.into()),
Expand Down
11 changes: 6 additions & 5 deletions src/source/src/parser/canal/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl CanalJsonParser {
writer.insert(|column| {
cannal_simd_json_parse_value(
&column.data_type,
v.get(column.name.as_str()),
v.get(column.name.to_ascii_lowercase().as_str()),
)
})
})
Expand Down Expand Up @@ -121,14 +121,15 @@ impl CanalJsonParser {
// in origin canal, old only contains the changed columns but data
// contains all columns.
// in ticdc, old contains all fields
let col_name_lc = column.name.to_ascii_lowercase();
let before_value = before
.get(column.name.as_str())
.or_else(|| after.get(column.name.as_str()));
.get(col_name_lc.as_str())
.or_else(|| after.get(col_name_lc.as_str()));
let before =
cannal_simd_json_parse_value(&column.data_type, before_value)?;
let after = cannal_simd_json_parse_value(
&column.data_type,
after.get(column.name.as_str()),
after.get(col_name_lc.as_str()),
)?;
Ok((before, after))
})
Expand All @@ -154,7 +155,7 @@ impl CanalJsonParser {
writer.delete(|column| {
cannal_simd_json_parse_value(
&column.data_type,
v.get(column.name.as_str()),
v.get(column.name.to_ascii_lowercase().as_str()),
)
})
})
Expand Down
4 changes: 3 additions & 1 deletion src/source/src/parser/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ fn do_parse_simd_json_value(dtype: &DataType, v: &BorrowedValue<'_>) -> Result<S
.field_names
.iter()
.zip_eq(struct_type_info.fields.iter())
.map(|field| simd_json_parse_value(field.1, v.get(field.0.as_str())))
.map(|field| {
simd_json_parse_value(field.1, v.get(field.0.to_ascii_lowercase().as_str()))
})
.collect::<Result<Vec<Datum>>>()?;
ScalarImpl::Struct(StructValue::new(fields))
}
Expand Down
26 changes: 18 additions & 8 deletions src/source/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,14 @@ impl DebeziumJsonParser {
})?;

writer.update(|column| {
let before =
simd_json_parse_value(&column.data_type, before.get(column.name.as_str()))?;
let after =
simd_json_parse_value(&column.data_type, after.get(column.name.as_str()))?;
let before = simd_json_parse_value(
&column.data_type,
before.get(column.name.to_ascii_lowercase().as_str()),
)?;
let after = simd_json_parse_value(
&column.data_type,
after.get(column.name.to_ascii_lowercase().as_str()),
)?;

Ok((before, after))
})
Expand All @@ -99,8 +103,11 @@ impl DebeziumJsonParser {
})?;

writer.insert(|column| {
simd_json_parse_value(&column.data_type, after.get(column.name.as_str()))
.map_err(Into::into)
simd_json_parse_value(
&column.data_type,
after.get(column.name.to_ascii_lowercase().as_str()),
)
.map_err(Into::into)
})
}
DEBEZIUM_DELETE_OP => {
Expand All @@ -114,8 +121,11 @@ impl DebeziumJsonParser {
})?;

writer.delete(|column| {
simd_json_parse_value(&column.data_type, before.get(column.name.as_str()))
.map_err(Into::into)
simd_json_parse_value(
&column.data_type,
before.get(column.name.to_ascii_lowercase().as_str()),
)
.map_err(Into::into)
})
}
_ => Err(RwError::from(ProtocolError(format!(
Expand Down
6 changes: 5 additions & 1 deletion src/source/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ impl JsonParser {
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;

writer.insert(|desc| {
simd_json_parse_value(&desc.data_type, value.get(desc.name.as_str())).map_err(|e| {
simd_json_parse_value(
&desc.data_type,
value.get(desc.name.to_ascii_lowercase().as_str()),
)
.map_err(|e| {
tracing::error!(
"failed to process value ({}): {}",
String::from_utf8_lossy(payload),
Expand Down
4 changes: 2 additions & 2 deletions src/source/src/parser/maxwell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ mod test {
async fn test_json_parser() {
let parser = MaxwellParser;
let descs = vec![
SourceColumnDesc::simple("id", DataType::Int32, 0.into()),
SourceColumnDesc::simple("name", DataType::Varchar, 1.into()),
SourceColumnDesc::simple("ID", DataType::Int32, 0.into()),
SourceColumnDesc::simple("NAME", DataType::Varchar, 1.into()),
SourceColumnDesc::simple("is_adult", DataType::Int16, 2.into()),
SourceColumnDesc::simple("birthday", DataType::Timestamp, 3.into()),
];
Expand Down
21 changes: 14 additions & 7 deletions src/source/src/parser/maxwell/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ impl MaxwellParser {
))
})?;
writer.insert(|column| {
simd_json_parse_value(&column.data_type, after.get(column.name.as_str()))
.map_err(Into::into)
simd_json_parse_value(
&column.data_type,
after.get(column.name.to_ascii_lowercase().as_str()),
)
.map_err(Into::into)
})
}
MAXWELL_UPDATE_OP => {
Expand All @@ -72,12 +75,13 @@ impl MaxwellParser {

writer.update(|column| {
// old only contains the changed columns but data contains all columns.
let col_name_lc = column.name.to_ascii_lowercase();
let before_value = before
.get(column.name.as_str())
.or_else(|| after.get(column.name.as_str()));
.get(col_name_lc.as_str())
.or_else(|| after.get(col_name_lc.as_str()));
let before = simd_json_parse_value(&column.data_type, before_value)?;
let after =
simd_json_parse_value(&column.data_type, after.get(column.name.as_str()))?;
simd_json_parse_value(&column.data_type, after.get(col_name_lc.as_str()))?;
Ok((before, after))
})
}
Expand All @@ -86,8 +90,11 @@ impl MaxwellParser {
RwError::from(ProtocolError("old is missing for delete event".to_string()))
})?;
writer.delete(|column| {
simd_json_parse_value(&column.data_type, before.get(column.name.as_str()))
.map_err(Into::into)
simd_json_parse_value(
&column.data_type,
before.get(column.name.to_ascii_lowercase().as_str()),
)
.map_err(Into::into)
})
}
other => Err(RwError::from(ProtocolError(format!(
Expand Down

0 comments on commit b0c36c1

Please sign in to comment.