From 93426af22b9f6281cbd06699e00f1a027710fec7 Mon Sep 17 00:00:00 2001 From: Timcross-WEI Date: Mon, 5 Sep 2022 18:25:54 +0800 Subject: [PATCH 1/3] upgrade SubscribeTask and PravegaClient in pravega connector --- .../pravega/PravegaConsumerImpl.java | 6 ++- .../pravega/client/PravegaClient.java | 35 ++++++++--------- .../pravega/client/SubscribeTask.java | 38 ++++++++++++++----- .../pravega/client/PravegaClientTest.java | 4 +- 4 files changed, 50 insertions(+), 33 deletions(-) diff --git a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/PravegaConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/PravegaConsumerImpl.java index 9cbd4de70f..a947c6115d 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/PravegaConsumerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/PravegaConsumerImpl.java @@ -35,13 +35,15 @@ public class PravegaConsumerImpl implements Consumer { private static final AtomicBoolean started = new AtomicBoolean(false); + private String instanceName; private String consumerGroup; private PravegaClient client; private EventListener eventListener; @Override public void init(Properties keyValue) throws Exception { - consumerGroup = keyValue.getProperty("consumerGroup"); + instanceName = keyValue.getProperty("instanceName", ""); + consumerGroup = keyValue.getProperty("consumerGroup", ""); client = PravegaClient.getInstance(); } @@ -72,7 +74,7 @@ public void updateOffset(List cloudEvents, AbstractContext context) @Override public void subscribe(String topic) throws Exception { - if (!client.subscribe(topic, consumerGroup, eventListener)) { + if (!client.subscribe(topic, consumerGroup, instanceName, eventListener)) { throw new PravegaConnectorException(String.format("subscribe topic[%s] fail.", topic)); } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/PravegaClient.java b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/PravegaClient.java index 25ebb53ebe..4c7389912c 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/PravegaClient.java +++ b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/PravegaClient.java @@ -129,20 +129,14 @@ public SendResult publish(String topic, CloudEvent cloudEvent) { } - public boolean subscribe(String topic, String consumerGroup, EventListener listener) { + public boolean subscribe(String topic, String consumerGroup, String instanceName, EventListener listener) { if (subscribeTaskMap.containsKey(topic)) { return true; } - String readerGroup = buildReaderGroup(topic, consumerGroup); - // clear the readerGroup first to ensure a readerGroup only has one reader - try { - deleteReaderGroup(readerGroup); - } catch (Exception e) { - log.debug("clear readerGroup[{}] fail since it doesn't exist.", readerGroup); - } - createReaderGroup(topic, readerGroup); - String readerId = buildReaderId(readerGroup); - EventStreamReader reader = createReader(readerId, readerGroup); + String readerGroupName = buildReaderGroupName(consumerGroup, topic); + createReaderGroup(topic, readerGroupName); + String readerId = buildReaderId(instanceName); + EventStreamReader reader = createReader(readerId, readerGroupName); SubscribeTask subscribeTask = new SubscribeTask(topic, reader, listener); subscribeTask.start(); subscribeTaskMap.put(topic, subscribeTask); @@ -153,7 +147,7 @@ public boolean unsubscribe(String topic, String consumerGroup) { if (!subscribeTaskMap.containsKey(topic)) { return true; } - deleteReaderGroup(buildReaderGroup(topic, consumerGroup)); + deleteReaderGroup(buildReaderGroupName(consumerGroup, topic)); subscribeTaskMap.remove(topic).stopRead(); writerMap.remove(topic).close(); return true; @@ -176,21 +170,24 @@ private EventStreamWriter createWrite(String topic) { return clientFactory.createEventWriter(topic, new ByteArraySerializer(), EventWriterConfig.builder().build()); } - private String buildReaderGroup(String topic, String consumerGroup) { - return String.format("%s-%s", topic, consumerGroup); + private String buildReaderGroupName(String consumerGroup, String topic) { + return String.format("%s-%s", consumerGroup, topic); } - private String buildReaderId(String readerGroup) { - return String.format("%s-reader", readerGroup); + private String buildReaderId(String instanceName) { + return String.format("%s-reader", instanceName).replaceAll("\\(", "-").replaceAll("\\)", "-"); } - private void createReaderGroup(String topic, String readerGroup) { + private void createReaderGroup(String topic, String readerGroupName) { if (!checkTopicExist(topic)) { createStream(topic); } ReaderGroupConfig readerGroupConfig = - ReaderGroupConfig.builder().stream(NameUtils.getScopedStreamName(config.getScope(), topic)).build(); - readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig); + ReaderGroupConfig.builder() + .stream(NameUtils.getScopedStreamName(config.getScope(), topic)) + .retentionType(ReaderGroupConfig.StreamDataRetention.AUTOMATIC_RELEASE_AT_LAST_CHECKPOINT) + .build(); + readerGroupManager.createReaderGroup(readerGroupName, readerGroupConfig); } private void deleteReaderGroup(String readerGroup) { diff --git a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/SubscribeTask.java b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/SubscribeTask.java index a38e1159f8..cb654985d0 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/SubscribeTask.java +++ b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/SubscribeTask.java @@ -34,6 +34,7 @@ public class SubscribeTask extends Thread { private final EventStreamReader reader; private final EventListener listener; private final AtomicBoolean running = new AtomicBoolean(true); + private final AtomicBoolean continueRead = new AtomicBoolean(true); public SubscribeTask(String name, EventStreamReader reader, EventListener listener) { super(name); @@ -43,22 +44,23 @@ public SubscribeTask(String name, EventStreamReader reader, EventListene @Override public void run() { + CloudEvent cloudEvent = null; while (running.get()) { - EventRead event; - while ((event = reader.readNextEvent(2000)) != null) { + if (continueRead.get()) { + EventRead event = reader.readNextEvent(2000); + if (event == null) { + continue; + } byte[] eventByteArray = event.getEvent(); if (eventByteArray == null) { continue; } PravegaEvent pravegaEvent = PravegaEvent.getFromByteArray(eventByteArray); - CloudEvent cloudEvent = pravegaEvent.convertToCloudEvent(); - EventMeshAsyncConsumeContext consumeContext = new EventMeshAsyncConsumeContext() { - @Override - public void commit(EventMeshAction action) { - // nothing to do - } - }; - listener.consume(cloudEvent, consumeContext); + cloudEvent = pravegaEvent.convertToCloudEvent(); + + listener.consume(cloudEvent, new PravegaEventMeshAsyncConsumeContext()); + } else { + listener.consume(cloudEvent, new PravegaEventMeshAsyncConsumeContext()); } } } @@ -66,4 +68,20 @@ public void commit(EventMeshAction action) { public void stopRead() { running.compareAndSet(true, false); } + + private class PravegaEventMeshAsyncConsumeContext extends EventMeshAsyncConsumeContext { + @Override + public void commit(EventMeshAction action) { + switch (action) { + case CommitMessage: + case ReconsumeLater: + continueRead.set(false); + break; + case ManualAck: + continueRead.set(true); + break; + default: + } + } + } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/test/java/org/apache/eventmesh/connector/pravega/client/PravegaClientTest.java b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/test/java/org/apache/eventmesh/connector/pravega/client/PravegaClientTest.java index 34b1685542..0f65559cfe 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/test/java/org/apache/eventmesh/connector/pravega/client/PravegaClientTest.java +++ b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/test/java/org/apache/eventmesh/connector/pravega/client/PravegaClientTest.java @@ -100,7 +100,7 @@ public void publishTest() { public void subscribeTest() { PravegaClient pravegaClient = getNewPravegaClient(); pravegaClient.start(); - pravegaClient.subscribe("test1", "consumerGroup", new EventListener() { + pravegaClient.subscribe("test1", "consumerGroup", "instanceName", new EventListener() { @Override public void consume(CloudEvent cloudEvent, AsyncConsumeContext context) { // do nothing @@ -116,7 +116,7 @@ public void unsubscribeTest() { pravegaClient.unsubscribe("test1", "consumerGroup"); pravegaClient.start(); - pravegaClient.subscribe("test1", "consumerGroup", new EventListener() { + pravegaClient.subscribe("test1", "consumerGroup", "instanceName", new EventListener() { @Override public void consume(CloudEvent cloudEvent, AsyncConsumeContext context) { // do nothing From 0c1fcb70c699106673a51b6d1349569c0f1b2667 Mon Sep 17 00:00:00 2001 From: Timcross-WEI Date: Tue, 13 Sep 2022 16:20:46 +0800 Subject: [PATCH 2/3] support sendOneway method --- .../apache/eventmesh/connector/pravega/PravegaProducerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/PravegaProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/PravegaProducerImpl.java index 3104f5562c..a5aa1d2909 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/PravegaProducerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/PravegaProducerImpl.java @@ -81,7 +81,7 @@ public void publish(CloudEvent cloudEvent, SendCallback sendCallback) throws Exc @Override public void sendOneway(CloudEvent cloudEvent) { - throw new UnsupportedOperationException(); + client.publish(cloudEvent.getSubject(), cloudEvent); } @Override From cb09085db933b6480f06b134de2409b7640bb3e8 Mon Sep 17 00:00:00 2001 From: Timcross-WEI Date: Tue, 13 Sep 2022 17:11:56 +0800 Subject: [PATCH 3/3] support Broadcast and Clustering message model --- .../pravega/PravegaConsumerImpl.java | 6 ++++-- .../pravega/client/PravegaClient.java | 19 +++++++++++++------ .../pravega/client/PravegaClientTest.java | 8 ++++---- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/PravegaConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/PravegaConsumerImpl.java index a947c6115d..5c06f6d4ad 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/PravegaConsumerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/PravegaConsumerImpl.java @@ -35,6 +35,7 @@ public class PravegaConsumerImpl implements Consumer { private static final AtomicBoolean started = new AtomicBoolean(false); + private boolean isBroadcast; private String instanceName; private String consumerGroup; private PravegaClient client; @@ -42,6 +43,7 @@ public class PravegaConsumerImpl implements Consumer { @Override public void init(Properties keyValue) throws Exception { + isBroadcast = Boolean.parseBoolean(keyValue.getProperty("isBroadcast", "false")); instanceName = keyValue.getProperty("instanceName", ""); consumerGroup = keyValue.getProperty("consumerGroup", ""); client = PravegaClient.getInstance(); @@ -74,14 +76,14 @@ public void updateOffset(List cloudEvents, AbstractContext context) @Override public void subscribe(String topic) throws Exception { - if (!client.subscribe(topic, consumerGroup, instanceName, eventListener)) { + if (!client.subscribe(topic, isBroadcast, consumerGroup, instanceName, eventListener)) { throw new PravegaConnectorException(String.format("subscribe topic[%s] fail.", topic)); } } @Override public void unsubscribe(String topic) { - if (!client.unsubscribe(topic, consumerGroup)) { + if (!client.unsubscribe(topic, isBroadcast, consumerGroup)) { throw new PravegaConnectorException(String.format("unsubscribe topic[%s] fail.", topic)); } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/PravegaClient.java b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/PravegaClient.java index 4c7389912c..7c54cc1190 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/PravegaClient.java +++ b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/PravegaClient.java @@ -23,6 +23,7 @@ import org.apache.eventmesh.connector.pravega.exception.PravegaConnectorException; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -129,11 +130,11 @@ public SendResult publish(String topic, CloudEvent cloudEvent) { } - public boolean subscribe(String topic, String consumerGroup, String instanceName, EventListener listener) { + public boolean subscribe(String topic, boolean isBroadcast, String consumerGroup, String instanceName, EventListener listener) { if (subscribeTaskMap.containsKey(topic)) { return true; } - String readerGroupName = buildReaderGroupName(consumerGroup, topic); + String readerGroupName = buildReaderGroupName(isBroadcast, consumerGroup, topic); createReaderGroup(topic, readerGroupName); String readerId = buildReaderId(instanceName); EventStreamReader reader = createReader(readerId, readerGroupName); @@ -143,11 +144,13 @@ public boolean subscribe(String topic, String consumerGroup, String instanceName return true; } - public boolean unsubscribe(String topic, String consumerGroup) { + public boolean unsubscribe(String topic, boolean isBroadcast, String consumerGroup) { if (!subscribeTaskMap.containsKey(topic)) { return true; } - deleteReaderGroup(buildReaderGroupName(consumerGroup, topic)); + if (!isBroadcast) { + deleteReaderGroup(buildReaderGroupName(false, consumerGroup, topic)); + } subscribeTaskMap.remove(topic).stopRead(); writerMap.remove(topic).close(); return true; @@ -170,8 +173,12 @@ private EventStreamWriter createWrite(String topic) { return clientFactory.createEventWriter(topic, new ByteArraySerializer(), EventWriterConfig.builder().build()); } - private String buildReaderGroupName(String consumerGroup, String topic) { - return String.format("%s-%s", consumerGroup, topic); + private String buildReaderGroupName(boolean isBroadcast, String consumerGroup, String topic) { + if (isBroadcast) { + return UUID.randomUUID().toString(); + } else { + return String.format("%s-%s", consumerGroup, topic); + } } private String buildReaderId(String instanceName) { diff --git a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/test/java/org/apache/eventmesh/connector/pravega/client/PravegaClientTest.java b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/test/java/org/apache/eventmesh/connector/pravega/client/PravegaClientTest.java index 0f65559cfe..c3d29271ed 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/test/java/org/apache/eventmesh/connector/pravega/client/PravegaClientTest.java +++ b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/test/java/org/apache/eventmesh/connector/pravega/client/PravegaClientTest.java @@ -100,7 +100,7 @@ public void publishTest() { public void subscribeTest() { PravegaClient pravegaClient = getNewPravegaClient(); pravegaClient.start(); - pravegaClient.subscribe("test1", "consumerGroup", "instanceName", new EventListener() { + pravegaClient.subscribe("test1", false, "consumerGroup", "instanceName", new EventListener() { @Override public void consume(CloudEvent cloudEvent, AsyncConsumeContext context) { // do nothing @@ -113,16 +113,16 @@ public void consume(CloudEvent cloudEvent, AsyncConsumeContext context) { @Test public void unsubscribeTest() { PravegaClient pravegaClient = getNewPravegaClient(); - pravegaClient.unsubscribe("test1", "consumerGroup"); + pravegaClient.unsubscribe("test1", false, "consumerGroup"); pravegaClient.start(); - pravegaClient.subscribe("test1", "consumerGroup", "instanceName", new EventListener() { + pravegaClient.subscribe("test1", false, "consumerGroup", "instanceName", new EventListener() { @Override public void consume(CloudEvent cloudEvent, AsyncConsumeContext context) { // do nothing } }); - pravegaClient.unsubscribe("test1", "consumerGroup"); + pravegaClient.unsubscribe("test1", false, "consumerGroup"); assertFalse(pravegaClient.getSubscribeTaskMap().containsKey("test1")); try {