Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

After #3228, removed usages of deprecated client API #3272

Merged
merged 12 commits into from
Jan 5, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
import org.apache.pulsar.client.api.ProducerConfiguration;

/**
* Implements a streaming wordcount program on pulsar topics.
Expand Down Expand Up @@ -99,7 +98,6 @@ public static void main(String[] args) throws Exception {
serviceUrl,
outputTopic,
wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
new ProducerConfiguration(),
wordWithCount -> wordWithCount.word
)).setParallelism(parallelism);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.pulsar.client.api.ProducerConfiguration;

/**
* Implements a streaming wordcount program on pulsar topics.
Expand Down Expand Up @@ -108,7 +107,7 @@ public static void main(String[] args) throws Exception {
table.printSchema();
TableSink sink = null;
if (null != outputTopic) {
sink = new PulsarAvroTableSink(serviceUrl, outputTopic, new ProducerConfiguration(), ROUTING_KEY, WordWithCount.class);
sink = new PulsarAvroTableSink(serviceUrl, outputTopic, ROUTING_KEY, WordWithCount.class);
} else {
// print the results with a csv file
sink = new CsvTableSink("./examples/file", "|");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.pulsar.client.api.ProducerConfiguration;

/**
* Implements a streaming wordcount program on pulsar topics.
Expand Down Expand Up @@ -109,7 +108,7 @@ public static void main(String[] args) throws Exception {
table.printSchema();
TableSink sink = null;
if (null != outputTopic) {
sink = new PulsarJsonTableSink(serviceUrl, outputTopic, new ProducerConfiguration(), ROUTING_KEY);
sink = new PulsarJsonTableSink(serviceUrl, outputTopic, ROUTING_KEY);
} else {
// print the results with a csv file
sink = new CsvTableSink("./examples/file", "|");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,7 @@ public void testDeleteNamespace() throws Exception {
final String topicName = "persistent://" + namespace + "/my-topic";
TopicName topic = TopicName.get(topicName);

Producer producer = pulsarClient.createProducer(topicName);
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
producer.close();
NamespaceBundle bundle1 = pulsar.getNamespaceService().getBundle(topic);
// (2) Delete topic
Expand All @@ -1171,7 +1171,7 @@ public void testSubscribeRate() throws Exception {

admin.topics().createPartitionedTopic(topicName, 2);
pulsar.getConfiguration().setAuthorizationEnabled(false);
Consumer consumer = pulsarClient.newConsumer()
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("subscribe-rate")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,20 @@
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
Expand Down Expand Up @@ -1168,19 +1169,22 @@ public void testPayloadCorruptionDetection() throws Exception {
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();

Message<byte[]> msg1 = MessageBuilder.create().setContent("message-1".getBytes()).build();
CompletableFuture<MessageId> future1 = producer.sendAsync(msg1);
CompletableFuture<MessageId> future1 = producer.newMessage().value("message-1".getBytes()).sendAsync();

// Stop the broker, and publishes messages. Messages are accumulated in the producer queue and they're checksums
// would have already been computed. If we change the message content at that point, it should result in a
// checksum validation error
stopBroker();

Message<byte[]> msg2 = MessageBuilder.create().setContent("message-2".getBytes()).build();
CompletableFuture<MessageId> future2 = producer.sendAsync(msg2);

// Taint msg2
msg2.getData()[msg2.getData().length - 1] = '3'; // new content would be 'message-3'
byte[] a2 = "message-2".getBytes();
TypedMessageBuilder<byte[]> msg2 = producer.newMessage().value(a2);


CompletableFuture<MessageId> future2 = msg2.sendAsync();

// corrupt the message, new content would be 'message-3'
((TypedMessageBuilderImpl<byte[]>) msg2).getContent().put(a2.length - 1, (byte) '3');

// Restart the broker to have the messages published
startBroker();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down Expand Up @@ -418,11 +417,10 @@ public Void call() throws Exception {
log.info("--- Starting Consumer --- " + url3);

// Produce a message that isn't replicated
producer1.produce(1, MessageBuilder.create().disableReplication());
producer1.produce(1, producer1.newMessage().disableReplication());

// Produce a message not replicated to r2
producer1.produce(1,
MessageBuilder.create().setReplicationClusters(Lists.newArrayList("r1", "r3")));
producer1.produce(1, producer1.newMessage().replicationClusters(Lists.newArrayList("r1", "r3")));

// Produce a default replicated message
producer1.produce(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.testng.Assert.assertEquals;

import com.google.common.collect.Sets;

import java.net.URL;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -35,12 +37,12 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
Expand All @@ -50,9 +52,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

public class ReplicatorTestBase {
URL url1;
URL urlTls1;
Expand Down Expand Up @@ -309,12 +308,15 @@ void produce(int messages) throws Exception {

}

void produce(int messages, MessageBuilder<byte[]> messageBuilder) throws Exception {
TypedMessageBuilder<byte[]> newMessage() {
return producer.newMessage();
}

void produce(int messages, TypedMessageBuilder<byte[]> messageBuilder) throws Exception {
log.info("Start sending messages");
for (int i = 0; i < messages; i++) {
final String m = new String("test-builder-" + i);
messageBuilder.setContent(m.getBytes());
producer.send(messageBuilder.build());
messageBuilder.value(m.getBytes()).send();
log.info("Sent message {}", m);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
Expand Down Expand Up @@ -418,11 +417,11 @@ public Void call() throws Exception {
log.info("--- Starting Consumer --- " + url3);

// Produce a message that isn't replicated
producer1.produce(1, MessageBuilder.create().disableReplication());
producer1.produce(1, producer1.newMessage().disableReplication());

// Produce a message not replicated to r2
producer1.produce(1,
MessageBuilder.create().setReplicationClusters(Lists.newArrayList("r1", "r3")));
producer1.newMessage().replicationClusters(Lists.newArrayList("r1", "r3")));

// Produce a default replicated message
producer1.produce(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
Expand Down Expand Up @@ -305,16 +305,20 @@ void produce(int messages) throws Exception {

}

void produce(int messages, MessageBuilder<byte[]> messageBuilder) throws Exception {
void produce(int messages, TypedMessageBuilder<byte[]> messageBuilder) throws Exception {
log.info("Start sending messages");
for (int i = 0; i < messages; i++) {
final String m = new String("test-builder-" + i);
messageBuilder.setContent(m.getBytes());
producer.send(messageBuilder.build());
messageBuilder.value(m.getBytes());
messageBuilder.send();
log.info("Sent message {}", m);
}
}

TypedMessageBuilder<byte[]> newMessage() {
return producer.newMessage();
}

void close() throws Exception {
client.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.atomic.AtomicReference;

import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -334,9 +335,15 @@ public void testInvalidSequence() throws Exception {
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName.toString())
.subscriptionName("my-subscriber-name").subscribe();

Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString())
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

try {
Message<byte[]> msg = MessageBuilder.create().setContent("InvalidMessage".getBytes()).build();
consumer.acknowledge(msg);
TypedMessageBuilderImpl<byte[]> mb = (TypedMessageBuilderImpl<byte[]>) producer.newMessage()
.value("InvalidMessage".getBytes());
consumer.acknowledge(mb.getMessage());
} catch (PulsarClientException.InvalidMessageException e) {
// ok
}
Expand All @@ -357,10 +364,7 @@ public void testInvalidSequence() throws Exception {
// ok
}

Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString())
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

producer.close();

try {
Expand Down
Loading