From 3f12269d8d99e514cf48ef6d57fc3928d37b3646 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 5 Nov 2024 11:40:39 +0800 Subject: [PATCH] [improve][io] Support update subscription position for sink connector (#23538) --- .../pulsar/functions/utils/SinkConfigUtils.java | 4 +++- .../functions/utils/SinkConfigUtilsTest.java | 14 ++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index 6631c053fac49..65b6b97fc6ee9 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -724,7 +724,9 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne if (newConfig.getTransformFunctionConfig() != null) { mergedConfig.setTransformFunctionConfig(newConfig.getTransformFunctionConfig()); } - + if (newConfig.getSourceSubscriptionPosition() != null) { + mergedConfig.setSourceSubscriptionPosition(newConfig.getSourceSubscriptionPosition()); + } return mergedConfig; } diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java index 5c2b6d92b9366..c4c79a635eac0 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java @@ -38,6 +38,7 @@ import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.common.functions.ConsumerConfig; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; @@ -224,6 +225,18 @@ public void testCleanSubscriptionField() throws IOException { } } + @Test + public void testUpdateSubscriptionPosition() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createSinkConfig(); + newSinkConfig.setSourceSubscriptionPosition(SubscriptionInitialPosition.Earliest); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals( + new Gson().toJson(newSinkConfig), + new Gson().toJson(mergedConfig) + ); + } + @Test public void testMergeEqual() { SinkConfig sinkConfig = createSinkConfig(); @@ -565,6 +578,7 @@ private SinkConfig createSinkConfig() { inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build()); sinkConfig.setInputSpecs(inputSpecs); sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + sinkConfig.setSourceSubscriptionPosition(SubscriptionInitialPosition.Latest); sinkConfig.setRetainOrdering(false); sinkConfig.setRetainKeyOrdering(false); sinkConfig.setConfigs(new HashMap<>());