Skip to content

Commit

Permalink
[ISSUE apache#4631]Optimize the message body of the SDK's returned re…
Browse files Browse the repository at this point in the history
…ply message
  • Loading branch information
mxsm committed Dec 8, 2023
1 parent ee9ee4c commit 9c8acf3
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ProtocolKey {
public static final String GRPC_RESPONSE_TIME = "time";

public static final String SUB_MESSAGE_TYPE = "submessagetype";
public static final String SUB_REPLY_MESSAGE = "subscription_reply";

/**
* CloudEvents spec
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.eventmesh.common.protocol.grpc.common.EventMeshCloudEventUtils;
import org.apache.eventmesh.common.protocol.grpc.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
import org.apache.eventmesh.common.protocol.grpc.common.SubscriptionReply;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.grpc.processor.ReplyMessageProcessor;
Expand Down Expand Up @@ -87,7 +86,7 @@ public StreamObserver<CloudEvent> subscribeStream(StreamObserver<CloudEvent> res
public void onNext(CloudEvent subscription) {
final String subMessageType = Optional.ofNullable(subscription.getAttributesMap().get(ProtocolKey.SUB_MESSAGE_TYPE))
.orElse(CloudEventAttributeValue.newBuilder().build()).getCeString();
if (StringUtils.equals(subMessageType, SubscriptionReply.TYPE)) {
if (StringUtils.equals(subMessageType, ProtocolKey.SUB_REPLY_MESSAGE)) {
log.info("cmd={}|{}|client2eventMesh|from={}|to={}", "reply-to-server", EventMeshConstants.PROTOCOL_GRPC,
EventMeshCloudEventUtils.getIp(subscription), eventMeshGrpcServer.getEventMeshGrpcConfiguration().getEventMeshIp());
handleSubscribeReply(subscription, emitter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.eventmesh.common.protocol.grpc.cloudevents.ConsumerServiceGrpc.ConsumerServiceStub;
import org.apache.eventmesh.common.protocol.grpc.common.EventMeshCloudEventUtils;
import org.apache.eventmesh.common.protocol.grpc.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.grpc.common.SubscriptionReply;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.LogUtils;

import java.io.Serializable;
Expand Down Expand Up @@ -118,26 +116,19 @@ public void onCompleted() {
private CloudEvent buildReplyMessage(final CloudEvent reqMessage, final T replyMessage) {
final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventMeshCloudEvent(replyMessage,
clientConfig, listener.getProtocolType());
SubscriptionReply subscriptionReply = SubscriptionReply.builder().producerGroup(clientConfig.getConsumerGroup())
.topic(EventMeshCloudEventUtils.getSubject(cloudEvent))
.content(EventMeshCloudEventUtils.getDataContent(cloudEvent))
.seqNum(EventMeshCloudEventUtils.getSeqNum(cloudEvent))
.uniqueId(EventMeshCloudEventUtils.getUniqueId(cloudEvent))
.ttl(EventMeshCloudEventUtils.getTtl(cloudEvent)).build();

Map<String, String> prop = new HashMap<>();
Map<String, CloudEventAttributeValue> reqMessageMap = reqMessage.getAttributesMap();
reqMessageMap.entrySet().forEach(entry -> prop.put(entry.getKey(), entry.getValue().getCeString()));
Map<String, CloudEventAttributeValue> cloudEventMap = cloudEvent.getAttributesMap();
cloudEventMap.entrySet().forEach(entry -> prop.put(entry.getKey(), entry.getValue().getCeString()));
subscriptionReply.putAllProperties(prop);

return CloudEvent.newBuilder(cloudEvent).putAllAttributes(reqMessageMap)
return CloudEvent.newBuilder(cloudEvent).putAllAttributes(reqMessageMap).putAllAttributes(cloudEventMap)
.putAttributes(ProtocolKey.DATA_CONTENT_TYPE,
CloudEventAttributeValue.newBuilder().setCeString(EventMeshDataContentType.JSON.getCode()).build())
//Indicate that it is a subscription response
.putAttributes(ProtocolKey.SUB_MESSAGE_TYPE, CloudEventAttributeValue.newBuilder().setCeString(SubscriptionReply.TYPE).build())
.setTextData(JsonUtils.toJSONString(subscriptionReply)).build();
// Indicate that it is a subscription response
.putAttributes(ProtocolKey.SUB_MESSAGE_TYPE, CloudEventAttributeValue.newBuilder().setCeString(ProtocolKey.SUB_REPLY_MESSAGE).build())
.build();
}

@Override
Expand Down

0 comments on commit 9c8acf3

Please sign in to comment.