Skip to content

Commit

Permalink
[fix][client] Fix ReaderBuilder doest not give illegalArgument on con…
Browse files Browse the repository at this point in the history
…nection failure retry
  • Loading branch information
rdhabalia committed May 2, 2024
1 parent bc44280 commit 8bb5ab8
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand All @@ -48,6 +50,7 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
Expand Down Expand Up @@ -902,4 +905,28 @@ public void testHasMessageAvailableAfterSeekTimestamp(boolean initializeLastMess
assertTrue(reader.hasMessageAvailable());
}
}

@Test
public void testReaderBuilderStateOnRetryFailure() throws Exception {
String ns = "my-property/my-ns";
String topic = "persistent://" + ns + "/testRetryReader";
RetentionPolicies retention = new RetentionPolicies(-1, -1);
admin.namespaces().setRetention(ns, retention);
String badUrl = "pulsar://bad-host:8080";

PulsarClient client = PulsarClient.builder().serviceUrl(badUrl).build();

ReaderBuilder<byte[]> readerBuilder = client.newReader().topic(topic).startMessageFromRollbackDuration(100,
TimeUnit.SECONDS);

for (int i = 0; i <3; i++) {
try {
readerBuilder.createAsync().get(1, TimeUnit.SECONDS);
} catch (TimeoutException e) {
log.info("It should time out due to invalid url");
} catch (IllegalStateException e) {
fail("It should not fail with corrupted reader state");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public CompletableFuture<Reader<T>> createAsync() {
.failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder"));
}

if (conf.getStartMessageId() != null && conf.getStartMessageFromRollbackDurationInSec() > 0
boolean isStartMsgIdExist = conf.getStartMessageId() != null && conf.getStartMessageId() != MessageId.earliest;
if (isStartMsgIdExist && conf.getStartMessageFromRollbackDurationInSec() > 0
|| conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0) {
return FutureUtil
.failedFuture(new IllegalArgumentException(
Expand Down

0 comments on commit 8bb5ab8

Please sign in to comment.