diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java index 9493f78f417..1ece6c0184d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java @@ -30,6 +30,7 @@ import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode; import org.apache.inlong.sort.protocol.node.format.Format; +import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; import java.util.ArrayList; @@ -67,7 +68,9 @@ public ExtractNode createExtractNode(StreamNode streamNodeInfo) { final String primaryKey = pulsarSource.getPrimaryKey(); final String serviceUrl = pulsarSource.getServiceUrl(); final String adminUrl = pulsarSource.getAdminUrl(); - final String scanStartupSubStartOffset = null; + final String scanStartupSubStartOffset = + StringUtils.isNotBlank(pulsarSource.getSubscription()) ? PulsarScanStartupMode.EARLIEST.getValue() + : null; return new PulsarExtractNode(pulsarSource.getSourceName(), pulsarSource.getSourceName(),