diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/GrpcType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/GrpcType.java similarity index 91% rename from eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/GrpcType.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/GrpcType.java index 507942a5f5..81317310c6 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/GrpcType.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/GrpcType.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup; +package org.apache.eventmesh.common.protocol.grpc.common; public enum GrpcType { WEBHOOK, STREAM diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/ConsumerManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/ConsumerManager.java index e4ec6a7a63..a72d4ad8d4 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/ConsumerManager.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/ConsumerManager.java @@ -23,7 +23,7 @@ import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer; import org.apache.eventmesh.runtime.common.ServiceState; import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupClient; -import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType; +import org.apache.eventmesh.common.protocol.grpc.common.GrpcType; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java index 3a4fde027d..604dcb1349 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java @@ -45,7 +45,7 @@ import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper; import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupClient; import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupTopicConfig; -import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType; +import org.apache.eventmesh.common.protocol.grpc.common.GrpcType; import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.StreamTopicConfig; import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.WebhookTopicConfig; import org.apache.eventmesh.runtime.core.protocol.grpc.push.HandleMsgContext; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/ConsumerGroupClient.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/ConsumerGroupClient.java index 1d3b3c1cdc..6d185803e0 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/ConsumerGroupClient.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/ConsumerGroupClient.java @@ -19,6 +19,7 @@ import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.grpc.cloudevents.CloudEvent; +import org.apache.eventmesh.common.protocol.grpc.common.GrpcType; import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter; import java.util.Date; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/ConsumerGroupTopicConfig.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/ConsumerGroupTopicConfig.java index c7044e4ed1..a166dab290 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/ConsumerGroupTopicConfig.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/ConsumerGroupTopicConfig.java @@ -18,6 +18,7 @@ package org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup; import org.apache.eventmesh.common.protocol.SubscriptionMode; +import org.apache.eventmesh.common.protocol.grpc.common.GrpcType; public abstract class ConsumerGroupTopicConfig { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/StreamTopicConfig.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/StreamTopicConfig.java index 6094e22105..1c461db46f 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/StreamTopicConfig.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/StreamTopicConfig.java @@ -19,6 +19,7 @@ import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.grpc.cloudevents.CloudEvent; +import org.apache.eventmesh.common.protocol.grpc.common.GrpcType; import org.apache.eventmesh.common.utils.LogUtils; import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/WebhookTopicConfig.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/WebhookTopicConfig.java index 358cfef58e..818efd8dc6 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/WebhookTopicConfig.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/consumergroup/WebhookTopicConfig.java @@ -18,6 +18,7 @@ package org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup; import org.apache.eventmesh.common.protocol.SubscriptionMode; +import org.apache.eventmesh.common.protocol.grpc.common.GrpcType; import java.util.ArrayList; import java.util.HashSet; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SubscribeProcessor.java index bd0b0f6c42..47450fbf2d 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SubscribeProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SubscribeProcessor.java @@ -30,7 +30,7 @@ import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.ConsumerManager; import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.EventMeshConsumer; import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupClient; -import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType; +import org.apache.eventmesh.common.protocol.grpc.common.GrpcType; import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter; import org.apache.eventmesh.runtime.core.protocol.grpc.service.ServiceUtils; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SubscribeStreamProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SubscribeStreamProcessor.java index aa21f56236..51995671dd 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SubscribeStreamProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/processor/SubscribeStreamProcessor.java @@ -29,7 +29,7 @@ import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.ConsumerManager; import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.EventMeshConsumer; import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupClient; -import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType; +import org.apache.eventmesh.common.protocol.grpc.common.GrpcType; import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter; import org.apache.eventmesh.runtime.core.protocol.grpc.service.ServiceUtils; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/HandleMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/HandleMsgContext.java index 63d40ff858..51d6ad2fe6 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/HandleMsgContext.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/HandleMsgContext.java @@ -22,7 +22,7 @@ import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer; import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.EventMeshConsumer; import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupTopicConfig; -import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType; +import org.apache.eventmesh.common.protocol.grpc.common.GrpcType; import org.apache.eventmesh.runtime.util.EventMeshUtil; import io.cloudevents.CloudEvent; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/MessageHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/MessageHandler.java index bb3737c35a..15e3ae93df 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/MessageHandler.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/MessageHandler.java @@ -18,7 +18,7 @@ package org.apache.eventmesh.runtime.core.protocol.grpc.push; import org.apache.eventmesh.common.ThreadPoolFactory; -import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType; +import org.apache.eventmesh.common.protocol.grpc.common.GrpcType; import org.apache.commons.collections4.MapUtils; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ServiceUtils.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ServiceUtils.java index bb4a561939..cc1823a3ff 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ServiceUtils.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ServiceUtils.java @@ -31,7 +31,7 @@ import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.common.utils.RandomStringUtils; import org.apache.eventmesh.runtime.constants.EventMeshConstants; -import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType; +import org.apache.eventmesh.common.protocol.grpc.common.GrpcType; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; diff --git a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java index 868191d995..4cede04955 100644 --- a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java +++ b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java @@ -37,6 +37,7 @@ import org.apache.eventmesh.common.protocol.grpc.cloudevents.HeartbeatServiceGrpc.HeartbeatServiceBlockingStub; import org.apache.eventmesh.common.protocol.grpc.common.ClientType; import org.apache.eventmesh.common.protocol.grpc.common.EventMeshCloudEventUtils; +import org.apache.eventmesh.common.protocol.grpc.common.GrpcType; import org.apache.eventmesh.common.protocol.grpc.common.ProtocolKey; import org.apache.eventmesh.common.protocol.grpc.common.Response; import org.apache.eventmesh.common.protocol.grpc.common.StatusCode; @@ -45,6 +46,7 @@ import org.apache.commons.collections4.MapUtils; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -52,6 +54,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import io.grpc.ManagedChannel; @@ -68,7 +71,7 @@ public class EventMeshGrpcConsumer implements AutoCloseable { private ManagedChannel channel; private final EventMeshGrpcClientConfig clientConfig; - private final Map subscriptionMap = new ConcurrentHashMap<>(); + private final Map subscriptionMap = new ConcurrentHashMap<>(); private final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new EventMeshThreadFactory("GRPCClientScheduler", true)); @@ -93,11 +96,22 @@ public void init() { heartBeat(); } + /** + * Subscribes to an event at a specified URL(Webhook). + * + * @param subscriptionItems The list of subscription items. + * @param url The URL to subscribe to. + * @return A response containing information about the subscription result. + */ public Response subscribe(final List subscriptionItems, final String url) { LogUtils.info(log, "Create subscription: {} , url: {}", subscriptionItems, url); - addSubscription(subscriptionItems, url); + addSubscription(subscriptionItems, url, GrpcType.WEBHOOK); + return subscribeWebhook(subscriptionItems, url); + } + + private Response subscribeWebhook(List subscriptionItems, String url) { final CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url, subscriptionItems); try { @@ -114,6 +128,11 @@ public Response subscribe(final List subscriptionItems, final return null; } + /** + * Subscribes to a streaming. + * + * @param subscriptionItems The list of subscription items for streaming. + */ public void subscribe(final List subscriptionItems) { LogUtils.info(log, "Create streaming subscription: {}", subscriptionItems); @@ -122,7 +141,7 @@ public void subscribe(final List subscriptionItems) { return; } - addSubscription(subscriptionItems, SDK_STREAM_URL); + addSubscription(subscriptionItems, SDK_STREAM_URL, GrpcType.STREAM); CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null, subscriptionItems); @@ -135,9 +154,9 @@ public void subscribe(final List subscriptionItems) { subStreamHandler.sendSubscription(subscription); } - private void addSubscription(final List subscriptionItems, final String url) { + private void addSubscription(final List subscriptionItems, final String url, final GrpcType grpcType) { for (SubscriptionItem item : subscriptionItems) { - subscriptionMap.putIfAbsent(item.getTopic(), new SubscriptionInfo(item, url)); + subscriptionMap.putIfAbsent(item.getTopic(), new SubscriptionInfo(item, url, grpcType)); } } @@ -246,14 +265,27 @@ private void resubscribe() { if (subscriptionMap.isEmpty()) { return; } - - final Map> subscriptionGroup = subscriptionMap.values().stream() + final Collection values = subscriptionMap.values(); + final AtomicBoolean isStreamSub = new AtomicBoolean(false); + for (SubscriptionInfo info : values) { + if (info.grpcType == GrpcType.STREAM) { + isStreamSub.compareAndSet(false, true); + break; + } + } + final Map> subscriptionGroup = values.stream() .collect(Collectors.groupingBy(SubscriptionInfo::getUrl, mapping(SubscriptionInfo::getSubscriptionItem, toList()))); subscriptionGroup.forEach((url, items) -> { - CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url, - items); - subStreamHandler.sendSubscription(subscription); + if (isStreamSub.get()) { + CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, + url, + items); + subStreamHandler.sendSubscription(subscription); + } else { + subscribeWebhook(items, url); + } + }); } @@ -276,10 +308,16 @@ private static class SubscriptionInfo { private transient SubscriptionItem subscriptionItem; private transient String url; + private GrpcType grpcType; - SubscriptionInfo(final SubscriptionItem subscriptionItem, final String url) { + SubscriptionInfo(final SubscriptionItem subscriptionItem, final String url, final GrpcType grpcType) { this.subscriptionItem = subscriptionItem; this.url = url; + this.grpcType = grpcType; + } + + public GrpcType getGrpcType() { + return grpcType; } public SubscriptionItem getSubscriptionItem() {