Skip to content

Commit

Permalink
PIP-12 Introduce builder for creating Producer Consumer Reader (#1089)
Browse files Browse the repository at this point in the history
* PIP-12: Introduce builders for creating Producer Consumer Reader

* Fixed Javadocs

* Addressed comments
  • Loading branch information
merlimat authored Feb 17, 2018
1 parent d5a4bb0 commit cb5eb5b
Show file tree
Hide file tree
Showing 23 changed files with 1,815 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.AbstractReplicator.State;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
Expand All @@ -43,7 +43,7 @@ public abstract class AbstractReplicator {
protected volatile ProducerImpl producer;

protected final int producerQueueSize;
protected final ProducerConfiguration producerConfiguration;
protected final ProducerBuilder producerBuilder;

protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0 ,TimeUnit.MILLISECONDS);

Expand All @@ -68,10 +68,11 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca
this.producer = null;
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();

this.producerConfiguration = new ProducerConfiguration();
this.producerConfiguration.setSendTimeout(0, TimeUnit.SECONDS);
this.producerConfiguration.setMaxPendingMessages(producerQueueSize);
this.producerConfiguration.setProducerName(getReplicatorName(replicatorPrefix, localCluster));
this.producerBuilder = client.newProducer() //
.topic(topicName)
.sendTimeout(0, TimeUnit.SECONDS) //
.maxPendingMessages(producerQueueSize) //
.producerName(getReplicatorName(replicatorPrefix, localCluster));
STATE_UPDATER.set(this, State.Stopped);
}

Expand All @@ -83,10 +84,6 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca

protected abstract void disableReplicatorRead();

public ProducerConfiguration getProducerConfiguration() {
return producerConfiguration;
}

public String getRemoteCluster() {
return remoteCluster;
}
Expand Down Expand Up @@ -121,7 +118,7 @@ public synchronized void startProducer() {
}

