diff --git a/pom.xml b/pom.xml
index 2aaca3ad35aee..ee2780caeb590 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,7 @@ flexible messaging model and an intuitive client API.
pulsar-client-api
pulsar-client
pulsar-client-shaded
+ pulsar-client-1x-base
pulsar-client-admin
pulsar-client-admin-shaded
pulsar-client-tools
diff --git a/pulsar-client-1x-base/pom.xml b/pulsar-client-1x-base/pom.xml
new file mode 100644
index 0000000000000..22a5b1b8e619e
--- /dev/null
+++ b/pulsar-client-1x-base/pom.xml
@@ -0,0 +1,40 @@
+
+
+ 4.0.0
+
+
+ org.apache.pulsar
+ pulsar
+ 2.3.0-SNAPSHOT
+ ..
+
+
+ pulsar-client-1x-base
+ Pulsar Client 1.x Compatibility Base
+ pom
+
+
+ pulsar-client-2x-shaded
+ pulsar-client-1x
+
+
diff --git a/pulsar-client-1x-base/pulsar-client-1x/pom.xml b/pulsar-client-1x-base/pulsar-client-1x/pom.xml
new file mode 100644
index 0000000000000..b6fd0b84e32bf
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/pom.xml
@@ -0,0 +1,52 @@
+
+
+ 4.0.0
+
+
+ org.apache.pulsar
+ pulsar-client-1x-base
+ 2.3.0-SNAPSHOT
+ ..
+
+
+ pulsar-client-1x
+ Pulsar Client 1.x Compatibility API
+
+
+
+ org.apache.pulsar
+ pulsar-client-2x-shaded
+ ${project.parent.version}
+
+
+
+ com.google.guava
+ guava
+
+
+
+ org.apache.commons
+ commons-lang3
+
+
+
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
new file mode 100644
index 0000000000000..e5bac914b7865
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+
+/**
+ * Class used to specify client side configuration like authentication, etc..
+ *
+ * @deprecated Use {@link PulsarClient#builder()} to construct and configure a new {@link PulsarClient} instance
+ */
+@Deprecated
+public class ClientConfiguration implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ClientConfigurationData confData = new ClientConfigurationData();
+
+ /**
+ * @return the authentication provider to be used
+ */
+ public Authentication getAuthentication() {
+ return confData.getAuthentication();
+ }
+
+ /**
+ * Set the authentication provider to use in the Pulsar client instance.
+ *
authParams)
+ throws UnsupportedAuthenticationException {
+ confData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParams));
+ }
+
+ /**
+ * @return the operation timeout in ms
+ */
+ public long getOperationTimeoutMs() {
+ return confData.getOperationTimeoutMs();
+ }
+
+ /**
+ * Set the operation timeout (default: 30 seconds)
+ *
+ * Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the
+ * operation will be marked as failed
+ *
+ * @param operationTimeout
+ * operation timeout
+ * @param unit
+ * time unit for {@code operationTimeout}
+ */
+ public void setOperationTimeout(int operationTimeout, TimeUnit unit) {
+ checkArgument(operationTimeout >= 0);
+ confData.setOperationTimeoutMs(unit.toMillis(operationTimeout));
+ }
+
+ /**
+ * @return the number of threads to use for handling connections
+ */
+ public int getIoThreads() {
+ return confData.getNumIoThreads();
+ }
+
+ /**
+ * Set the number of threads to be used for handling connections to brokers (default: 1 thread)
+ *
+ * @param numIoThreads
+ */
+ public void setIoThreads(int numIoThreads) {
+ checkArgument(numIoThreads > 0);
+ confData.setNumIoThreads(numIoThreads);
+ }
+
+ /**
+ * @return the number of threads to use for message listeners
+ */
+ public int getListenerThreads() {
+ return confData.getNumListenerThreads();
+ }
+
+ /**
+ * Set the number of threads to be used for message listeners (default: 1 thread)
+ *
+ * @param numListenerThreads
+ */
+ public void setListenerThreads(int numListenerThreads) {
+ checkArgument(numListenerThreads > 0);
+ confData.setNumListenerThreads(numListenerThreads);
+ }
+
+ /**
+ * @return the max number of connections per single broker
+ */
+ public int getConnectionsPerBroker() {
+ return confData.getConnectionsPerBroker();
+ }
+
+ /**
+ * Sets the max number of connection that the client library will open to a single broker.
+ *
+ * By default, the connection pool will use a single connection for all the producers and consumers. Increasing this
+ * parameter may improve throughput when using many producers over a high latency connection.
+ *
+ *
+ * @param connectionsPerBroker
+ * max number of connections per broker (needs to be greater than 0)
+ */
+ public void setConnectionsPerBroker(int connectionsPerBroker) {
+ checkArgument(connectionsPerBroker > 0, "Connections per broker need to be greater than 0");
+ confData.setConnectionsPerBroker(connectionsPerBroker);
+ }
+
+ /**
+ * @return whether TCP no-delay should be set on the connections
+ */
+ public boolean isUseTcpNoDelay() {
+ return confData.isUseTcpNoDelay();
+ }
+
+ /**
+ * Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm.
+ *
+ * No-delay features make sure packets are sent out on the network as soon as possible, and it's critical to achieve
+ * low latency publishes. On the other hand, sending out a huge number of small packets might limit the overall
+ * throughput, so if latency is not a concern, it's advisable to set the useTcpNoDelay
flag to false.
+ *
+ * Default value is true
+ *
+ * @param useTcpNoDelay
+ */
+ public void setUseTcpNoDelay(boolean useTcpNoDelay) {
+ confData.setUseTcpNoDelay(useTcpNoDelay);
+ }
+
+ /**
+ * @return whether TLS encryption is used on the connection
+ */
+ public boolean isUseTls() {
+ return confData.isUseTls();
+ }
+
+ /**
+ * Configure whether to use TLS encryption on the connection (default: false)
+ *
+ * @param useTls
+ */
+ public void setUseTls(boolean useTls) {
+ confData.setUseTls(useTls);
+ }
+
+ /**
+ * @return path to the trusted TLS certificate file
+ */
+ public String getTlsTrustCertsFilePath() {
+ return confData.getTlsTrustCertsFilePath();
+ }
+
+ /**
+ * Set the path to the trusted TLS certificate file
+ *
+ * @param tlsTrustCertsFilePath
+ */
+ public void setTlsTrustCertsFilePath(String tlsTrustCertsFilePath) {
+ confData.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
+ }
+
+ /**
+ * @return whether the Pulsar client accept untrusted TLS certificate from broker
+ */
+ public boolean isTlsAllowInsecureConnection() {
+ return confData.isTlsAllowInsecureConnection();
+ }
+
+ /**
+ * Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false)
+ *
+ * @param tlsAllowInsecureConnection
+ */
+ public void setTlsAllowInsecureConnection(boolean tlsAllowInsecureConnection) {
+ confData.setTlsAllowInsecureConnection(tlsAllowInsecureConnection);
+ }
+
+ /**
+ * Stats will be activated with positive statsIntervalSeconds
+ *
+ * @return the interval between each stat info (default: 60 seconds)
+ */
+ public long getStatsIntervalSeconds() {
+ return confData.getStatsIntervalSeconds();
+ }
+
+ /**
+ * Set the interval between each stat info (default: 60 seconds) Stats will be activated with positive
+ * statsIntervalSeconds It should be set to at least 1 second
+ *
+ * @param statsIntervalSeconds
+ * the interval between each stat info
+ * @param unit
+ * time unit for {@code statsInterval}
+ */
+ public void setStatsInterval(long statsInterval, TimeUnit unit) {
+ confData.setStatsIntervalSeconds(unit.toSeconds(statsInterval));
+ }
+
+ /**
+ * Get configured total allowed concurrent lookup-request.
+ *
+ * @return
+ */
+ public int getConcurrentLookupRequest() {
+ return confData.getConcurrentLookupRequest();
+ }
+
+ /**
+ * Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker.
+ * (default: 50000) It should be configured with higher value only in case of it requires to
+ * produce/subscribe on thousands of topic using created {@link PulsarClient}
+ *
+ * @param concurrentLookupRequest
+ */
+ public void setConcurrentLookupRequest(int concurrentLookupRequest) {
+ confData.setConcurrentLookupRequest(concurrentLookupRequest);
+ }
+
+ /**
+ * Get configured max number of reject-request in a time-frame (30 seconds) after which connection will be closed
+ *
+ * @return
+ */
+ public int getMaxNumberOfRejectedRequestPerConnection() {
+ return confData.getMaxNumberOfRejectedRequestPerConnection();
+ }
+
+ /**
+ * Set max number of broker-rejected requests in a certain time-frame (30 seconds) after which current connection
+ * will be closed and client creates a new connection that give chance to connect a different broker (default:
+ * 50)
+ *
+ * @param maxNumberOfRejectedRequestPerConnection
+ */
+ public void setMaxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection) {
+ confData.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection);
+ }
+
+ public boolean isTlsHostnameVerificationEnable() {
+ return confData.isTlsHostnameVerificationEnable();
+ }
+
+ /**
+ * It allows to validate hostname verification when client connects to broker over tls. It validates incoming x509
+ * certificate and matches provided hostname(CN/SAN) with expected broker's host name. It follows RFC 2818, 3.1.
+ * Server Identity hostname verification.
+ *
+ * @see rfc2818
+ *
+ * @param tlsHostnameVerificationEnable
+ */
+ public void setTlsHostnameVerificationEnable(boolean tlsHostnameVerificationEnable) {
+ confData.setTlsHostnameVerificationEnable(tlsHostnameVerificationEnable);
+ }
+
+ public ClientConfiguration setServiceUrl(String serviceUrl) {
+ confData.setServiceUrl(serviceUrl);
+ return this;
+ }
+
+ /**
+ * Set the duration of time to wait for a connection to a broker to be established. If the duration
+ * passes without a response from the broker, the connection attempt is dropped.
+ *
+ * @param duration the duration to wait
+ * @param unit the time unit in which the duration is defined
+ */
+ public void setConnectionTimeout(int duration, TimeUnit unit) {
+ confData.setConnectionTimeoutMs((int)unit.toMillis(duration));
+ }
+
+ /**
+ * Get the duration of time for which the client will wait for a connection to a broker to be
+ * established before giving up.
+ *
+ * @return the duration, in milliseconds
+ */
+ public long getConnectionTimeoutMs() {
+ return confData.getConnectionTimeoutMs();
+ }
+
+ public ClientConfigurationData getConfigurationData() {
+ return confData;
+ }
+
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java
new file mode 100644
index 0000000000000..00da0a4b7513d
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An interface that abstracts behavior of Pulsar's consumer.
+ */
+public interface Consumer extends Closeable {
+
+ /**
+ * Get a topic for the consumer
+ *
+ * @return topic for the consumer
+ */
+ String getTopic();
+
+ /**
+ * Get a subscription for the consumer
+ *
+ * @return subscription for the consumer
+ */
+ String getSubscription();
+
+ /**
+ * Unsubscribe the consumer
+ *
+ * This call blocks until the consumer is unsubscribed.
+ *
+ * @throws PulsarClientException
+ */
+ void unsubscribe() throws PulsarClientException;
+
+ /**
+ * Asynchronously unsubscribe the consumer
+ *
+ * @return {@link CompletableFuture} for this operation
+ */
+ CompletableFuture unsubscribeAsync();
+
+ /**
+ * Receives a single message.
+ *
+ * This calls blocks until a message is available.
+ *
+ * @return the received message
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the consumer was already closed
+ * @throws PulsarClientException.InvalidConfigurationException
+ * if a message listener was defined in the configuration
+ */
+ Message receive() throws PulsarClientException;
+
+ /**
+ * Receive a single message
+ *
+ * Retrieves a message when it will be available and completes {@link CompletableFuture} with received message.
+ *
+ *
+ * {@code receiveAsync()} should be called subsequently once returned {@code CompletableFuture} gets complete with
+ * received message. Else it creates backlog of receive requests in the application.
+ *
+ *
+ * @return {@link CompletableFuture}<{@link Message}> will be completed when message is available
+ */
+ CompletableFuture> receiveAsync();
+
+ /**
+ * Receive a single message
+ *
+ * Retrieves a message, waiting up to the specified wait time if necessary.
+ *
+ * @param timeout
+ * 0 or less means immediate rather than infinite
+ * @param unit
+ * @return the received {@link Message} or null if no message available before timeout
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the consumer was already closed
+ * @throws PulsarClientException.InvalidConfigurationException
+ * if a message listener was defined in the configuration
+ */
+ Message receive(int timeout, TimeUnit unit) throws PulsarClientException;
+
+ /**
+ * Acknowledge the consumption of a single message
+ *
+ * @param message
+ * The {@code Message} to be acknowledged
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the consumer was already closed
+ */
+ void acknowledge(Message> message) throws PulsarClientException;
+
+ /**
+ * Acknowledge the consumption of a single message, identified by its MessageId
+ *
+ * @param messageId
+ * The {@code MessageId} to be acknowledged
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the consumer was already closed
+ */
+ void acknowledge(MessageId messageId) throws PulsarClientException;
+
+ /**
+ * Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
+ *
+ * This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
+ * re-delivered to this consumer.
+ *
+ * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+ *
+ * It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered.
+ *
+ * @param message
+ * The {@code Message} to be cumulatively acknowledged
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the consumer was already closed
+ */
+ void acknowledgeCumulative(Message> message) throws PulsarClientException;
+
+ /**
+ * Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
+ *
+ * This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
+ * re-delivered to this consumer.
+ *
+ * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+ *
+ * It's equivalent to calling asyncAcknowledgeCumulative(MessageId) and waiting for the callback to be triggered.
+ *
+ * @param messageId
+ * The {@code MessageId} to be cumulatively acknowledged
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the consumer was already closed
+ */
+ void acknowledgeCumulative(MessageId messageId) throws PulsarClientException;
+
+ /**
+ * Asynchronously acknowledge the consumption of a single message
+ *
+ * @param message
+ * The {@code Message} to be acknowledged
+ * @return a future that can be used to track the completion of the operation
+ */
+ CompletableFuture acknowledgeAsync(Message> message);
+
+ /**
+ * Asynchronously acknowledge the consumption of a single message
+ *
+ * @param messageId
+ * The {@code MessageId} to be acknowledged
+ * @return a future that can be used to track the completion of the operation
+ */
+ CompletableFuture acknowledgeAsync(MessageId messageId);
+
+ /**
+ * Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided
+ * message.
+ *
+ * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+ *
+ * @param message
+ * The {@code Message} to be cumulatively acknowledged
+ * @return a future that can be used to track the completion of the operation
+ */
+ CompletableFuture acknowledgeCumulativeAsync(Message> message);
+
+ /**
+ * Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided
+ * message.
+ *
+ * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+ *
+ * @param messageId
+ * The {@code MessageId} to be cumulatively acknowledged
+ * @return a future that can be used to track the completion of the operation
+ */
+ CompletableFuture acknowledgeCumulativeAsync(MessageId messageId);
+
+ /**
+ * Get statistics for the consumer.
+ *
+ *
+ * - numMsgsReceived : Number of messages received in the current interval
+ *
- numBytesReceived : Number of bytes received in the current interval
+ *
- numReceiveFailed : Number of messages failed to receive in the current interval
+ *
- numAcksSent : Number of acks sent in the current interval
+ *
- numAcksFailed : Number of acks failed to send in the current interval
+ *
- totalMsgsReceived : Total number of messages received
+ *
- totalBytesReceived : Total number of bytes received
+ *
- totalReceiveFailed : Total number of messages failed to receive
+ *
- totalAcksSent : Total number of acks sent
+ *
- totalAcksFailed : Total number of acks failed to sent
+ *
+ *
+ * @return statistic for the consumer
+ */
+ ConsumerStats getStats();
+
+ /**
+ * Close the consumer and stop the broker to push more messages.
+ */
+ @Override
+ void close() throws PulsarClientException;
+
+ /**
+ * Asynchronously close the consumer and stop the broker to push more messages
+ *
+ * @return a future that can be used to track the completion of the operation
+ */
+ CompletableFuture closeAsync();
+
+ /**
+ * Return true if the topic was terminated and this consumer has already consumed all the messages in the topic.
+ *
+ * Please note that this does not simply mean that the consumer is caught up with the last message published by
+ * producers, rather the topic needs to be explicitly "terminated".
+ */
+ boolean hasReachedEndOfTopic();
+
+ /**
+ * Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
+ * active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
+ * the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
+ * breaks, the messages are redelivered after reconnect.
+ */
+ void redeliverUnacknowledgedMessages();
+
+ /**
+ * Reset the subscription associated with this consumer to a specific message id.
+ *
+ *
+ * The message id can either be a specific message or represent the first or last messages in the topic.
+ *
+ *
+ * MessageId.earliest
: Reset the subscription on the earliest message available in the topic
+ * MessageId.latest
: Reset the subscription on the latest message in the topic
+ *
+ *
+ * Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
+ * the individual partitions.
+ *
+ * @param messageId
+ * the message id where to reposition the subscription
+ */
+ void seek(MessageId messageId) throws PulsarClientException;
+
+ /**
+ * Reset the subscription associated with this consumer to a specific message id.
+ *
+ *
+ * The message id can either be a specific message or represent the first or last messages in the topic.
+ *
+ *
+ * MessageId.earliest
: Reset the subscription on the earliest message available in the topic
+ * MessageId.latest
: Reset the subscription on the latest message in the topic
+ *
+ *
+ * Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
+ * the individual partitions.
+ *
+ * @param messageId
+ * the message id where to reposition the subscription
+ * @return a future to track the completion of the seek operation
+ */
+ CompletableFuture seekAsync(MessageId messageId);
+
+ /**
+ * @return Whether the consumer is connected to the broker
+ */
+ boolean isConnected();
+
+ /**
+ * Get the name of consumer.
+ * @return consumer name.
+ */
+ String getConsumerName();
+
+ /**
+ * Stop requesting new messages from the broker until {@link #resume()} is called. Note that this might cause
+ * {@link #receive()} to block until {@link #resume()} is called and new messages are pushed by the broker.
+ */
+ void pause();
+
+ /**
+ * Resume requesting messages from the broker.
+ */
+ void resume();
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
new file mode 100644
index 0000000000000..5ffba38ffceeb
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+/**
+ * Class specifying the configuration of a consumer. In Exclusive subscription, only a single consumer is allowed to
+ * attach to the subscription. Other consumers will get an error message. In Shared subscription, multiple consumers
+ * will be able to use the same subscription name and the messages will be dispatched in a round robin fashion.
+ *
+ * @deprecated Use {@link PulsarClient#newConsumer} to build and configure a {@link Consumer} instance
+ */
+@Deprecated
+public class ConsumerConfiguration implements Serializable {
+
+ /**
+ * Resend shouldn't be requested before minAckTimeoutMillis.
+ */
+ static long minAckTimeoutMillis = 1000;
+
+ private static final long serialVersionUID = 1L;
+
+ private final ConsumerConfigurationData conf = new ConsumerConfigurationData<>();
+
+ private boolean initializeSubscriptionOnLatest = true;
+
+ public ConsumerConfiguration() {
+ // Disable acknowledgment grouping when using v1 API
+ conf.setAcknowledgementsGroupTimeMicros(0);
+ }
+
+ /**
+ * @return the configured timeout in milliseconds for unacked messages.
+ */
+ public long getAckTimeoutMillis() {
+ return conf.getAckTimeoutMillis();
+ }
+
+ /**
+ * Set the timeout for unacked messages, truncated to the nearest millisecond. The timeout needs to be greater than
+ * 10 seconds.
+ *
+ * @param ackTimeout
+ * for unacked messages.
+ * @param timeUnit
+ * unit in which the timeout is provided.
+ * @return {@link ConsumerConfiguration}
+ */
+ public ConsumerConfiguration setAckTimeout(long ackTimeout, TimeUnit timeUnit) {
+ long ackTimeoutMillis = timeUnit.toMillis(ackTimeout);
+ checkArgument(ackTimeoutMillis >= minAckTimeoutMillis,
+ "Ack timeout should be should be greater than " + minAckTimeoutMillis + " ms");
+ conf.setAckTimeoutMillis(timeUnit.toMillis(ackTimeout));
+ return this;
+ }
+
+ /**
+ * @return the configured subscription type
+ */
+ public SubscriptionType getSubscriptionType() {
+ return conf.getSubscriptionType();
+ }
+
+ /**
+ * Select the subscription type to be used when subscribing to the topic.
+ *
+ * Default is {@link SubscriptionType#Exclusive}
+ *
+ * @param subscriptionType
+ * the subscription type value
+ */
+ public ConsumerConfiguration setSubscriptionType(SubscriptionType subscriptionType) {
+ checkNotNull(subscriptionType);
+ conf.setSubscriptionType(subscriptionType);
+ return this;
+ }
+
+ /**
+ * @return the configured {@link MessageListener} for the consumer
+ */
+ public MessageListener getMessageListener() {
+ return conf.getMessageListener();
+ }
+
+ /**
+ * Sets a {@link MessageListener} for the consumer
+ *
+ * When a {@link MessageListener} is set, application will receive messages through it. Calls to
+ * {@link Consumer#receive()} will not be allowed.
+ *
+ * @param messageListener
+ * the listener object
+ */
+ public ConsumerConfiguration setMessageListener(MessageListener messageListener) {
+ checkNotNull(messageListener);
+ conf.setMessageListener(messageListener);
+ return this;
+ }
+
+ /**
+ * @return this configured {@link ConsumerEventListener} for the consumer.
+ * @see #setConsumerEventListener(ConsumerEventListener)
+ * @since 2.0
+ */
+ public ConsumerEventListener getConsumerEventListener() {
+ return conf.getConsumerEventListener();
+ }
+
+ /**
+ * Sets a {@link ConsumerEventListener} for the consumer.
+ *
+ *
+ * The consumer group listener is used for receiving consumer state change in a consumer group for failover
+ * subscription. Application can then react to the consumer state changes.
+ *
+ *
+ * This change is experimental. It is subject to changes coming in release 2.0.
+ *
+ * @param listener
+ * the consumer group listener object
+ * @return consumer configuration
+ * @since 2.0
+ */
+ public ConsumerConfiguration setConsumerEventListener(ConsumerEventListener listener) {
+ checkNotNull(listener);
+ conf.setConsumerEventListener(listener);
+ return this;
+ }
+
+ /**
+ * @return the configure receiver queue size value
+ */
+ public int getReceiverQueueSize() {
+ return conf.getReceiverQueueSize();
+ }
+
+ /**
+ * @return the configured max total receiver queue size across partitions
+ */
+ public int getMaxTotalReceiverQueueSizeAcrossPartitions() {
+ return conf.getMaxTotalReceiverQueueSizeAcrossPartitions();
+ }
+
+ /**
+ * Set the max total receiver queue size across partitons.
+ *
+ * This setting will be used to reduce the receiver queue size for individual partitions
+ * {@link #setReceiverQueueSize(int)} if the total exceeds this value (default: 50000).
+ *
+ * @param maxTotalReceiverQueueSizeAcrossPartitions
+ */
+ public void setMaxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions) {
+ checkArgument(maxTotalReceiverQueueSizeAcrossPartitions >= conf.getReceiverQueueSize());
+ conf.setMaxTotalReceiverQueueSizeAcrossPartitions(maxTotalReceiverQueueSizeAcrossPartitions);
+ }
+
+ /**
+ * @return the CryptoKeyReader
+ */
+ public CryptoKeyReader getCryptoKeyReader() {
+ return conf.getCryptoKeyReader();
+ }
+
+ /**
+ * Sets a {@link CryptoKeyReader}
+ *
+ * @param cryptoKeyReader
+ * CryptoKeyReader object
+ */
+ public ConsumerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+ checkNotNull(cryptoKeyReader);
+ conf.setCryptoKeyReader(cryptoKeyReader);
+ return this;
+ }
+
+ /**
+ * Sets the ConsumerCryptoFailureAction to the value specified
+ *
+ * @param action
+ * consumer action
+ */
+ public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
+ conf.setCryptoFailureAction(action);
+ }
+
+ /**
+ * @return The ConsumerCryptoFailureAction
+ */
+ public ConsumerCryptoFailureAction getCryptoFailureAction() {
+ return conf.getCryptoFailureAction();
+ }
+
+ /**
+ * Sets the size of the consumer receive queue.
+ *
+ * The consumer receive queue controls how many messages can be accumulated by the {@link Consumer} before the
+ * application calls {@link Consumer#receive()}. Using a higher value could potentially increase the consumer
+ * throughput at the expense of bigger memory utilization.
+ *
+ *
+ * Setting the consumer queue size as zero
+ *
+ * - Decreases the throughput of the consumer, by disabling pre-fetching of messages. This approach improves the
+ * message distribution on shared subscription, by pushing messages only to the consumers that are ready to process
+ * them. Neither {@link Consumer#receive(int, TimeUnit)} nor Partitioned Topics can be used if the consumer queue
+ * size is zero. {@link Consumer#receive()} function call should not be interrupted when the consumer queue size is
+ * zero.
+ * - Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer connection with
+ * broker and {@link Consumer#receive()} call will remain blocked while {@link Consumer#receiveAsync()} receives
+ * exception in callback. consumer will not be able receive any further message unless batch-message in pipeline
+ * is removed
+ *
+ *
+ * Default value is {@code 1000} messages and should be good for most use cases.
+ *
+ * @param receiverQueueSize
+ * the new receiver queue size value
+ */
+ public ConsumerConfiguration setReceiverQueueSize(int receiverQueueSize) {
+ checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be negative");
+ conf.setReceiverQueueSize(receiverQueueSize);
+ return this;
+ }
+
+ /**
+ * @return the consumer name
+ */
+ public String getConsumerName() {
+ return conf.getConsumerName();
+ }
+
+ /**
+ * Set the consumer name.
+ *
+ * @param consumerName
+ */
+ public ConsumerConfiguration setConsumerName(String consumerName) {
+ checkArgument(consumerName != null && !consumerName.equals(""));
+ conf.setConsumerName(consumerName);
+ return this;
+ }
+
+ public int getPriorityLevel() {
+ return conf.getPriorityLevel();
+ }
+
+ /**
+ * Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching
+ * messages. Here, broker follows descending priorities. (eg: 0=max-priority, 1, 2,..)
+ * In Shared subscription mode, broker will first dispatch messages to max priority-level consumers if they have
+ * permits, else broker will consider next priority level consumers.
+ * If subscription has consumer-A with priorityLevel 0 and Consumer-B with priorityLevel 1 then broker will dispatch
+ * messages to only consumer-A until it runs out permit and then broker starts dispatching messages to Consumer-B.
+ *
+ *
+ * Consumer PriorityLevel Permits
+ * C1 0 2
+ * C2 0 1
+ * C3 0 1
+ * C4 1 2
+ * C5 1 1
+ * Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4
+ *
+ *
+ * @param priorityLevel
+ */
+ public void setPriorityLevel(int priorityLevel) {
+ conf.setPriorityLevel(priorityLevel);
+ }
+
+ public boolean getReadCompacted() {
+ return conf.isReadCompacted();
+ }
+
+ /**
+ * If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
+ * of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for
+ * each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
+ * point, the messages will be sent as normal.
+ *
+ * readCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
+ * failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
+ * shared subscription, will lead to the subscription call throwing a PulsarClientException.
+ *
+ * @param readCompacted
+ * whether to read from the compacted topic
+ */
+ public ConsumerConfiguration setReadCompacted(boolean readCompacted) {
+ conf.setReadCompacted(readCompacted);
+ return this;
+ }
+
+ /**
+ * Set a name/value property with this consumer.
+ *
+ * @param key
+ * @param value
+ * @return
+ */
+ public ConsumerConfiguration setProperty(String key, String value) {
+ checkArgument(key != null);
+ checkArgument(value != null);
+ conf.getProperties().put(key, value);
+ return this;
+ }
+
+ /**
+ * Add all the properties in the provided map
+ *
+ * @param properties
+ * @return
+ */
+ public ConsumerConfiguration setProperties(Map properties) {
+ conf.getProperties().putAll(properties);
+ return this;
+ }
+
+ public Map getProperties() {
+ return conf.getProperties();
+ }
+
+ public ConsumerConfigurationData getConfigurationData() {
+ return conf;
+ }
+
+ /**
+ * @param subscriptionInitialPosition the initial position at which to set
+ * set cursor when subscribing to the topic first time
+ * Default is {@value InitialPosition.Latest}
+ */
+ public ConsumerConfiguration setSubscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition) {
+ conf.setSubscriptionInitialPosition(subscriptionInitialPosition);
+ return this;
+ }
+
+ /**
+ * @return the configured {@link subscriptionInitailPosition} for the consumer
+ */
+ public SubscriptionInitialPosition getSubscriptionInitialPosition(){
+ return conf.getSubscriptionInitialPosition();
+ }
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java
new file mode 100644
index 0000000000000..4a9b99f87a0b4
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pulsar.client.impl.MessageBuilderImpl;
+
+/**
+ * Message builder factory. Use this class to create messages to be send to the Pulsar producer
+ *
+ * @deprecated since 2.0. Use {@link TypedMessageBuilder} as returned by {@link Producer#newMessage()} to create a new
+ * message builder.
+ */
+@Deprecated
+public interface MessageBuilder {
+
+ static MessageBuilder create() {
+ return new MessageBuilderImpl();
+ }
+
+ /**
+ * Finalize the immutable message
+ *
+ * @return a {@link Message} ready to be sent through a {@link Producer}
+ */
+ Message build();
+
+ /**
+ * Set the content of the message
+ *
+ * @param data
+ * array containing the payload
+ */
+ MessageBuilder setContent(byte[] data);
+
+ /**
+ * Set the content of the message
+ *
+ * @param data
+ * array containing the payload
+ * @param offset
+ * offset into the data array
+ * @param length
+ * length of the payload starting from the above offset
+ */
+ MessageBuilder setContent(byte[] data, int offset, int length);
+
+ /**
+ * Set the content of the message
+ *
+ * @param buf
+ * a {@link ByteBuffer} with the payload of the message
+ */
+ MessageBuilder setContent(ByteBuffer buf);
+
+ /**
+ * Sets a new property on a message.
+ *
+ * @param name
+ * the name of the property
+ * @param value
+ * the associated value
+ */
+ MessageBuilder setProperty(String name, String value);
+
+ /**
+ * Add all the properties in the provided map
+ */
+ MessageBuilder setProperties(Map properties);
+
+ /**
+ * Sets the key of the message for routing policy
+ *
+ * @param key
+ */
+ MessageBuilder setKey(String key);
+
+ /**
+ * Set the event time for a given message.
+ *
+ *
+ * Applications can retrieve the event time by calling {@link Message#getEventTime()}.
+ *
+ *
+ * Note: currently pulsar doesn't support event-time based index. so the subscribers can't seek the messages by
+ * event time.
+ *
+ * @since 1.20.0
+ */
+ MessageBuilder setEventTime(long timestamp);
+
+ /**
+ * Specify a custom sequence id for the message being published.
+ *
+ * The sequence id can be used for deduplication purposes and it needs to follow these rules:
+ *
+ * sequenceId >= 0
+ * - Sequence id for a message needs to be greater than sequence id for earlier messages:
+ *
sequenceId(N+1) > sequenceId(N)
+ * - It's not necessary for sequence ids to be consecutive. There can be holes between messages. Eg. the
+ *
sequenceId
could represent an offset or a cumulative size.
+ *
+ *
+ * @param sequenceId
+ * the sequence id to assign to the current message
+ * @since 1.20.0
+ */
+ MessageBuilder setSequenceId(long sequenceId);
+
+ /**
+ * Override the replication clusters for this message.
+ *
+ * @param clusters
+ */
+ MessageBuilder setReplicationClusters(List clusters);
+
+ /**
+ * Disable replication for this message.
+ */
+ MessageBuilder disableReplication();
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java
new file mode 100644
index 0000000000000..4d69668850765
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Producer object.
+ *
+ * The producer is used to publish messages on a topic
+ *
+ *
+ */
+public interface Producer extends Closeable {
+
+ /**
+ * @return the topic which producer is publishing to
+ */
+ String getTopic();
+
+ /**
+ * @return the producer name which could have been assigned by the system or specified by the client
+ */
+ String getProducerName();
+
+ /**
+ * Sends a message.
+ *
+ * This call will be blocking until is successfully acknowledged by the Pulsar broker.
+ *
+ * Use {@link #newMessage()} to specify more properties than just the value on the message to be sent.
+ *
+ * @param message
+ * a message
+ * @return the message id assigned to the published message
+ * @throws PulsarClientException.TimeoutException
+ * if the message was not correctly received by the system within the timeout period
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the producer was already closed
+ */
+ MessageId send(byte[] message) throws PulsarClientException;
+
+ /**
+ * Send a message asynchronously
+ *
+ * When the producer queue is full, by default this method will complete the future with an exception
+ * {@link PulsarClientException.ProducerQueueIsFullError}
+ *
+ * See {@link ProducerBuilder#maxPendingMessages(int)} to configure the producer queue size and
+ * {@link ProducerBuilder#blockIfQueueFull(boolean)} to change the blocking behavior.
+ *
+ * Use {@link #newMessage()} to specify more properties than just the value on the message to be sent.
+ *
+ * @param message
+ * a byte array with the payload of the message
+ * @return a future that can be used to track when the message will have been safely persisted
+ */
+ CompletableFuture sendAsync(byte[] message);
+
+ /**
+ * Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
+ *
+ * @throws PulsarClientException
+ * @since 2.1.0
+ * @see #flushAsync()
+ */
+ void flush() throws PulsarClientException;
+
+ /**
+ * Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
+ *
+ * @return a future that can be used to track when all the messages have been safely persisted.
+ * @since 2.1.0
+ * @see #flush()
+ */
+ CompletableFuture flushAsync();
+
+ /**
+ * Send a message
+ *
+ * @param message
+ * a message
+ * @return the message id assigned to the published message
+ * @throws PulsarClientException.TimeoutException
+ * if the message was not correctly received by the system within the timeout period
+ *
+ * @deprecated since 2.0. Use {@link TypedMessageBuilder} as returned by {@link Producer#newMessage()} to create a
+ * new message builder.
+ */
+ @Deprecated
+ MessageId send(Message message) throws PulsarClientException;
+
+ /**
+ * Send a message asynchronously
+ *
+ * When the returned {@link CompletableFuture} is marked as completed successfully, the provided message will
+ * contain the {@link MessageId} assigned by the broker to the published message.
+ *
+ * Example:
+ *
+ *
+ * Message msg = MessageBuilder.create().setContent(myContent).build();
+ * producer.sendAsync(msg).thenRun(v -> {
+ * System.out.println("Published message: " + msg.getMessageId());
+ * }).exceptionally(e -> {
+ * // Failed to publish
+ * });
+ *
+ *
+ * When the producer queue is full, by default this method will complete the future with an exception
+ * {@link PulsarClientException.ProducerQueueIsFullError}
+ *
+ * See {@link ProducerBuilder#maxPendingMessages(int)} to configure the producer queue size and
+ * {@link ProducerBuilder#blockIfQueueFull(boolean)} to change the blocking behavior.
+ *
+ * @param message
+ * a message
+ * @return a future that can be used to track when the message will have been safely persisted
+ * @deprecated since 2.0. Use {@link TypedMessageBuilder} as returned by {@link Producer#newMessage()} to create a
+ * new message builder.
+ */
+ @Deprecated
+ CompletableFuture sendAsync(Message message);
+
+ /**
+ * Get the last sequence id that was published by this producer.
+ *
+ * This represent either the automatically assigned or custom sequence id (set on the {@link MessageBuilder}) that
+ * was published and acknowledged by the broker.
+ *
+ * After recreating a producer with the same producer name, this will return the last message that was published in
+ * the previous producer session, or -1 if there no message was ever published.
+ *
+ * @return the last sequence id published by this producer
+ */
+ long getLastSequenceId();
+
+ /**
+ * Get statistics for the producer
+ *
+ *
+ * - numMsgsSent : Number of messages sent in the current interval
+ *
- numBytesSent : Number of bytes sent in the current interval
+ *
- numSendFailed : Number of messages failed to send in the current interval
+ *
- numAcksReceived : Number of acks received in the current interval
+ *
- totalMsgsSent : Total number of messages sent
+ *
- totalBytesSent : Total number of bytes sent
+ *
- totalSendFailed : Total number of messages failed to send
+ *
- totalAcksReceived: Total number of acks received
+ *
+ *
+ * @return statistic for the producer or null if ProducerStatsRecorderImpl is disabled.
+ */
+ ProducerStats getStats();
+
+ /**
+ * Close the producer and releases resources allocated.
+ *
+ * No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
+ * of errors, pending writes will not be retried.
+ *
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the producer was already closed
+ */
+ @Override
+ void close() throws PulsarClientException;
+
+ /**
+ * Close the producer and releases resources allocated.
+ *
+ * No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
+ * of errors, pending writes will not be retried.
+ *
+ * @return a future that can used to track when the producer has been closed
+ */
+ CompletableFuture closeAsync();
+
+ /**
+ * @return Whether the producer is connected to the broker
+ */
+ boolean isConnected();
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
new file mode 100644
index 0000000000000..1af077be9cade
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+
+import lombok.EqualsAndHashCode;
+
+/**
+ * Producer's configuration
+ *
+ * @deprecated use {@link PulsarClient#newProducer()} to construct and configure a {@link Producer} instance
+ */
+@Deprecated
+@EqualsAndHashCode
+public class ProducerConfiguration implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ProducerConfigurationData conf = new ProducerConfigurationData();
+
+ @Deprecated
+ public enum MessageRoutingMode {
+ SinglePartition, RoundRobinPartition, CustomPartition
+ }
+
+ @Deprecated
+ public enum HashingScheme {
+ JavaStringHash, Murmur3_32Hash
+ }
+
+ /**
+ * @return the configured custom producer name or null if no custom name was specified
+ * @since 1.20.0
+ */
+ public String getProducerName() {
+ return conf.getProducerName();
+ }
+
+ /**
+ * Specify a name for the producer
+ *
+ * If not assigned, the system will generate a globally unique name which can be access with
+ * {@link Producer#getProducerName()}.
+ *
+ * When specifying a name, it is app to the user to ensure that, for a given topic, the producer name is unique
+ * across all Pulsar's clusters.
+ *
+ * If a producer with the same name is already connected to a particular topic, the
+ * {@link PulsarClient#createProducer(String)} operation will fail with {@link ProducerBusyException}.
+ *
+ * @param producerName
+ * the custom name to use for the producer
+ * @since 1.20.0
+ */
+ public void setProducerName(String producerName) {
+ conf.setProducerName(producerName);
+ }
+
+ /**
+ * @return the message send timeout in ms
+ */
+ public long getSendTimeoutMs() {
+ return conf.getSendTimeoutMs();
+ }
+
+ /**
+ * Set the send timeout (default: 30 seconds)
+ *
+ * If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
+ *
+ * @param sendTimeout
+ * the send timeout
+ * @param unit
+ * the time unit of the {@code sendTimeout}
+ */
+ public ProducerConfiguration setSendTimeout(int sendTimeout, TimeUnit unit) {
+ checkArgument(sendTimeout >= 0);
+ conf.setSendTimeoutMs(unit.toMillis(sendTimeout));
+ return this;
+ }
+
+ /**
+ * @return the maximum number of messages allowed in the outstanding messages queue for the producer
+ */
+ public int getMaxPendingMessages() {
+ return conf.getMaxPendingMessages();
+ }
+
+ /**
+ * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
+ *
+ * When the queue is full, by default, all calls to {@link Producer#send} and {@link Producer#sendAsync} will fail
+ * unless blockIfQueueFull is set to true. Use {@link #setBlockIfQueueFull} to change the blocking behavior.
+ *
+ * @param maxPendingMessages
+ * @return
+ */
+ public ProducerConfiguration setMaxPendingMessages(int maxPendingMessages) {
+ checkArgument(maxPendingMessages > 0);
+ conf.setMaxPendingMessages(maxPendingMessages);
+ return this;
+ }
+
+ public HashingScheme getHashingScheme() {
+ return HashingScheme.valueOf(conf.getHashingScheme().toString());
+ }
+
+ public ProducerConfiguration setHashingScheme(HashingScheme hashingScheme) {
+ conf.setHashingScheme(org.apache.pulsar.client.api.HashingScheme.valueOf(hashingScheme.toString()));
+ return this;
+ }
+
+ /**
+ *
+ * @return the maximum number of pending messages allowed across all the partitions
+ */
+ public int getMaxPendingMessagesAcrossPartitions() {
+ return conf.getMaxPendingMessagesAcrossPartitions();
+ }
+
+ /**
+ * Set the number of max pending messages across all the partitions
+ *
+ * This setting will be used to lower the max pending messages for each partition
+ * ({@link #setMaxPendingMessages(int)}), if the total exceeds the configured value.
+ *
+ * @param maxPendingMessagesAcrossPartitions
+ */
+ public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
+ checkArgument(maxPendingMessagesAcrossPartitions >= conf.getMaxPendingMessages());
+ conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
+ }
+
+ /**
+ *
+ * @return whether the producer will block {@link Producer#send} and {@link Producer#sendAsync} operations when the
+ * pending queue is full
+ */
+ public boolean getBlockIfQueueFull() {
+ return conf.isBlockIfQueueFull();
+ }
+
+ /**
+ * Set whether the {@link Producer#send} and {@link Producer#sendAsync} operations should block when the outgoing
+ * message queue is full.
+ *
+ * Default is false
. If set to false
, send operations will immediately fail with
+ * {@link PulsarClientException.ProducerQueueIsFullError} when there is no space left in pending queue.
+ *
+ * @param blockIfQueueFull
+ * whether to block {@link Producer#send} and {@link Producer#sendAsync} operations on queue full
+ * @return
+ */
+ public ProducerConfiguration setBlockIfQueueFull(boolean blockIfQueueFull) {
+ conf.setBlockIfQueueFull(blockIfQueueFull);
+ return this;
+ }
+
+ /**
+ * Set the message routing mode for the partitioned producer.
+ *
+ * @param messageRouteMode message routing mode.
+ * @return producer configuration
+ * @see MessageRoutingMode
+ */
+ public ProducerConfiguration setMessageRoutingMode(MessageRoutingMode messageRouteMode) {
+ checkNotNull(messageRouteMode);
+ conf.setMessageRoutingMode(
+ org.apache.pulsar.client.api.MessageRoutingMode.valueOf(messageRouteMode.toString()));
+ return this;
+ }
+
+ /**
+ * Get the message routing mode for the partitioned producer.
+ *
+ * @return message routing mode, default is round-robin routing.
+ * @see MessageRoutingMode#RoundRobinPartition
+ */
+ public MessageRoutingMode getMessageRoutingMode() {
+ return MessageRoutingMode.valueOf(conf.getMessageRoutingMode().toString());
+ }
+
+ /**
+ * Set the compression type for the producer.
+ *
+ * By default, message payloads are not compressed. Supported compression types are:
+ *
+ * CompressionType.LZ4
+ * CompressionType.ZLIB
+ *
+ *
+ * @param compressionType
+ * @return
+ *
+ * @since 1.0.28
+ * Make sure all the consumer applications have been updated to use this client version, before starting to
+ * compress messages.
+ */
+ public ProducerConfiguration setCompressionType(CompressionType compressionType) {
+ conf.setCompressionType(compressionType);
+ return this;
+ }
+
+ /**
+ * @return the configured compression type for this producer
+ */
+ public CompressionType getCompressionType() {
+ return conf.getCompressionType();
+ }
+
+ /**
+ * Set a custom message routing policy by passing an implementation of MessageRouter
+ *
+ *
+ * @param messageRouter
+ */
+ public ProducerConfiguration setMessageRouter(MessageRouter messageRouter) {
+ checkNotNull(messageRouter);
+ setMessageRoutingMode(MessageRoutingMode.CustomPartition);
+ conf.setCustomMessageRouter(messageRouter);
+ return this;
+ }
+
+ /**
+ * Get the message router set by {@link #setMessageRouter(MessageRouter)}.
+ *
+ * @return message router.
+ * @deprecated since 1.22.0-incubating. numPartitions is already passed as parameter in
+ * {@link MessageRouter#choosePartition(Message, TopicMetadata)}.
+ * @see MessageRouter
+ */
+ @Deprecated
+ public MessageRouter getMessageRouter(int numPartitions) {
+ return conf.getCustomMessageRouter();
+ }
+
+ /**
+ * Get the message router set by {@link #setMessageRouter(MessageRouter)}.
+ *
+ * @return message router set by {@link #setMessageRouter(MessageRouter)}.
+ */
+ public MessageRouter getMessageRouter() {
+ return conf.getCustomMessageRouter();
+ }
+
+ /**
+ * Return the flag whether automatic message batching is enabled or not.
+ *
+ * @return true if batch messages are enabled. otherwise false.
+ * @since 2.0.0
+ * It is enabled by default.
+ */
+ public boolean getBatchingEnabled() {
+ return conf.isBatchingEnabled();
+ }
+
+ /**
+ * Control whether automatic batching of messages is enabled for the producer. default: false [No batching]
+ *
+ * When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
+ * broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
+ * messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or
+ * contents.
+ *
+ * When enabled default batch delay is set to 10 ms and default batch size is 1000 messages
+ *
+ * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit)
+ * @since 1.0.36
+ * Make sure all the consumer applications have been updated to use this client version, before starting to
+ * batch messages.
+ *
+ */
+ public ProducerConfiguration setBatchingEnabled(boolean batchMessagesEnabled) {
+ conf.setBatchingEnabled(batchMessagesEnabled);
+ return this;
+ }
+
+ /**
+ * @return the CryptoKeyReader
+ */
+ public CryptoKeyReader getCryptoKeyReader() {
+ return conf.getCryptoKeyReader();
+ }
+
+ /**
+ * Sets a {@link CryptoKeyReader}
+ *
+ * @param cryptoKeyReader
+ * CryptoKeyReader object
+ */
+ public ProducerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+ checkNotNull(cryptoKeyReader);
+ conf.setCryptoKeyReader(cryptoKeyReader);
+ return this;
+ }
+
+ /**
+ *
+ * @return encryptionKeys
+ *
+ */
+ public Set getEncryptionKeys() {
+ return conf.getEncryptionKeys();
+ }
+
+ /**
+ *
+ * Returns true if encryption keys are added
+ *
+ */
+ public boolean isEncryptionEnabled() {
+ return conf.isEncryptionEnabled();
+ }
+
+ /**
+ * Add public encryption key, used by producer to encrypt the data key.
+ *
+ * At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. If keys are
+ * found, a callback getKey(String keyName) is invoked against each key to load the values of the key. Application
+ * should implement this callback to return the key in pkcs8 format. If compression is enabled, message is encrypted
+ * after compression. If batch messaging is enabled, the batched message is encrypted.
+ *
+ */
+ public void addEncryptionKey(String key) {
+ conf.getEncryptionKeys().add(key);
+ }
+
+ public void removeEncryptionKey(String key) {
+ conf.getEncryptionKeys().remove(key);
+ }
+
+ /**
+ * Sets the ProducerCryptoFailureAction to the value specified
+ *
+ * @param action
+ * The producer action
+ */
+ public void setCryptoFailureAction(ProducerCryptoFailureAction action) {
+ conf.setCryptoFailureAction(action);
+ }
+
+ /**
+ * @return The ProducerCryptoFailureAction
+ */
+ public ProducerCryptoFailureAction getCryptoFailureAction() {
+ return conf.getCryptoFailureAction();
+ }
+
+ /**
+ *
+ * @return the batch time period in ms.
+ * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit)
+ */
+ public long getBatchingMaxPublishDelayMs() {
+ return TimeUnit.MICROSECONDS.toMillis(conf.getBatchingMaxPublishDelayMicros());
+ }
+
+ /**
+ * Set the time period within which the messages sent will be batched default: 10ms if batch messages are
+ * enabled. If set to a non zero value, messages will be queued until this time interval or until
+ *
+ * @see ProducerConfiguration#batchingMaxMessages threshold is reached; all messages will be published as a single
+ * batch message. The consumer will be delivered individual messages in the batch in the same order they were
+ * enqueued
+ * @since 1.0.36
+ * Make sure all the consumer applications have been updated to use this client version, before starting to
+ * batch messages.
+ * @param batchDelay
+ * the batch delay
+ * @param timeUnit
+ * the time unit of the {@code batchDelay}
+ * @return
+ */
+ public ProducerConfiguration setBatchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit) {
+ long delayInMs = timeUnit.toMillis(batchDelay);
+ checkArgument(delayInMs >= 1, "configured value for batch delay must be at least 1ms");
+ conf.setBatchingMaxPublishDelayMicros(timeUnit.toMicros(batchDelay));
+ return this;
+ }
+
+ /**
+ *
+ * @return the maximum number of messages permitted in a batch.
+ */
+ public int getBatchingMaxMessages() {
+ return conf.getBatchingMaxMessages();
+ }
+
+ /**
+ * Set the maximum number of messages permitted in a batch. default: 1000 If set to a value greater than 1,
+ * messages will be queued until this threshold is reached or batch interval has elapsed
+ *
+ * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit) All messages in batch will be published as
+ * a single batch message. The consumer will be delivered individual messages in the batch in the same order
+ * they were enqueued
+ * @param batchMessagesMaxMessagesPerBatch
+ * maximum number of messages in a batch
+ * @return
+ */
+ public ProducerConfiguration setBatchingMaxMessages(int batchMessagesMaxMessagesPerBatch) {
+ checkArgument(batchMessagesMaxMessagesPerBatch > 0);
+ conf.setBatchingMaxMessages(batchMessagesMaxMessagesPerBatch);
+ return this;
+ }
+
+ public Optional getInitialSequenceId() {
+ return Optional.ofNullable(conf.getInitialSequenceId());
+ }
+
+ /**
+ * Set the baseline for the sequence ids for messages published by the producer.
+ *
+ * First message will be using (initialSequenceId + 1) as its sequence id and subsequent messages will be assigned
+ * incremental sequence ids, if not otherwise specified.
+ *
+ * @param initialSequenceId
+ * @return
+ */
+ public ProducerConfiguration setInitialSequenceId(long initialSequenceId) {
+ conf.setInitialSequenceId(initialSequenceId);
+ return this;
+ }
+
+ /**
+ * Set a name/value property with this producer.
+ *
+ * @param key
+ * @param value
+ * @return
+ */
+ public ProducerConfiguration setProperty(String key, String value) {
+ checkArgument(key != null);
+ checkArgument(value != null);
+ conf.getProperties().put(key, value);
+ return this;
+ }
+
+ /**
+ * Add all the properties in the provided map
+ *
+ * @param properties
+ * @return
+ */
+ public ProducerConfiguration setProperties(Map properties) {
+ conf.getProperties().putAll(properties);
+ return this;
+ }
+
+ public Map getProperties() {
+ return conf.getProperties();
+ }
+
+ public ProducerConfigurationData getProducerConfigurationData() {
+ return conf;
+ }
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
new file mode 100644
index 0000000000000..a57a6846c8b4d
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.pulsar.client.impl.v1.PulsarClientV1Impl;
+
+/**
+ * Class that provides a client interface to Pulsar.
+ *
+ * Client instances are thread-safe and can be reused for managing multiple {@link Producer}, {@link Consumer} and
+ * {@link Reader} instances.
+ */
+public interface PulsarClient extends Closeable {
+
+ /**
+ * Create a new PulsarClient object using default client configuration
+ *
+ * @param serviceUrl
+ * the url of the Pulsar endpoint to be used
+ * @return a new pulsar client object
+ * @throws PulsarClientException.InvalidServiceURL
+ * if the serviceUrl is invalid
+ * @deprecated use {@link #builder()} to construct a client instance
+ */
+ @Deprecated
+ public static PulsarClient create(String serviceUrl) throws PulsarClientException {
+ return create(serviceUrl, new ClientConfiguration());
+ }
+
+ /**
+ * Create a new PulsarClient object
+ *
+ * @param serviceUrl
+ * the url of the Pulsar endpoint to be used
+ * @param conf
+ * the client configuration
+ * @return a new pulsar client object
+ * @throws PulsarClientException.InvalidServiceURL
+ * if the serviceUrl is invalid
+ * @deprecated use {@link #builder()} to construct a client instance
+ */
+ @Deprecated
+ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) throws PulsarClientException {
+ return new PulsarClientV1Impl(serviceUrl, conf);
+ }
+
+ /**
+ * Create a producer with default {@link ProducerConfiguration} for publishing on a specific topic
+ *
+ * @param topic
+ * The name of the topic where to produce
+ * @return The producer object
+ * @throws PulsarClientException.AlreadyClosedException
+ * if the client was already closed
+ * @throws PulsarClientException.InvalidTopicNameException
+ * if the topic name is not valid
+ * @throws PulsarClientException.AuthenticationException
+ * if there was an error with the supplied credentials
+ * @throws PulsarClientException.AuthorizationException
+ * if the authorization to publish on topic was denied
+ * @deprecated use {@link #newProducer()} to build a new producer
+ */
+ @Deprecated
+ Producer createProducer(String topic) throws PulsarClientException;
+
+ /**
+ * Asynchronously create a producer with default {@link ProducerConfiguration} for publishing on a specific topic
+ *
+ * @param topic
+ * The name of the topic where to produce
+ * @return Future of the asynchronously created producer object
+ * @deprecated use {@link #newProducer()} to build a new producer
+ */
+ @Deprecated
+ CompletableFuture createProducerAsync(String topic);
+
+ /**
+ * Create a producer with given {@code ProducerConfiguration} for publishing on a specific topic
+ *
+ * @param topic
+ * The name of the topic where to produce
+ * @param conf
+ * The {@code ProducerConfiguration} object
+ * @return The producer object
+ * @throws PulsarClientException
+ * if it was not possible to create the producer
+ * @throws InterruptedException
+ * @deprecated use {@link #newProducer()} to build a new producer
+ */
+ @Deprecated
+ Producer createProducer(String topic, ProducerConfiguration conf) throws PulsarClientException;
+
+ /**
+ * Asynchronously create a producer with given {@code ProducerConfiguration} for publishing on a specific topic
+ *
+ * @param topic
+ * The name of the topic where to produce
+ * @param conf
+ * The {@code ProducerConfiguration} object
+ * @return Future of the asynchronously created producer object
+ * @deprecated use {@link #newProducer()} to build a new producer
+ */
+ @Deprecated
+ CompletableFuture createProducerAsync(String topic, ProducerConfiguration conf);
+
+ /**
+ * Subscribe to the given topic and subscription combination with default {@code ConsumerConfiguration}
+ *
+ * @param topic
+ * The name of the topic
+ * @param subscription
+ * The name of the subscription
+ * @return The {@code Consumer} object
+ * @throws PulsarClientException
+ * @throws InterruptedException
+ *
+ * @deprecated Use {@link #newConsumer()} to build a new consumer
+ */
+ @Deprecated
+ Consumer subscribe(String topic, String subscription) throws PulsarClientException;
+
+ /**
+ * Asynchronously subscribe to the given topic and subscription combination using default
+ * {@code ConsumerConfiguration}
+ *
+ * @param topic
+ * The topic name
+ * @param subscription
+ * The subscription name
+ * @return Future of the {@code Consumer} object
+ * @deprecated Use {@link #newConsumer()} to build a new consumer
+ */
+ @Deprecated
+ CompletableFuture subscribeAsync(String topic, String subscription);
+
+ /**
+ * Subscribe to the given topic and subscription combination with given {@code ConsumerConfiguration}
+ *
+ * @param topic
+ * The name of the topic
+ * @param subscription
+ * The name of the subscription
+ * @param conf
+ * The {@code ConsumerConfiguration} object
+ * @return The {@code Consumer} object
+ * @throws PulsarClientException
+ * @deprecated Use {@link #newConsumer()} to build a new consumer
+ */
+ @Deprecated
+ Consumer subscribe(String topic, String subscription, ConsumerConfiguration conf) throws PulsarClientException;
+
+ /**
+ * Asynchronously subscribe to the given topic and subscription combination using given
+ * {@code ConsumerConfiguration}
+ *
+ * @param topic
+ * The name of the topic
+ * @param subscription
+ * The name of the subscription
+ * @param conf
+ * The {@code ConsumerConfiguration} object
+ * @return Future of the {@code Consumer} object
+ * @deprecated Use {@link #newConsumer()} to build a new consumer
+ */
+ @Deprecated
+ CompletableFuture subscribeAsync(String topic, String subscription, ConsumerConfiguration conf);
+
+ /**
+ * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified topic.
+ *
+ * The Reader provides a low-level abstraction that allows for manual positioning in the topic, without using a
+ * subscription. Reader can only work on non-partitioned topics.
+ *
+ * The initial reader positioning is done by specifying a message id. The options are:
+ *
+ * MessageId.earliest
: Start reading from the earliest message available in the topic
+ * MessageId.latest
: Start reading from the end topic, only getting messages published after the
+ * reader was created
+ * MessageId
: When passing a particular message id, the reader will position itself on that
+ * specific position. The first message to be read will be the message next to the specified messageId.
+ *
+ *
+ * @param topic
+ * The name of the topic where to read
+ * @param startMessageId
+ * The message id where the reader will position itself. The first message returned will be the one after
+ * the specified startMessageId
+ * @param conf
+ * The {@code ReaderConfiguration} object
+ * @return The {@code Reader} object
+ * @deprecated Use {@link #newReader()} to build a new reader
+ */
+ @Deprecated
+ Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf) throws PulsarClientException;
+
+ /**
+ * Asynchronously create a topic reader with given {@code ReaderConfiguration} for reading messages from the
+ * specified topic.
+ *
+ * The Reader provides a low-level abstraction that allows for manual positioning in the topic, without using a
+ * subscription. Reader can only work on non-partitioned topics.
+ *
+ * The initial reader positioning is done by specifying a message id. The options are:
+ *
+ * MessageId.earliest
: Start reading from the earliest message available in the topic
+ * MessageId.latest
: Start reading from the end topic, only getting messages published after the
+ * reader was created
+ * MessageId
: When passing a particular message id, the reader will position itself on that
+ * specific position. The first message to be read will be the message next to the specified messageId.
+ *
+ *
+ * @param topic
+ * The name of the topic where to read
+ * @param startMessageId
+ * The message id where the reader will position itself. The first message returned will be the one after
+ * the specified startMessageId
+ * @param conf
+ * The {@code ReaderConfiguration} object
+ * @return Future of the asynchronously created producer object
+ * @deprecated Use {@link #newReader()} to build a new reader
+ */
+ @Deprecated
+ CompletableFuture createReaderAsync(String topic, MessageId startMessageId, ReaderConfiguration conf);
+
+ /**
+ * Close the PulsarClient and release all the resources.
+ *
+ * All the producers and consumers will be orderly closed. Waits until all pending write request are persisted.
+ *
+ * @throws PulsarClientException
+ * if the close operation fails
+ */
+ @Override
+ void close() throws PulsarClientException;
+
+ /**
+ * Asynchronously close the PulsarClient and release all the resources.
+ *
+ * All the producers and consumers will be orderly closed. Waits until all pending write request are persisted.
+ *
+ * @throws PulsarClientException
+ * if the close operation fails
+ */
+ CompletableFuture closeAsync();
+
+ /**
+ * Perform immediate shutdown of PulsarClient.
+ *
+ * Release all the resources and close all the producers without waiting for ongoing operations to complete.
+ *
+ * @throws PulsarClientException
+ * if the forceful shutdown fails
+ */
+ void shutdown() throws PulsarClientException;
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Reader.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Reader.java
new file mode 100644
index 0000000000000..4a435c3cbda75
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Reader.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A Reader can be used to scan through all the messages currently available in a topic.
+ *
+ */
+public interface Reader extends Closeable {
+
+ /**
+ * @return the topic from which this reader is reading from
+ */
+ String getTopic();
+
+ /**
+ * Read the next message in the topic
+ *
+ * @return the next messasge
+ * @throws PulsarClientException
+ */
+ Message readNext() throws PulsarClientException;
+
+ /**
+ * Read the next message in the topic waiting for a maximum of timeout
+ * time units. Returns null if no message is recieved in that time.
+ *
+ * @return the next message(Could be null if none received in time)
+ * @throws PulsarClientException
+ */
+ Message readNext(int timeout, TimeUnit unit) throws PulsarClientException;
+
+ CompletableFuture> readNextAsync();
+
+ /**
+ * Asynchronously close the reader and stop the broker to push more messages
+ *
+ * @return a future that can be used to track the completion of the operation
+ */
+ CompletableFuture closeAsync();
+
+ /**
+ * Return true if the topic was terminated and this reader has reached the end of the topic
+ */
+ boolean hasReachedEndOfTopic();
+
+ /**
+ * Check if there is any message available to read from the current position.
+ */
+ boolean hasMessageAvailable() throws PulsarClientException;
+
+ /**
+ * Asynchronously Check if there is message that has been published successfully to the broker in the topic.
+ */
+ CompletableFuture hasMessageAvailableAsync();
+
+ /**
+ * @return Whether the reader is connected to the broker
+ */
+ boolean isConnected();
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
new file mode 100644
index 0000000000000..243166cbf8071
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import org.apache.commons.lang3.StringUtils;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+
+/**
+ *
+ * @deprecated Use {@link PulsarClient#newReader()} to construct and configure a {@link Reader} instance
+ */
+@Deprecated
+public class ReaderConfiguration implements Serializable {
+
+ private final ReaderConfigurationData conf = new ReaderConfigurationData<>();
+
+ /**
+ * @return the configured {@link ReaderListener} for the reader
+ */
+ public ReaderListener getReaderListener() {
+ return conf.getReaderListener();
+ }
+
+ /**
+ * Sets a {@link ReaderListener} for the reader
+ *
+ * When a {@link ReaderListener} is set, application will receive messages through it. Calls to
+ * {@link Reader#readNext()} will not be allowed.
+ *
+ * @param readerListener
+ * the listener object
+ */
+ public ReaderConfiguration setReaderListener(ReaderListener readerListener) {
+ checkNotNull(readerListener);
+ conf.setReaderListener(readerListener);
+ return this;
+ }
+
+ /**
+ * @return the configure receiver queue size value
+ */
+ public int getReceiverQueueSize() {
+ return conf.getReceiverQueueSize();
+ }
+
+ /**
+ * @return the CryptoKeyReader
+ */
+ public CryptoKeyReader getCryptoKeyReader() {
+ return conf.getCryptoKeyReader();
+ }
+
+ /**
+ * Sets a {@link CryptoKeyReader}
+ *
+ * @param cryptoKeyReader
+ * CryptoKeyReader object
+ */
+ public ReaderConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+ checkNotNull(cryptoKeyReader);
+ conf.setCryptoKeyReader(cryptoKeyReader);
+ return this;
+ }
+
+ /**
+ * Sets the ConsumerCryptoFailureAction to the value specified
+ *
+ * @param action
+ * The action to take when the decoding fails
+ */
+ public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
+ conf.setCryptoFailureAction(action);
+ }
+
+ /**
+ * @return The ConsumerCryptoFailureAction
+ */
+ public ConsumerCryptoFailureAction getCryptoFailureAction() {
+ return conf.getCryptoFailureAction();
+ }
+
+ /**
+ * Sets the size of the consumer receive queue.
+ *
+ * The consumer receive queue controls how many messages can be accumulated by the {@link Consumer} before the
+ * application calls {@link Consumer#receive()}. Using a higher value could potentially increase the consumer
+ * throughput at the expense of bigger memory utilization.
+ *
+ * Default value is {@code 1000} messages and should be good for most use cases.
+ *
+ * @param receiverQueueSize
+ * the new receiver queue size value
+ */
+ public ReaderConfiguration setReceiverQueueSize(int receiverQueueSize) {
+ checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be negative");
+ conf.setReceiverQueueSize(receiverQueueSize);
+ return this;
+ }
+
+ /**
+ * @return the consumer name
+ */
+ public String getReaderName() {
+ return conf.getReaderName();
+ }
+
+ /**
+ * Set the consumer name.
+ *
+ * @param readerName
+ */
+ public ReaderConfiguration setReaderName(String readerName) {
+ checkArgument(StringUtils.isNotBlank(readerName));
+ conf.setReaderName(readerName);
+ return this;
+ }
+
+ /**
+ * @return the subscription role prefix for subscription auth
+ */
+ public String getSubscriptionRolePrefix() {
+ return conf.getSubscriptionRolePrefix();
+ }
+
+ /**
+ * Set the subscription role prefix for subscription auth. The default prefix is "reader".
+ *
+ * @param subscriptionRolePrefix
+ */
+ public ReaderConfiguration setSubscriptionRolePrefix(String subscriptionRolePrefix) {
+ checkArgument(StringUtils.isNotBlank(subscriptionRolePrefix));
+ conf.setSubscriptionRolePrefix(subscriptionRolePrefix);
+ return this;
+ }
+
+ public ReaderConfigurationData getReaderConfigurationData() {
+ return conf;
+ }
+
+ private static final long serialVersionUID = 1L;
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
new file mode 100644
index 0000000000000..76b1dabd07a7d
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Preconditions;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+
+@SuppressWarnings("deprecation")
+public class MessageBuilderImpl implements MessageBuilder {
+ private static final ByteBuffer EMPTY_CONTENT = ByteBuffer.allocate(0);
+ private final MessageMetadata.Builder msgMetadataBuilder = MessageMetadata.newBuilder();
+ private ByteBuffer content = EMPTY_CONTENT;
+
+ @Override
+ public Message build() {
+ return MessageImpl.create(msgMetadataBuilder, content, Schema.BYTES);
+ }
+
+ @Override
+ public MessageBuilder setContent(byte[] data) {
+ setContent(data, 0, data.length);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder setContent(byte[] data, int offet, int length) {
+ this.content = ByteBuffer.wrap(data, offet, length);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder setContent(ByteBuffer buf) {
+ this.content = buf.duplicate();
+ return this;
+ }
+
+ @Override
+ public MessageBuilder setProperties(Map properties) {
+ for (Map.Entry entry : properties.entrySet()) {
+ msgMetadataBuilder
+ .addProperties(KeyValue.newBuilder().setKey(entry.getKey()).setValue(entry.getValue()).build());
+ }
+
+ return this;
+ }
+
+ @Override
+ public MessageBuilder setProperty(String name, String value) {
+ msgMetadataBuilder.addProperties(KeyValue.newBuilder().setKey(name).setValue(value).build());
+ return this;
+ }
+
+ @Override
+ public MessageBuilder setKey(String key) {
+ msgMetadataBuilder.setPartitionKey(key);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder setEventTime(long timestamp) {
+ checkArgument(timestamp > 0, "Invalid timestamp : '%s'", timestamp);
+ msgMetadataBuilder.setEventTime(timestamp);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder setSequenceId(long sequenceId) {
+ checkArgument(sequenceId >= 0);
+ msgMetadataBuilder.setSequenceId(sequenceId);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder setReplicationClusters(List clusters) {
+ Preconditions.checkNotNull(clusters);
+ msgMetadataBuilder.clearReplicateTo();
+ msgMetadataBuilder.addAllReplicateTo(clusters);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder disableReplication() {
+ msgMetadataBuilder.clearReplicateTo();
+ msgMetadataBuilder.addReplicateTo("__local__");
+ return this;
+ }
+
+
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java
new file mode 100644
index 0000000000000..e21e5720ea94e
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v1;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerStats;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public class ConsumerV1Impl implements Consumer {
+ private final org.apache.pulsar.shade.client.api.v2.Consumer consumer;
+
+ public ConsumerV1Impl(org.apache.pulsar.shade.client.api.v2.Consumer consumer) {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void acknowledge(Message> arg0) throws PulsarClientException {
+ consumer.acknowledge(arg0);
+ }
+
+ @Override
+ public void acknowledge(MessageId arg0) throws PulsarClientException {
+ consumer.acknowledge(arg0);
+ }
+
+ @Override
+ public CompletableFuture acknowledgeAsync(Message> arg0) {
+ return consumer.acknowledgeAsync(arg0);
+ }
+
+ @Override
+ public CompletableFuture acknowledgeAsync(MessageId arg0) {
+ return consumer.acknowledgeAsync(arg0);
+ }
+
+ @Override
+ public void acknowledgeCumulative(Message> arg0) throws PulsarClientException {
+ consumer.acknowledgeCumulative(arg0);
+ }
+
+ @Override
+ public void acknowledgeCumulative(MessageId arg0) throws PulsarClientException {
+ consumer.acknowledgeCumulative(arg0);
+ }
+
+ @Override
+ public CompletableFuture acknowledgeCumulativeAsync(Message> arg0) {
+ return consumer.acknowledgeCumulativeAsync(arg0);
+ }
+
+ @Override
+ public CompletableFuture acknowledgeCumulativeAsync(MessageId arg0) {
+ return consumer.acknowledgeCumulativeAsync(arg0);
+ }
+
+ @Override
+ public void close() throws PulsarClientException {
+ consumer.close();
+ }
+
+ @Override
+ public CompletableFuture closeAsync() {
+ return consumer.closeAsync();
+ }
+
+ @Override
+ public String getConsumerName() {
+ return consumer.getConsumerName();
+ }
+
+ @Override
+ public ConsumerStats getStats() {
+ return consumer.getStats();
+ }
+
+ public String getSubscription() {
+ return consumer.getSubscription();
+ }
+
+ public String getTopic() {
+ return consumer.getTopic();
+ }
+
+ public boolean hasReachedEndOfTopic() {
+ return consumer.hasReachedEndOfTopic();
+ }
+
+ public boolean isConnected() {
+ return consumer.isConnected();
+ }
+
+ public void pause() {
+ consumer.pause();
+ }
+
+ public Message receive() throws PulsarClientException {
+ return consumer.receive();
+ }
+
+ public Message receive(int arg0, TimeUnit arg1) throws PulsarClientException {
+ return consumer.receive(arg0, arg1);
+ }
+
+ public CompletableFuture> receiveAsync() {
+ return consumer.receiveAsync();
+ }
+
+ public void redeliverUnacknowledgedMessages() {
+ consumer.redeliverUnacknowledgedMessages();
+ }
+
+ public void resume() {
+ consumer.resume();
+ }
+
+ public void seek(MessageId arg0) throws PulsarClientException {
+ consumer.seek(arg0);
+ }
+
+ public CompletableFuture seekAsync(MessageId arg0) {
+ return consumer.seekAsync(arg0);
+ }
+
+ public void unsubscribe() throws PulsarClientException {
+ consumer.unsubscribe();
+ }
+
+ public CompletableFuture unsubscribeAsync() {
+ return consumer.unsubscribeAsync();
+ }
+
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ProducerV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ProducerV1Impl.java
new file mode 100644
index 0000000000000..3f913772288f7
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ProducerV1Impl.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v1;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerStats;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ProducerImpl;
+
+public class ProducerV1Impl implements Producer {
+
+ private final ProducerImpl producer;
+
+ public ProducerV1Impl(ProducerImpl producer) {
+ this.producer = producer;
+ }
+
+ public void close() throws PulsarClientException {
+ producer.close();
+ }
+
+ public CompletableFuture closeAsync() {
+ return producer.closeAsync();
+ }
+
+ public void flush() throws PulsarClientException {
+ producer.flush();
+ }
+
+ public CompletableFuture flushAsync() {
+ return producer.flushAsync();
+ }
+
+ public long getLastSequenceId() {
+ return producer.getLastSequenceId();
+ }
+
+ public ProducerStats getStats() {
+ return producer.getStats();
+ }
+
+ public boolean isConnected() {
+ return producer.isConnected();
+ }
+
+ public MessageId send(byte[] value) throws PulsarClientException {
+ return producer.send(value);
+ }
+
+ public MessageId send(Message value) throws PulsarClientException {
+ return producer.send(value);
+ }
+
+ public CompletableFuture sendAsync(byte[] arg0) {
+ return producer.sendAsync(arg0);
+ }
+
+ public CompletableFuture sendAsync(Message arg0) {
+ return producer.sendAsync(arg0);
+ }
+
+ @Override
+ public String getTopic() {
+ return producer.getTopic();
+ }
+
+ @Override
+ public String getProducerName() {
+ return producer.getProducerName();
+ }
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/PulsarClientV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/PulsarClientV1Impl.java
new file mode 100644
index 0000000000000..a10f9f12e3639
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/PulsarClientV1Impl.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v1;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderConfiguration;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@SuppressWarnings("deprecation")
+public class PulsarClientV1Impl implements PulsarClient {
+
+ private final PulsarClientImpl client;
+
+ public PulsarClientV1Impl(String serviceUrl, ClientConfiguration conf) throws PulsarClientException {
+ this.client = new PulsarClientImpl(conf.setServiceUrl(serviceUrl).getConfigurationData().clone());
+ }
+
+ @Override
+ public void close() throws PulsarClientException {
+ client.close();
+ }
+
+ @Override
+ public CompletableFuture closeAsync() {
+ return client.closeAsync();
+ }
+
+ @Override
+ public Producer createProducer(final String topic, final ProducerConfiguration conf) throws PulsarClientException {
+ if (conf == null) {
+ throw new PulsarClientException.InvalidConfigurationException("Invalid null configuration object");
+ }
+
+ try {
+ return createProducerAsync(topic, conf).get();
+ } catch (ExecutionException e) {
+ Throwable t = e.getCause();
+ if (t instanceof PulsarClientException) {
+ throw (PulsarClientException) t;
+ } else {
+ throw new PulsarClientException(t);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException(e);
+ }
+ }
+
+ @Override
+ public Producer createProducer(String topic)
+ throws PulsarClientException {
+ return createProducer(topic, new ProducerConfiguration());
+ }
+
+ @Override
+ public CompletableFuture createProducerAsync(final String topic, final ProducerConfiguration conf) {
+ ProducerConfigurationData confData = conf.getProducerConfigurationData().clone();
+ confData.setTopicName(topic);
+ return client.createProducerAsync(confData).thenApply(p -> new ProducerV1Impl((ProducerImpl) p));
+ }
+
+ @Override
+ public CompletableFuture createProducerAsync(String topic) {
+ return createProducerAsync(topic, new ProducerConfiguration());
+ }
+
+ @Override
+ public Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf)
+ throws PulsarClientException {
+ try {
+ return createReaderAsync(topic, startMessageId, conf).get();
+ } catch (ExecutionException e) {
+ Throwable t = e.getCause();
+ if (t instanceof PulsarClientException) {
+ throw (PulsarClientException) t;
+ } else {
+ throw new PulsarClientException(t);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture createReaderAsync(String topic, MessageId startMessageId,
+ ReaderConfiguration conf) {
+ ReaderConfigurationData confData = conf.getReaderConfigurationData().clone();
+ confData.setTopicName(topic);
+ confData.setStartMessageId(startMessageId);
+ return client.createReaderAsync(confData).thenApply(r -> new ReaderV1Impl(r));
+ }
+
+ @Override
+ public void shutdown() throws PulsarClientException {
+ client.shutdown();
+ }
+
+ @Override
+ public Consumer subscribe(String topic, String subscriptionName) throws PulsarClientException {
+ return subscribe(topic, subscriptionName, new ConsumerConfiguration());
+ }
+
+ @Override
+ public CompletableFuture subscribeAsync(final String topic, final String subscription,
+ final ConsumerConfiguration conf) {
+ if (conf == null) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.InvalidConfigurationException("Invalid null configuration"));
+ }
+
+ ConsumerConfigurationData confData = conf.getConfigurationData().clone();
+ confData.getTopicNames().add(topic);
+ confData.setSubscriptionName(subscription);
+ return client.subscribeAsync(confData).thenApply(c -> new ConsumerV1Impl(c));
+ }
+
+ @Override
+ public CompletableFuture subscribeAsync(String topic,
+ String subscriptionName) {
+ return subscribeAsync(topic, subscriptionName, new ConsumerConfiguration());
+ }
+
+ @Override
+ public Consumer subscribe(String topic, String subscription, ConsumerConfiguration conf)
+ throws PulsarClientException {
+ try {
+ return subscribeAsync(topic, subscription, conf).get();
+ } catch (ExecutionException e) {
+ Throwable t = e.getCause();
+ if (t instanceof PulsarClientException) {
+ throw (PulsarClientException) t;
+ } else {
+ throw new PulsarClientException(t);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException(e);
+ }
+ }
+}
diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ReaderV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ReaderV1Impl.java
new file mode 100644
index 0000000000000..7ae86b5502d9e
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ReaderV1Impl.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v1;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+
+public class ReaderV1Impl implements Reader {
+
+ private final org.apache.pulsar.shade.client.api.v2.Reader reader;
+
+ public ReaderV1Impl(org.apache.pulsar.shade.client.api.v2.Reader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public CompletableFuture closeAsync() {
+ return reader.closeAsync();
+ }
+
+ @Override
+ public String getTopic() {
+ return reader.getTopic();
+ }
+
+ @Override
+ public boolean hasMessageAvailable() throws PulsarClientException {
+ return reader.hasMessageAvailable();
+ }
+
+ @Override
+ public CompletableFuture hasMessageAvailableAsync() {
+ return reader.hasMessageAvailableAsync();
+ }
+
+ @Override
+ public boolean hasReachedEndOfTopic() {
+ return reader.hasReachedEndOfTopic();
+ }
+
+ @Override
+ public boolean isConnected() {
+ return reader.isConnected();
+ }
+
+ @Override
+ public Message readNext() throws PulsarClientException {
+ return reader.readNext();
+ }
+
+ @Override
+ public Message readNext(int arg0, TimeUnit arg1) throws PulsarClientException {
+ return reader.readNext(arg0, arg1);
+ }
+
+ @Override
+ public CompletableFuture> readNextAsync() {
+ return reader.readNextAsync();
+ }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java b/pulsar-client-1x-base/pulsar-client-1x/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
similarity index 88%
rename from pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
rename to pulsar-client-1x-base/pulsar-client-1x/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
index 56630a49fab69..5e14b7bafe56d 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
+++ b/pulsar-client-1x-base/pulsar-client-1x/src/test/java/org/apache/pulsar/client/impl/MessageBuilderTest.java
@@ -36,20 +36,20 @@ public class MessageBuilderTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testSetEventTimeNegative() {
- MessageBuilder> builder = MessageBuilder.create();
+ MessageBuilder builder = MessageBuilder.create();
builder.setEventTime(-1L);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testSetEventTimeZero() {
- MessageBuilder> builder = MessageBuilder.create();
+ MessageBuilder builder = MessageBuilder.create();
builder.setEventTime(0L);
}
@Test
public void testSetEventTimePositive() {
long eventTime = System.currentTimeMillis();
- MessageBuilder> builder = MessageBuilder.create();
+ MessageBuilder builder = MessageBuilder.create();
builder.setContent(new byte[0]);
builder.setEventTime(eventTime);
Message> msg = builder.build();
@@ -58,7 +58,7 @@ public void testSetEventTimePositive() {
@Test
public void testBuildMessageWithoutEventTime() {
- MessageBuilder> builder = MessageBuilder.create();
+ MessageBuilder builder = MessageBuilder.create();
builder.setContent(new byte[0]);
Message> msg = builder.build();
assertEquals(0L, msg.getEventTime());
@@ -66,7 +66,7 @@ public void testBuildMessageWithoutEventTime() {
@Test
public void testSetMessageProperties() {
- MessageBuilder> builder = MessageBuilder.create();
+ MessageBuilder builder = MessageBuilder.create();
builder.setContent(new byte[0]);
Map map = Maps.newHashMap();
map.put("key1", "value1");
diff --git a/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml b/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml
new file mode 100644
index 0000000000000..6b6b2bc35aaad
--- /dev/null
+++ b/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml
@@ -0,0 +1,91 @@
+
+
+ 4.0.0
+
+
+ org.apache.pulsar
+ pulsar-client-1x-base
+ 2.3.0-SNAPSHOT
+ ..
+
+
+ pulsar-client-2x-shaded
+ Pulsar Client 2.x Shaded API
+
+
+
+ org.apache.pulsar
+ pulsar-client
+ ${project.parent.version}
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ package
+
+ shade
+
+
+ true
+ true
+ false
+
+
+
+ org.apache.pulsar:pulsar-client
+
+
+
+
+ org.apache.pulsar:pulsar-client
+
+ **
+
+
+
+
+
+ org.apache.pulsar.client.api
+ org.apache.pulsar.shade.client.api.v2
+
+ org.apache.pulsar.client.api.PulsarClient
+ org.apache.pulsar.client.api.Producer
+ org.apache.pulsar.client.api.Consumer
+ org.apache.pulsar.client.api.Reader
+
+
+
+
+
+
+
+
+
+
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java
index 70416f3495dd7..46516f0cdbb5e 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/DefaultSchemasTest.java
@@ -18,9 +18,11 @@
*/
package org.apache.pulsar.client.impl.schema;
+import static org.testng.Assert.assertEquals;
+
+import java.nio.charset.StandardCharsets;
+
import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -30,10 +32,6 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.nio.charset.StandardCharsets;
-
-import static org.testng.Assert.assertEquals;
-
public class DefaultSchemasTest {
private PulsarClient client;
@@ -75,17 +73,7 @@ public void testStringSchema() throws Exception {
Assert.assertTrue(stringSchema.decode(testBytes).equals(testString));
assertEquals(stringSchema.encode(testString), testBytes);
- Message msg1 = MessageBuilder.create(stringSchema)
- .setContent(testBytes)
- .build();
- assertEquals(stringSchema.decode(msg1.getData()), testString);
-
- Message msg2 = MessageBuilder.create(stringSchema)
- .setValue(testString)
- .build();
- assertEquals(stringSchema.encode(testString), msg2.getData());
-
- byte[] bytes2 = testString.getBytes(StandardCharsets.UTF_16);
+ byte[] bytes2 = testString.getBytes(StandardCharsets.UTF_16);
StringSchema stringSchemaUtf16 = new StringSchema(StandardCharsets.UTF_16);
Assert.assertTrue(stringSchemaUtf16.decode(bytes2).equals(testString));
assertEquals(stringSchemaUtf16.encode(testString), bytes2);