Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Converted main part of code to use builder APIs with typed interface #1311

Merged
merged 1 commit into from
Mar 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,12 @@
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventType;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
Expand Down Expand Up @@ -499,27 +500,29 @@ public PulsarClient getReplicationClient(String cluster) {
String path = PulsarWebResource.path("clusters", cluster);
ClusterData data = this.pulsar.getConfigurationCache().clustersCache().get(path)
.orElseThrow(() -> new KeeperException.NoNodeException(path));
ClientConfiguration configuration = new ClientConfiguration();
configuration.setUseTcpNoDelay(false);
configuration.setConnectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker());
configuration.setStatsInterval(0, TimeUnit.SECONDS);
ClientBuilder clientBuilder = PulsarClient.builder()
.enableTcpNoDelay(false)
.connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker())
.statsInterval(0, TimeUnit.SECONDS);
if (pulsar.getConfiguration().isAuthenticationEnabled()) {
configuration.setAuthentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
}
String clusterUrl = null;
if (pulsar.getConfiguration().isReplicationTlsEnabled()) {
clusterUrl = isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls()
: data.getServiceUrlTls();
configuration.setUseTls(true);
configuration.setTlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath());
configuration
.setTlsAllowInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection());
clientBuilder
.serviceUrl(isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls()
: data.getServiceUrlTls())
.enableTls(true)
.tlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath())
.allowTlsInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection());
} else {
clusterUrl = isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl()
: data.getServiceUrl();
clientBuilder.serviceUrl(
isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl());
}
return new PulsarClientImpl(clusterUrl, configuration, this.workerGroup);

// Share all the IO threads across broker and client connections
ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
return new PulsarClientImpl(conf, workerGroup);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,24 +104,20 @@ protected void resetConfig() {

protected final void internalSetup() throws Exception {
init();
org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
lookupUrl = new URI(brokerUrl.toString());
if (isTcpLookup) {
lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT);
}
pulsarClient = PulsarClient.create(lookupUrl.toString(), clientConf);
pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();
}

protected final void internalSetupForStatsTest() throws Exception {
init();
org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
clientConf.setStatsInterval(1, TimeUnit.SECONDS);
String lookupUrl = brokerUrl.toString();
if (isTcpLookup) {
lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
}
pulsarClient = PulsarClient.create(lookupUrl, clientConf);
pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).statsInterval(1, TimeUnit.SECONDS).build();
}

