diff --git a/Cargo.lock b/Cargo.lock index f38579037670..d921584b232b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6801,8 +6801,7 @@ dependencies = [ [[package]] name = "simd-json" version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e3375b6c3d8c048ba09c8b4b6c3f1d3f35e06b71db07d231c323943a949e1b8" +source = "git+https://github.com/tabVersion/simd-json.git?branch=main#fe89a0d1d2bb5749acc72a515db39c36d16d267d" dependencies = [ "halfbrown", "lexical-core", diff --git a/src/source/Cargo.toml b/src/source/Cargo.toml index bc2d232437e5..ad3c08ee3aaa 100644 --- a/src/source/Cargo.toml +++ b/src/source/Cargo.toml @@ -41,7 +41,7 @@ risingwave_expr = { path = "../expr" } risingwave_pb = { path = "../prost" } risingwave_storage = { path = "../storage" } serde = { version = "1", features = ["derive"] } -simd-json = "0.7" +simd-json = { git = "https://github.com/tabVersion/simd-json.git", branch = "main", features = ["key-to-lowercase"] } smallvec = "1" static_assertions = "1" tempfile = "3" diff --git a/src/source/src/parser/canal/mod.rs b/src/source/src/parser/canal/mod.rs index 89b34a327474..5c583f4a4ad0 100644 --- a/src/source/src/parser/canal/mod.rs +++ b/src/source/src/parser/canal/mod.rs @@ -37,8 +37,8 @@ mod tests { let payload = br#"{"data":[{"id":"1","name":"mike","is_adult":"0","balance":"1500.62","reg_time":"2018-01-01 00:00:01","win_rate":"0.65"}],"database":"demo","es":1668673476000,"id":7,"isDdl":false,"mysqlType":{"id":"int","name":"varchar(40)","is_adult":"boolean","balance":"decimal(10,2)","reg_time":"timestamp","win_rate":"double"},"old":[{"balance":"1000.62"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"is_adult":-6,"balance":3,"reg_time":93,"win_rate":8},"table":"demo","ts":1668673476732,"type":"UPDATE"}"#; let parser = CanalJsonParser; let descs = vec![ - SourceColumnDesc::simple("id", DataType::Int64, 0.into()), - SourceColumnDesc::simple("name", DataType::Varchar, 1.into()), + SourceColumnDesc::simple("ID", DataType::Int64, 0.into()), + SourceColumnDesc::simple("NAME", DataType::Varchar, 1.into()), SourceColumnDesc::simple("is_adult", DataType::Boolean, 2.into()), SourceColumnDesc::simple("balance", DataType::Decimal, 3.into()), SourceColumnDesc::simple("reg_time", DataType::Timestamp, 4.into()), diff --git a/src/source/src/parser/canal/simd_json_parser.rs b/src/source/src/parser/canal/simd_json_parser.rs index 28e1ef1ab270..8eb3a0ab266d 100644 --- a/src/source/src/parser/canal/simd_json_parser.rs +++ b/src/source/src/parser/canal/simd_json_parser.rs @@ -82,7 +82,7 @@ impl CanalJsonParser { writer.insert(|column| { cannal_simd_json_parse_value( &column.data_type, - v.get(column.name.as_str()), + v.get(column.name.to_ascii_lowercase().as_str()), ) }) }) @@ -121,14 +121,15 @@ impl CanalJsonParser { // in origin canal, old only contains the changed columns but data // contains all columns. // in ticdc, old contains all fields + let col_name_lc = column.name.to_ascii_lowercase(); let before_value = before - .get(column.name.as_str()) - .or_else(|| after.get(column.name.as_str())); + .get(col_name_lc.as_str()) + .or_else(|| after.get(col_name_lc.as_str())); let before = cannal_simd_json_parse_value(&column.data_type, before_value)?; let after = cannal_simd_json_parse_value( &column.data_type, - after.get(column.name.as_str()), + after.get(col_name_lc.as_str()), )?; Ok((before, after)) }) @@ -154,7 +155,7 @@ impl CanalJsonParser { writer.delete(|column| { cannal_simd_json_parse_value( &column.data_type, - v.get(column.name.as_str()), + v.get(column.name.to_ascii_lowercase().as_str()), ) }) }) diff --git a/src/source/src/parser/common.rs b/src/source/src/parser/common.rs index 473319b548db..5f6be8916cab 100644 --- a/src/source/src/parser/common.rs +++ b/src/source/src/parser/common.rs @@ -67,7 +67,9 @@ fn do_parse_simd_json_value(dtype: &DataType, v: &BorrowedValue<'_>) -> Result>>()?; ScalarImpl::Struct(StructValue::new(fields)) } diff --git a/src/source/src/parser/debezium/simd_json_parser.rs b/src/source/src/parser/debezium/simd_json_parser.rs index b5c4aff4aca3..e41e3044f3d7 100644 --- a/src/source/src/parser/debezium/simd_json_parser.rs +++ b/src/source/src/parser/debezium/simd_json_parser.rs @@ -80,10 +80,14 @@ impl DebeziumJsonParser { })?; writer.update(|column| { - let before = - simd_json_parse_value(&column.data_type, before.get(column.name.as_str()))?; - let after = - simd_json_parse_value(&column.data_type, after.get(column.name.as_str()))?; + let before = simd_json_parse_value( + &column.data_type, + before.get(column.name.to_ascii_lowercase().as_str()), + )?; + let after = simd_json_parse_value( + &column.data_type, + after.get(column.name.to_ascii_lowercase().as_str()), + )?; Ok((before, after)) }) @@ -99,8 +103,11 @@ impl DebeziumJsonParser { })?; writer.insert(|column| { - simd_json_parse_value(&column.data_type, after.get(column.name.as_str())) - .map_err(Into::into) + simd_json_parse_value( + &column.data_type, + after.get(column.name.to_ascii_lowercase().as_str()), + ) + .map_err(Into::into) }) } DEBEZIUM_DELETE_OP => { @@ -114,8 +121,11 @@ impl DebeziumJsonParser { })?; writer.delete(|column| { - simd_json_parse_value(&column.data_type, before.get(column.name.as_str())) - .map_err(Into::into) + simd_json_parse_value( + &column.data_type, + before.get(column.name.to_ascii_lowercase().as_str()), + ) + .map_err(Into::into) }) } _ => Err(RwError::from(ProtocolError(format!( diff --git a/src/source/src/parser/json_parser.rs b/src/source/src/parser/json_parser.rs index 98c821f739b9..d5167f2e04b4 100644 --- a/src/source/src/parser/json_parser.rs +++ b/src/source/src/parser/json_parser.rs @@ -38,7 +38,11 @@ impl JsonParser { .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; writer.insert(|desc| { - simd_json_parse_value(&desc.data_type, value.get(desc.name.as_str())).map_err(|e| { + simd_json_parse_value( + &desc.data_type, + value.get(desc.name.to_ascii_lowercase().as_str()), + ) + .map_err(|e| { tracing::error!( "failed to process value ({}): {}", String::from_utf8_lossy(payload), diff --git a/src/source/src/parser/maxwell/mod.rs b/src/source/src/parser/maxwell/mod.rs index bf4aaa3a0fd3..b4bf565e8a02 100644 --- a/src/source/src/parser/maxwell/mod.rs +++ b/src/source/src/parser/maxwell/mod.rs @@ -30,8 +30,8 @@ mod test { async fn test_json_parser() { let parser = MaxwellParser; let descs = vec![ - SourceColumnDesc::simple("id", DataType::Int32, 0.into()), - SourceColumnDesc::simple("name", DataType::Varchar, 1.into()), + SourceColumnDesc::simple("ID", DataType::Int32, 0.into()), + SourceColumnDesc::simple("NAME", DataType::Varchar, 1.into()), SourceColumnDesc::simple("is_adult", DataType::Int16, 2.into()), SourceColumnDesc::simple("birthday", DataType::Timestamp, 3.into()), ]; diff --git a/src/source/src/parser/maxwell/simd_json_parser.rs b/src/source/src/parser/maxwell/simd_json_parser.rs index c69741c29692..74c47b5847c2 100644 --- a/src/source/src/parser/maxwell/simd_json_parser.rs +++ b/src/source/src/parser/maxwell/simd_json_parser.rs @@ -54,8 +54,11 @@ impl MaxwellParser { )) })?; writer.insert(|column| { - simd_json_parse_value(&column.data_type, after.get(column.name.as_str())) - .map_err(Into::into) + simd_json_parse_value( + &column.data_type, + after.get(column.name.to_ascii_lowercase().as_str()), + ) + .map_err(Into::into) }) } MAXWELL_UPDATE_OP => { @@ -72,12 +75,13 @@ impl MaxwellParser { writer.update(|column| { // old only contains the changed columns but data contains all columns. + let col_name_lc = column.name.to_ascii_lowercase(); let before_value = before - .get(column.name.as_str()) - .or_else(|| after.get(column.name.as_str())); + .get(col_name_lc.as_str()) + .or_else(|| after.get(col_name_lc.as_str())); let before = simd_json_parse_value(&column.data_type, before_value)?; let after = - simd_json_parse_value(&column.data_type, after.get(column.name.as_str()))?; + simd_json_parse_value(&column.data_type, after.get(col_name_lc.as_str()))?; Ok((before, after)) }) } @@ -86,8 +90,11 @@ impl MaxwellParser { RwError::from(ProtocolError("old is missing for delete event".to_string())) })?; writer.delete(|column| { - simd_json_parse_value(&column.data_type, before.get(column.name.as_str())) - .map_err(Into::into) + simd_json_parse_value( + &column.data_type, + before.get(column.name.to_ascii_lowercase().as_str()), + ) + .map_err(Into::into) }) } other => Err(RwError::from(ProtocolError(format!(