log.info("[{}][{} -> {}] Starting replicator", topicName, localCluster, remoteCluster);
client.createProducerAsync(topicName, producerConfiguration).thenAccept(producer -> {
producerBuilder.createAsync().thenAccept(producer -> {
readEntries(producer);
}).exceptionally(ex -> {
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, St
BrokerService brokerService) {
super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService);

producerConfiguration.setBlockIfQueueFull(false);
producerBuilder.blockIfQueueFull(false);

startProducer();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1149,17 +1149,17 @@ public void testClosingReplicationProducerTwice() throws Exception {
brokerService.getReplicationClients().put(remoteCluster, client);
PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService);

doReturn(new CompletableFuture<Producer>()).when(clientImpl).createProducerAsync(globalTopicName, replicator.getProducerConfiguration());
doReturn(new CompletableFuture<Producer>()).when(clientImpl).createProducerAsync(matches(globalTopicName), any());

replicator.startProducer();
verify(clientImpl).createProducerAsync(globalTopicName, replicator.getProducerConfiguration());
verify(clientImpl).createProducerAsync(matches(globalTopicName), any());

replicator.disconnect(false);
replicator.disconnect(false);

replicator.startProducer();

verify(clientImpl, Mockito.times(2)).createProducerAsync(globalTopicName, replicator.getProducerConfiguration());
verify(clientImpl, Mockito.times(2)).createProducerAsync(matches(globalTopicName), any());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;

/**
* Builder interface that is used to construct a {@link PulsarClient} instance.
*
* @since 2.0.0
*/
public interface ClientBuilder extends Serializable, Cloneable {

/**
* @return the new {@link PulsarClient} instance
*/
PulsarClient build() throws PulsarClientException;

/**
* Create a copy of the current client builder.
* <p>
* Cloning the builder can be used to share an incomplete configuration and specialize it multiple times. For
* example:
*
* <pre>
* ClientBuilder builder = PulsarClient.builder().ioThreads(8).listenerThreads(4);
*
* PulsarClient client1 = builder.clone().serviceUrl(URL_1).build();
* PulsarClient client2 = builder.clone().serviceUrl(URL_2).build();
* </pre>
*/
ClientBuilder clone();

/**
* Configure the service URL for the Pulsar service.
* <p>
* This parameter is required
*
* @param serviceUrl
* @return
*/
ClientBuilder serviceUrl(String serviceUrl);

/**
* Set the authentication provider to use in the Pulsar client instance.
* <p>
* Example:
* <p>
*
* <pre>
* <code>
* String AUTH_CLASS = "org.apache.pulsar.client.impl.auth.AuthenticationTls";
*
* Map<String, String> conf = new TreeMap<>();
* conf.put("tlsCertFile", "/my/cert/file");
* conf.put("tlsKeyFile", "/my/key/file");
*
* Authentication auth = AuthenticationFactor.create(AUTH_CLASS, conf);
*
* PulsarClient client = PulsarClient.builder()
* .serviceUrl(SERVICE_URL)
* .authentication(auth)
* .build();
* ....
* </code>
* </pre>
*
* @param authentication
* an instance of the {@link Authentication} provider already constructed
*/
ClientBuilder authentication(Authentication authentication);

/**
* Set the authentication provider to use in the Pulsar client instance.
* <p>
* Example:
* <p>
*
* <pre>
* <code>
* String AUTH_CLASS = "org.apache.pulsar.client.impl.auth.AuthenticationTls";
* String AUTH_PARAMS = "tlsCertFile:/my/cert/file,tlsKeyFile:/my/key/file";
*
* PulsarClient client = PulsarClient.builder()
* .serviceUrl(SERVICE_URL)
* .authentication(AUTH_CLASS, AUTH_PARAMS)
* .build();
* ....
* </code>
* </pre>
*
* @param authPluginClassName
* name of the Authentication-Plugin you want to use
* @param authParamsString
* string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2"
* @throws UnsupportedAuthenticationException
* failed to instantiate specified Authentication-Plugin
*/
ClientBuilder authentication(String authPluginClassName, String authParamsString)
throws UnsupportedAuthenticationException;

/**
* Set the authentication provider to use in the Pulsar client instance.
* <p>
* Example:
* <p>
*
* <pre>
* <code>
* String AUTH_CLASS = "org.apache.pulsar.client.impl.auth.AuthenticationTls";
*
* Map<String, String> conf = new TreeMap<>();
* conf.put("tlsCertFile", "/my/cert/file");
* conf.put("tlsKeyFile", "/my/key/file");
*
* PulsarClient client = PulsarClient.builder()
* .serviceUrl(SERVICE_URL)
* .authentication(AUTH_CLASS, conf)
* .build();
* ....
* </code>
*
* @param authPluginClassName
* name of the Authentication-Plugin you want to use
* @param authParams
* map which represents parameters for the Authentication-Plugin
* @throws UnsupportedAuthenticationException
* failed to instantiate specified Authentication-Plugin
*/
ClientBuilder authentication(String authPluginClassName, Map<String, String> authParams)
throws UnsupportedAuthenticationException;

/**
* Set the operation timeout <i>(default: 30 seconds)</i>
* <p>
* Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the
* operation will be maked as failed
*
* @param operationTimeout
* operation timeout
* @param unit
* time unit for {@code operationTimeout}
*/
ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit);

/**
* Set the number of threads to be used for handling connections to brokers <i>(default: 1 thread)</i>
*
* @param numIoThreads
*/
ClientBuilder ioThreads(int numIoThreads);

/**
* Set the number of threads to be used for message listeners <i>(default: 1 thread)</i>
*
* @param numListenerThreads
*/
ClientBuilder listenerThreads(int numListenerThreads);

/**
* Sets the max number of connection that the client library will open to a single broker.
* <p>
* By default, the connection pool will use a single connection for all the producers and consumers. Increasing this
* parameter may improve throughput when using many producers over a high latency connection.
* <p>
*
* @param connectionsPerBroker
* max number of connections per broker (needs to be greater than 0)
*/
ClientBuilder connectionsPerBroker(int connectionsPerBroker);

/**
* Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm.
* <p>
* No-delay features make sure packets are sent out on the network as soon as possible, and it's critical to achieve
* low latency publishes. On the other hand, sending out a huge number of small packets might limit the overall
* throughput, so if latency is not a concern, it's advisable to set the <code>useTcpNoDelay</code> flag to false.
* <p>
* Default value is true
*
* @param enableTcpNoDelay
*/
ClientBuilder enableTcpNoDelay(boolean enableTcpNoDelay);

/**
* Configure whether to use TLS encryption on the connection <i>(default: false)</i>
*
* @param enableTls
*/
ClientBuilder enableTls(boolean enableTls);

/**
* Set the path to the trusted TLS certificate file
*
* @param tlsTrustCertsFilePath
*/
ClientBuilder tlsTrustCertsFilePath(String tlsTrustCertsFilePath);

/**
* Configure whether the Pulsar client accept untrusted TLS certificate from broker <i>(default: false)</i>
*
* @param allowTlsInsecureConnection
*/
ClientBuilder allowTlsInsecureConnection(boolean allowTlsInsecureConnection);

/**
* It allows to validate hostname verification when client connects to broker over tls. It validates incoming x509
* certificate and matches provided hostname(CN/SAN) with expected broker's host name. It follows RFC 2818, 3.1.
* Server Identity hostname verification.
*
* @see <a href="https://tools.ietf.org/html/rfc2818">rfc2818</a>
*
* @param enableTlsHostnameVerification
*/
ClientBuilder enableTlsHostnameVerification(boolean enableTlsHostnameVerification);

/**
* Set the interval between each stat info <i>(default: 60 seconds)</i> Stats will be activated with positive
* statsIntervalSeconds It should be set to at least 1 second
*
* @param statsIntervalSeconds
* the interval between each stat info
* @param unit
* time unit for {@code statsInterval}
*/
ClientBuilder statsInterval(long statsInterval, TimeUnit unit);

/**
* Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker.
* <i>(default: 5000)</i> It should be configured with higher value only in case of it requires to produce/subscribe
* on thousands of topic using created {@link PulsarClient}
*
* @param maxConcurrentLookupRequests
*/
ClientBuilder maxConcurrentLookupRequests(int maxConcurrentLookupRequests);

/**
* Set max number of broker-rejected requests in a certain time-frame (30 seconds) after which current connection
* will be closed and client creates a new connection that give chance to connect a different broker <i>(default:
* 50)</i>
*
* @param maxNumberOfRejectedRequestPerConnection
*/
ClientBuilder maxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,13 @@
/**
* Class used to specify client side configuration like authentication, etc..
*
*
* @deprecated Use {@link PulsarClient#builder()} to construct and configure a new {@link PulsarClient} instance
*/
@Deprecated
public class ClientConfiguration implements Serializable {

/**
*
*/
private static final long serialVersionUID = 1L;

private Authentication authentication = new AuthenticationDisabled();
private long operationTimeoutMs = 30000;
private long statsIntervalSeconds = 60;
Expand Down Expand Up @@ -221,8 +220,7 @@ public int getConnectionsPerBroker() {
* max number of connections per broker (needs to be greater than 0)
*/
public void setConnectionsPerBroker(int connectionsPerBroker) {
checkArgument(connectionsPerBroker > 0,
"Connections per broker need to be greater than 0");
checkArgument(connectionsPerBroker > 0, "Connections per broker need to be greater than 0");
this.connectionsPerBroker = connectionsPerBroker;
}

Expand Down
Loading

0 comments on commit cb5eb5b

Please sign in to comment.