Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][misc] Return failed future in async method instead of throw exception directly #9

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3107,10 +3107,14 @@ private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopi
Optional<Policies> policies) {
final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName, policies);
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
checkArgument(defaultNumPartitions > 0,
"Default number of partitions should be more than 0");
checkArgument(maxPartitions <= 0 || defaultNumPartitions <= maxPartitions,
"Number of partitions should be less than or equal to " + maxPartitions);
if (defaultNumPartitions <= 0) {
return FutureUtil.failedFuture(
new IllegalArgumentException("Default number of partitions should be more than 0"));
}
if (maxPartitions > 0 && defaultNumPartitions > maxPartitions) {
return FutureUtil.failedFuture(new IllegalArgumentException(
"Number of partitions should be less than or equal to " + maxPartitions));
}

PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.TopicHashPositions;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;

public class NamespacesImpl extends BaseResource implements Namespaces {

Expand Down Expand Up @@ -182,7 +183,10 @@ public void createNamespace(String namespace, Policies policies) throws PulsarAd
@Override
public CompletableFuture<Void> createNamespaceAsync(String namespace, Policies policies) {
NamespaceName ns = NamespaceName.get(namespace);
checkArgument(ns.isV2(), "Create namespace with policies is only supported on newer namespaces");
if (!ns.isV2()) {
return FutureUtil.failedFuture(new IllegalArgumentException(
"Create namespace with policies is only supported on newer namespaces"));
}
WebTarget path = namespacePath(ns);
// For V2 API we pass full Policy class instance
return asyncPutRequest(path, Entity.entity(policies, MediaType.APPLICATION_JSON));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.util.FutureUtil;

public class NonPersistentTopicsImpl extends BaseResource implements NonPersistentTopics {

Expand All @@ -51,7 +52,9 @@ public void createPartitionedTopic(String topic, int numPartitions) throws Pulsa

@Override
public CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions) {
checkArgument(numPartitions > 0, "Number of partitions should be more than 0");
if (numPartitions <= 0) {
return FutureUtil.failedFuture(new IllegalArgumentException("Number of partitions should be more than 0"));
}
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "partitions");
return asyncPutRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -329,7 +330,9 @@ public CompletableFuture<Void> createPartitionedTopicAsync(String topic, int num

public CompletableFuture<Void> createPartitionedTopicAsync(
String topic, int numPartitions, boolean createLocalTopicOnly, Map<String, String> properties) {
checkArgument(numPartitions > 0, "Number of partitions should be more than 0");
if (numPartitions <= 0) {
return FutureUtil.failedFuture(new IllegalArgumentException("Number of partitions must be more than 0"));
}
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitions")
.queryParam("createLocalTopicOnly", Boolean.toString(createLocalTopicOnly));
Expand Down Expand Up @@ -382,7 +385,9 @@ public CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int num
@Override
public CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int numPartitions,
boolean updateLocalTopicOnly, boolean force) {
checkArgument(numPartitions > 0, "Number of partitions must be more than 0");
if (numPartitions <= 0) {
return FutureUtil.failedFuture(new IllegalArgumentException("Number of partitions must be more than 0"));
}
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitions");
path = path.queryParam("updateLocalTopicOnly", Boolean.toString(updateLocalTopicOnly)).queryParam("force",
Expand Down Expand Up @@ -878,7 +883,9 @@ public List<Message<byte[]>> peekMessages(String topic, String subName, int numM

@Override
public CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(String topic, String subName, int numMessages) {
checkArgument(numMessages > 0);
if (numMessages <= 0) {
return FutureUtil.failedFuture(new IllegalArgumentException("numMessages must be positive"));
}
CompletableFuture<List<Message<byte[]>>> future = new CompletableFuture<List<Message<byte[]>>>();
peekMessagesAsync(topic, subName, numMessages, new ArrayList<>(), future, 1);
return future;
Expand Down Expand Up @@ -2742,8 +2749,12 @@ public void createShadowTopic(String shadowTopic, String sourceTopic, Map<String
@Override
public CompletableFuture<Void> createShadowTopicAsync(String shadowTopic, String sourceTopic,
Map<String, String> properties) {
checkArgument(TopicName.get(shadowTopic).isPersistent(), "Shadow topic must be persistent");
checkArgument(TopicName.get(sourceTopic).isPersistent(), "Source topic must be persistent");
if (!TopicName.get(shadowTopic).isPersistent()) {
return FutureUtil.failedFuture(new IllegalArgumentException("Shadow topic must be persistent"));
}
if (!TopicName.get(sourceTopic).isPersistent()) {
return FutureUtil.failedFuture(new IllegalArgumentException("Source topic must be persistent"));
}
return getPartitionedTopicMetadataAsync(sourceTopic).thenCompose(sourceTopicMeta -> {
HashMap<String, String> shadowProperties = new HashMap<>();
if (properties != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.common.util.FutureUtil;

public class TransactionsImpl extends BaseResource implements Transactions {
private final WebTarget adminV3Transactions;
Expand Down Expand Up @@ -234,7 +235,10 @@ public void scaleTransactionCoordinators(int replicas) throws PulsarAdminExcepti

@Override
public CompletableFuture<Void> scaleTransactionCoordinatorsAsync(int replicas) {
checkArgument(replicas > 0, "Number of transaction coordinators must be more than 0");
if (replicas <= 0) {
return FutureUtil.failedFuture(
new IllegalArgumentException("Number of transaction coordinators must be more than 0"));
}
WebTarget path = adminV3Transactions.path("transactionCoordinator");
path = path.path("replicas");
return asyncPostRequest(path, Entity.entity(replicas, MediaType.APPLICATION_JSON));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,12 +620,16 @@ public CompletableFuture<Void> acknowledgeAsync(MessageId messageId,
Transaction txn) {
TransactionImpl txnImpl = null;
if (null != txn) {
checkArgument(txn instanceof TransactionImpl);
txnImpl = (TransactionImpl) txn;
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
if (!txnImpl.checkIfOpen(completableFuture)) {
return completableFuture;
}
if (txn instanceof TransactionImpl) {
txnImpl = (TransactionImpl) txn;
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
if (!txnImpl.checkIfOpen(completableFuture)) {
return completableFuture;
}
} else {
return FutureUtil.failedFuture(
new IllegalArgumentException("The txn must be an instance of TransactionImpl"));
}
}
return doAcknowledgeWithTxn(messageId, AckType.Individual, Collections.emptyMap(), txnImpl);
}
Expand All @@ -644,8 +648,12 @@ public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, T

TransactionImpl txnImpl = null;
if (null != txn) {
checkArgument(txn instanceof TransactionImpl);
txnImpl = (TransactionImpl) txn;
if (txn instanceof TransactionImpl) {
txnImpl = (TransactionImpl) txn;
} else {
return FutureUtil.failedFuture(
new IllegalArgumentException("The txn must be an instance of TransactionImpl"));
}
}
return doAcknowledgeWithTxn(messageId, AckType.Cumulative, Collections.emptyMap(), txnImpl);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,14 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
} else {
decreaseIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
resumeReceivingFromPausedConsumersIfNeeded();
result.complete(message);
if (message instanceof TopicMessageImpl) {
unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
resumeReceivingFromPausedConsumersIfNeeded();
result.complete(message);
} else {
result.completeExceptionally(new IllegalStateException(
"Received message is not a TopicMessageImpl: " + message.getClass().getName()));
}
}
});
return result;
Expand Down Expand Up @@ -1171,7 +1175,10 @@ private void handleSubscribeOneTopicError(String topicName,

// un-subscribe a given topic
public CompletableFuture<Void> unsubscribeAsync(String topicName) {
checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + topicName);
if (!TopicName.isValid(topicName)) {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidTopicNameException("Invalid topic name:" + topicName));
}

if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil.failedFuture(
Expand Down Expand Up @@ -1225,7 +1232,10 @@ public CompletableFuture<Void> unsubscribeAsync(String topicName) {

// Remove a consumer for a topic
public CompletableFuture<Void> removeConsumerAsync(String topicName) {
checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + topicName);
if (!TopicName.isValid(topicName)) {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidTopicNameException("Invalid topic name:" + topicName));
}

if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil.failedFuture(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,11 @@ CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transa
return completableFuture;
}
int partition = routerPolicy.choosePartition(message, topicMetadata);
checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(),
"Illegal partition index chosen by the message routing policy: " + partition);
if (partition < 0 || partition >= topicMetadata.numPartitions()) {
completableFuture.completeExceptionally(new IllegalArgumentException(
"Illegal partition index chosen by the message routing policy: " + partition));
return completableFuture;
}

if (conf.isLazyStartPartitionedProducers() && !producers.containsKey(partition)) {
final ProducerImpl<T> newProducer = createProducer(partition, Optional.ofNullable(overrideProducerName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ public Producer<T> create() throws PulsarClientException {
@Override
public CompletableFuture<Producer<T>> createAsync() {
// config validation
checkArgument(!(conf.isBatchingEnabled() && conf.isChunkingEnabled()),
"Batching and chunking of messages can't be enabled together");
if (conf.isBatchingEnabled() && conf.isChunkingEnabled()) {
return FutureUtil.failedFuture(
new IllegalArgumentException("Batching and chunking of messages can't be enabled together"));
}
if (conf.getTopicName() == null) {
return FutureUtil
.failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,11 @@ private ByteBuf applyCompression(ByteBuf payload) {
}

public void sendAsync(Message<?> message, SendCallback callback) {
checkArgument(message instanceof MessageImpl);
if (!(message instanceof MessageImpl)) {
callback.sendComplete(
new IllegalArgumentException("Invalid message type: " + message.getClass().getName()));
return;
}

if (!isValidProducerState(callback, message.getSequenceId())) {
return;
Expand Down