From 8f7d7fe49bfc1e3272c66123a867e2ccad6ae055 Mon Sep 17 00:00:00 2001 From: daleiz <30970925+daleiz@users.noreply.github.com> Date: Thu, 22 Aug 2024 16:01:30 +0800 Subject: [PATCH 1/3] sink-las: fix getField and column types mapping - EMQX-12951 - EMQX-12952 --- .../app/src/main/java/sink/LasSinkTask.java | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/sink-las/app/src/main/java/sink/LasSinkTask.java b/sink-las/app/src/main/java/sink/LasSinkTask.java index a593a0c..1970162 100644 --- a/sink-las/app/src/main/java/sink/LasSinkTask.java +++ b/sink-las/app/src/main/java/sink/LasSinkTask.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import java.util.stream.Stream; import static com.bytedance.las.tunnel.ActionType.*; import static com.bytedance.las.tunnel.TunnelConfig.SERVICE_REGION; @@ -106,7 +107,10 @@ void handle(SinkRecordBatch batch) { } void putValue(GenericData.Record record, Schema schema, String field, Object value) { - Schema valueSchema = schema.getField(field).schema(); + String lowerFieldName = field.toLowerCase(); + Schema.Field valueField = schema.getField(lowerFieldName); + Preconditions.checkNotNull(valueField, "Can not find the column named %s in the target LAS table", lowerFieldName); + Schema valueSchema = valueField.schema(); switch (valueSchema.getType()) { // for number type, it must be a double type. case INT: @@ -121,24 +125,22 @@ void putValue(GenericData.Record record, Schema schema, String field, Object val float floatValue = ((Double) value).floatValue(); record.put(field, floatValue); break; + case DOUBLE: + record.put(field, value); + break; case UNION: - // by default, a field in LAS table is nullable, ant its schema is a union like [type, null]. + // by default, a field in a LAS table is nullable, ant its schema is a union like [type, null]. List unionTypes = valueSchema.getTypes(); - if(unionTypes.stream().anyMatch(it -> it.getType().equals(Schema.Type.INT))) { - int unionIntValue = ((Double) value).intValue(); - record.put(field, unionIntValue); - break; - } else if (unionTypes.stream().anyMatch(it -> it.getType().equals(Schema.Type.LONG))) { - long unionLongValue = ((Double) value).longValue(); - record.put(field, unionLongValue); - break; - } else if (unionTypes.stream().anyMatch(it -> it.getType().equals(Schema.Type.FLOAT))) { - float unionFloatValue = ((Double) value).floatValue(); - record.put(field, unionFloatValue); - break; + Preconditions.checkArgument(unionTypes.size() == 2, "Unsupported column schema %s in a LAS table", valueSchema.toString()); + Preconditions.checkArgument(unionTypes.get(1).getType().equals(Schema.Type.NULL) || unionTypes.get(0).getType().equals(Schema.Type.NULL), "Unsupported column schema %s in a LAS table", valueSchema.toString()); + + if(unionTypes.get(1).getType().equals(Schema.Type.NULL)) { + putValue(record, unionTypes.get(0), field, value); + } else { + putValue(record, unionTypes.get(1), field, value); } default: - record.put(field, value); + throw new RuntimeException(String.format("Unsupported column type %s in a LAS table, we only allow string type and number types in a LAS table.", valueSchema.getType().toString())); } } @@ -153,7 +155,7 @@ GenericData.Record convertRecord(SinkRecord sinkRecord) { var r = new GenericData.Record(schema); if(valueRecord == null) { // This represents a deleting row event from CDC source, for LAS sink, we set 'is_deleted' column to true. - putValue(r, schema, "is_deleted", true); + putValue(r, schema, "is_deleted", "true"); } else { for (var entry : valueRecord.entrySet()) { var value = entry.getValue(); From 3ac02a1d86db10a6d877e90337a21b7ce7615cb1 Mon Sep 17 00:00:00 2001 From: daleiz <30970925+daleiz@users.noreply.github.com> Date: Thu, 22 Aug 2024 16:13:25 +0800 Subject: [PATCH 2/3] fix --- sink-las/app/src/main/java/sink/LasSinkTask.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sink-las/app/src/main/java/sink/LasSinkTask.java b/sink-las/app/src/main/java/sink/LasSinkTask.java index 1970162..c90c22b 100644 --- a/sink-las/app/src/main/java/sink/LasSinkTask.java +++ b/sink-las/app/src/main/java/sink/LasSinkTask.java @@ -126,7 +126,11 @@ void putValue(GenericData.Record record, Schema schema, String field, Object val record.put(field, floatValue); break; case DOUBLE: - record.put(field, value); + double doubleValue = (Double) value; + record.put(field, doubleValue); + break; + case STRING: + record.put(field, value.toString()); break; case UNION: // by default, a field in a LAS table is nullable, ant its schema is a union like [type, null]. @@ -139,6 +143,7 @@ void putValue(GenericData.Record record, Schema schema, String field, Object val } else { putValue(record, unionTypes.get(1), field, value); } + break; default: throw new RuntimeException(String.format("Unsupported column type %s in a LAS table, we only allow string type and number types in a LAS table.", valueSchema.getType().toString())); From 3f884ddd27ed21ce539259db0fb5b7e1e581c994 Mon Sep 17 00:00:00 2001 From: daleiz <30970925+daleiz@users.noreply.github.com> Date: Thu, 22 Aug 2024 16:17:20 +0800 Subject: [PATCH 3/3] clean --- sink-las/app/src/main/java/sink/LasSinkTask.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/sink-las/app/src/main/java/sink/LasSinkTask.java b/sink-las/app/src/main/java/sink/LasSinkTask.java index c90c22b..bd1a6a6 100644 --- a/sink-las/app/src/main/java/sink/LasSinkTask.java +++ b/sink-las/app/src/main/java/sink/LasSinkTask.java @@ -16,11 +16,8 @@ import org.apache.avro.generic.GenericData; import com.google.common.base.Preconditions; - - import java.util.List; import java.util.Map; -import java.util.stream.Stream; import static com.bytedance.las.tunnel.ActionType.*; import static com.bytedance.las.tunnel.TunnelConfig.SERVICE_REGION;