Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change tcp sdk #603

Merged
merged 1 commit into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package org.apache.eventmesh.protocol.meshmessage.resolver.http;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2RequestBody;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageRequestBody;
Expand All @@ -30,8 +26,14 @@
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageRequestHeader;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;

import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;

public class SendMessageRequestProtocolResolver {

public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHandleException {
Expand Down Expand Up @@ -62,49 +64,51 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan
cloudEventBuilder = CloudEventBuilder.v1();

event = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo())
.withSubject(sendMessageRequestBody.getTopic())
.withData(content.getBytes(StandardCharsets.UTF_8))
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
.withExtension(ProtocolKey.VERSION, version.getVersion())
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
.withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP, sendMessageRequestBody.getProducerGroup())
.withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageRequestBody.getTtl())
.withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageRequestBody.getTag())
.build();
.withSubject(sendMessageRequestBody.getTopic())
.withData(content.getBytes(StandardCharsets.UTF_8))
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
.withExtension(ProtocolKey.VERSION, version.getVersion())
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
.withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP,
sendMessageRequestBody.getProducerGroup())
.withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageRequestBody.getTtl())
.withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageRequestBody.getTag())
.build();
} else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) {
cloudEventBuilder = CloudEventBuilder.v03();
event = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo())
.withSubject(sendMessageRequestBody.getTopic())
.withData(content.getBytes(StandardCharsets.UTF_8))
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
.withExtension(ProtocolKey.VERSION, version.getVersion())
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
.withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP, sendMessageRequestBody.getProducerGroup())
.withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageRequestBody.getTtl())
.withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageRequestBody.getTag())
.build();
.withSubject(sendMessageRequestBody.getTopic())
.withData(content.getBytes(StandardCharsets.UTF_8))
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
.withExtension(ProtocolKey.VERSION, version.getVersion())
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
.withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP,
sendMessageRequestBody.getProducerGroup())
.withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageRequestBody.getTtl())
.withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageRequestBody.getTag())
.build();
}
return event;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ private CloseableHttpClient setHttpClient() throws EventMeshException {
}

protected String selectEventMesh() {
// todo: target endpoint maybe destroy, should remove the bad endpoint
if (eventMeshHttpClientConfig.isUseTls()) {
return Constants.HTTPS_PROTOCOL_PREFIX + eventMeshServerSelector.select();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,32 @@ public class EventMeshHttpClientConfig {
@Builder.Default
private int consumeThreadMax = 5;

private String env;
@Builder.Default
private String env = "";

@Builder.Default
private String consumerGroup = "DefaultConsumerGroup";

@Builder.Default
private String producerGroup = "DefaultProducerGroup";

private String idc;
@Builder.Default
private String idc = "";

@Builder.Default
private String ip = "127.0.0.1";

private String pid;
@Builder.Default
private String pid = "";

private String sys;
@Builder.Default
private String sys = "";

private String userName;
@Builder.Default
private String userName = "";

private String password;
@Builder.Default
private String password = "";

@Builder.Default
private boolean useTls = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.utils.JsonUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand All @@ -45,7 +47,6 @@
import java.util.stream.Collectors;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import io.netty.handler.codec.http.HttpMethod;
Expand All @@ -56,7 +57,7 @@ public class EventMeshHttpConsumer extends AbstractHttpClient implements AutoClo

private final ThreadPoolExecutor consumeExecutor;

private static final List<SubscriptionItem> subscription = Lists.newArrayList();
private static final List<SubscriptionItem> SUBSCRIPTIONS = Collections.synchronizedList(new ArrayList<>());

private final ScheduledThreadPoolExecutor scheduler;

Expand Down Expand Up @@ -100,7 +101,7 @@ public void subscribe(List<SubscriptionItem> topicList, String subscribeUrl) thr
if (ret.getRetCode() != EventMeshRetCode.SUCCESS.getRetCode()) {
throw new EventMeshException(ret.getRetCode(), ret.getRetMsg());
}
subscription.addAll(topicList);
SUBSCRIPTIONS.addAll(topicList);
} catch (Exception ex) {
throw new EventMeshException(String.format("Subscribe topic error, target:%s", target), ex);
}
Expand Down Expand Up @@ -156,7 +157,7 @@ public void unsubscribe(List<String> topicList, String unSubscribeUrl) throws Ev
throw new EventMeshException(ret.getRetCode(), ret.getRetMsg());
}
// todo: avoid concurrentModifiedException
subscription.removeIf(item -> topicList.contains(item.getTopic()));
SUBSCRIPTIONS.removeIf(item -> topicList.contains(item.getTopic()));
} catch (Exception ex) {
throw new EventMeshException(String.format("Unsubscribe topic error, target:%s", target), ex);
}
Expand All @@ -173,15 +174,6 @@ public void close() throws EventMeshException {
log.info("LiteConsumer shutdown");
}

private String selectEventMesh() {
// todo: target endpoint maybe destroy, should remove the bad endpoint
if (eventMeshHttpClientConfig.isUseTls()) {
return Constants.HTTPS_PROTOCOL_PREFIX + eventMeshServerSelector.select();
} else {
return Constants.HTTP_PROTOCOL_PREFIX + eventMeshServerSelector.select();
}
}

