Skip to content

Commit

Permalink
support Broadcast and Clustering message model
Browse files Browse the repository at this point in the history
  • Loading branch information
LIU-WEI-git committed Sep 21, 2022
1 parent 0c1fcb7 commit cb09085
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@
public class PravegaConsumerImpl implements Consumer {
private static final AtomicBoolean started = new AtomicBoolean(false);

private boolean isBroadcast;
private String instanceName;
private String consumerGroup;
private PravegaClient client;
private EventListener eventListener;

@Override
public void init(Properties keyValue) throws Exception {
isBroadcast = Boolean.parseBoolean(keyValue.getProperty("isBroadcast", "false"));
instanceName = keyValue.getProperty("instanceName", "");
consumerGroup = keyValue.getProperty("consumerGroup", "");
client = PravegaClient.getInstance();
Expand Down Expand Up @@ -74,14 +76,14 @@ public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context)

@Override
public void subscribe(String topic) throws Exception {
if (!client.subscribe(topic, consumerGroup, instanceName, eventListener)) {
if (!client.subscribe(topic, isBroadcast, consumerGroup, instanceName, eventListener)) {
throw new PravegaConnectorException(String.format("subscribe topic[%s] fail.", topic));
}
}

@Override
public void unsubscribe(String topic) {
if (!client.unsubscribe(topic, consumerGroup)) {
if (!client.unsubscribe(topic, isBroadcast, consumerGroup)) {
throw new PravegaConnectorException(String.format("unsubscribe topic[%s] fail.", topic));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.eventmesh.connector.pravega.exception.PravegaConnectorException;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -129,11 +130,11 @@ public SendResult publish(String topic, CloudEvent cloudEvent) {

}

public boolean subscribe(String topic, String consumerGroup, String instanceName, EventListener listener) {
public boolean subscribe(String topic, boolean isBroadcast, String consumerGroup, String instanceName, EventListener listener) {
if (subscribeTaskMap.containsKey(topic)) {
return true;
}
String readerGroupName = buildReaderGroupName(consumerGroup, topic);
String readerGroupName = buildReaderGroupName(isBroadcast, consumerGroup, topic);
createReaderGroup(topic, readerGroupName);
String readerId = buildReaderId(instanceName);
EventStreamReader<byte[]> reader = createReader(readerId, readerGroupName);
Expand All @@ -143,11 +144,13 @@ public boolean subscribe(String topic, String consumerGroup, String instanceName
return true;
}

public boolean unsubscribe(String topic, String consumerGroup) {
public boolean unsubscribe(String topic, boolean isBroadcast, String consumerGroup) {
if (!subscribeTaskMap.containsKey(topic)) {
return true;
}
deleteReaderGroup(buildReaderGroupName(consumerGroup, topic));
if (!isBroadcast) {
deleteReaderGroup(buildReaderGroupName(false, consumerGroup, topic));
}
subscribeTaskMap.remove(topic).stopRead();
writerMap.remove(topic).close();
return true;
Expand All @@ -170,8 +173,12 @@ private EventStreamWriter<byte[]> createWrite(String topic) {
return clientFactory.createEventWriter(topic, new ByteArraySerializer(), EventWriterConfig.builder().build());
}

private String buildReaderGroupName(String consumerGroup, String topic) {
return String.format("%s-%s", consumerGroup, topic);
private String buildReaderGroupName(boolean isBroadcast, String consumerGroup, String topic) {
if (isBroadcast) {
return UUID.randomUUID().toString();
} else {
return String.format("%s-%s", consumerGroup, topic);
}
}

private String buildReaderId(String instanceName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void publishTest() {
public void subscribeTest() {
PravegaClient pravegaClient = getNewPravegaClient();
pravegaClient.start();
pravegaClient.subscribe("test1", "consumerGroup", "instanceName", new EventListener() {
pravegaClient.subscribe("test1", false, "consumerGroup", "instanceName", new EventListener() {
@Override
public void consume(CloudEvent cloudEvent, AsyncConsumeContext context) {
// do nothing
Expand All @@ -113,16 +113,16 @@ public void consume(CloudEvent cloudEvent, AsyncConsumeContext context) {
@Test
public void unsubscribeTest() {
PravegaClient pravegaClient = getNewPravegaClient();
pravegaClient.unsubscribe("test1", "consumerGroup");
pravegaClient.unsubscribe("test1", false, "consumerGroup");

pravegaClient.start();
pravegaClient.subscribe("test1", "consumerGroup", "instanceName", new EventListener() {
pravegaClient.subscribe("test1", false, "consumerGroup", "instanceName", new EventListener() {
@Override
public void consume(CloudEvent cloudEvent, AsyncConsumeContext context) {
// do nothing
}
});
pravegaClient.unsubscribe("test1", "consumerGroup");
pravegaClient.unsubscribe("test1", false, "consumerGroup");

assertFalse(pravegaClient.getSubscribeTaskMap().containsKey("test1"));
try {
Expand Down

0 comments on commit cb09085

Please sign in to comment.