From 1faec7c7d4dd688838d18fa20af9b562cede54a3 Mon Sep 17 00:00:00 2001 From: pengzirui Date: Tue, 25 Jun 2024 15:52:04 +0800 Subject: [PATCH 1/2] [INLONG-10508][Sort] Fix pulsar connector flink 1.15 scan start up mode parameter cannot keep consistent with flink 1.13 --- .../apache/inlong/sort/pulsar/PulsarTableOptionUtils.java | 8 +++++++- .../org/apache/inlong/sort/pulsar/PulsarTableOptions.java | 1 + 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java index 5b2488dd40c..6fab4c6b201 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java @@ -31,6 +31,7 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; +import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -175,7 +176,12 @@ public static Properties getPulsarPropertiesWithPrefix( public static StartCursor getStartCursor(ReadableConfig tableOptions) { if (tableOptions.getOptional(STARTUP_MODE).isPresent()) { - return parseMessageIdStartCursor(tableOptions.get(STARTUP_MODE)); + String mode = tableOptions.getOptional(STARTUP_MODE).get(); + // to keep consistent with pulsar connector in flink 1.13 + if (mode.equals(PulsarScanStartupMode.EXTERNAL_SUBSCRIPTION.getValue())) { + return parseMessageIdStartCursor(tableOptions.get(SOURCE_START_FROM_MESSAGE_ID)); + } + return parseMessageIdStartCursor(mode); } else if (tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent()) { return parseMessageIdStartCursor(tableOptions.get(SOURCE_START_FROM_MESSAGE_ID)); } else if (tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) { diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java index 59e72201f8d..6ef1450fb75 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java @@ -121,6 +121,7 @@ private PulsarTableOptions() { code("earliest"), code("latest"), code("ledgerId:entryId:partitionId"), + code("external-subscription"), code("12:2:-1")) .build()); From bf98eece726dbbde3cf2eea2f2c1b68535f7b741 Mon Sep 17 00:00:00 2001 From: pengzirui Date: Tue, 25 Jun 2024 16:08:24 +0800 Subject: [PATCH 2/2] fix format --- .../org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java index 6fab4c6b201..c438fe13948 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.pulsar; +import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode; + import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; @@ -31,7 +33,6 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; -import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl;