Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-12 Introduce builder for creating Producer Consumer Reader #1089

Merged
merged 3 commits into from
Feb 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.AbstractReplicator.State;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
Expand All @@ -43,7 +43,7 @@ public abstract class AbstractReplicator {
protected volatile ProducerImpl producer;

protected final int producerQueueSize;
protected final ProducerConfiguration producerConfiguration;
protected final ProducerBuilder producerBuilder;

protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0 ,TimeUnit.MILLISECONDS);

Expand All @@ -68,10 +68,11 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca
this.producer = null;
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();

this.producerConfiguration = new ProducerConfiguration();
this.producerConfiguration.setSendTimeout(0, TimeUnit.SECONDS);
this.producerConfiguration.setMaxPendingMessages(producerQueueSize);
this.producerConfiguration.setProducerName(getReplicatorName(replicatorPrefix, localCluster));
this.producerBuilder = client.newProducer() //
.topic(topicName)
.sendTimeout(0, TimeUnit.SECONDS) //
.maxPendingMessages(producerQueueSize) //
.producerName(getReplicatorName(replicatorPrefix, localCluster));
STATE_UPDATER.set(this, State.Stopped);
}

Expand All @@ -83,10 +84,6 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca

protected abstract void disableReplicatorRead();

public ProducerConfiguration getProducerConfiguration() {
return producerConfiguration;
}

public String getRemoteCluster() {
return remoteCluster;
}
Expand Down Expand Up @@ -121,7 +118,7 @@ public synchronized void startProducer() {
}

log.info("[{}][{} -> {}] Starting replicator", topicName, localCluster, remoteCluster);
client.createProducerAsync(topicName, producerConfiguration).thenAccept(producer -> {
producerBuilder.createAsync().thenAccept(producer -> {
readEntries(producer);
}).exceptionally(ex -> {
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, St
BrokerService brokerService) {
super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService);

producerConfiguration.setBlockIfQueueFull(false);
producerBuilder.blockIfQueueFull(false);

startProducer();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1149,17 +1149,17 @@ public void testClosingReplicationProducerTwice() throws Exception {
brokerService.getReplicationClients().put(remoteCluster, client);
PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService);

doReturn(new CompletableFuture<Producer>()).when(clientImpl).createProducerAsync(globalTopicName, replicator.getProducerConfiguration());
doReturn(new CompletableFuture<Producer>()).when(clientImpl).createProducerAsync(matches(globalTopicName), any());

replicator.startProducer();
verify(clientImpl).createProducerAsync(globalTopicName, replicator.getProducerConfiguration());
verify(clientImpl).createProducerAsync(matches(globalTopicName), any());

replicator.disconnect(false);
replicator.disconnect(false);

replicator.startProducer();

verify(clientImpl, Mockito.times(2)).createProducerAsync(globalTopicName, replicator.getProducerConfiguration());
verify(clientImpl, Mockito.times(2)).createProducerAsync(matches(globalTopicName), any());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;

/**
* Builder interface that is used to construct a {@link PulsarClient} instance.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it might be good to add "@SInCE 2.0". so we can track when we introduce the build methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will add @since

*
* @since 2.0.0
*/
public interface ClientBuilder extends Serializable, Cloneable {

/**
* @return the new {@link PulsarClient} instance
*/
PulsarClient build() throws PulsarClientException;

/**
* Create a copy of the current client builder.
* <p>
* Cloning the builder can be used to share an incomplete configuration and specialize it multiple times. For
* example:
*
* <pre>
* ClientBuilder builder = PulsarClient.builder().ioThreads(8).listenerThreads(4);
*
* PulsarClient client1 = builder.clone().serviceUrl(URL_1).build();
* PulsarClient client2 = builder.clone().serviceUrl(URL_2).build();
* </pre>
*/
ClientBuilder clone();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: @Override?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clone() is a bit weird in the way it's treated. It cannot be marked as @Override


/**
* Configure the service URL for the Pulsar service.
* <p>
* This parameter is required
*
* @param serviceUrl
* @return
*/
ClientBuilder serviceUrl(String serviceUrl);

/**
* Set the authentication provider to use in the Pulsar client instance.
* <p>
* Example:
* <p>
*
* <pre>
* <code>
* String AUTH_CLASS = "org.apache.pulsar.client.impl.auth.AuthenticationTls";
*
* Map<String, String> conf = new TreeMap<>();
* conf.put("tlsCertFile", "/my/cert/file");
* conf.put("tlsKeyFile", "/my/key/file");
*
* Authentication auth = AuthenticationFactor.create(AUTH_CLASS, conf);
*
* PulsarClient client = PulsarClient.builder()
* .serviceUrl(SERVICE_URL)
* .authentication(auth)
* .build();
* ....
* </code>
* </pre>
*
* @param authentication
* an instance of the {@link Authentication} provider already constructed
*/
ClientBuilder authentication(Authentication authentication);

/**
* Set the authentication provider to use in the Pulsar client instance.
* <p>
* Example:
* <p>
*
* <pre>
* <code>
* String AUTH_CLASS = "org.apache.pulsar.client.impl.auth.AuthenticationTls";
* String AUTH_PARAMS = "tlsCertFile:/my/cert/file,tlsKeyFile:/my/key/file";
*
* PulsarClient client = PulsarClient.builder()
* .serviceUrl(SERVICE_URL)
* .authentication(AUTH_CLASS, AUTH_PARAMS)
* .build();
* ....
* </code>
* </pre>
*
* @param authPluginClassName
* name of the Authentication-Plugin you want to use
* @param authParamsString
* string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2"
* @throws UnsupportedAuthenticationException
* failed to instantiate specified Authentication-Plugin
*/
ClientBuilder authentication(String authPluginClassName, String authParamsString)
throws UnsupportedAuthenticationException;

/**
* Set the authentication provider to use in the Pulsar client instance.
* <p>
* Example:
* <p>
*
* <pre>
* <code>
* String AUTH_CLASS = "org.apache.pulsar.client.impl.auth.AuthenticationTls";
*
* Map<String, String> conf = new TreeMap<>();
* conf.put("tlsCertFile", "/my/cert/file");
* conf.put("tlsKeyFile", "/my/key/file");
*
* PulsarClient client = PulsarClient.builder()
* .serviceUrl(SERVICE_URL)
* .authentication(AUTH_CLASS, conf)
* .build();
* ....
* </code>
*
* @param authPluginClassName
* name of the Authentication-Plugin you want to use
* @param authParams
* map which represents parameters for the Authentication-Plugin
* @throws UnsupportedAuthenticationException
* failed to instantiate specified Authentication-Plugin
*/
ClientBuilder authentication(String authPluginClassName, Map<String, String> authParams)
throws UnsupportedAuthenticationException;

/**
* Set the operation timeout <i>(default: 30 seconds)</i>
* <p>
* Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the
* operation will be maked as failed
*
* @param operationTimeout
* operation timeout
* @param unit
* time unit for {@code operationTimeout}
*/
ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit);

/**
* Set the number of threads to be used for handling connections to brokers <i>(default: 1 thread)</i>
*
* @param numIoThreads
*/
ClientBuilder ioThreads(int numIoThreads);

/**
* Set the number of threads to be used for message listeners <i>(default: 1 thread)</i>
*
* @param numListenerThreads
*/
ClientBuilder listenerThreads(int numListenerThreads);

/**
* Sets the max number of connection that the client library will open to a single broker.
* <p>
* By default, the connection pool will use a single connection for all the producers and consumers. Increasing this
* parameter may improve throughput when using many producers over a high latency connection.
* <p>
*
* @param connectionsPerBroker
* max number of connections per broker (needs to be greater than 0)
*/
ClientBuilder connectionsPerBroker(int connectionsPerBroker);

/**
* Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm.
* <p>
* No-delay features make sure packets are sent out on the network as soon as possible, and it's critical to achieve
* low latency publishes. On the other hand, sending out a huge number of small packets might limit the overall
* throughput, so if latency is not a concern, it's advisable to set the <code>useTcpNoDelay</code> flag to false.
* <p>
* Default value is true
*
* @param enableTcpNoDelay
*/
ClientBuilder enableTcpNoDelay(boolean enableTcpNoDelay);

/**
* Configure whether to use TLS encryption on the connection <i>(default: false)</i>
*
* @param enableTls
*/
ClientBuilder enableTls(boolean enableTls);

/**
* Set the path to the trusted TLS certificate file
*
* @param tlsTrustCertsFilePath
*/
ClientBuilder tlsTrustCertsFilePath(String tlsTrustCertsFilePath);

/**
* Configure whether the Pulsar client accept untrusted TLS certificate from broker <i>(default: false)</i>
*
* @param allowTlsInsecureConnection
*/
ClientBuilder allowTlsInsecureConnection(boolean allowTlsInsecureConnection);

/**
* It allows to validate hostname verification when client connects to broker over tls. It validates incoming x509
* certificate and matches provided hostname(CN/SAN) with expected broker's host name. It follows RFC 2818, 3.1.
* Server Identity hostname verification.
*
* @see <a href="https://tools.ietf.org/html/rfc2818">rfc2818</a>
*
* @param enableTlsHostnameVerification
*/
ClientBuilder enableTlsHostnameVerification(boolean enableTlsHostnameVerification);

/**
* Set the interval between each stat info <i>(default: 60 seconds)</i> Stats will be activated with positive
* statsIntervalSeconds It should be set to at least 1 second
*
* @param statsIntervalSeconds
* the interval between each stat info
* @param unit
* time unit for {@code statsInterval}
*/
ClientBuilder statsInterval(long statsInterval, TimeUnit unit);

/**
* Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker.
* <i>(default: 5000)</i> It should be configured with higher value only in case of it requires to produce/subscribe
* on thousands of topic using created {@link PulsarClient}
*
* @param maxConcurrentLookupRequests
*/
ClientBuilder maxConcurrentLookupRequests(int maxConcurrentLookupRequests);

/**
* Set max number of broker-rejected requests in a certain time-frame (30 seconds) after which current connection
* will be closed and client creates a new connection that give chance to connect a different broker <i>(default:
* 50)</i>
*
* @param maxNumberOfRejectedRequestPerConnection
*/
ClientBuilder maxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,13 @@
/**
* Class used to specify client side configuration like authentication, etc..
*
*
* @deprecated Use {@link PulsarClient#builder()} to construct and configure a new {@link PulsarClient} instance
*/
@Deprecated
public class ClientConfiguration implements Serializable {

/**
*
*/
private static final long serialVersionUID = 1L;

private Authentication authentication = new AuthenticationDisabled();
private long operationTimeoutMs = 30000;
private long statsIntervalSeconds = 60;
Expand Down Expand Up @@ -221,8 +220,7 @@ public int getConnectionsPerBroker() {
* max number of connections per broker (needs to be greater than 0)
*/
public void setConnectionsPerBroker(int connectionsPerBroker) {
checkArgument(connectionsPerBroker > 0,
"Connections per broker need to be greater than 0");
checkArgument(connectionsPerBroker > 0, "Connections per broker need to be greater than 0");
this.connectionsPerBroker = connectionsPerBroker;
}

Expand Down
Loading