protected final void init() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testClosedConsumer() throws PulsarClientException {
public void testListener() throws PulsarClientException {
Consumer consumer = null;
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setMessageListener((Consumer c, Message msg) -> {
conf.setMessageListener((Consumer<byte[]> c, Message<byte[]> msg) -> {
});
consumer = pulsarClient.subscribe("persistent://prop/cluster/ns/topicName", "my-subscription", conf);
Assert.assertTrue(consumer.receiveAsync().isCompletedExceptionally());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
Expand All @@ -62,7 +62,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;

public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener {
public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener<byte[]> {

private static final long serialVersionUID = 1L;

Expand All @@ -74,7 +74,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
private final String groupId;
private final boolean isAutoCommit;

private final ConcurrentMap<TopicPartition, org.apache.pulsar.client.api.Consumer> consumers = new ConcurrentHashMap<>();
private final ConcurrentMap<TopicPartition, org.apache.pulsar.client.api.Consumer<byte[]>> consumers = new ConcurrentHashMap<>();

private final Map<TopicPartition, Long> lastReceivedOffset = new ConcurrentHashMap<>();
private final Map<TopicPartition, OffsetAndMetadata> lastCommittedOffset = new ConcurrentHashMap<>();
Expand All @@ -84,10 +84,10 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
private final Properties properties;

private static class QueueItem {
final org.apache.pulsar.client.api.Consumer consumer;
final Message message;
final org.apache.pulsar.client.api.Consumer<byte[]> consumer;
final Message<byte[]> message;

QueueItem(org.apache.pulsar.client.api.Consumer consumer, Message message) {
QueueItem(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> message) {
this.consumer = consumer;
this.message = message;
}
Expand Down Expand Up @@ -146,19 +146,19 @@ private PulsarKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializ

this.properties = new Properties();
config.originals().forEach((k, v) -> properties.put(k, v));
ClientConfiguration clientConf = PulsarClientKafkaConfig.getClientConfiguration(properties);
ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
// Since this client instance is going to be used just for the consumers, we can enable Nagle to group
// all the acknowledgments sent to broker within a short time frame
clientConf.setUseTcpNoDelay(false);
clientBuilder.enableTcpNoDelay(false);
try {
client = PulsarClient.create(serviceUrl, clientConf);
client = clientBuilder.serviceUrl(serviceUrl).build();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}

@Override
public void received(org.apache.pulsar.client.api.Consumer consumer, Message msg) {
public void received(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> msg) {
// Block listener thread if the application is slowing down
try {
receivedMessages.put(new QueueItem(consumer, msg));
Expand Down Expand Up @@ -204,16 +204,17 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callb
// acknowledgeCumulative()
int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get();

ConsumerConfiguration conf = PulsarConsumerKafkaConfig.getConsumerConfiguration(properties);
conf.setSubscriptionType(SubscriptionType.Failover);
conf.setMessageListener(this);
ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
consumerBuilder.subscriptionType(SubscriptionType.Failover);
consumerBuilder.messageListener(this);
consumerBuilder.subscriptionName(groupId);
if (numberOfPartitions > 1) {
// Subscribe to each partition
conf.setConsumerName(ConsumerName.generateRandomName());
consumerBuilder.consumerName(ConsumerName.generateRandomName());
for (int i = 0; i < numberOfPartitions; i++) {
String partitionName = TopicName.get(topic).getPartition(i).toString();
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = client
.subscribeAsync(partitionName, groupId, conf);
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
.topic(partitionName).subscribeAsync();
int partitionIndex = i;
TopicPartition tp = new TopicPartition(topic, partitionIndex);
future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
Expand All @@ -222,8 +223,8 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callb
}
} else {
// Topic has a single partition
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = client.subscribeAsync(topic,
groupId, conf);
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
.subscribeAsync();
TopicPartition tp = new TopicPartition(topic, 0);
future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
futures.add(future);
Expand Down Expand Up @@ -293,7 +294,7 @@ public ConsumerRecords<K, V> poll(long timeoutMillis) {
TopicName topicName = TopicName.get(item.consumer.getTopic());
String topic = topicName.getPartitionedTopicName();
int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
Message msg = item.message;
Message<byte[]> msg = item.message;
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
long offset = MessageIdUtils.getOffset(msgId);

Expand Down Expand Up @@ -335,7 +336,7 @@ public ConsumerRecords<K, V> poll(long timeoutMillis) {
}

@SuppressWarnings("unchecked")
private K getKey(String topic, Message msg) {
private K getKey(String topic, Message<byte[]> msg) {
if (!msg.hasKey()) {
return null;
}
Expand Down Expand Up @@ -393,7 +394,7 @@ private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMet
List<CompletableFuture<Void>> futures = new ArrayList<>();

offsets.forEach((topicPartition, offsetAndMetadata) -> {
org.apache.pulsar.client.api.Consumer consumer = consumers.get(topicPartition);
org.apache.pulsar.client.api.Consumer<byte[]> consumer = consumers.get(topicPartition);

lastCommittedOffset.put(topicPartition, offsetAndMetadata);
futures.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset())));
Expand All @@ -415,7 +416,7 @@ private Map<TopicPartition, OffsetAndMetadata> getCurrentOffsetsMap() {
@Override
public void seek(TopicPartition partition, long offset) {
MessageId msgId = MessageIdUtils.getMessageId(offset);
org.apache.pulsar.client.api.Consumer c = consumers.get(partition);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(partition);
if (c == null) {
throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed");
}
Expand All @@ -436,7 +437,7 @@ public void seekToBeginning(Collection<TopicPartition> partitions) {
}

for (TopicPartition tp : partitions) {
org.apache.pulsar.client.api.Consumer c = consumers.get(tp);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
if (c == null) {
futures.add(FutureUtil.failedFuture(
new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
Expand All @@ -457,7 +458,7 @@ public void seekToEnd(Collection<TopicPartition> partitions) {
}

for (TopicPartition tp : partitions) {
org.apache.pulsar.client.api.Consumer c = consumers.get(tp);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
if (c == null) {
futures.add(FutureUtil.failedFuture(
new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
Expand Down
Loading