private RequestParam buildCommonRequestParam() {
return new RequestParam(HttpMethod.POST)
.addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshHttpClientConfig.getEnv())
Expand All @@ -191,6 +183,7 @@ private RequestParam buildCommonRequestParam() {
.addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshHttpClientConfig.getSys())
.addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshHttpClientConfig.getUserName())
.addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshHttpClientConfig.getPassword())
// add protocol version?
.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here version is http version just http 1.0, 1.1.

.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
.setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,25 @@
import com.google.common.base.Preconditions;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.netty.handler.codec.http.HttpMethod;
import lombok.extern.slf4j.Slf4j;

@Slf4j
class CloudEventProducer extends AbstractHttpClient implements EventMeshProtocolProducer<CloudEvent> {

private static final String PROTOCOL_TYPE = "cloudevents";

public CloudEventProducer(EventMeshHttpClientConfig eventMeshHttpClientConfig) throws EventMeshException {
super(eventMeshHttpClientConfig);
}

@Override
public void publish(CloudEvent cloudEvent) throws EventMeshException {
validateCloudEvent(cloudEvent);
CloudEvent enhanceCloudEvent = enhanceCloudEvent(cloudEvent);
// todo: Can put to abstract class, all protocol use the same send method? This can be a template method
RequestParam requestParam = buildCommonPostParam(cloudEvent)
RequestParam requestParam = buildCommonPostParam(enhanceCloudEvent)
.addHeader(ProtocolKey.REQUEST_CODE, RequestCode.MSG_SEND_ASYNC.getRequestCode());
String target = selectEventMesh();
try {
Expand All @@ -53,7 +57,8 @@ public void publish(CloudEvent cloudEvent) throws EventMeshException {
@Override
public CloudEvent request(CloudEvent cloudEvent, long timeout) throws EventMeshException {
validateCloudEvent(cloudEvent);
RequestParam requestParam = buildCommonPostParam(cloudEvent)
CloudEvent enhanceCloudEvent = enhanceCloudEvent(cloudEvent);
RequestParam requestParam = buildCommonPostParam(enhanceCloudEvent)
.addHeader(ProtocolKey.REQUEST_CODE, RequestCode.MSG_SEND_SYNC.getRequestCode())
.setTimeout(timeout);
String target = selectEventMesh();
Expand All @@ -71,15 +76,16 @@ public CloudEvent request(CloudEvent cloudEvent, long timeout) throws EventMeshE
}

@Override
public void request(CloudEvent cloudEvent, RRCallback<CloudEvent> rrCallback, long timeout)
public void request(final CloudEvent cloudEvent, final RRCallback<CloudEvent> rrCallback, long timeout)
throws EventMeshException {
validateCloudEvent(cloudEvent);
RequestParam requestParam = buildCommonPostParam(cloudEvent)
CloudEvent enhanceCloudEvent = enhanceCloudEvent(cloudEvent);
RequestParam requestParam = buildCommonPostParam(enhanceCloudEvent)
.addHeader(ProtocolKey.REQUEST_CODE, RequestCode.MSG_SEND_SYNC.getRequestCode())
.setTimeout(timeout);
String target = selectEventMesh();
RRCallbackResponseHandlerAdapter<CloudEvent> adapter =
new RRCallbackResponseHandlerAdapter<>(cloudEvent, rrCallback, timeout);
RRCallbackResponseHandlerAdapter<CloudEvent> adapter = new RRCallbackResponseHandlerAdapter<>(
enhanceCloudEvent, rrCallback, timeout);
try {
HttpUtils.post(httpClient, null, target, requestParam, adapter);
} catch (IOException e) {
Expand All @@ -97,14 +103,27 @@ private RequestParam buildCommonPostParam(CloudEvent cloudEvent) {
requestParam
.addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshHttpClientConfig.getUserName())
.addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshHttpClientConfig.getPassword())
.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
// todo: add producerGroup to header, set protocol type, protocol version
.addHeader(ProtocolKey.PROTOCOL_TYPE, PROTOCOL_TYPE)
// todo: move producerGroup tp header
.addBody(SendMessageRequestBody.PRODUCERGROUP, eventMeshHttpClientConfig.getProducerGroup())
.addBody(SendMessageRequestBody.CONTENT, JsonUtils.serialize(cloudEvent));
return requestParam;
}

private CloudEvent enhanceCloudEvent(final CloudEvent cloudEvent) {
return CloudEventBuilder.from(cloudEvent)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, eventMeshHttpClientConfig.getEnv())
.withExtension(ProtocolKey.ClientInstanceKey.IDC, eventMeshHttpClientConfig.getIdc())
.withExtension(ProtocolKey.ClientInstanceKey.IP, eventMeshHttpClientConfig.getIp())
.withExtension(ProtocolKey.ClientInstanceKey.PID, eventMeshHttpClientConfig.getPid())
.withExtension(ProtocolKey.ClientInstanceKey.SYS, eventMeshHttpClientConfig.getSys())
.withExtension(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
.withExtension(ProtocolKey.PROTOCOL_DESC, cloudEvent.getSpecVersion().name())
.withExtension(ProtocolKey.PROTOCOL_VERSION, cloudEvent.getSpecVersion().toString())
.build();
}

private CloudEvent transformMessage(EventMeshRetObj retObj) {
SendMessageResponseBody.ReplyMessage replyMessage = JsonUtils.deserialize(retObj.getRetMsg(),
SendMessageResponseBody.ReplyMessage.class);
Expand Down
Loading