Skip to content

Commit

Permalink
upgrade SubscribeTask and PravegaClient in pravega connector
Browse files Browse the repository at this point in the history
  • Loading branch information
LIU-WEI-git committed Sep 21, 2022
1 parent 49a32d7 commit 93426af
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@
public class PravegaConsumerImpl implements Consumer {
private static final AtomicBoolean started = new AtomicBoolean(false);

private String instanceName;
private String consumerGroup;
private PravegaClient client;
private EventListener eventListener;

@Override
public void init(Properties keyValue) throws Exception {
consumerGroup = keyValue.getProperty("consumerGroup");
instanceName = keyValue.getProperty("instanceName", "");
consumerGroup = keyValue.getProperty("consumerGroup", "");
client = PravegaClient.getInstance();
}

Expand Down Expand Up @@ -72,7 +74,7 @@ public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context)

@Override
public void subscribe(String topic) throws Exception {
if (!client.subscribe(topic, consumerGroup, eventListener)) {
if (!client.subscribe(topic, consumerGroup, instanceName, eventListener)) {
throw new PravegaConnectorException(String.format("subscribe topic[%s] fail.", topic));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,14 @@ public SendResult publish(String topic, CloudEvent cloudEvent) {

}

public boolean subscribe(String topic, String consumerGroup, EventListener listener) {
public boolean subscribe(String topic, String consumerGroup, String instanceName, EventListener listener) {
if (subscribeTaskMap.containsKey(topic)) {
return true;
}
String readerGroup = buildReaderGroup(topic, consumerGroup);
// clear the readerGroup first to ensure a readerGroup only has one reader
try {
deleteReaderGroup(readerGroup);
} catch (Exception e) {
log.debug("clear readerGroup[{}] fail since it doesn't exist.", readerGroup);
}
createReaderGroup(topic, readerGroup);
String readerId = buildReaderId(readerGroup);
EventStreamReader<byte[]> reader = createReader(readerId, readerGroup);
String readerGroupName = buildReaderGroupName(consumerGroup, topic);
createReaderGroup(topic, readerGroupName);
String readerId = buildReaderId(instanceName);
EventStreamReader<byte[]> reader = createReader(readerId, readerGroupName);
SubscribeTask subscribeTask = new SubscribeTask(topic, reader, listener);
subscribeTask.start();
subscribeTaskMap.put(topic, subscribeTask);
Expand All @@ -153,7 +147,7 @@ public boolean unsubscribe(String topic, String consumerGroup) {
if (!subscribeTaskMap.containsKey(topic)) {
return true;
}
deleteReaderGroup(buildReaderGroup(topic, consumerGroup));
deleteReaderGroup(buildReaderGroupName(consumerGroup, topic));
subscribeTaskMap.remove(topic).stopRead();
writerMap.remove(topic).close();
return true;
Expand All @@ -176,21 +170,24 @@ private EventStreamWriter<byte[]> createWrite(String topic) {
return clientFactory.createEventWriter(topic, new ByteArraySerializer(), EventWriterConfig.builder().build());
}

private String buildReaderGroup(String topic, String consumerGroup) {
return String.format("%s-%s", topic, consumerGroup);
private String buildReaderGroupName(String consumerGroup, String topic) {
return String.format("%s-%s", consumerGroup, topic);
}

private String buildReaderId(String readerGroup) {
return String.format("%s-reader", readerGroup);
private String buildReaderId(String instanceName) {
return String.format("%s-reader", instanceName).replaceAll("\\(", "-").replaceAll("\\)", "-");
}

private void createReaderGroup(String topic, String readerGroup) {
private void createReaderGroup(String topic, String readerGroupName) {
if (!checkTopicExist(topic)) {
createStream(topic);
}
ReaderGroupConfig readerGroupConfig =
ReaderGroupConfig.builder().stream(NameUtils.getScopedStreamName(config.getScope(), topic)).build();
readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig);
ReaderGroupConfig.builder()
.stream(NameUtils.getScopedStreamName(config.getScope(), topic))
.retentionType(ReaderGroupConfig.StreamDataRetention.AUTOMATIC_RELEASE_AT_LAST_CHECKPOINT)
.build();
readerGroupManager.createReaderGroup(readerGroupName, readerGroupConfig);
}

private void deleteReaderGroup(String readerGroup) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class SubscribeTask extends Thread {
private final EventStreamReader<byte[]> reader;
private final EventListener listener;
private final AtomicBoolean running = new AtomicBoolean(true);
private final AtomicBoolean continueRead = new AtomicBoolean(true);

public SubscribeTask(String name, EventStreamReader<byte[]> reader, EventListener listener) {
super(name);
Expand All @@ -43,27 +44,44 @@ public SubscribeTask(String name, EventStreamReader<byte[]> reader, EventListene

@Override
public void run() {
CloudEvent cloudEvent = null;
while (running.get()) {
EventRead<byte[]> event;
while ((event = reader.readNextEvent(2000)) != null) {
if (continueRead.get()) {
EventRead<byte[]> event = reader.readNextEvent(2000);
if (event == null) {
continue;
}
byte[] eventByteArray = event.getEvent();
if (eventByteArray == null) {
continue;
}
PravegaEvent pravegaEvent = PravegaEvent.getFromByteArray(eventByteArray);
CloudEvent cloudEvent = pravegaEvent.convertToCloudEvent();
EventMeshAsyncConsumeContext consumeContext = new EventMeshAsyncConsumeContext() {
@Override
public void commit(EventMeshAction action) {
// nothing to do
}
};
listener.consume(cloudEvent, consumeContext);
cloudEvent = pravegaEvent.convertToCloudEvent();

listener.consume(cloudEvent, new PravegaEventMeshAsyncConsumeContext());
} else {
listener.consume(cloudEvent, new PravegaEventMeshAsyncConsumeContext());
}
}
}

public void stopRead() {
running.compareAndSet(true, false);
}

private class PravegaEventMeshAsyncConsumeContext extends EventMeshAsyncConsumeContext {
@Override
public void commit(EventMeshAction action) {
switch (action) {
case CommitMessage:
case ReconsumeLater:
continueRead.set(false);
break;
case ManualAck:
continueRead.set(true);
break;
default:
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void publishTest() {
public void subscribeTest() {
PravegaClient pravegaClient = getNewPravegaClient();
pravegaClient.start();
pravegaClient.subscribe("test1", "consumerGroup", new EventListener() {
pravegaClient.subscribe("test1", "consumerGroup", "instanceName", new EventListener() {
@Override
public void consume(CloudEvent cloudEvent, AsyncConsumeContext context) {
// do nothing
Expand All @@ -116,7 +116,7 @@ public void unsubscribeTest() {
pravegaClient.unsubscribe("test1", "consumerGroup");

pravegaClient.start();
pravegaClient.subscribe("test1", "consumerGroup", new EventListener() {
pravegaClient.subscribe("test1", "consumerGroup", "instanceName", new EventListener() {
@Override
public void consume(CloudEvent cloudEvent, AsyncConsumeContext context) {
// do nothing
Expand Down

0 comments on commit 93426af

Please sign in to comment.