diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java index fefac9c0..30d79aef 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java @@ -14,6 +14,7 @@ package org.apache.flink.streaming.connectors.pulsar.internal; +import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.streaming.connectors.pulsar.util.MessageIdUtils; import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema; @@ -28,6 +29,8 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -168,12 +171,16 @@ protected void handleTooLargeCursor() { protected void emitRecord(Message message) throws IOException { MessageId messageId = message.getMessageId(); - final T record = deserializer.deserialize(message); - if (deserializer.isEndOfStream(record)) { - running = false; - return; + List list = new ArrayList<>(); + ListCollector listCollector = new ListCollector<>(list); + deserializer.deserialize(message, listCollector); + for (T t : list) { + if (deserializer.isEndOfStream(t)) { + running = false; + return; + } + owner.emitRecordsWithTimestamps(t, state, messageId, message.getEventTime()); } - owner.emitRecordsWithTimestamps(record, state, messageId, message.getEventTime()); } public void cancel() throws IOException {