From 2ce4d74f5511c826da31049e37c7b1cd4859e716 Mon Sep 17 00:00:00 2001 From: haormj Date: Wed, 4 Aug 2021 10:54:53 +0800 Subject: [PATCH] fix: pulsar-flink-381 debezium-format update message will produce two RowData, but pulsar connector only deliver one RowData to downstream Updates https://github.com/streamnative/pulsar-flink/issues/381 --- .../pulsar/internal/ReaderThread.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java index fefac9c0..30d79aef 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java @@ -14,6 +14,7 @@ package org.apache.flink.streaming.connectors.pulsar.internal; +import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.streaming.connectors.pulsar.util.MessageIdUtils; import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema; @@ -28,6 +29,8 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -168,12 +171,16 @@ protected void handleTooLargeCursor() { protected void emitRecord(Message message) throws IOException { MessageId messageId = message.getMessageId(); - final T record = deserializer.deserialize(message); - if (deserializer.isEndOfStream(record)) { - running = false; - return; + List list = new ArrayList<>(); + ListCollector listCollector = new ListCollector<>(list); + deserializer.deserialize(message, listCollector); + for (T t : list) { + if (deserializer.isEndOfStream(t)) { + running = false; + return; + } + owner.emitRecordsWithTimestamps(t, state, messageId, message.getEventTime()); } - owner.emitRecordsWithTimestamps(record, state, messageId, message.getEventTime()); } public void cancel() throws IOException {