Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
algosday committed Jun 15, 2023
1 parent ac0e79c commit b53a3fc
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ impl AvroParser {
}

if let Some(pk) = &self.upsert_primary_key_column_name {
accessor = accessor.with_primary_key_column_name(pk);
accessor = accessor.with_key_as_column_name(pk);
}

apply_row_operation_on_stream_chunk_writer(accessor, &mut writer)
Expand Down
17 changes: 13 additions & 4 deletions src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ pub(crate) fn from_avro_value(

#[cfg(test)]
mod tests {
use risingwave_common::types::ScalarImpl;
use risingwave_common::types::{ScalarImpl, Timestamp};

use super::*;
#[test]
Expand All @@ -278,7 +278,6 @@ mod tests {
let rust_decimal = avro_decimal_to_rust_decimal(avro_decimal, 28, 1).unwrap();
assert_eq!(rust_decimal, rust_decimal::Decimal::try_from(28.1).unwrap());
}
#[ignore]
#[test]
fn test_avro_timestamp_micros() {
let v1 = Value::TimestampMicros(1620000000000);
Expand All @@ -287,7 +286,17 @@ mod tests {
let value_schema2 = Schema::TimestampMillis;
let datum1 = from_avro_value(v1, &value_schema1, &DataType::Timestamp).unwrap();
let datum2 = from_avro_value(v2, &value_schema2, &DataType::Timestamp).unwrap();
assert_eq!(datum1, Some(ScalarImpl::Int64(1620000000000)));
assert_eq!(datum2, Some(ScalarImpl::Int64(1620000000000)));
assert_eq!(
datum1,
Some(ScalarImpl::Timestamp(Timestamp::new(
"2021-05-03T00:00:00".parse().unwrap()
)))
);
assert_eq!(
datum2,
Some(ScalarImpl::Timestamp(Timestamp::new(
"2021-05-03T00:00:00".parse().unwrap()
)))
);
}
}
2 changes: 1 addition & 1 deletion src/connector/src/parser/canal/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl CanalJsonParser {
})
.ok_or_else(|| {
RwError::from(ProtocolError(
"data is missing for creating event".to_string(),
"'data' is missing for creating event".to_string(),
))
})?;
let mut errors = Vec::new();
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ impl DebeziumAvroParser {

let resolver = apache_avro::schema::ResolvedSchema::try_from(&*self.outer_schema)
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
// todo: to_resolved may cause stackoverflow if there's a loop in the schema
let schema = resolver
.to_resolved(&self.outer_schema)
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/parser/unified/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ use crate::parser::unified::AccessError;
pub struct UpsertChangeEvent<K, V> {
key_accessor: Option<K>,
value_accessor: Option<V>,
primary_key_column_name: Option<String>,
key_as_column_name: Option<String>,
}

impl<K, V> Default for UpsertChangeEvent<K, V> {
fn default() -> Self {
Self {
key_accessor: None,
value_accessor: None,
primary_key_column_name: None,
key_as_column_name: None,
}
}
}
Expand All @@ -52,8 +52,8 @@ impl<K, V> UpsertChangeEvent<K, V> {
self
}

pub fn with_primary_key_column_name(mut self, name: impl ToString) -> Self {
self.primary_key_column_name = Some(name.to_string());
pub fn with_key_as_column_name(mut self, name: impl ToString) -> Self {
self.key_as_column_name = Some(name.to_string());
self
}
}
Expand Down Expand Up @@ -114,7 +114,7 @@ where
other => return other,
};

if let Some(primary_key_column_name) = &self.primary_key_column_name && name == primary_key_column_name {
if let Some(key_as_column_name) = &self.key_as_column_name && name == key_as_column_name {
return self.access(&["key"], Some(type_expected));
}

Expand Down

0 comments on commit b53a3fc

Please sign in to comment.