Skip to content

Commit

Permalink
[ISSUE apache#4652]Fix EventMeshGrpcConsumer subscribe webhook send h…
Browse files Browse the repository at this point in the history
…eartBeat throw NPE
  • Loading branch information
mxsm committed Dec 14, 2023
1 parent b85a58c commit ef70153
Show file tree
Hide file tree
Showing 13 changed files with 61 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup;
package org.apache.eventmesh.common.protocol.grpc.common;

public enum GrpcType {
WEBHOOK, STREAM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
import org.apache.eventmesh.runtime.common.ServiceState;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupClient;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType;
import org.apache.eventmesh.common.protocol.grpc.common.GrpcType;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupClient;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupTopicConfig;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType;
import org.apache.eventmesh.common.protocol.grpc.common.GrpcType;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.StreamTopicConfig;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.WebhookTopicConfig;
import org.apache.eventmesh.runtime.core.protocol.grpc.push.HandleMsgContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.grpc.cloudevents.CloudEvent;
import org.apache.eventmesh.common.protocol.grpc.common.GrpcType;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter;

import java.util.Date;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup;

import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.grpc.common.GrpcType;

public abstract class ConsumerGroupTopicConfig {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.grpc.cloudevents.CloudEvent;
import org.apache.eventmesh.common.protocol.grpc.common.GrpcType;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup;

import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.grpc.common.GrpcType;

import java.util.ArrayList;
import java.util.HashSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.ConsumerManager;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.EventMeshConsumer;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupClient;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType;
import org.apache.eventmesh.common.protocol.grpc.common.GrpcType;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.ServiceUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.ConsumerManager;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.EventMeshConsumer;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupClient;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType;
import org.apache.eventmesh.common.protocol.grpc.common.GrpcType;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.ServiceUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.EventMeshConsumer;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupTopicConfig;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType;
import org.apache.eventmesh.common.protocol.grpc.common.GrpcType;
import org.apache.eventmesh.runtime.util.EventMeshUtil;

import io.cloudevents.CloudEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.eventmesh.runtime.core.protocol.grpc.push;

import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType;
import org.apache.eventmesh.common.protocol.grpc.common.GrpcType;

import org.apache.commons.collections4.MapUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.RandomStringUtils;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType;
import org.apache.eventmesh.common.protocol.grpc.common.GrpcType;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.eventmesh.common.protocol.grpc.cloudevents.HeartbeatServiceGrpc.HeartbeatServiceBlockingStub;
import org.apache.eventmesh.common.protocol.grpc.common.ClientType;
import org.apache.eventmesh.common.protocol.grpc.common.EventMeshCloudEventUtils;
import org.apache.eventmesh.common.protocol.grpc.common.GrpcType;
import org.apache.eventmesh.common.protocol.grpc.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.grpc.common.Response;
import org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
Expand All @@ -45,13 +46,15 @@

import org.apache.commons.collections4.MapUtils;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import io.grpc.ManagedChannel;
Expand All @@ -68,7 +71,7 @@ public class EventMeshGrpcConsumer implements AutoCloseable {
private ManagedChannel channel;
private final EventMeshGrpcClientConfig clientConfig;

private final Map<String, SubscriptionInfo> subscriptionMap = new ConcurrentHashMap<>();
private final Map<String /*topic*/, SubscriptionInfo> subscriptionMap = new ConcurrentHashMap<>();

private final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
new EventMeshThreadFactory("GRPCClientScheduler", true));
Expand All @@ -93,11 +96,22 @@ public void init() {
heartBeat();
}

/**
* Subscribes to an event at a specified URL(Webhook).
*
* @param subscriptionItems The list of subscription items.
* @param url The URL to subscribe to.
* @return A response containing information about the subscription result.
*/
public Response subscribe(final List<SubscriptionItem> subscriptionItems, final String url) {
LogUtils.info(log, "Create subscription: {} , url: {}", subscriptionItems, url);

addSubscription(subscriptionItems, url);
addSubscription(subscriptionItems, url, GrpcType.WEBHOOK);

return subscribeWebhook(subscriptionItems, url);
}

private Response subscribeWebhook(List<SubscriptionItem> subscriptionItems, String url) {
final CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE,
url, subscriptionItems);
try {
Expand All @@ -114,6 +128,11 @@ public Response subscribe(final List<SubscriptionItem> subscriptionItems, final
return null;
}

/**
* Subscribes to a streaming.
*
* @param subscriptionItems The list of subscription items for streaming.
*/
public void subscribe(final List<SubscriptionItem> subscriptionItems) {
LogUtils.info(log, "Create streaming subscription: {}", subscriptionItems);

Expand All @@ -122,7 +141,7 @@ public void subscribe(final List<SubscriptionItem> subscriptionItems) {
return;
}

addSubscription(subscriptionItems, SDK_STREAM_URL);
addSubscription(subscriptionItems, SDK_STREAM_URL, GrpcType.STREAM);

CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null,
subscriptionItems);
Expand All @@ -135,9 +154,9 @@ public void subscribe(final List<SubscriptionItem> subscriptionItems) {
subStreamHandler.sendSubscription(subscription);
}

private void addSubscription(final List<SubscriptionItem> subscriptionItems, final String url) {
private void addSubscription(final List<SubscriptionItem> subscriptionItems, final String url, final GrpcType grpcType) {
for (SubscriptionItem item : subscriptionItems) {
subscriptionMap.putIfAbsent(item.getTopic(), new SubscriptionInfo(item, url));
subscriptionMap.putIfAbsent(item.getTopic(), new SubscriptionInfo(item, url, grpcType));
}
}

Expand Down Expand Up @@ -246,14 +265,27 @@ private void resubscribe() {
if (subscriptionMap.isEmpty()) {
return;
}

final Map<String, List<SubscriptionItem>> subscriptionGroup = subscriptionMap.values().stream()
final Collection<SubscriptionInfo> values = subscriptionMap.values();
final AtomicBoolean isStreamSub = new AtomicBoolean(false);
for (SubscriptionInfo info : values) {
if (info.grpcType == GrpcType.STREAM) {
isStreamSub.compareAndSet(false, true);
break;
}
}
final Map<String, List<SubscriptionItem>> subscriptionGroup = values.stream()
.collect(Collectors.groupingBy(SubscriptionInfo::getUrl, mapping(SubscriptionInfo::getSubscriptionItem, toList())));

subscriptionGroup.forEach((url, items) -> {
CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url,
items);
subStreamHandler.sendSubscription(subscription);
if (isStreamSub.get()) {
CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE,
url,
items);
subStreamHandler.sendSubscription(subscription);
} else {
subscribeWebhook(items, url);
}

});
}

Expand All @@ -276,10 +308,16 @@ private static class SubscriptionInfo {

private transient SubscriptionItem subscriptionItem;
private transient String url;
private GrpcType grpcType;

SubscriptionInfo(final SubscriptionItem subscriptionItem, final String url) {
SubscriptionInfo(final SubscriptionItem subscriptionItem, final String url, final GrpcType grpcType) {
this.subscriptionItem = subscriptionItem;
this.url = url;
this.grpcType = grpcType;
}

public GrpcType getGrpcType() {
return grpcType;
}

public SubscriptionItem getSubscriptionItem() {
Expand Down

0 comments on commit ef70153

Please sign in to comment.