diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index d952f7d65743a..eae16907db135 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -3107,10 +3107,14 @@ private CompletableFuture createDefaultPartitionedTopi Optional policies) { final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName, policies); final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic(); - checkArgument(defaultNumPartitions > 0, - "Default number of partitions should be more than 0"); - checkArgument(maxPartitions <= 0 || defaultNumPartitions <= maxPartitions, - "Number of partitions should be less than or equal to " + maxPartitions); + if (defaultNumPartitions <= 0) { + return FutureUtil.failedFuture( + new IllegalArgumentException("Default number of partitions should be more than 0")); + } + if (maxPartitions > 0 && defaultNumPartitions > maxPartitions) { + return FutureUtil.failedFuture(new IllegalArgumentException( + "Number of partitions should be less than or equal to " + maxPartitions)); + } PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 59f0ef3b34763..739b1beabe91e 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -59,6 +59,7 @@ import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; import org.apache.pulsar.common.policies.data.TopicHashPositions; import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.common.util.FutureUtil; public class NamespacesImpl extends BaseResource implements Namespaces { @@ -182,7 +183,10 @@ public void createNamespace(String namespace, Policies policies) throws PulsarAd @Override public CompletableFuture createNamespaceAsync(String namespace, Policies policies) { NamespaceName ns = NamespaceName.get(namespace); - checkArgument(ns.isV2(), "Create namespace with policies is only supported on newer namespaces"); + if (!ns.isV2()) { + return FutureUtil.failedFuture(new IllegalArgumentException( + "Create namespace with policies is only supported on newer namespaces")); + } WebTarget path = namespacePath(ns); // For V2 API we pass full Policy class instance return asyncPutRequest(path, Entity.entity(policies, MediaType.APPLICATION_JSON)); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java index 76727cd1e0fc4..e4e8e551e3b79 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java @@ -32,6 +32,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.NonPersistentTopicStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.common.util.FutureUtil; public class NonPersistentTopicsImpl extends BaseResource implements NonPersistentTopics { @@ -51,7 +52,9 @@ public void createPartitionedTopic(String topic, int numPartitions) throws Pulsa @Override public CompletableFuture createPartitionedTopicAsync(String topic, int numPartitions) { - checkArgument(numPartitions > 0, "Number of partitions should be more than 0"); + if (numPartitions <= 0) { + return FutureUtil.failedFuture(new IllegalArgumentException("Number of partitions should be more than 0")); + } TopicName topicName = validateTopic(topic); WebTarget path = topicPath(topicName, "partitions"); return asyncPutRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON)); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 33d1cd1785827..15a66e0a0a926 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -92,6 +92,7 @@ import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.DateFormatter; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -329,7 +330,9 @@ public CompletableFuture createPartitionedTopicAsync(String topic, int num public CompletableFuture createPartitionedTopicAsync( String topic, int numPartitions, boolean createLocalTopicOnly, Map properties) { - checkArgument(numPartitions > 0, "Number of partitions should be more than 0"); + if (numPartitions <= 0) { + return FutureUtil.failedFuture(new IllegalArgumentException("Number of partitions must be more than 0")); + } TopicName tn = validateTopic(topic); WebTarget path = topicPath(tn, "partitions") .queryParam("createLocalTopicOnly", Boolean.toString(createLocalTopicOnly)); @@ -382,7 +385,9 @@ public CompletableFuture updatePartitionedTopicAsync(String topic, int num @Override public CompletableFuture updatePartitionedTopicAsync(String topic, int numPartitions, boolean updateLocalTopicOnly, boolean force) { - checkArgument(numPartitions > 0, "Number of partitions must be more than 0"); + if (numPartitions <= 0) { + return FutureUtil.failedFuture(new IllegalArgumentException("Number of partitions must be more than 0")); + } TopicName tn = validateTopic(topic); WebTarget path = topicPath(tn, "partitions"); path = path.queryParam("updateLocalTopicOnly", Boolean.toString(updateLocalTopicOnly)).queryParam("force", @@ -878,7 +883,9 @@ public List> peekMessages(String topic, String subName, int numM @Override public CompletableFuture>> peekMessagesAsync(String topic, String subName, int numMessages) { - checkArgument(numMessages > 0); + if (numMessages <= 0) { + return FutureUtil.failedFuture(new IllegalArgumentException("numMessages must be positive")); + } CompletableFuture>> future = new CompletableFuture>>(); peekMessagesAsync(topic, subName, numMessages, new ArrayList<>(), future, 1); return future; @@ -2742,8 +2749,12 @@ public void createShadowTopic(String shadowTopic, String sourceTopic, Map createShadowTopicAsync(String shadowTopic, String sourceTopic, Map properties) { - checkArgument(TopicName.get(shadowTopic).isPersistent(), "Shadow topic must be persistent"); - checkArgument(TopicName.get(sourceTopic).isPersistent(), "Source topic must be persistent"); + if (!TopicName.get(shadowTopic).isPersistent()) { + return FutureUtil.failedFuture(new IllegalArgumentException("Shadow topic must be persistent")); + } + if (!TopicName.get(sourceTopic).isPersistent()) { + return FutureUtil.failedFuture(new IllegalArgumentException("Source topic must be persistent")); + } return getPartitionedTopicMetadataAsync(sourceTopic).thenCompose(sourceTopicMeta -> { HashMap shadowProperties = new HashMap<>(); if (properties != null) { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java index 5693ebc8f60aa..5f69007141b17 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java @@ -41,6 +41,7 @@ import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats; import org.apache.pulsar.common.policies.data.TransactionPendingAckStats; import org.apache.pulsar.common.stats.PositionInPendingAckStats; +import org.apache.pulsar.common.util.FutureUtil; public class TransactionsImpl extends BaseResource implements Transactions { private final WebTarget adminV3Transactions; @@ -234,7 +235,10 @@ public void scaleTransactionCoordinators(int replicas) throws PulsarAdminExcepti @Override public CompletableFuture scaleTransactionCoordinatorsAsync(int replicas) { - checkArgument(replicas > 0, "Number of transaction coordinators must be more than 0"); + if (replicas <= 0) { + return FutureUtil.failedFuture( + new IllegalArgumentException("Number of transaction coordinators must be more than 0")); + } WebTarget path = adminV3Transactions.path("transactionCoordinator"); path = path.path("replicas"); return asyncPostRequest(path, Entity.entity(replicas, MediaType.APPLICATION_JSON)); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index e933005f2d6ea..c3d10db4dd488 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -620,12 +620,16 @@ public CompletableFuture acknowledgeAsync(MessageId messageId, Transaction txn) { TransactionImpl txnImpl = null; if (null != txn) { - checkArgument(txn instanceof TransactionImpl); - txnImpl = (TransactionImpl) txn; - CompletableFuture completableFuture = new CompletableFuture<>(); - if (!txnImpl.checkIfOpen(completableFuture)) { - return completableFuture; - } + if (txn instanceof TransactionImpl) { + txnImpl = (TransactionImpl) txn; + CompletableFuture completableFuture = new CompletableFuture<>(); + if (!txnImpl.checkIfOpen(completableFuture)) { + return completableFuture; + } + } else { + return FutureUtil.failedFuture( + new IllegalArgumentException("The txn must be an instance of TransactionImpl")); + } } return doAcknowledgeWithTxn(messageId, AckType.Individual, Collections.emptyMap(), txnImpl); } @@ -644,8 +648,12 @@ public CompletableFuture acknowledgeCumulativeAsync(MessageId messageId, T TransactionImpl txnImpl = null; if (null != txn) { - checkArgument(txn instanceof TransactionImpl); - txnImpl = (TransactionImpl) txn; + if (txn instanceof TransactionImpl) { + txnImpl = (TransactionImpl) txn; + } else { + return FutureUtil.failedFuture( + new IllegalArgumentException("The txn must be an instance of TransactionImpl")); + } } return doAcknowledgeWithTxn(messageId, AckType.Cumulative, Collections.emptyMap(), txnImpl); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index d0607b97c1893..937c4b85dfe21 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -430,10 +430,14 @@ protected CompletableFuture> internalReceiveAsync() { cancellationHandler.setCancelAction(() -> pendingReceives.remove(result)); } else { decreaseIncomingMessageSize(message); - checkState(message instanceof TopicMessageImpl); - unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount()); - resumeReceivingFromPausedConsumersIfNeeded(); - result.complete(message); + if (message instanceof TopicMessageImpl) { + unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount()); + resumeReceivingFromPausedConsumersIfNeeded(); + result.complete(message); + } else { + result.completeExceptionally(new IllegalStateException( + "Received message is not a TopicMessageImpl: " + message.getClass().getName())); + } } }); return result; @@ -1171,7 +1175,10 @@ private void handleSubscribeOneTopicError(String topicName, // un-subscribe a given topic public CompletableFuture unsubscribeAsync(String topicName) { - checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + topicName); + if (!TopicName.isValid(topicName)) { + return FutureUtil.failedFuture( + new PulsarClientException.InvalidTopicNameException("Invalid topic name:" + topicName)); + } if (getState() == State.Closing || getState() == State.Closed) { return FutureUtil.failedFuture( @@ -1225,7 +1232,10 @@ public CompletableFuture unsubscribeAsync(String topicName) { // Remove a consumer for a topic public CompletableFuture removeConsumerAsync(String topicName) { - checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + topicName); + if (!TopicName.isValid(topicName)) { + return FutureUtil.failedFuture( + new PulsarClientException.InvalidTopicNameException("Invalid topic name:" + topicName)); + } if (getState() == State.Closing || getState() == State.Closed) { return FutureUtil.failedFuture( diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index f780edc95c136..582018051ee95 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -229,8 +229,11 @@ CompletableFuture internalSendWithTxnAsync(Message message, Transa return completableFuture; } int partition = routerPolicy.choosePartition(message, topicMetadata); - checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(), - "Illegal partition index chosen by the message routing policy: " + partition); + if (partition < 0 || partition >= topicMetadata.numPartitions()) { + completableFuture.completeExceptionally(new IllegalArgumentException( + "Illegal partition index chosen by the message routing policy: " + partition)); + return completableFuture; + } if (conf.isLazyStartPartitionedProducers() && !producers.containsKey(partition)) { final ProducerImpl newProducer = createProducer(partition, Optional.ofNullable(overrideProducerName)); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java index ecbdfa76b64ae..3111082650f1d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java @@ -93,8 +93,10 @@ public Producer create() throws PulsarClientException { @Override public CompletableFuture> createAsync() { // config validation - checkArgument(!(conf.isBatchingEnabled() && conf.isChunkingEnabled()), - "Batching and chunking of messages can't be enabled together"); + if (conf.isBatchingEnabled() && conf.isChunkingEnabled()) { + return FutureUtil.failedFuture( + new IllegalArgumentException("Batching and chunking of messages can't be enabled together")); + } if (conf.getTopicName() == null) { return FutureUtil .failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder")); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 2192ebfb64e75..edfaef2fb8934 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -428,7 +428,11 @@ private ByteBuf applyCompression(ByteBuf payload) { } public void sendAsync(Message message, SendCallback callback) { - checkArgument(message instanceof MessageImpl); + if (!(message instanceof MessageImpl)) { + callback.sendComplete( + new IllegalArgumentException("Invalid message type: " + message.getClass().getName())); + return; + } if (!isValidProducerState(callback, message.getSequenceId())) { return;