From cb5eb5b671316bf200202b5b410b1438f2cde98b Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Sat, 17 Feb 2018 12:22:47 -0800 Subject: [PATCH] PIP-12 Introduce builder for creating Producer Consumer Reader (#1089) * PIP-12: Introduce builders for creating Producer Consumer Reader * Fixed Javadocs * Addressed comments --- .../broker/service/AbstractReplicator.java | 21 +- .../NonPersistentReplicator.java | 2 +- .../broker/service/PersistentTopicTest.java | 6 +- .../pulsar/client/api/ClientBuilder.java | 265 +++++++++++++++++ .../client/api/ClientConfiguration.java | 10 +- .../pulsar/client/api/ConsumerBuilder.java | 245 ++++++++++++++++ .../client/api/ConsumerConfiguration.java | 3 +- .../pulsar/client/api/CryptoKeyReader.java | 39 ++- .../pulsar/client/api/MessageRouter.java | 3 +- .../pulsar/client/api/MessageRoutingMode.java | 23 ++ .../pulsar/client/api/ProducerBuilder.java | 272 ++++++++++++++++++ .../client/api/ProducerConfiguration.java | 23 +- .../pulsar/client/api/PulsarClient.java | 81 +++++- .../pulsar/client/api/ReaderBuilder.java | 140 +++++++++ .../client/api/ReaderConfiguration.java | 10 +- .../pulsar/client/impl/ClientBuilderImpl.java | 154 ++++++++++ .../client/impl/ConsumerBuilderImpl.java | 176 ++++++++++++ .../client/impl/ProducerBuilderImpl.java | 194 +++++++++++++ .../pulsar/client/impl/PulsarClientImpl.java | 23 +- .../pulsar/client/impl/ReaderBuilderImpl.java | 132 +++++++++ .../pulsar/client/api/MessageIdTest.java | 16 +- .../pulsar/client/impl/BuildersTest.java | 47 +++ .../client/tutorial/SampleProducer.java | 6 +- 23 files changed, 1815 insertions(+), 76 deletions(-) create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 1913dd5ff3dad..49213c90db74e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -22,10 +22,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.service.AbstractReplicator.State; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; -import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -43,7 +43,7 @@ public abstract class AbstractReplicator { protected volatile ProducerImpl producer; protected final int producerQueueSize; - protected final ProducerConfiguration producerConfiguration; + protected final ProducerBuilder producerBuilder; protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0 ,TimeUnit.MILLISECONDS); @@ -68,10 +68,11 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca this.producer = null; this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); - this.producerConfiguration = new ProducerConfiguration(); - this.producerConfiguration.setSendTimeout(0, TimeUnit.SECONDS); - this.producerConfiguration.setMaxPendingMessages(producerQueueSize); - this.producerConfiguration.setProducerName(getReplicatorName(replicatorPrefix, localCluster)); + this.producerBuilder = client.newProducer() // + .topic(topicName) + .sendTimeout(0, TimeUnit.SECONDS) // + .maxPendingMessages(producerQueueSize) // + .producerName(getReplicatorName(replicatorPrefix, localCluster)); STATE_UPDATER.set(this, State.Stopped); } @@ -83,10 +84,6 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca protected abstract void disableReplicatorRead(); - public ProducerConfiguration getProducerConfiguration() { - return producerConfiguration; - } - public String getRemoteCluster() { return remoteCluster; } @@ -121,7 +118,7 @@ public synchronized void startProducer() { } log.info("[{}][{} -> {}] Starting replicator", topicName, localCluster, remoteCluster); - client.createProducerAsync(topicName, producerConfiguration).thenAccept(producer -> { + producerBuilder.createAsync().thenAccept(producer -> { readEntries(producer); }).exceptionally(ex -> { if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index d914aa7ececd2..e9219a614d28e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -52,7 +52,7 @@ public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, St BrokerService brokerService) { super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService); - producerConfiguration.setBlockIfQueueFull(false); + producerBuilder.blockIfQueueFull(false); startProducer(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 55019a0d65c5b..a8ec9400cc559 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1149,17 +1149,17 @@ public void testClosingReplicationProducerTwice() throws Exception { brokerService.getReplicationClients().put(remoteCluster, client); PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService); - doReturn(new CompletableFuture()).when(clientImpl).createProducerAsync(globalTopicName, replicator.getProducerConfiguration()); + doReturn(new CompletableFuture()).when(clientImpl).createProducerAsync(matches(globalTopicName), any()); replicator.startProducer(); - verify(clientImpl).createProducerAsync(globalTopicName, replicator.getProducerConfiguration()); + verify(clientImpl).createProducerAsync(matches(globalTopicName), any()); replicator.disconnect(false); replicator.disconnect(false); replicator.startProducer(); - verify(clientImpl, Mockito.times(2)).createProducerAsync(globalTopicName, replicator.getProducerConfiguration()); + verify(clientImpl, Mockito.times(2)).createProducerAsync(matches(globalTopicName), any()); } @Test diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java new file mode 100644 index 0000000000000..b832e59c9bf9d --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; + +/** + * Builder interface that is used to construct a {@link PulsarClient} instance. + * + * @since 2.0.0 + */ +public interface ClientBuilder extends Serializable, Cloneable { + + /** + * @return the new {@link PulsarClient} instance + */ + PulsarClient build() throws PulsarClientException; + + /** + * Create a copy of the current client builder. + *

+ * Cloning the builder can be used to share an incomplete configuration and specialize it multiple times. For + * example: + * + *

+     * ClientBuilder builder = PulsarClient.builder().ioThreads(8).listenerThreads(4);
+     *
+     * PulsarClient client1 = builder.clone().serviceUrl(URL_1).build();
+     * PulsarClient client2 = builder.clone().serviceUrl(URL_2).build();
+     * 
+ */ + ClientBuilder clone(); + + /** + * Configure the service URL for the Pulsar service. + *

+ * This parameter is required + * + * @param serviceUrl + * @return + */ + ClientBuilder serviceUrl(String serviceUrl); + + /** + * Set the authentication provider to use in the Pulsar client instance. + *

+ * Example: + *

+ * + *

+     * 
+     * String AUTH_CLASS = "org.apache.pulsar.client.impl.auth.AuthenticationTls";
+     *
+     * Map conf = new TreeMap<>();
+     * conf.put("tlsCertFile", "/my/cert/file");
+     * conf.put("tlsKeyFile", "/my/key/file");
+     *
+     * Authentication auth = AuthenticationFactor.create(AUTH_CLASS, conf);
+     *
+     * PulsarClient client = PulsarClient.builder()
+     *          .serviceUrl(SERVICE_URL)
+     *          .authentication(auth)
+     *          .build();
+     * ....
+     * 
+     * 
+ * + * @param authentication + * an instance of the {@link Authentication} provider already constructed + */ + ClientBuilder authentication(Authentication authentication); + + /** + * Set the authentication provider to use in the Pulsar client instance. + *

+ * Example: + *

+ * + *

+     * 
+     * String AUTH_CLASS = "org.apache.pulsar.client.impl.auth.AuthenticationTls";
+     * String AUTH_PARAMS = "tlsCertFile:/my/cert/file,tlsKeyFile:/my/key/file";
+     *
+     * PulsarClient client = PulsarClient.builder()
+     *          .serviceUrl(SERVICE_URL)
+     *          .authentication(AUTH_CLASS, AUTH_PARAMS)
+     *          .build();
+     * ....
+     * 
+     * 
+ * + * @param authPluginClassName + * name of the Authentication-Plugin you want to use + * @param authParamsString + * string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2" + * @throws UnsupportedAuthenticationException + * failed to instantiate specified Authentication-Plugin + */ + ClientBuilder authentication(String authPluginClassName, String authParamsString) + throws UnsupportedAuthenticationException; + + /** + * Set the authentication provider to use in the Pulsar client instance. + *

+ * Example: + *

+ * + *

+     * 
+     * String AUTH_CLASS = "org.apache.pulsar.client.impl.auth.AuthenticationTls";
+     *
+     * Map conf = new TreeMap<>();
+     * conf.put("tlsCertFile", "/my/cert/file");
+     * conf.put("tlsKeyFile", "/my/key/file");
+     *
+     * PulsarClient client = PulsarClient.builder()
+     *          .serviceUrl(SERVICE_URL)
+     *          .authentication(AUTH_CLASS, conf)
+     *          .build();
+     * ....
+     * 
+     *
+     * @param authPluginClassName
+     *            name of the Authentication-Plugin you want to use
+     * @param authParams
+     *            map which represents parameters for the Authentication-Plugin
+     * @throws UnsupportedAuthenticationException
+     *             failed to instantiate specified Authentication-Plugin
+     */
+    ClientBuilder authentication(String authPluginClassName, Map authParams)
+            throws UnsupportedAuthenticationException;
+
+    /**
+     * Set the operation timeout (default: 30 seconds)
+     * 

+ * Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the + * operation will be maked as failed + * + * @param operationTimeout + * operation timeout + * @param unit + * time unit for {@code operationTimeout} + */ + ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit); + + /** + * Set the number of threads to be used for handling connections to brokers (default: 1 thread) + * + * @param numIoThreads + */ + ClientBuilder ioThreads(int numIoThreads); + + /** + * Set the number of threads to be used for message listeners (default: 1 thread) + * + * @param numListenerThreads + */ + ClientBuilder listenerThreads(int numListenerThreads); + + /** + * Sets the max number of connection that the client library will open to a single broker. + *

+ * By default, the connection pool will use a single connection for all the producers and consumers. Increasing this + * parameter may improve throughput when using many producers over a high latency connection. + *

+ * + * @param connectionsPerBroker + * max number of connections per broker (needs to be greater than 0) + */ + ClientBuilder connectionsPerBroker(int connectionsPerBroker); + + /** + * Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm. + *

+ * No-delay features make sure packets are sent out on the network as soon as possible, and it's critical to achieve + * low latency publishes. On the other hand, sending out a huge number of small packets might limit the overall + * throughput, so if latency is not a concern, it's advisable to set the useTcpNoDelay flag to false. + *

+ * Default value is true + * + * @param enableTcpNoDelay + */ + ClientBuilder enableTcpNoDelay(boolean enableTcpNoDelay); + + /** + * Configure whether to use TLS encryption on the connection (default: false) + * + * @param enableTls + */ + ClientBuilder enableTls(boolean enableTls); + + /** + * Set the path to the trusted TLS certificate file + * + * @param tlsTrustCertsFilePath + */ + ClientBuilder tlsTrustCertsFilePath(String tlsTrustCertsFilePath); + + /** + * Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false) + * + * @param allowTlsInsecureConnection + */ + ClientBuilder allowTlsInsecureConnection(boolean allowTlsInsecureConnection); + + /** + * It allows to validate hostname verification when client connects to broker over tls. It validates incoming x509 + * certificate and matches provided hostname(CN/SAN) with expected broker's host name. It follows RFC 2818, 3.1. + * Server Identity hostname verification. + * + * @see rfc2818 + * + * @param enableTlsHostnameVerification + */ + ClientBuilder enableTlsHostnameVerification(boolean enableTlsHostnameVerification); + + /** + * Set the interval between each stat info (default: 60 seconds) Stats will be activated with positive + * statsIntervalSeconds It should be set to at least 1 second + * + * @param statsIntervalSeconds + * the interval between each stat info + * @param unit + * time unit for {@code statsInterval} + */ + ClientBuilder statsInterval(long statsInterval, TimeUnit unit); + + /** + * Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker. + * (default: 5000) It should be configured with higher value only in case of it requires to produce/subscribe + * on thousands of topic using created {@link PulsarClient} + * + * @param maxConcurrentLookupRequests + */ + ClientBuilder maxConcurrentLookupRequests(int maxConcurrentLookupRequests); + + /** + * Set max number of broker-rejected requests in a certain time-frame (30 seconds) after which current connection + * will be closed and client creates a new connection that give chance to connect a different broker (default: + * 50) + * + * @param maxNumberOfRejectedRequestPerConnection + */ + ClientBuilder maxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java index 9ab7b32b16e8f..14f94da5aba2e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java @@ -30,14 +30,13 @@ /** * Class used to specify client side configuration like authentication, etc.. * - * + * @deprecated Use {@link PulsarClient#builder()} to construct and configure a new {@link PulsarClient} instance */ +@Deprecated public class ClientConfiguration implements Serializable { - /** - * - */ private static final long serialVersionUID = 1L; + private Authentication authentication = new AuthenticationDisabled(); private long operationTimeoutMs = 30000; private long statsIntervalSeconds = 60; @@ -221,8 +220,7 @@ public int getConnectionsPerBroker() { * max number of connections per broker (needs to be greater than 0) */ public void setConnectionsPerBroker(int connectionsPerBroker) { - checkArgument(connectionsPerBroker > 0, - "Connections per broker need to be greater than 0"); + checkArgument(connectionsPerBroker > 0, "Connections per broker need to be greater than 0"); this.connectionsPerBroker = connectionsPerBroker; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java new file mode 100644 index 0000000000000..a2c3c81331855 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * {@link ConsumerBuilder} is used to configure and create instances of {@link Consumer}. + * + * @see PulsarClient#newConsumer() + * + * @since 2.0.0 + */ +public interface ConsumerBuilder extends Serializable, Cloneable { + + /** + * Create a copy of the current consumer builder. + *

+ * Cloning the builder can be used to share an incomplete configuration and specialize it multiple times. For + * example: + * + *

+     * ConsumerBuilder builder = client.newConsumer() //
+     *         .subscriptionName("my-subscription-name") //
+     *         .subscriptionType(SubscriptionType.Shared) //
+     *         .receiverQueueSize(10);
+     *
+     * Consumer consumer1 = builder.clone().topic(TOPIC_1).subscribe();
+     * Consumer consumer2 = builder.clone().topic(TOPIC_2).subscribe();
+     * 
+ */ + ConsumerBuilder clone(); + + /** + * Finalize the {@link Consumer} creation by subscribing to the topic. + * + *

+ * If the subscription does not exist, a new subscription will be created and all messages published after the + * creation will be retained until acknowledged, even if the consumer is not connected. + * + * @return the {@link Consumer} instance + * @throws PulsarClientException + * if the the subscribe operation fails + */ + Consumer subscribe() throws PulsarClientException; + + /** + * Finalize the {@link Consumer} creation by subscribing to the topic in asynchronous mode. + * + *

+ * If the subscription does not exist, a new subscription will be created and all messages published after the + * creation will be retained until acknowledged, even if the consumer is not connected. + * + * @return a future that will yield a {@link Consumer} instance + * @throws PulsarClientException + * if the the subscribe operation fails + */ + CompletableFuture subscribeAsync(); + + /** + * Specify the topic this consumer will subscribe on. + *

+ * This argument is required when constructing the consumer. + * + * @param topicName + */ + ConsumerBuilder topic(String topicName); + + /** + * Specify the subscription name for this consumer. + *

+ * This argument is required when constructing the consumer. + * + * @param subscriptionName + */ + ConsumerBuilder subscriptionName(String subscriptionName); + + /** + * Set the timeout for unacked messages, truncated to the nearest millisecond. The timeout needs to be greater than + * 10 seconds. + * + * @param ackTimeout + * for unacked messages. + * @param timeUnit + * unit in which the timeout is provided. + * @return {@link ConsumerConfiguration} + */ + ConsumerBuilder ackTimeout(long ackTimeout, TimeUnit timeUnit); + + /** + * Select the subscription type to be used when subscribing to the topic. + *

+ * Default is {@link SubscriptionType#Exclusive} + * + * @param subscriptionType + * the subscription type value + */ + ConsumerBuilder subscriptionType(SubscriptionType subscriptionType); + + /** + * Sets a {@link MessageListener} for the consumer + *

+ * When a {@link MessageListener} is set, application will receive messages through it. Calls to + * {@link Consumer#receive()} will not be allowed. + * + * @param messageListener + * the listener object + */ + ConsumerBuilder messageListener(MessageListener messageListener); + + /** + * Sets a {@link CryptoKeyReader} + * + * @param cryptoKeyReader + * CryptoKeyReader object + */ + ConsumerBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader); + + /** + * Sets the ConsumerCryptoFailureAction to the value specified + * + * @param action + * The consumer action + */ + ConsumerBuilder cryptoFailureAction(ConsumerCryptoFailureAction action); + + /** + * Sets the size of the consumer receive queue. + *

+ * The consumer receive queue controls how many messages can be accumulated by the {@link Consumer} before the + * application calls {@link Consumer#receive()}. Using a higher value could potentially increase the consumer + * throughput at the expense of bigger memory utilization. + *

+ *

+ * Setting the consumer queue size as zero + *

    + *
  • Decreases the throughput of the consumer, by disabling pre-fetching of messages. This approach improves the + * message distribution on shared subscription, by pushing messages only to the consumers that are ready to process + * them. Neither {@link Consumer#receive(int, TimeUnit)} nor Partitioned Topics can be used if the consumer queue + * size is zero. {@link Consumer#receive()} function call should not be interrupted when the consumer queue size is + * zero.
  • + *
  • Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer connection with + * broker and {@link Consumer#receive()} call will remain blocked while {@link Consumer#receiveAsync()} receives + * exception in callback. consumer will not be able receive any further message unless batch-message in pipeline + * is removed
  • + *
+ *

+ * Default value is {@code 1000} messages and should be good for most use cases. + * + * @param receiverQueueSize + * the new receiver queue size value + */ + ConsumerBuilder receiverQueueSize(int receiverQueueSize); + + /** + * Set the max total receiver queue size across partitons. + *

+ * This setting will be used to reduce the receiver queue size for individual partitions + * {@link #receiverQueueSize(int)} if the total exceeds this value (default: 50000). + * + * @param maxTotalReceiverQueueSizeAcrossPartitions + */ + ConsumerBuilder maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions); + + /** + * Set the consumer name. + * + * @param consumerName + */ + ConsumerBuilder consumerName(String consumerName); + + /** + * If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog + * of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for + * each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that + * point, the messages will be sent as normal. + * + * readCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e. + * failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a + * shared subscription, will lead to the subscription call throwing a PulsarClientException. + * + * @param readCompacted + * whether to read from the compacted topic + */ + ConsumerBuilder readCompacted(boolean readCompacted); + + /** + * Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching + * messages. Here, broker follows descending priorities. (eg: 0=max-priority, 1, 2,..)
+ * In Shared subscription mode, broker will first dispatch messages to max priority-level consumers if they have + * permits, else broker will consider next priority level consumers.
+ * If subscription has consumer-A with priorityLevel 0 and Consumer-B with priorityLevel 1 then broker will dispatch + * messages to only consumer-A until it runs out permit and then broker starts dispatching messages to Consumer-B. + * + *

+     * Consumer PriorityLevel Permits
+     * C1       0             2
+     * C2       0             1
+     * C3       0             1
+     * C4       1             2
+     * C5       1             1
+     * Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4
+     * 
+ * + * @param priorityLevel + */ + ConsumerBuilder priorityLevel(int priorityLevel); + + /** + * Set a name/value property with this consumer. + * + * @param key + * @param value + * @return + */ + ConsumerBuilder property(String key, String value); + + /** + * Add all the properties in the provided map + * + * @param properties + * @return + */ + ConsumerBuilder properties(Map properties); + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java index 00e4537ad46ac..5f25fa83bd74d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java @@ -31,8 +31,9 @@ * attach to the subscription. Other consumers will get an error message. In Shared subscription, multiple consumers * will be able to use the same subscription name and the messages will be dispatched in a round robin fashion. * - * + * @deprecated Use {@link PulsarClient#newConsumer} to build and configure a {@link Consumer} instance */ +@Deprecated public class ConsumerConfiguration implements Serializable { /** diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java index 46496b7b74a97..4acb6e9cf8bbb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/CryptoKeyReader.java @@ -18,34 +18,33 @@ */ package org.apache.pulsar.client.api; -import java.util.List; import java.util.Map; public interface CryptoKeyReader { - /* + /** * Return the encryption key corresponding to the key name in the argument *

- * This method should be implemented to return the EncryptionKeyInfo. This method will be - * called at the time of producer creation as well as consumer receiving messages. - * Hence, application should not make any blocking calls within the implementation. + * This method should be implemented to return the EncryptionKeyInfo. This method will be called at the time of + * producer creation as well as consumer receiving messages. Hence, application should not make any blocking calls + * within the implementation. *

- * - * @param keyName - * Unique name to identify the key - * @param metadata - * Additional information needed to identify the key - * @return EncryptionKeyInfo with details about the public key - * */ + * + * @param keyName + * Unique name to identify the key + * @param metadata + * Additional information needed to identify the key + * @return EncryptionKeyInfo with details about the public key + */ EncryptionKeyInfo getPublicKey(String keyName, Map metadata); - /* - * @param keyName - * Unique name to identify the key - * @param metadata - * Additional information needed to identify the key - * @return byte array of the private key value - */ + /** + * @param keyName + * Unique name to identify the key + * @param metadata + * Additional information needed to identify the key + * @return byte array of the private key value + */ EncryptionKeyInfo getPrivateKey(String keyName, Map metadata); - + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java index 25c89755b6d61..a9cef6f733d50 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java @@ -23,7 +23,7 @@ public interface MessageRouter extends Serializable { /** - * + * * @param msg * Message object * @return The index of the partition to use for the message @@ -42,7 +42,6 @@ default int choosePartition(Message msg) { * @return the partition to route the message. * @since 1.22.0 */ - @SuppressWarnings("deprecation") default int choosePartition(Message msg, TopicMetadata metadata) { return choosePartition(msg); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java new file mode 100644 index 0000000000000..1d45489b91e6b --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +public enum MessageRoutingMode { + SinglePartition, RoundRobinPartition, CustomPartition +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java new file mode 100644 index 0000000000000..5ab1215dc123f --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError; + +/** + * {@link ProducerBuilder} is used to configure and create instances of {@link Producer}. + * + * @see PulsarClient#newProducer() + */ +public interface ProducerBuilder extends Serializable, Cloneable { + + /** + * Finalize the creation of the {@link Producer} instance. + *

+ * This method will block until the producer is created successfully. + * + * @return the producer instance + * @throws PulsarClientException.ProducerBusyException + * if a producer with the same "producer name" is already connected to the topic + * @throws PulsarClientException + * if the producer creation fails + */ + Producer create() throws PulsarClientException; + + /** + * Finalize the creation of the {@link Producer} instance in asynchronous mode. + *

+ * This method will return a {@link CompletableFuture} that can be used to access the instance when it's ready. + * + * @return a future that will yield the created producer instance + * @throws PulsarClientException.ProducerBusyException + * if a producer with the same "producer name" is already connected to the topic + * @throws PulsarClientException + * if the producer creation fails + */ + CompletableFuture createAsync(); + + /** + * Create a copy of the current {@link ProducerBuilder}. + *

+ * Cloning the builder can be used to share an incomplete configuration and specialize it multiple times. For + * example: + * + *

+     * ProducerBuilder builder = client.newProducer().sendTimeout(10, TimeUnit.SECONDS).blockIfQueueFull(true);
+     *
+     * Producer producer1 = builder.clone().topic(TOPIC_1).create();
+     * Producer producer2 = builder.clone().topic(TOPIC_2).create();
+     * 
+ */ + ProducerBuilder clone(); + + /** + * Specify the topic this producer will be publishing on. + *

+ * This argument is required when constructing the produce. + * + * @param topicName + */ + ProducerBuilder topic(String topicName); + + /** + * Specify a name for the producer + *

+ * If not assigned, the system will generate a globally unique name which can be access with + * {@link Producer#getProducerName()}. + *

+ * When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique + * across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on + * a topic. + * + * @param producerName + * the custom name to use for the producer + */ + ProducerBuilder producerName(String producerName); + + /** + * Set the send timeout (default: 30 seconds) + *

+ * If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported. + * + * @param sendTimeout + * the send timeout + * @param unit + * the time unit of the {@code sendTimeout} + */ + ProducerBuilder sendTimeout(int sendTimeout, TimeUnit unit); + + /** + * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. + *

+ * When the queue is full, by default, all calls to {@link Producer#send} and {@link Producer#sendAsync} will fail + * unless blockIfQueueFull is set to true. Use {@link #setBlockIfQueueFull} to change the blocking behavior. + * + * @param maxPendingMessages + * @return + */ + ProducerBuilder maxPendingMessages(int maxPendingMessages); + + /** + * Set the number of max pending messages across all the partitions + *

+ * This setting will be used to lower the max pending messages for each partition + * ({@link #maxPendingMessages(int)}), if the total exceeds the configured value. + * + * @param maxPendingMessagesAcrossPartitions + */ + ProducerBuilder maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions); + + /** + * Set whether the {@link Producer#send} and {@link Producer#sendAsync} operations should block when the outgoing + * message queue is full. + *

+ * Default is false. If set to false, send operations will immediately fail with + * {@link ProducerQueueIsFullError} when there is no space left in pending queue. + * + * @param blockIfQueueFull + * whether to block {@link Producer#send} and {@link Producer#sendAsync} operations on queue full + * @return + */ + ProducerBuilder blockIfQueueFull(boolean blockIfQueueFull); + + /** + * Set the message routing mode for the partitioned producer + * + * @param mode + * @return + */ + ProducerBuilder messageRoutingMode(MessageRoutingMode messageRouteMode); + + /** + * Set the compression type for the producer. + *

+ * By default, message payloads are not compressed. Supported compression types are: + *

    + *
  • CompressionType.LZ4
  • + *
  • CompressionType.ZLIB
  • + *
+ * + * @param compressionType + * @return + */ + ProducerBuilder compressionType(CompressionType compressionType); + + /** + * Set a custom message routing policy by passing an implementation of MessageRouter + * + * + * @param messageRouter + */ + ProducerBuilder messageRouter(MessageRouter messageRouter); + + /** + * Control whether automatic batching of messages is enabled for the producer. default: false [No batching] + * + * When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the + * broker, leading to better throughput, especially when publishing small messages. If compression is enabled, + * messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or + * contents. + * + * When enabled default batch delay is set to 10 ms and default batch size is 1000 messages + * + * @see #batchingMaxPublishDelay(long, TimeUnit) + */ + ProducerBuilder enableBatching(boolean enableBatching); + + /** + * Sets a {@link CryptoKeyReader} + * + * @param cryptoKeyReader + * CryptoKeyReader object + */ + ProducerBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader); + + /** + * Add public encryption key, used by producer to encrypt the data key. + * + * At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. If keys are + * found, a callback getKey(String keyName) is invoked against each key to load the values of the key. Application + * should implement this callback to return the key in pkcs8 format. If compression is enabled, message is encrypted + * after compression. If batch messaging is enabled, the batched message is encrypted. + * + */ + ProducerBuilder addEncryptionKey(String key); + + /** + * Sets the ProducerCryptoFailureAction to the value specified + * + * @param The + * producer action + */ + ProducerBuilder cryptoFailureAction(ProducerCryptoFailureAction action); + + /** + * Set the time period within which the messages sent will be batched default: 10ms if batch messages are + * enabled. If set to a non zero value, messages will be queued until this time interval or until + * + * @see ProducerConfiguration#batchingMaxMessages threshold is reached; all messages will be published as a single + * batch message. The consumer will be delivered individual messages in the batch in the same order they were + * enqueued + * @param batchDelay + * the batch delay + * @param timeUnit + * the time unit of the {@code batchDelay} + * @return + */ + ProducerBuilder batchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit); + + /** + * Set the maximum number of messages permitted in a batch. default: 1000 If set to a value greater than 1, + * messages will be queued until this threshold is reached or batch interval has elapsed + * + * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit) All messages in batch will be published as + * a single batch message. The consumer will be delivered individual messages in the batch in the same order + * they were enqueued + * @param batchMessagesMaxMessagesPerBatch + * maximum number of messages in a batch + * @return + */ + ProducerBuilder batchingMaxMessages(int batchMessagesMaxMessagesPerBatch); + + /** + * Set the baseline for the sequence ids for messages published by the producer. + *

+ * First message will be using (initialSequenceId + 1) as its sequence id and subsequent messages will be assigned + * incremental sequence ids, if not otherwise specified. + * + * @param initialSequenceId + * @return + */ + ProducerBuilder initialSequenceId(long initialSequenceId); + + /** + * Set a name/value property with this producer. + * + * @param key + * @param value + * @return + */ + ProducerBuilder property(String key, String value); + + /** + * Add all the properties in the provided map + * + * @param properties + * @return + */ + ProducerBuilder properties(Map properties); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java index a6d2a558309a2..be72fc7e021bf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java @@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException; -import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; import com.google.common.base.Objects; @@ -36,7 +35,9 @@ /** * Producer's configuration * + * @deprecated use {@link PulsarClient#newProducer()} to construct and configure a {@link Producer} instance */ +@Deprecated public class ProducerConfiguration implements Serializable { private static final long serialVersionUID = 1L; @@ -268,7 +269,7 @@ public ProducerConfiguration setMessageRouter(MessageRouter messageRouter) { * * @return message router. * @deprecated since 1.22.0-incubating. numPartitions is already passed as parameter in - * {@link MessageRouter#choosePartition(Message, TopicMetadata)}. + * {@link MessageRouter#choosePartition(Message, TopicMetadata)}. * @see MessageRouter */ @Deprecated @@ -338,7 +339,7 @@ public ProducerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) * @return encryptionKeys * */ - public ConcurrentOpenHashSet getEncryptionKeys() { + public ConcurrentOpenHashSet getEncryptionKeys() { return this.encryptionKeys; } @@ -354,16 +355,15 @@ public boolean isEncryptionEnabled() { /** * Add public encryption key, used by producer to encrypt the data key. * - * At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. - * If keys are found, a callback getKey(String keyName) is invoked against each key to load - * the values of the key. Application should implement this callback to return the key in pkcs8 format. - * If compression is enabled, message is encrypted after compression. - * If batch messaging is enabled, the batched message is encrypted. + * At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. If keys are + * found, a callback getKey(String keyName) is invoked against each key to load the values of the key. Application + * should implement this callback to return the key in pkcs8 format. If compression is enabled, message is encrypted + * after compression. If batch messaging is enabled, the batched message is encrypted. * */ public void addEncryptionKey(String key) { if (this.encryptionKeys == null) { - this.encryptionKeys = new ConcurrentOpenHashSet(16,1); + this.encryptionKeys = new ConcurrentOpenHashSet(16, 1); } this.encryptionKeys.add(key); } @@ -377,7 +377,8 @@ public void removeEncryptionKey(String key) { /** * Sets the ProducerCryptoFailureAction to the value specified * - * @param The producer action + * @param action + * The producer action */ public void setCryptoFailureAction(ProducerCryptoFailureAction action) { cryptoFailureAction = action; @@ -467,6 +468,7 @@ public ProducerConfiguration setInitialSequenceId(long initialSequenceId) { /** * Set a name/value property with this producer. + * * @param key * @param value * @return @@ -480,6 +482,7 @@ public ProducerConfiguration setProperty(String key, String value) { /** * Add all the properties in the provided map + * * @param properties * @return */ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java index e15db40a605cd..6ba2518d088b5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java @@ -21,15 +21,28 @@ import java.io.Closeable; import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; /** - * Class that provides a client interface to Pulsar - * - * + * Class that provides a client interface to Pulsar. + *

+ * Client instances are thread-safe and can be reused for managing multiple {@link Producer}, {@link Consumer} and + * {@link Reader} instances. */ public interface PulsarClient extends Closeable { + /** + * Get a new builder instance that can used to configure and build a {@link PulsarClient} instance. + * + * @return the {@link ClientBuilder} + * + * @since 2.0.0 + */ + public static ClientBuilder builder() { + return new ClientBuilderImpl(); + } + /** * Create a new PulsarClient object using default client configuration * @@ -38,7 +51,9 @@ public interface PulsarClient extends Closeable { * @return a new pulsar client object * @throws PulsarClientException.InvalidServiceURL * if the serviceUrl is invalid + * @deprecated use {@link #builder()} to construct a client instance */ + @Deprecated public static PulsarClient create(String serviceUrl) throws PulsarClientException { return create(serviceUrl, new ClientConfiguration()); } @@ -53,11 +68,50 @@ public static PulsarClient create(String serviceUrl) throws PulsarClientExceptio * @return a new pulsar client object * @throws PulsarClientException.InvalidServiceURL * if the serviceUrl is invalid + * @deprecated use {@link #builder()} to construct a client instance */ + @Deprecated public static PulsarClient create(String serviceUrl, ClientConfiguration conf) throws PulsarClientException { return new PulsarClientImpl(serviceUrl, conf); } + /** + * Create a producer with default for publishing on a specific topic + *

+ * Example: + * + * + * Producer producer = client.newProducer().topic(myTopic).create(); + * + * + * + * @return a {@link ProducerBuilder} object to configure and construct the {@link Producer} instance + * + * @since 2.0.0 + */ + ProducerBuilder newProducer(); + + /** + * Create a producer with default for publishing on a specific topic + * + * @return a {@link ProducerBuilder} object to configure and construct the {@link Producer} instance + * + * @since 2.0.0 + */ + ConsumerBuilder newConsumer(); + + /** + * Create a topic reader for reading messages from the specified topic. + *

+ * The Reader provides a low-level abstraction that allows for manual positioning in the topic, without using a + * subscription. Reader can only work on non-partitioned topics. + * + * @return a {@link ReaderBuilder} that can be used to configure and construct a {@link Reader} instance + * + * @since 2.0.0 + */ + ReaderBuilder newReader(); + /** * Create a producer with default {@link ProducerConfiguration} for publishing on a specific topic * @@ -72,7 +126,9 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * if there was an error with the supplied credentials * @throws PulsarClientException.AuthorizationException * if the authorization to publish on topic was denied + * @deprecated use {@link #newProducer()} to build a new producer */ + @Deprecated Producer createProducer(String topic) throws PulsarClientException; /** @@ -81,7 +137,9 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * @param topic * The name of the topic where to produce * @return Future of the asynchronously created producer object + * @deprecated use {@link #newProducer()} to build a new producer */ + @Deprecated CompletableFuture createProducerAsync(String topic); /** @@ -95,7 +153,9 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * @throws PulsarClientException * if it was not possible to create the producer * @throws InterruptedException + * @deprecated use {@link #newProducer()} to build a new producer */ + @Deprecated Producer createProducer(String topic, ProducerConfiguration conf) throws PulsarClientException; /** @@ -106,7 +166,9 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * @param conf * The {@code ProducerConfiguration} object * @return Future of the asynchronously created producer object + * @deprecated use {@link #newProducer()} to build a new producer */ + @Deprecated CompletableFuture createProducerAsync(String topic, ProducerConfiguration conf); /** @@ -119,7 +181,10 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * @return The {@code Consumer} object * @throws PulsarClientException * @throws InterruptedException + * + * @deprecated Use {@link #newConsumer()} to build a new consumer */ + @Deprecated Consumer subscribe(String topic, String subscription) throws PulsarClientException; /** @@ -131,7 +196,9 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * @param subscription * The subscription name * @return Future of the {@code Consumer} object + * @deprecated Use {@link #newConsumer()} to build a new consumer */ + @Deprecated CompletableFuture subscribeAsync(String topic, String subscription); /** @@ -145,7 +212,9 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * The {@code ConsumerConfiguration} object * @return The {@code Consumer} object * @throws PulsarClientException + * @deprecated Use {@link #newConsumer()} to build a new consumer */ + @Deprecated Consumer subscribe(String topic, String subscription, ConsumerConfiguration conf) throws PulsarClientException; /** @@ -159,7 +228,9 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * @param conf * The {@code ConsumerConfiguration} object * @return Future of the {@code Consumer} object + * @deprecated Use {@link #newConsumer()} to build a new consumer */ + @Deprecated CompletableFuture subscribeAsync(String topic, String subscription, ConsumerConfiguration conf); /** @@ -185,7 +256,9 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * @param conf * The {@code ReaderConfiguration} object * @return The {@code Reader} object + * @deprecated Use {@link #newReader()} to build a new reader */ + @Deprecated Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf) throws PulsarClientException; /** @@ -212,7 +285,9 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * @param conf * The {@code ReaderConfiguration} object * @return Future of the asynchronously created producer object + * @deprecated Use {@link #newReader()} to build a new reader */ + @Deprecated CompletableFuture createReaderAsync(String topic, MessageId startMessageId, ReaderConfiguration conf); /** diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java new file mode 100644 index 0000000000000..196af20571241 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.io.Serializable; +import java.util.concurrent.CompletableFuture; + +/** + * {@link ReaderBuilder} is used to configure and create instances of {@link Reader}. + * + * @see PulsarClient#newReader() + * + * @since 2.0.0 + */ +public interface ReaderBuilder extends Serializable, Cloneable { + + /** + * Finalize the creation of the {@link Reader} instance. + * + *

+ * This method will block until the reader is created successfully. + * + * @return the reader instance + * @throws PulsarClientException + * if the reader creation fails + */ + Reader create() throws PulsarClientException; + + /** + * Finalize the creation of the {@link Reader} instance in asynchronous mode. + * + *

+ * This method will return a {@link CompletableFuture} that can be used to access the instance when it's ready. + * + * @return the reader instance + * @throws PulsarClientException + * if the reader creation fails + */ + CompletableFuture createAsync(); + + /** + * Create a copy of the current {@link ReaderBuilder}. + *

+ * Cloning the builder can be used to share an incomplete configuration and specialize it multiple times. For + * example: + * + *

+     * ReaderBuilder builder = client.newReader().readerName("my-reader").receiverQueueSize(10);
+     *
+     * Reader reader1 = builder.clone().topic(TOPIC_1).create();
+     * Reader reader2 = builder.clone().topic(TOPIC_2).create();
+     * 
+ */ + ReaderBuilder clone(); + + /** + * Specify the topic this consumer will subscribe on. + *

+ * This argument is required when constructing the consumer. + * + * @param topicName + */ + ReaderBuilder topic(String topicName); + + /** + * The initial reader positioning is done by specifying a message id. The options are: + *

    + *
  • MessageId.earliest : Start reading from the earliest message available in the topic + *
  • MessageId.latest : Start reading from the end topic, only getting messages published after the + * reader was created + *
  • MessageId : When passing a particular message id, the reader will position itself on that + * specific position. The first message to be read will be the message next to the specified messageId. + *
+ */ + ReaderBuilder startMessageId(MessageId startMessageId); + + /** + * Sets a {@link ReaderListener} for the reader + *

+ * When a {@link ReaderListener} is set, application will receive messages through it. Calls to + * {@link Reader#readNext()} will not be allowed. + * + * @param readerListener + * the listener object + */ + ReaderBuilder readerListener(ReaderListener readerListener); + + /** + * Sets a {@link CryptoKeyReader} + * + * @param cryptoKeyReader + * CryptoKeyReader object + */ + ReaderBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader); + + /** + * Sets the ConsumerCryptoFailureAction to the value specified + * + * @param action + * The action to take when the decoding fails + */ + ReaderBuilder cryptoFailureAction(ConsumerCryptoFailureAction action); + + /** + * Sets the size of the consumer receive queue. + *

+ * The consumer receive queue controls how many messages can be accumulated by the {@link Consumer} before the + * application calls {@link Consumer#receive()}. Using a higher value could potentially increase the consumer + * throughput at the expense of bigger memory utilization. + *

+ * Default value is {@code 1000} messages and should be good for most use cases. + * + * @param receiverQueueSize + * the new receiver queue size value + */ + ReaderBuilder receiverQueueSize(int receiverQueueSize); + + /** + * Set the reader name. + * + * @param readerName + */ + ReaderBuilder readerName(String readerName); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java index 999e2e60a77b3..6f3816c45597b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java @@ -23,6 +23,11 @@ import java.io.Serializable; +/** + * + * @deprecated Use {@link PulsarClient#newReader()} to construct and configure a {@link Reader} instance + */ +@Deprecated public class ReaderConfiguration implements Serializable { private int receiverQueueSize = 1000; @@ -84,8 +89,9 @@ public ReaderConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) { /** * Sets the ConsumerCryptoFailureAction to the value specified - * - * @param The consumer action + * + * @param action + * The action to take when the decoding fails */ public void setCryptoFailureAction(ConsumerCryptoFailureAction action) { cryptoFailureAction = action; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java new file mode 100644 index 0000000000000..2be5318e08a4b --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; + +@SuppressWarnings("deprecation") +public class ClientBuilderImpl implements ClientBuilder { + + private static final long serialVersionUID = 1L; + + String serviceUrl; + final ClientConfiguration conf = new ClientConfiguration(); + + @Override + public PulsarClient build() throws PulsarClientException { + if (serviceUrl == null) { + throw new IllegalArgumentException("service URL needs to be specified on the ClientBuilder object"); + } + + return new PulsarClientImpl(serviceUrl, conf); + } + + @Override + public ClientBuilder clone() { + try { + return (ClientBuilder) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Failed to clone ClientBuilderImpl"); + } + } + + @Override + public ClientBuilder serviceUrl(String serviceUrl) { + this.serviceUrl = serviceUrl; + return this; + } + + @Override + public ClientBuilder authentication(Authentication authentication) { + conf.setAuthentication(authentication); + return this; + } + + @Override + public ClientBuilder authentication(String authPluginClassName, String authParamsString) + throws UnsupportedAuthenticationException { + conf.setAuthentication(authPluginClassName, authParamsString); + return this; + } + + @Override + public ClientBuilder authentication(String authPluginClassName, Map authParams) + throws UnsupportedAuthenticationException { + conf.setAuthentication(authPluginClassName, authParams); + return this; + } + + @Override + public ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit) { + conf.setOperationTimeout(operationTimeout, unit); + return this; + } + + @Override + public ClientBuilder ioThreads(int numIoThreads) { + conf.setIoThreads(numIoThreads); + return this; + } + + @Override + public ClientBuilder listenerThreads(int numListenerThreads) { + conf.setListenerThreads(numListenerThreads); + return this; + } + + @Override + public ClientBuilder connectionsPerBroker(int connectionsPerBroker) { + conf.setConnectionsPerBroker(connectionsPerBroker); + return this; + } + + @Override + public ClientBuilder enableTcpNoDelay(boolean useTcpNoDelay) { + conf.setUseTcpNoDelay(useTcpNoDelay); + return this; + } + + @Override + public ClientBuilder enableTls(boolean useTls) { + conf.setUseTls(useTls); + return this; + } + + @Override + public ClientBuilder enableTlsHostnameVerification(boolean enableTlsHostnameVerification) { + conf.setTlsHostnameVerificationEnable(enableTlsHostnameVerification); + return this; + } + + @Override + public ClientBuilder tlsTrustCertsFilePath(String tlsTrustCertsFilePath) { + conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath); + return this; + } + + @Override + public ClientBuilder allowTlsInsecureConnection(boolean tlsAllowInsecureConnection) { + conf.setTlsAllowInsecureConnection(tlsAllowInsecureConnection); + return this; + } + + @Override + public ClientBuilder statsInterval(long statsInterval, TimeUnit unit) { + conf.setStatsInterval(statsInterval, unit); + return this; + } + + @Override + public ClientBuilder maxConcurrentLookupRequests(int concurrentLookupRequests) { + conf.setConcurrentLookupRequest(concurrentLookupRequests); + return this; + } + + @Override + public ClientBuilder maxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection) { + conf.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection); + return this; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java new file mode 100644 index 0000000000000..ab91326154029 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.MessageListener; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.util.FutureUtil; + + +@SuppressWarnings("deprecation") +public class ConsumerBuilderImpl implements ConsumerBuilder { + + private static final long serialVersionUID = 1L; + + private final PulsarClientImpl client; + private String topicName; + private String subscriptionName; + private final ConsumerConfiguration conf; + + ConsumerBuilderImpl(PulsarClientImpl client) { + this.client = client; + this.conf = new ConsumerConfiguration(); + } + + @Override + public ConsumerBuilder clone() { + try { + return (ConsumerBuilder) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Failed to clone ConsumerBuilderImpl"); + } + } + + @Override + public Consumer subscribe() throws PulsarClientException { + try { + return subscribeAsync().get(); + } catch (ExecutionException e) { + Throwable t = e.getCause(); + if (t instanceof PulsarClientException) { + throw (PulsarClientException) t; + } else { + throw new PulsarClientException(t); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException(e); + } + } + + @Override + public CompletableFuture subscribeAsync() { + if (topicName == null) { + return FutureUtil + .failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder")); + } + + if (subscriptionName == null) { + return FutureUtil.failedFuture( + new IllegalArgumentException("Subscription name must be set on the producer builder")); + } + + return client.subscribeAsync(topicName, subscriptionName, conf); + } + + @Override + public ConsumerBuilder topic(String topicName) { + this.topicName = topicName; + return this; + } + + @Override + public ConsumerBuilder subscriptionName(String subscriptionName) { + this.subscriptionName = subscriptionName; + return this; + } + + @Override + public ConsumerBuilder ackTimeout(long ackTimeout, TimeUnit timeUnit) { + conf.setAckTimeout(ackTimeout, timeUnit); + return this; + } + + @Override + public ConsumerBuilder subscriptionType(SubscriptionType subscriptionType) { + conf.setSubscriptionType(subscriptionType); + return this; + } + + @Override + public ConsumerBuilder messageListener(MessageListener messageListener) { + conf.setMessageListener(messageListener); + return this; + } + + @Override + public ConsumerBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader) { + conf.setCryptoKeyReader(cryptoKeyReader); + return this; + } + + @Override + public ConsumerBuilder cryptoFailureAction(ConsumerCryptoFailureAction action) { + conf.setCryptoFailureAction(action); + return this; + } + + @Override + public ConsumerBuilder receiverQueueSize(int receiverQueueSize) { + conf.setReceiverQueueSize(receiverQueueSize); + return this; + } + + @Override + public ConsumerBuilder consumerName(String consumerName) { + conf.setConsumerName(consumerName); + return this; + } + + @Override + public ConsumerBuilder priorityLevel(int priorityLevel) { + conf.setPriorityLevel(priorityLevel); + return this; + } + + @Override + public ConsumerBuilder property(String key, String value) { + conf.setProperty(key, value); + return this; + } + + @Override + public ConsumerBuilder properties(Map properties) { + conf.setProperties(properties); + return this; + } + + @Override + public ConsumerBuilder maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions) { + conf.setMaxTotalReceiverQueueSizeAcrossPartitions(maxTotalReceiverQueueSizeAcrossPartitions); + return this; + } + + @Override + public ConsumerBuilder readCompacted(boolean readCompacted) { + conf.setReadCompacted(readCompacted); + return this; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java new file mode 100644 index 0000000000000..6bb9a9b595c87 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.MessageRouter; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.ProducerCryptoFailureAction; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.util.FutureUtil; + +@SuppressWarnings("deprecation") +public class ProducerBuilderImpl implements ProducerBuilder { + + private static final long serialVersionUID = 1L; + + private final PulsarClientImpl client; + private String topicName; + private final ProducerConfiguration conf; + + ProducerBuilderImpl(PulsarClientImpl client) { + this.client = client; + this.conf = new ProducerConfiguration(); + } + + @Override + public ProducerBuilder clone() { + try { + return (ProducerBuilder) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Failed to clone ProducerBuilderImpl"); + } + } + + @Override + public Producer create() throws PulsarClientException { + try { + return createAsync().get(); + } catch (ExecutionException e) { + Throwable t = e.getCause(); + if (t instanceof PulsarClientException) { + throw (PulsarClientException) t; + } else { + throw new PulsarClientException(t); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException(e); + } + } + + @Override + public CompletableFuture createAsync() { + if (topicName == null) { + return FutureUtil + .failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder")); + } + + return client.createProducerAsync(topicName, conf); + } + + @Override + public ProducerBuilder topic(String topicName) { + this.topicName = topicName; + return this; + } + + @Override + public ProducerBuilder producerName(String producerName) { + conf.setProducerName(producerName); + return this; + } + + @Override + public ProducerBuilder sendTimeout(int sendTimeout, TimeUnit unit) { + conf.setSendTimeout(sendTimeout, unit); + return this; + } + + @Override + public ProducerBuilder maxPendingMessages(int maxPendingMessages) { + conf.setMaxPendingMessages(maxPendingMessages); + return this; + } + + @Override + public ProducerBuilder maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) { + conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions); + return this; + } + + @Override + public ProducerBuilder blockIfQueueFull(boolean blockIfQueueFull) { + conf.setBlockIfQueueFull(blockIfQueueFull); + return this; + } + + @Override + public ProducerBuilder messageRoutingMode(MessageRoutingMode messageRouteMode) { + conf.setMessageRoutingMode(ProducerConfiguration.MessageRoutingMode.valueOf(messageRouteMode.toString())); + return this; + } + + @Override + public ProducerBuilder compressionType(CompressionType compressionType) { + conf.setCompressionType(compressionType); + return this; + } + + @Override + public ProducerBuilder messageRouter(MessageRouter messageRouter) { + conf.setMessageRouter(messageRouter); + return this; + } + + @Override + public ProducerBuilder enableBatching(boolean batchMessagesEnabled) { + conf.setBatchingEnabled(batchMessagesEnabled); + return this; + } + + @Override + public ProducerBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader) { + conf.setCryptoKeyReader(cryptoKeyReader); + return this; + } + + @Override + public ProducerBuilder addEncryptionKey(String key) { + conf.addEncryptionKey(key); + return this; + } + + @Override + public ProducerBuilder cryptoFailureAction(ProducerCryptoFailureAction action) { + conf.setCryptoFailureAction(action); + return this; + } + + @Override + public ProducerBuilder batchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit) { + conf.setBatchingMaxPublishDelay(batchDelay, timeUnit); + return this; + } + + @Override + public ProducerBuilder batchingMaxMessages(int batchMessagesMaxMessagesPerBatch) { + conf.setBatchingMaxMessages(batchMessagesMaxMessagesPerBatch); + return this; + } + + @Override + public ProducerBuilder initialSequenceId(long initialSequenceId) { + conf.setInitialSequenceId(initialSequenceId); + return this; + } + + @Override + public ProducerBuilder property(String key, String value) { + conf.setProperty(key, value); + return this; + } + + @Override + public ProducerBuilder properties(Map properties) { + conf.setProperties(properties); + return this; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 029f712f3f365..6e1c5afde64ca 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -32,13 +32,16 @@ import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerConfiguration; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.ProducerConfiguration; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.ReaderConfiguration; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.util.ExecutorProvider; @@ -58,6 +61,7 @@ import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; +@SuppressWarnings("deprecation") public class PulsarClientImpl implements PulsarClient { private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class); @@ -92,8 +96,7 @@ public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, EventLoopGr } public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup, - ConnectionPool cnxPool) - throws PulsarClientException { + ConnectionPool cnxPool) throws PulsarClientException { if (isBlank(serviceUrl) || conf == null || eventLoopGroup == null) { throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration"); } @@ -117,6 +120,21 @@ public ClientConfiguration getConfiguration() { return conf; } + @Override + public ProducerBuilder newProducer() { + return new ProducerBuilderImpl(this); + } + + @Override + public ConsumerBuilder newConsumer() { + return new ConsumerBuilderImpl(this); + } + + @Override + public ReaderBuilder newReader() { + return new ReaderBuilderImpl(this); + } + @Override public Producer createProducer(String destination) throws PulsarClientException { try { @@ -157,6 +175,7 @@ public CompletableFuture createProducerAsync(String topic) { return createProducerAsync(topic, new ProducerConfiguration()); } + @Override public CompletableFuture createProducerAsync(final String topic, final ProducerConfiguration conf) { if (state.get() != State.Open) { return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed")); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java new file mode 100644 index 0000000000000..f3751340f6b2c --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.api.ReaderConfiguration; +import org.apache.pulsar.client.api.ReaderListener; +import org.apache.pulsar.common.util.FutureUtil; + +@SuppressWarnings("deprecation") +public class ReaderBuilderImpl implements ReaderBuilder { + + private static final long serialVersionUID = 1L; + + private final PulsarClientImpl client; + + private final ReaderConfiguration conf; + private String topicName; + private MessageId startMessageId; + + ReaderBuilderImpl(PulsarClientImpl client) { + this.client = client; + this.conf = new ReaderConfiguration(); + } + + @Override + public ReaderBuilder clone() { + try { + return (ReaderBuilder) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Failed to clone ReaderBuilderImpl"); + } + } + + @Override + public Reader create() throws PulsarClientException { + try { + return createAsync().get(); + } catch (ExecutionException e) { + Throwable t = e.getCause(); + if (t instanceof PulsarClientException) { + throw (PulsarClientException) t; + } else { + throw new PulsarClientException(t); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException(e); + } + } + + @Override + public CompletableFuture createAsync() { + if (topicName == null) { + return FutureUtil + .failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder")); + } + + if (startMessageId == null) { + return FutureUtil + .failedFuture(new IllegalArgumentException("Start message id must be set on the reader builder")); + } + + return client.createReaderAsync(topicName, startMessageId, conf); + } + + @Override + public ReaderBuilder topic(String topicName) { + this.topicName = topicName; + return this; + } + + @Override + public ReaderBuilder startMessageId(MessageId startMessageId) { + this.startMessageId = startMessageId; + return this; + } + + @Override + public ReaderBuilder readerListener(ReaderListener readerListener) { + conf.setReaderListener(readerListener); + return this; + } + + @Override + public ReaderBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader) { + conf.setCryptoKeyReader(cryptoKeyReader); + return this; + } + + @Override + public ReaderBuilder cryptoFailureAction(ConsumerCryptoFailureAction action) { + conf.setCryptoFailureAction(action); + return this; + } + + @Override + public ReaderBuilder receiverQueueSize(int receiverQueueSize) { + conf.setReceiverQueueSize(receiverQueueSize); + return this; + } + + @Override + public ReaderBuilder readerName(String readerName) { + conf.setReaderName(readerName); + return this; + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageIdTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageIdTest.java index 5068a2c06fe13..f96d2885cac85 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageIdTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessageIdTest.java @@ -18,31 +18,25 @@ */ package org.apache.pulsar.client.api; -import org.testng.annotations.Test; - -import com.google.common.base.Objects; - import static org.testng.Assert.assertEquals; -import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.BatchMessageIdImpl; -import org.apache.pulsar.client.impl.ConsumerId; import org.apache.pulsar.client.impl.MessageIdImpl; -import org.testng.Assert; +import org.testng.annotations.Test; public class MessageIdTest { - + @Test public void messageIdTest() { MessageId mId = new MessageIdImpl(1, 2, 3); assertEquals(mId.toString(), "1:2:3"); - + mId = new BatchMessageIdImpl(0, 2, 3, 4); assertEquals(mId.toString(), "0:2:3:4"); - + mId = new BatchMessageIdImpl(-1, 2, -3, 4); assertEquals(mId.toString(), "-1:2:-3:4"); - + mId = new MessageIdImpl(0, -23, 3); assertEquals(mId.toString(), "0:-23:3"); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java new file mode 100644 index 0000000000000..f4b89d5d4e70d --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import org.apache.pulsar.client.api.PulsarClient; +import org.testng.annotations.Test; + +@SuppressWarnings("deprecation") +public class BuildersTest { + + @Test + public void clientBuilderTest() { + ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().enableTls(true).ioThreads(10) + .maxNumberOfRejectedRequestPerConnection(200).serviceUrl("pulsar://service:6650"); + + assertEquals(clientBuilder.conf.isUseTls(), true); + assertEquals(clientBuilder.serviceUrl, "pulsar://service:6650"); + + ClientBuilderImpl b2 = (ClientBuilderImpl) clientBuilder.clone(); + assertTrue(b2 != clientBuilder); + + b2.serviceUrl("pulsar://other-broker:6650"); + + assertEquals(clientBuilder.serviceUrl, "pulsar://service:6650"); + assertEquals(b2.serviceUrl, "pulsar://other-broker:6650"); + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java index 43b7b2699da5a..97f25b1491896 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java @@ -26,14 +26,14 @@ public class SampleProducer { public static void main(String[] args) throws PulsarClientException, InterruptedException, IOException { - PulsarClient pulsarClient = PulsarClient.create("http://127.0.0.1:8080"); + PulsarClient client = PulsarClient.builder().serviceUrl("http://localhost:6650").build(); - Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic"); + Producer producer = client.newProducer().topic("persistent://my-property/use/my-ns/my-topic").create(); for (int i = 0; i < 10; i++) { producer.send("my-message".getBytes()); } - pulsarClient.close(); + client.close(); } }