Skip to content

Commit

Permalink
[ISSUE apache#4619]Fix Grpc request reply can not revice reply message
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Dec 6, 2023
1 parent 34c803b commit a2c5764
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class ProtocolKey {
public static final String GRPC_RESPONSE_MESSAGE = "response_message";
public static final String GRPC_RESPONSE_TIME = "time";

public static final String SUB_MESSAGE_TYPE = "submessagetype";

/**
* CloudEvents spec
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
@NoArgsConstructor
public class SubscriptionReply {

public static final String TYPE = "subscription_reply";

private String producerGroup;

private String topic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.grpc.cloudevents.CloudEvent;
import org.apache.eventmesh.common.protocol.grpc.cloudevents.CloudEvent.CloudEventAttributeValue;
import org.apache.eventmesh.common.protocol.grpc.cloudevents.ConsumerServiceGrpc;
import org.apache.eventmesh.common.protocol.grpc.common.EventMeshCloudEventUtils;
import org.apache.eventmesh.common.protocol.grpc.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
import org.apache.eventmesh.common.protocol.grpc.common.SubscriptionReply;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
Expand All @@ -31,7 +34,9 @@
import org.apache.eventmesh.runtime.core.protocol.grpc.processor.UnsubscribeProcessor;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;

Expand Down Expand Up @@ -86,19 +91,18 @@ public StreamObserver<CloudEvent> subscribeStream(StreamObserver<CloudEvent> res

@Override
public void onNext(CloudEvent subscription) {
Set<SubscriptionItem> subscriptionItems = JsonUtils.parseTypeReferenceObject(subscription.getTextData(),
new TypeReference<Set<SubscriptionItem>>() {
});
if (CollectionUtils.isNotEmpty(subscriptionItems)) {
final String subMessageType = Optional.ofNullable(subscription.getAttributesMap().get(ProtocolKey.SUB_MESSAGE_TYPE))
.orElse(CloudEventAttributeValue.newBuilder().build()).getCeString();
if(StringUtils.equals(subMessageType, SubscriptionReply.TYPE)){
log.info("cmd={}|{}|client2eventMesh|from={}|to={}", "reply-to-server", EventMeshConstants.PROTOCOL_GRPC,
EventMeshCloudEventUtils.getIp(subscription), eventMeshGrpcServer.getEventMeshGrpcConfiguration().getEventMeshIp());
handleSubscribeReply(subscription, emitter);
}else {
log.info("cmd={}|{}|client2eventMesh|from={}|to={}", "subscribeStream", EventMeshConstants.PROTOCOL_GRPC,
EventMeshCloudEventUtils.getIp(subscription), eventMeshGrpcServer.getEventMeshGrpcConfiguration().getEventMeshIp());

eventMeshGrpcServer.getMetricsMonitor().recordReceiveMsgFromClient();
handleSubscriptionStream(subscription, emitter);
} else {
log.info("cmd={}|{}|client2eventMesh|from={}|to={}", "reply-to-server", EventMeshConstants.PROTOCOL_GRPC,
EventMeshCloudEventUtils.getIp(subscription), eventMeshGrpcServer.getEventMeshGrpcConfiguration().getEventMeshIp());
handleSubscribeReply(subscription, emitter);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,15 @@ private CloudEvent buildReplyMessage(final CloudEvent reqMessage, final T replyM
Map<String, String> prop = new HashMap<>();
Map<String, CloudEventAttributeValue> reqMessageMap = reqMessage.getAttributesMap();
reqMessageMap.entrySet().forEach(entry -> prop.put(entry.getKey(), entry.getValue().getCeString()));
Map<String, CloudEventAttributeValue> cloudEventMap = reqMessage.getAttributesMap();
Map<String, CloudEventAttributeValue> cloudEventMap = cloudEvent.getAttributesMap();
cloudEventMap.entrySet().forEach(entry -> prop.put(entry.getKey(), entry.getValue().getCeString()));
subscriptionReply.putAllProperties(prop);

return CloudEvent.newBuilder().putAllAttributes(cloudEvent.getAttributesMap())
return CloudEvent.newBuilder(cloudEvent).putAllAttributes(reqMessageMap)
.putAttributes(ProtocolKey.DATA_CONTENT_TYPE,
CloudEventAttributeValue.newBuilder().setCeString(EventMeshDataContentType.JSON.getCode()).build())
//Indicate that it is a subscription response
.putAttributes(ProtocolKey.SUB_MESSAGE_TYPE, CloudEventAttributeValue.newBuilder().setCeString(SubscriptionReply.TYPE).build())
.setTextData(JsonUtils.toJSONString(subscriptionReply)).build();
}

Expand Down

0 comments on commit a2c5764

Please sign in to comment.