diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index 2d3e8d4c6e978e..883d5c4549e70a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -36,6 +36,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -48,6 +50,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.Reader; @@ -902,4 +905,28 @@ public void testHasMessageAvailableAfterSeekTimestamp(boolean initializeLastMess assertTrue(reader.hasMessageAvailable()); } } + + @Test + public void testReaderBuilderStateOnRetryFailure() throws Exception { + String ns = "my-property/my-ns"; + String topic = "persistent://" + ns + "/testRetryReader"; + RetentionPolicies retention = new RetentionPolicies(-1, -1); + admin.namespaces().setRetention(ns, retention); + String badUrl = "pulsar://bad-host:8080"; + + PulsarClient client = PulsarClient.builder().serviceUrl(badUrl).build(); + + ReaderBuilder readerBuilder = client.newReader().topic(topic).startMessageFromRollbackDuration(100, + TimeUnit.SECONDS); + + for (int i = 0; i <3; i++) { + try { + readerBuilder.createAsync().get(1, TimeUnit.SECONDS); + } catch (TimeoutException e) { + log.info("It should time out due to invalid url"); + } catch (IllegalStateException e) { + fail("It should not fail with corrupted reader state"); + } + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index 2860cda0ceef1c..b18b3b12e8e5e9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -86,7 +86,8 @@ public CompletableFuture> createAsync() { .failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder")); } - if (conf.getStartMessageId() != null && conf.getStartMessageFromRollbackDurationInSec() > 0 + boolean isStartMsgIdExist = conf.getStartMessageId() != null && conf.getStartMessageId() != MessageId.earliest; + if (isStartMsgIdExist && conf.getStartMessageFromRollbackDurationInSec() > 0 || conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0) { return FutureUtil .failedFuture(new IllegalArgumentException(