diff --git a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/PravegaConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/PravegaConsumerImpl.java index 9cbd4de70f..a947c6115d 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/PravegaConsumerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/PravegaConsumerImpl.java @@ -35,13 +35,15 @@ public class PravegaConsumerImpl implements Consumer { private static final AtomicBoolean started = new AtomicBoolean(false); + private String instanceName; private String consumerGroup; private PravegaClient client; private EventListener eventListener; @Override public void init(Properties keyValue) throws Exception { - consumerGroup = keyValue.getProperty("consumerGroup"); + instanceName = keyValue.getProperty("instanceName", ""); + consumerGroup = keyValue.getProperty("consumerGroup", ""); client = PravegaClient.getInstance(); } @@ -72,7 +74,7 @@ public void updateOffset(List cloudEvents, AbstractContext context) @Override public void subscribe(String topic) throws Exception { - if (!client.subscribe(topic, consumerGroup, eventListener)) { + if (!client.subscribe(topic, consumerGroup, instanceName, eventListener)) { throw new PravegaConnectorException(String.format("subscribe topic[%s] fail.", topic)); } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/PravegaClient.java b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/PravegaClient.java index 25ebb53ebe..4c7389912c 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/PravegaClient.java +++ b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/PravegaClient.java @@ -129,20 +129,14 @@ public SendResult publish(String topic, CloudEvent cloudEvent) { } - public boolean subscribe(String topic, String consumerGroup, EventListener listener) { + public boolean subscribe(String topic, String consumerGroup, String instanceName, EventListener listener) { if (subscribeTaskMap.containsKey(topic)) { return true; } - String readerGroup = buildReaderGroup(topic, consumerGroup); - // clear the readerGroup first to ensure a readerGroup only has one reader - try { - deleteReaderGroup(readerGroup); - } catch (Exception e) { - log.debug("clear readerGroup[{}] fail since it doesn't exist.", readerGroup); - } - createReaderGroup(topic, readerGroup); - String readerId = buildReaderId(readerGroup); - EventStreamReader reader = createReader(readerId, readerGroup); + String readerGroupName = buildReaderGroupName(consumerGroup, topic); + createReaderGroup(topic, readerGroupName); + String readerId = buildReaderId(instanceName); + EventStreamReader reader = createReader(readerId, readerGroupName); SubscribeTask subscribeTask = new SubscribeTask(topic, reader, listener); subscribeTask.start(); subscribeTaskMap.put(topic, subscribeTask); @@ -153,7 +147,7 @@ public boolean unsubscribe(String topic, String consumerGroup) { if (!subscribeTaskMap.containsKey(topic)) { return true; } - deleteReaderGroup(buildReaderGroup(topic, consumerGroup)); + deleteReaderGroup(buildReaderGroupName(consumerGroup, topic)); subscribeTaskMap.remove(topic).stopRead(); writerMap.remove(topic).close(); return true; @@ -176,21 +170,24 @@ private EventStreamWriter createWrite(String topic) { return clientFactory.createEventWriter(topic, new ByteArraySerializer(), EventWriterConfig.builder().build()); } - private String buildReaderGroup(String topic, String consumerGroup) { - return String.format("%s-%s", topic, consumerGroup); + private String buildReaderGroupName(String consumerGroup, String topic) { + return String.format("%s-%s", consumerGroup, topic); } - private String buildReaderId(String readerGroup) { - return String.format("%s-reader", readerGroup); + private String buildReaderId(String instanceName) { + return String.format("%s-reader", instanceName).replaceAll("\\(", "-").replaceAll("\\)", "-"); } - private void createReaderGroup(String topic, String readerGroup) { + private void createReaderGroup(String topic, String readerGroupName) { if (!checkTopicExist(topic)) { createStream(topic); } ReaderGroupConfig readerGroupConfig = - ReaderGroupConfig.builder().stream(NameUtils.getScopedStreamName(config.getScope(), topic)).build(); - readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig); + ReaderGroupConfig.builder() + .stream(NameUtils.getScopedStreamName(config.getScope(), topic)) + .retentionType(ReaderGroupConfig.StreamDataRetention.AUTOMATIC_RELEASE_AT_LAST_CHECKPOINT) + .build(); + readerGroupManager.createReaderGroup(readerGroupName, readerGroupConfig); } private void deleteReaderGroup(String readerGroup) { diff --git a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/SubscribeTask.java b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/SubscribeTask.java index a38e1159f8..cb654985d0 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/SubscribeTask.java +++ b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/client/SubscribeTask.java @@ -34,6 +34,7 @@ public class SubscribeTask extends Thread { private final EventStreamReader reader; private final EventListener listener; private final AtomicBoolean running = new AtomicBoolean(true); + private final AtomicBoolean continueRead = new AtomicBoolean(true); public SubscribeTask(String name, EventStreamReader reader, EventListener listener) { super(name); @@ -43,22 +44,23 @@ public SubscribeTask(String name, EventStreamReader reader, EventListene @Override public void run() { + CloudEvent cloudEvent = null; while (running.get()) { - EventRead event; - while ((event = reader.readNextEvent(2000)) != null) { + if (continueRead.get()) { + EventRead event = reader.readNextEvent(2000); + if (event == null) { + continue; + } byte[] eventByteArray = event.getEvent(); if (eventByteArray == null) { continue; } PravegaEvent pravegaEvent = PravegaEvent.getFromByteArray(eventByteArray); - CloudEvent cloudEvent = pravegaEvent.convertToCloudEvent(); - EventMeshAsyncConsumeContext consumeContext = new EventMeshAsyncConsumeContext() { - @Override - public void commit(EventMeshAction action) { - // nothing to do - } - }; - listener.consume(cloudEvent, consumeContext); + cloudEvent = pravegaEvent.convertToCloudEvent(); + + listener.consume(cloudEvent, new PravegaEventMeshAsyncConsumeContext()); + } else { + listener.consume(cloudEvent, new PravegaEventMeshAsyncConsumeContext()); } } } @@ -66,4 +68,20 @@ public void commit(EventMeshAction action) { public void stopRead() { running.compareAndSet(true, false); } + + private class PravegaEventMeshAsyncConsumeContext extends EventMeshAsyncConsumeContext { + @Override + public void commit(EventMeshAction action) { + switch (action) { + case CommitMessage: + case ReconsumeLater: + continueRead.set(false); + break; + case ManualAck: + continueRead.set(true); + break; + default: + } + } + } } diff --git a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/test/java/org/apache/eventmesh/connector/pravega/client/PravegaClientTest.java b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/test/java/org/apache/eventmesh/connector/pravega/client/PravegaClientTest.java index 34b1685542..0f65559cfe 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-pravega/src/test/java/org/apache/eventmesh/connector/pravega/client/PravegaClientTest.java +++ b/eventmesh-connector-plugin/eventmesh-connector-pravega/src/test/java/org/apache/eventmesh/connector/pravega/client/PravegaClientTest.java @@ -100,7 +100,7 @@ public void publishTest() { public void subscribeTest() { PravegaClient pravegaClient = getNewPravegaClient(); pravegaClient.start(); - pravegaClient.subscribe("test1", "consumerGroup", new EventListener() { + pravegaClient.subscribe("test1", "consumerGroup", "instanceName", new EventListener() { @Override public void consume(CloudEvent cloudEvent, AsyncConsumeContext context) { // do nothing @@ -116,7 +116,7 @@ public void unsubscribeTest() { pravegaClient.unsubscribe("test1", "consumerGroup"); pravegaClient.start(); - pravegaClient.subscribe("test1", "consumerGroup", new EventListener() { + pravegaClient.subscribe("test1", "consumerGroup", "instanceName", new EventListener() { @Override public void consume(CloudEvent cloudEvent, AsyncConsumeContext context) { // do nothing