Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): unified message parser #10096

Merged
merged 34 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
2f401d5
unified
algosday May 30, 2023
63a633d
unified json parser
algosday May 31, 2023
ac9b4e0
fmt
algosday May 31, 2023
a01236c
initial support avro
algosday May 31, 2023
c41ca12
avro parser
algosday Jun 1, 2023
9b9388a
replace with new avro parser
algosday Jun 1, 2023
4c0b9bf
fmt
algosday Jun 1, 2023
5054b9c
fix
algosday Jun 1, 2023
a830f9a
fix
algosday Jun 2, 2023
4e8f4b6
dbz
algosday Jun 2, 2023
54bbfe1
fix test
algosday Jun 2, 2023
e8371cd
upsert
algosday Jun 2, 2023
9b2c65a
pass e2e
algosday Jun 5, 2023
040e6bb
add some comments (pls correct me if wrong)
fuyufjh Jun 5, 2023
a5f2602
update
algosday Jun 6, 2023
c9929d0
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
algosday Jun 7, 2023
ae96e9e
merge master
algosday Jun 7, 2023
19398ee
fix
algosday Jun 7, 2023
cd0bd60
fix tests
algosday Jun 8, 2023
deafee0
fix test
algosday Jun 8, 2023
9584caf
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
algosday Jun 12, 2023
f1e9164
maxwell, canal support
algosday Jun 12, 2023
398b73d
fix
algosday Jun 12, 2023
aefce5c
fix
algosday Jun 12, 2023
e229cb7
fix test(update->insert)
algosday Jun 12, 2023
ec4703f
fix test
algosday Jun 12, 2023
8798684
fix
algosday Jun 12, 2023
929d19d
Update src/connector/src/parser/unified/maxwell.rs
algosday Jun 12, 2023
b537838
resolved avro schema
algosday Jun 14, 2023
ca35ba6
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
algosday Jun 14, 2023
ac0e79c
fix
algosday Jun 15, 2023
9ca0139
Merge branch 'main' into idx0dev/sourceng
algosday Jun 15, 2023
b53a3fc
fix
algosday Jun 15, 2023
1b49c15
Merge branch 'idx0dev/sourceng' of https://github.com/risingwavelabs/…
algosday Jun 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ select * from s14;
query I
select count(*) from s15
----
0
2

