Skip to content

Commit

Permalink
Merge pull request #1237 from LIU-WEI-git/pravega_connector_upgrade
Browse files Browse the repository at this point in the history
[ISSUE #1243] Upgrade pravega connector
  • Loading branch information
qqeasonchen authored Sep 23, 2022
2 parents e379be6 + cb09085 commit 2232648
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@
public class PravegaConsumerImpl implements Consumer {
private static final AtomicBoolean started = new AtomicBoolean(false);

private boolean isBroadcast;
private String instanceName;
private String consumerGroup;
private PravegaClient client;
private EventListener eventListener;

@Override
public void init(Properties keyValue) throws Exception {
consumerGroup = keyValue.getProperty("consumerGroup");
isBroadcast = Boolean.parseBoolean(keyValue.getProperty("isBroadcast", "false"));
instanceName = keyValue.getProperty("instanceName", "");
consumerGroup = keyValue.getProperty("consumerGroup", "");
client = PravegaClient.getInstance();
}

Expand Down Expand Up @@ -72,14 +76,14 @@ public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context)

@Override
public void subscribe(String topic) throws Exception {
if (!client.subscribe(topic, consumerGroup, eventListener)) {
if (!client.subscribe(topic, isBroadcast, consumerGroup, instanceName, eventListener)) {
throw new PravegaConnectorException(String.format("subscribe topic[%s] fail.", topic));
}
}

@Override
public void unsubscribe(String topic) {
if (!client.unsubscribe(topic, consumerGroup)) {
if (!client.unsubscribe(topic, isBroadcast, consumerGroup)) {
throw new PravegaConnectorException(String.format("unsubscribe topic[%s] fail.", topic));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void publish(CloudEvent cloudEvent, SendCallback sendCallback) throws Exc

@Override
public void sendOneway(CloudEvent cloudEvent) {
throw new UnsupportedOperationException();
client.publish(cloudEvent.getSubject(), cloudEvent);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.eventmesh.connector.pravega.exception.PravegaConnectorException;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -129,31 +130,27 @@ public SendResult publish(String topic, CloudEvent cloudEvent) {

}

public boolean subscribe(String topic, String consumerGroup, EventListener listener) {
public boolean subscribe(String topic, boolean isBroadcast, String consumerGroup, String instanceName, EventListener listener) {
if (subscribeTaskMap.containsKey(topic)) {
return true;
}
String readerGroup = buildReaderGroup(topic, consumerGroup);
// clear the readerGroup first to ensure a readerGroup only has one reader
try {
deleteReaderGroup(readerGroup);
} catch (Exception e) {
log.debug("clear readerGroup[{}] fail since it doesn't exist.", readerGroup);
}
createReaderGroup(topic, readerGroup);
String readerId = buildReaderId(readerGroup);
EventStreamReader<byte[]> reader = createReader(readerId, readerGroup);
String readerGroupName = buildReaderGroupName(isBroadcast, consumerGroup, topic);
createReaderGroup(topic, readerGroupName);
String readerId = buildReaderId(instanceName);
EventStreamReader<byte[]> reader = createReader(readerId, readerGroupName);
SubscribeTask subscribeTask = new SubscribeTask(topic, reader, listener);
subscribeTask.start();
subscribeTaskMap.put(topic, subscribeTask);
return true;
}

public boolean unsubscribe(String topic, String consumerGroup) {
public boolean unsubscribe(String topic, boolean isBroadcast, String consumerGroup) {
if (!subscribeTaskMap.containsKey(topic)) {
return true;
}
deleteReaderGroup(buildReaderGroup(topic, consumerGroup));
if (!isBroadcast) {
deleteReaderGroup(buildReaderGroupName(false, consumerGroup, topic));
}
subscribeTaskMap.remove(topic).stopRead();
writerMap.remove(topic).close();
return true;
Expand All @@ -176,21 +173,28 @@ private EventStreamWriter<byte[]> createWrite(String topic) {
return clientFactory.createEventWriter(topic, new ByteArraySerializer(), EventWriterConfig.builder().build());
}

private String buildReaderGroup(String topic, String consumerGroup) {
return String.format("%s-%s", topic, consumerGroup);
private String buildReaderGroupName(boolean isBroadcast, String consumerGroup, String topic) {
if (isBroadcast) {
return UUID.randomUUID().toString();
} else {
return String.format("%s-%s", consumerGroup, topic);
}
}

private String buildReaderId(String readerGroup) {
return String.format("%s-reader", readerGroup);
private String buildReaderId(String instanceName) {
return String.format("%s-reader", instanceName).replaceAll("\\(", "-").replaceAll("\\)", "-");
}

private void createReaderGroup(String topic, String readerGroup) {
private void createReaderGroup(String topic, String readerGroupName) {
if (!checkTopicExist(topic)) {
createStream(topic);
}
ReaderGroupConfig readerGroupConfig =
ReaderGroupConfig.builder().stream(NameUtils.getScopedStreamName(config.getScope(), topic)).build();
readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig);
ReaderGroupConfig.builder()
.stream(NameUtils.getScopedStreamName(config.getScope(), topic))
.retentionType(ReaderGroupConfig.StreamDataRetention.AUTOMATIC_RELEASE_AT_LAST_CHECKPOINT)
.build();
readerGroupManager.createReaderGroup(readerGroupName, readerGroupConfig);
}

private void deleteReaderGroup(String readerGroup) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class SubscribeTask extends Thread {
private final EventStreamReader<byte[]> reader;
private final EventListener listener;
private final AtomicBoolean running = new AtomicBoolean(true);
private final AtomicBoolean continueRead = new AtomicBoolean(true);

public SubscribeTask(String name, EventStreamReader<byte[]> reader, EventListener listener) {
super(name);
Expand All @@ -43,27 +44,44 @@ public SubscribeTask(String name, EventStreamReader<byte[]> reader, EventListene

@Override
public void run() {
CloudEvent cloudEvent = null;
while (running.get()) {
EventRead<byte[]> event;
while ((event = reader.readNextEvent(2000)) != null) {
if (continueRead.get()) {
EventRead<byte[]> event = reader.readNextEvent(2000);
if (event == null) {
continue;
}
byte[] eventByteArray = event.getEvent();
if (eventByteArray == null) {
continue;
}
PravegaEvent pravegaEvent = PravegaEvent.getFromByteArray(eventByteArray);
CloudEvent cloudEvent = pravegaEvent.convertToCloudEvent();
EventMeshAsyncConsumeContext consumeContext = new EventMeshAsyncConsumeContext() {
@Override
public void commit(EventMeshAction action) {
// nothing to do
}
};
listener.consume(cloudEvent, consumeContext);
cloudEvent = pravegaEvent.convertToCloudEvent();

listener.consume(cloudEvent, new PravegaEventMeshAsyncConsumeContext());
} else {
listener.consume(cloudEvent, new PravegaEventMeshAsyncConsumeContext());
}
}
}

public void stopRead() {
running.compareAndSet(true, false);
}

private class PravegaEventMeshAsyncConsumeContext extends EventMeshAsyncConsumeContext {
@Override
public void commit(EventMeshAction action) {
switch (action) {
case CommitMessage:
case ReconsumeLater:
continueRead.set(false);
break;
case ManualAck:
continueRead.set(true);
break;
default:
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void publishTest() {
public void subscribeTest() {
PravegaClient pravegaClient = getNewPravegaClient();
pravegaClient.start();
pravegaClient.subscribe("test1", "consumerGroup", new EventListener() {
pravegaClient.subscribe("test1", false, "consumerGroup", "instanceName", new EventListener() {
@Override
public void consume(CloudEvent cloudEvent, AsyncConsumeContext context) {
// do nothing
Expand All @@ -113,16 +113,16 @@ public void consume(CloudEvent cloudEvent, AsyncConsumeContext context) {
@Test
public void unsubscribeTest() {
PravegaClient pravegaClient = getNewPravegaClient();
pravegaClient.unsubscribe("test1", "consumerGroup");
pravegaClient.unsubscribe("test1", false, "consumerGroup");

pravegaClient.start();
pravegaClient.subscribe("test1", "consumerGroup", new EventListener() {
pravegaClient.subscribe("test1", false, "consumerGroup", "instanceName", new EventListener() {
@Override
public void consume(CloudEvent cloudEvent, AsyncConsumeContext context) {
// do nothing
}
});
pravegaClient.unsubscribe("test1", "consumerGroup");
pravegaClient.unsubscribe("test1", false, "consumerGroup");

assertFalse(pravegaClient.getSubscribeTaskMap().containsKey("test1"));
try {
Expand Down

0 comments on commit 2232648

Please sign in to comment.