diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/ProtocolKey.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/ProtocolKey.java index 0aede56444..ec47fda44b 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/ProtocolKey.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/ProtocolKey.java @@ -48,6 +48,7 @@ public class ProtocolKey { public static final String GRPC_RESPONSE_TIME = "time"; public static final String SUB_MESSAGE_TYPE = "submessagetype"; + public static final String SUB_REPLY_MESSAGE = "subscription_reply"; /** * CloudEvents spec diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/SubscriptionReply.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/SubscriptionReply.java deleted file mode 100644 index da40a03631..0000000000 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/SubscriptionReply.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.common.protocol.grpc.common; - -import java.util.HashMap; -import java.util.Map; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@Builder -@AllArgsConstructor -@NoArgsConstructor -public class SubscriptionReply { - - public static final String TYPE = "subscription_reply"; - - private String producerGroup; - - private String topic; - - private String content; - - private String ttl; - - private String uniqueId; - - private String seqNum; - - private String tag; - - @Builder.Default - private Map properties = new HashMap<>(); - - public void putAllProperties(Map properties) { - if (null == properties || properties.isEmpty()) { - return; - } - properties.putAll(properties); - } -} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java index 8852c1b40c..8963cb95bc 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/service/ConsumerService.java @@ -23,7 +23,6 @@ import org.apache.eventmesh.common.protocol.grpc.common.EventMeshCloudEventUtils; import org.apache.eventmesh.common.protocol.grpc.common.ProtocolKey; import org.apache.eventmesh.common.protocol.grpc.common.StatusCode; -import org.apache.eventmesh.common.protocol.grpc.common.SubscriptionReply; import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.grpc.processor.ReplyMessageProcessor; @@ -87,7 +86,7 @@ public StreamObserver subscribeStream(StreamObserver res public void onNext(CloudEvent subscription) { final String subMessageType = Optional.ofNullable(subscription.getAttributesMap().get(ProtocolKey.SUB_MESSAGE_TYPE)) .orElse(CloudEventAttributeValue.newBuilder().build()).getCeString(); - if (StringUtils.equals(subMessageType, SubscriptionReply.TYPE)) { + if (StringUtils.equals(subMessageType, ProtocolKey.SUB_REPLY_MESSAGE)) { log.info("cmd={}|{}|client2eventMesh|from={}|to={}", "reply-to-server", EventMeshConstants.PROTOCOL_GRPC, EventMeshCloudEventUtils.getIp(subscription), eventMeshGrpcServer.getEventMeshGrpcConfiguration().getEventMeshIp()); handleSubscribeReply(subscription, emitter); diff --git a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java index 5f1ee67caa..72e04ae513 100644 --- a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java +++ b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java @@ -25,8 +25,6 @@ import org.apache.eventmesh.common.protocol.grpc.cloudevents.ConsumerServiceGrpc.ConsumerServiceStub; import org.apache.eventmesh.common.protocol.grpc.common.EventMeshCloudEventUtils; import org.apache.eventmesh.common.protocol.grpc.common.ProtocolKey; -import org.apache.eventmesh.common.protocol.grpc.common.SubscriptionReply; -import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.common.utils.LogUtils; import java.io.Serializable; @@ -118,26 +116,19 @@ public void onCompleted() { private CloudEvent buildReplyMessage(final CloudEvent reqMessage, final T replyMessage) { final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventMeshCloudEvent(replyMessage, clientConfig, listener.getProtocolType()); - SubscriptionReply subscriptionReply = SubscriptionReply.builder().producerGroup(clientConfig.getConsumerGroup()) - .topic(EventMeshCloudEventUtils.getSubject(cloudEvent)) - .content(EventMeshCloudEventUtils.getDataContent(cloudEvent)) - .seqNum(EventMeshCloudEventUtils.getSeqNum(cloudEvent)) - .uniqueId(EventMeshCloudEventUtils.getUniqueId(cloudEvent)) - .ttl(EventMeshCloudEventUtils.getTtl(cloudEvent)).build(); Map prop = new HashMap<>(); Map reqMessageMap = reqMessage.getAttributesMap(); reqMessageMap.entrySet().forEach(entry -> prop.put(entry.getKey(), entry.getValue().getCeString())); Map cloudEventMap = cloudEvent.getAttributesMap(); cloudEventMap.entrySet().forEach(entry -> prop.put(entry.getKey(), entry.getValue().getCeString())); - subscriptionReply.putAllProperties(prop); - return CloudEvent.newBuilder(cloudEvent).putAllAttributes(reqMessageMap) + return CloudEvent.newBuilder(cloudEvent).putAllAttributes(reqMessageMap).putAllAttributes(cloudEventMap) .putAttributes(ProtocolKey.DATA_CONTENT_TYPE, CloudEventAttributeValue.newBuilder().setCeString(EventMeshDataContentType.JSON.getCode()).build()) - //Indicate that it is a subscription response - .putAttributes(ProtocolKey.SUB_MESSAGE_TYPE, CloudEventAttributeValue.newBuilder().setCeString(SubscriptionReply.TYPE).build()) - .setTextData(JsonUtils.toJSONString(subscriptionReply)).build(); + // Indicate that it is a subscription response + .putAttributes(ProtocolKey.SUB_MESSAGE_TYPE, CloudEventAttributeValue.newBuilder().setCeString(ProtocolKey.SUB_REPLY_MESSAGE).build()) + .build(); } @Override