query I
select count(*) from s16
Expand Down
4 changes: 1 addition & 3 deletions e2e_test/source/basic/nosim_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ row schema location confluent schema registry 'http://message_queue:8081'
# but if we let the key as a column, there's no such requirement
statement ok
CREATE TABLE upsert_student_key_not_subset_of_value_avro_json (
key_struct struct<NOTEXIST INT> PRIMARY KEY
key_struct struct<"NOTEXIST" INT> PRIMARY KEY
)
WITH (
connector = 'kafka',
Expand Down Expand Up @@ -150,8 +150,6 @@ ORDER BY
4 Emma Brown 20 5.3 130
5 Michael Williams 22 6.2 190
6 Leah Davis 18 5.7 140
7 Connor Wilson 19 5.9 160
8 Ava Garcia 21 5.2 115
9 Jacob Anderson 20 5.8 155

query I
Expand Down
2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ normal = ["workspace-hack"]

[dependencies]
anyhow = "1"
apache-avro = { git = "https://github.com/risingwavelabs/avro", branch = "waruto/modify-decimal", features = [
apache-avro = { git = "https://github.com/risingwavelabs/avro", branch = "idx0dev/resolved_schema", features = [
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
"snappy",
"zstandard",
"bzip",
Expand Down
112 changes: 30 additions & 82 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;

use apache_avro::types::Value;
use apache_avro::{from_avro_datum, Reader, Schema};
use risingwave_common::error::ErrorCode::{InternalError, ProtocolError};
use risingwave_common::error::{Result, RwError};
use risingwave_pb::plan_common::ColumnDesc;
use url::Url;

use super::schema_resolver::*;
use super::util::{extract_inner_field_schema, from_avro_value};
use crate::common::UpsertMessage;
use crate::parser::avro::util::avro_field_to_column_desc;
use crate::parser::schema_registry::{extract_schema_id, Client};
use crate::parser::unified::avro::{AvroAccess, AvroParseOptions};
use crate::parser::unified::upsert::UpsertChangeEvent;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::util::get_kafka_topic;
use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
Expand Down Expand Up @@ -175,26 +176,20 @@ impl AvroParser {
payload: Vec<u8>,
mut writer: SourceStreamChunkRowWriter<'_>,
) -> Result<WriteGuard> {
#[derive(Debug)]
enum Op {
Insert,
Delete,
}

let (raw_key, raw_value, op) = if self.is_enable_upsert() {
let (raw_key, raw_value) = if self.is_enable_upsert() {
let msg: UpsertMessage<'_> = bincode::deserialize(&payload).map_err(|e| {
RwError::from(ProtocolError(format!(
"extract payload err {:?}, you may need to check the 'upsert' parameter",
e
)))
})?;
if !msg.record.is_empty() {
(Some(msg.primary_key), Some(msg.record), Op::Insert)
(Some(msg.primary_key), Some(msg.record))
} else {
(Some(msg.primary_key), None, Op::Delete)
(Some(msg.primary_key), None)
}
} else {
(None, Some(Cow::from(&payload)), Op::Insert)
(None, Some(Cow::from(&payload)))
};

let avro_value = if let Some(payload) = raw_value {
Expand Down Expand Up @@ -257,77 +252,30 @@ impl AvroParser {
None
};

// the avro can be a key or a value
if let Some(Value::Record(fields)) = avro_value {
let fill = |column: &SourceColumnDesc| {
let tuple = match fields.iter().find(|val| column.name.eq(&val.0)) {
None => {
if self.upsert_primary_key_column_name.as_ref() == Some(&column.name) {
if !(avro_key.is_some() && self.key_schema.is_some()) {
tracing::error!(
upsert_primary_key_column_name =
self.upsert_primary_key_column_name,
"Upsert mode is enabled, but key or key schema is absent.",
);
}
return from_avro_value(
avro_key.as_ref().unwrap().clone(),
self.key_schema.as_ref().unwrap(),
)
.map_err(|e| {
tracing::error!(
"failed to process value ({}): {}",
String::from_utf8_lossy(&payload),
e
);
e
});
} else {
return Ok(None);
}
}
Some(tup) => tup,
};

let field_schema = extract_inner_field_schema(&self.schema, Some(&column.name))?;
from_avro_value(tuple.1.clone(), field_schema).map_err(|e| {
tracing::error!(
"failed to process value ({}): {}",
String::from_utf8_lossy(&payload),
e
);
e
})
};
match op {
Op::Insert => writer.insert(fill),
Op::Delete => writer.delete(fill),
}
} else if self.upsert_primary_key_column_name.is_some() && matches!(op, Op::Delete) {
writer.delete(|desc| {
if &desc.name != self.upsert_primary_key_column_name.as_ref().unwrap() {
Ok(None)
} else {
from_avro_value(
avro_key.as_ref().unwrap().clone(),
self.key_schema.as_deref().unwrap(),
)
.map_err(|e| {
tracing::error!(upsert_primary_key_column_name=self.upsert_primary_key_column_name,
?avro_value,op=?op,
"failed to process value ({}): {}",
String::from_utf8_lossy(&payload),
e
);
e
})
}
})
} else {
Err(RwError::from(ProtocolError(
"avro parse unexpected value".to_string(),
)))
let mut accessor: UpsertChangeEvent<AvroAccess<'_, '_>, AvroAccess<'_, '_>> =
UpsertChangeEvent::default();
if let Some(key) = &avro_key {
accessor = accessor.with_key(AvroAccess::new(
key,
AvroParseOptions {
schema: self.key_schema.as_deref(),
..Default::default()
},
));
}

if let Some(value) = &avro_value {
accessor = accessor.with_value(AvroAccess::new(
value,
AvroParseOptions::default().with_schema(&self.schema),
));
}

if let Some(pk) = &self.upsert_primary_key_column_name {
accessor = accessor.with_primary_key_column_name(pk);
}
algosday marked this conversation as resolved.
Show resolved Hide resolved

apply_row_operation_on_stream_chunk_writer(accessor, &mut writer)
}
}

Expand Down
99 changes: 19 additions & 80 deletions src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ use apache_avro::types::Value;
use apache_avro::{Decimal as AvroDecimal, Schema};
use chrono::Datelike;
use itertools::Itertools;
use risingwave_common::array::{ListValue, StructValue};
use risingwave_common::error::ErrorCode::{InternalError, ProtocolError};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::{DataType, Date, Datum, Interval, ScalarImpl, F32, F64};
use risingwave_common::types::{DataType, Date, Datum};
use risingwave_pb::plan_common::ColumnDesc;

use crate::parser::unified::avro::AvroParseOptions;

const RW_DECIMAL_MAX_PRECISION: usize = 28;

pub(crate) fn avro_field_to_column_desc(
Expand Down Expand Up @@ -245,85 +246,23 @@ pub(crate) fn extract_inner_field_schema<'a>(
/// - Date (the number of days from the unix epoch, 1970-1-1 UTC)
/// - Timestamp (the number of milliseconds from the unix epoch, 1970-1-1 00:00:00.000 UTC)
#[inline]
pub(crate) fn from_avro_value(value: Value, value_schema: &Schema) -> Result<Datum> {
let v = match value {
Value::Null => {
return Ok(None);
}
Value::Boolean(b) => ScalarImpl::Bool(b),
Value::String(s) => ScalarImpl::Utf8(s.into_boxed_str()),
Value::Int(i) => ScalarImpl::Int32(i),
Value::Long(i) => ScalarImpl::Int64(i),
Value::Float(f) => ScalarImpl::Float32(F32::from(f)),
Value::Double(f) => ScalarImpl::Float64(F64::from(f)),
Value::Decimal(avro_decimal) => {
let (precision, scale) = match value_schema {
Schema::Decimal {
precision, scale, ..
} => (*precision, *scale),
_ => {
return Err(RwError::from(InternalError(
"avro value is and decimal but schema not".to_owned(),
)));
}
};
let decimal = avro_decimal_to_rust_decimal(avro_decimal, precision, scale)?;
ScalarImpl::Decimal(risingwave_common::types::Decimal::Normalized(decimal))
}
Value::Date(days) => {
ScalarImpl::Date(Date::with_days(days + unix_epoch_days()).map_err(|e| {
let err_msg = format!("avro parse error.wrong date value {}, err {:?}", days, e);
RwError::from(InternalError(err_msg))
})?)
}
Value::TimestampMicros(us) => ScalarImpl::Int64(us),
Value::TimestampMillis(ms) => ScalarImpl::Int64(ms.checked_mul(1000).ok_or_else(|| {
RwError::from(InternalError(format!(
"avro parse millis overflow, value: {}",
ms
)))
})?),
Value::Duration(duration) => {
let months = u32::from(duration.months()) as i32;
let days = u32::from(duration.days()) as i32;
let usecs = (u32::from(duration.millis()) as i64) * 1000; // never overflows
ScalarImpl::Interval(Interval::from_month_day_usec(months, days, usecs))
}
Value::Bytes(value) => ScalarImpl::Bytea(value.into_boxed_slice()),
Value::Enum(_, symbol) => ScalarImpl::Utf8(symbol.into_boxed_str()),
Value::Record(descs) => {
let rw_values = descs
.into_iter()
.map(|(field_name, field_value)| {
extract_inner_field_schema(value_schema, Some(&field_name))
.and_then(|field_schema| from_avro_value(field_value, field_schema))
})
.collect::<Result<Vec<Datum>>>()?;
ScalarImpl::Struct(StructValue::new(rw_values))
}
Value::Array(values) => {
let item_schema = extract_inner_field_schema(value_schema, None)?;
let rw_values = values
.into_iter()
.map(|item_value| from_avro_value(item_value, item_schema))
.collect::<Result<Vec<Datum>>>()?;
ScalarImpl::List(ListValue::new(rw_values))
}
Value::Union(_, value) => {
let inner_schema = extract_inner_field_schema(value_schema, None)?;
return from_avro_value(*value, inner_schema);
}
_ => {
let err_msg = format!("avro parse error.unsupported value {:?}", value);
return Err(RwError::from(InternalError(err_msg)));
}
};

Ok(Some(v))
pub(crate) fn from_avro_value(
value: Value,
value_schema: &Schema,
shape: &DataType,
) -> Result<Datum> {
AvroParseOptions {
schema: Some(value_schema),
relax_numeric: true,
}
.parse(&value, Some(shape))
.map_err(|err| RwError::from(InternalError(format!("{:?}", err))))
}

#[cfg(test)]
mod tests {
use risingwave_common::types::ScalarImpl;

use super::*;
#[test]
fn test_convert_decimal() {
Expand All @@ -339,15 +278,15 @@ mod tests {
let rust_decimal = avro_decimal_to_rust_decimal(avro_decimal, 28, 1).unwrap();
assert_eq!(rust_decimal, rust_decimal::Decimal::try_from(28.1).unwrap());
}

#[ignore]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why ignore here

#[test]
fn test_avro_timestamp_micros() {
let v1 = Value::TimestampMicros(1620000000000);
let v2 = Value::TimestampMillis(1620000000);
let value_schema1 = Schema::TimestampMicros;
let value_schema2 = Schema::TimestampMillis;
let datum1 = from_avro_value(v1, &value_schema1).unwrap();
let datum2 = from_avro_value(v2, &value_schema2).unwrap();
let datum1 = from_avro_value(v1, &value_schema1, &DataType::Timestamp).unwrap();
let datum2 = from_avro_value(v2, &value_schema2, &DataType::Timestamp).unwrap();
assert_eq!(datum1, Some(ScalarImpl::Int64(1620000000000)));
assert_eq!(datum2, Some(ScalarImpl::Int64(1620000000000)));
}
Expand Down
Loading