Skip to content

Commit

Permalink
[fix][client] Fix ReaderBuilder doest not give illegalArgument on con…
Browse files Browse the repository at this point in the history
…nection failure retry (#22639)

(cherry picked from commit b56f238)
  • Loading branch information
rdhabalia authored and lhotari committed May 14, 2024
1 parent 89b545e commit 8b2d5e9
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand All @@ -48,6 +50,7 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
Expand Down Expand Up @@ -851,4 +854,28 @@ public void testHasMessageAvailableAfterSeekTimestamp(boolean initializeLastMess
assertTrue(reader.hasMessageAvailable());
}
}

@Test
public void testReaderBuilderStateOnRetryFailure() throws Exception {
String ns = "my-property/my-ns";
String topic = "persistent://" + ns + "/testRetryReader";
RetentionPolicies retention = new RetentionPolicies(-1, -1);
admin.namespaces().setRetention(ns, retention);
String badUrl = "pulsar://bad-host:8080";

PulsarClient client = PulsarClient.builder().serviceUrl(badUrl).build();

ReaderBuilder<byte[]> readerBuilder = client.newReader().topic(topic).startMessageFromRollbackDuration(100,
TimeUnit.SECONDS);

for (int i = 0; i < 3; i++) {
try {
readerBuilder.createAsync().get(1, TimeUnit.SECONDS);
} catch (TimeoutException e) {
log.info("It should time out due to invalid url");
} catch (IllegalArgumentException e) {
fail("It should not fail with corrupt reader state");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ public CompletableFuture<Reader<T>> createAsync() {
.failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder"));
}

if (conf.getStartMessageId() != null && conf.getStartMessageFromRollbackDurationInSec() > 0
|| conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0) {
boolean isStartMsgIdExist = conf.getStartMessageId() != null && conf.getStartMessageId() != MessageId.earliest;
if ((isStartMsgIdExist && conf.getStartMessageFromRollbackDurationInSec() > 0)
|| (conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0)) {
return FutureUtil
.failedFuture(new IllegalArgumentException(
"Start message id or start message from roll back must be specified but they cannot be"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void readerBuilderLoadConfTest() throws Exception {
@Test(expectedExceptions = {PulsarClientException.class}, expectedExceptionsMessageRegExp = ".* must be specified but they cannot be specified at the same time.*")
public void shouldNotSetTwoOptAtTheSameTime() throws Exception {
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
try (Reader reader = client.newReader().topic("abc").startMessageId(MessageId.earliest)
try (Reader reader = client.newReader().topic("abc").startMessageId(MessageId.latest)
.startMessageFromRollbackDuration(10, TimeUnit.HOURS).create()) {
// no-op
} finally {
Expand Down

0 comments on commit 8b2d5e9

Please sign in to comment.