diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index 2d3e8d4c6e978..12228220b18bd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -36,6 +36,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -48,6 +50,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.Reader; @@ -902,4 +905,28 @@ public void testHasMessageAvailableAfterSeekTimestamp(boolean initializeLastMess assertTrue(reader.hasMessageAvailable()); } } + + @Test + public void testReaderBuilderStateOnRetryFailure() throws Exception { + String ns = "my-property/my-ns"; + String topic = "persistent://" + ns + "/testRetryReader"; + RetentionPolicies retention = new RetentionPolicies(-1, -1); + admin.namespaces().setRetention(ns, retention); + String badUrl = "pulsar://bad-host:8080"; + + PulsarClient client = PulsarClient.builder().serviceUrl(badUrl).build(); + + ReaderBuilder readerBuilder = client.newReader().topic(topic).startMessageFromRollbackDuration(100, + TimeUnit.SECONDS); + + for (int i = 0; i < 3; i++) { + try { + readerBuilder.createAsync().get(1, TimeUnit.SECONDS); + } catch (TimeoutException e) { + log.info("It should time out due to invalid url"); + } catch (IllegalArgumentException e) { + fail("It should not fail with corrupt reader state"); + } + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index 2860cda0ceef1..ef230475be53b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -86,8 +86,9 @@ public CompletableFuture> createAsync() { .failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder")); } - if (conf.getStartMessageId() != null && conf.getStartMessageFromRollbackDurationInSec() > 0 - || conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0) { + boolean isStartMsgIdExist = conf.getStartMessageId() != null && conf.getStartMessageId() != MessageId.earliest; + if ((isStartMsgIdExist && conf.getStartMessageFromRollbackDurationInSec() > 0) + || (conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0)) { return FutureUtil .failedFuture(new IllegalArgumentException( "Start message id or start message from roll back must be specified but they cannot be" diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java index 607689e0e2b3b..5f52f86d8b014 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java @@ -106,7 +106,7 @@ public void readerBuilderLoadConfTest() throws Exception { @Test(expectedExceptions = {PulsarClientException.class}, expectedExceptionsMessageRegExp = ".* must be specified but they cannot be specified at the same time.*") public void shouldNotSetTwoOptAtTheSameTime() throws Exception { PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); - try (Reader reader = client.newReader().topic("abc").startMessageId(MessageId.earliest) + try (Reader reader = client.newReader().topic("abc").startMessageId(MessageId.latest) .startMessageFromRollbackDuration(10, TimeUnit.HOURS).create()) { // no-op } finally {