Skip to content

Commit

Permalink
Merge 81690ec into 42db065
Browse files Browse the repository at this point in the history
  • Loading branch information
lrhkobe authored Jul 13, 2023
2 parents 42db065 + 81690ec commit ac9519f
Show file tree
Hide file tree
Showing 22 changed files with 204 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,6 @@ public void reload() {
this.eventMeshProvideServerProtocols = Collections.singletonList(ConfigurationContextUtil.HTTP);
}

meshGroup = String.join("-", this.eventMeshEnv, this.eventMeshCluster, this.sysID);
meshGroup = String.join("-", this.eventMeshEnv, this.eventMeshIDC, this.eventMeshCluster, this.sysID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,20 @@ public DefaultFullHttpResponse httpResponse() throws Exception {
return response;
}

public DefaultFullHttpResponse httpResponse(HttpResponseStatus httpResponseStatus) throws Exception {
if (cmdType == CmdType.REQ) {
return null;
}
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus,
Unpooled.wrappedBuffer(Objects.requireNonNull(JsonUtils.toJSONString(this.getBody())).getBytes(Constants.DEFAULT_CHARSET)));
HttpHeaders headers = response.headers();
headers.add(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=" + Constants.DEFAULT_CHARSET);
headers.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
headers.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
Optional.of(this.getHeader().toMap()).ifPresent(customerHeader -> customerHeader.forEach(headers::add));
return response;
}

public enum CmdType {
REQ,
RES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public class HttpEventWrapper implements ProtocolTransportObject {
//Command response time
public long resTime;

private HttpResponseStatus httpResponseStatus = HttpResponseStatus.OK;

public HttpEventWrapper() {
this(null, null, null);
}
Expand Down Expand Up @@ -180,7 +182,7 @@ public void setBody(byte[] newBody) {
}

public DefaultFullHttpResponse httpResponse() throws Exception {
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus,
Unpooled.wrappedBuffer(this.body));
HttpHeaders headers = response.headers();
headers.add(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=" + Constants.DEFAULT_CHARSET);
Expand Down Expand Up @@ -225,4 +227,8 @@ public void buildSysHeaderForCE() {
sysHeaderMap.put(ProtocolKey.CloudEventsKey.SUBJECT, topic);
}

public void setHttpResponseStatus(HttpResponseStatus httpResponseStatus) {
this.httpResponseStatus = httpResponseStatus;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,18 @@ private static Object deserializeBody(String bodyJsonString, Header header) thro
case RESPONSE_TO_CLIENT_ACK:
case ASYNC_MESSAGE_TO_CLIENT_ACK:
case BROADCAST_MESSAGE_TO_CLIENT_ACK:
case HELLO_RESPONSE:
case RECOMMEND_RESPONSE:
case SUBSCRIBE_RESPONSE:
case LISTEN_RESPONSE:
case UNSUBSCRIBE_RESPONSE:
case HEARTBEAT_RESPONSE:
case ASYNC_MESSAGE_TO_SERVER_ACK:
case BROADCAST_MESSAGE_TO_SERVER_ACK:
case CLIENT_GOODBYE_REQUEST:
case CLIENT_GOODBYE_RESPONSE:
case SERVER_GOODBYE_REQUEST:
case SERVER_GOODBYE_RESPONSE:
// The message string will be deserialized by protocol plugin, if the event is cloudevents, the body is
// just a string.
return bodyJsonString;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -106,6 +108,7 @@ public class HttpSummaryMetrics implements Metric {

private final DelayQueue<?> httpFailedQueue;

private Lock lock = new ReentrantLock();

public HttpSummaryMetrics(final ThreadPoolExecutor batchMsgExecutor,
final ThreadPoolExecutor sendMsgExecutor,
Expand Down Expand Up @@ -139,20 +142,37 @@ public void recordHTTPDiscard() {
}

public void snapshotHTTPTPS() {
Integer tps = httpRequestPerSecond.intValue();
httpRequestTPSSnapshots.add(tps);
httpRequestPerSecond.set(0);
if (httpRequestTPSSnapshots.size() > STATIC_PERIOD / 1000) {
httpRequestTPSSnapshots.removeFirst();
try {
lock.lock();
Integer tps = httpRequestPerSecond.intValue();
httpRequestTPSSnapshots.add(tps);
httpRequestPerSecond.set(0);
if (httpRequestTPSSnapshots.size() > STATIC_PERIOD / 1000) {
httpRequestTPSSnapshots.removeFirst();
}
} finally {
lock.unlock();
}
}

public float maxHTTPTPS() {
return Collections.max(httpRequestTPSSnapshots);
try {
lock.lock();
float tps = Collections.max(httpRequestTPSSnapshots);
return tps;
} finally {
lock.unlock();
}
}

public float avgHTTPTPS() {
return avg(httpRequestTPSSnapshots);
try {
lock.lock();
float tps = avg(httpRequestTPSSnapshots);
return tps;
} finally {
lock.unlock();
}
}

public void recordHTTPReqResTimeCost(long cost) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ public enum HttpProtocolConstant {
;
public static final String PROTOCOL_NAME = "http";

public static final String DATA_CONTENT_TYPE = "application/json";

public static final String CONSTANTS_DEFAULT_SOURCE = "/";
public static final String CONSTANTS_DEFAULT_TYPE = "http_request";
public static final String CONSTANTS_DEFAULT_SUBJECT = "TOPIC-HTTP-REQUEST";
public static final String CONSTANTS_DEFAULT_SUBJECT = "";

public static final String CONSTANTS_KEY_ID = "id";
public static final String CONSTANTS_KEY_SOURCE = "source";
Expand All @@ -35,4 +33,9 @@ public enum HttpProtocolConstant {
public static final String CONSTANTS_KEY_BODY = "body";
public static final String CONSTANTS_KEY_PATH = "path";
public static final String CONSTANTS_KEY_METHOD = "method";

public static final String DATA_CONTENT_TYPE = "Content-Type";

public static final String APPLICATION_JSON = "application/json";
public static final String PROTOBUF = "application/x-protobuf";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.eventmesh.protocol.http.resolver;

import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
Expand All @@ -30,7 +29,6 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;

import io.cloudevents.CloudEvent;
Expand Down Expand Up @@ -60,12 +58,14 @@ public static CloudEvent buildEvent(HttpEventWrapper httpEventWrapper) throws Pr
String subject = sysHeaderMap.getOrDefault(HttpProtocolConstant.CONSTANTS_KEY_SUBJECT,
HttpProtocolConstant.CONSTANTS_DEFAULT_SUBJECT).toString();

String dataContentType = requestHeaderMap.getOrDefault(HttpProtocolConstant.DATA_CONTENT_TYPE,
HttpProtocolConstant.APPLICATION_JSON).toString();
// with attributes
builder.withId(id)
.withType(type)
.withSource(URI.create(HttpProtocolConstant.CONSTANTS_KEY_SOURCE + ":" + source))
.withSubject(subject)
.withDataContentType(HttpProtocolConstant.DATA_CONTENT_TYPE);
.withDataContentType(dataContentType);

// with extensions
for (Map.Entry<String, Object> extension : sysHeaderMap.entrySet()) {
Expand All @@ -81,19 +81,24 @@ public static CloudEvent buildEvent(HttpEventWrapper httpEventWrapper) throws Pr

byte[] requestBody = httpEventWrapper.getBody();

Map<String, Object> requestBodyMap = JsonUtils.parseTypeReferenceObject(new String(requestBody, Constants.DEFAULT_CHARSET),
new TypeReference<HashMap<String, Object>>() {
});

String requestURI = httpEventWrapper.getRequestURI();

Map<String, Object> data = new HashMap<>();
data.put(HttpProtocolConstant.CONSTANTS_KEY_HEADERS, requestHeaderMap);
data.put(HttpProtocolConstant.CONSTANTS_KEY_BODY, requestBodyMap);
data.put(HttpProtocolConstant.CONSTANTS_KEY_PATH, requestURI);
data.put(HttpProtocolConstant.CONSTANTS_KEY_METHOD, httpEventWrapper.getHttpMethod());
// with data
return builder.withData(Objects.requireNonNull(JsonUtils.toJSONString(data)).getBytes(StandardCharsets.UTF_8)).build();
if (StringUtils.equals(dataContentType, HttpProtocolConstant.APPLICATION_JSON)) {
Map<String, Object> requestBodyMap = JsonUtils.parseTypeReferenceObject(new String(requestBody),
new TypeReference<HashMap<String, Object>>() {});

String requestURI = httpEventWrapper.getRequestURI();

Map<String, Object> data = new HashMap<>();
data.put(HttpProtocolConstant.CONSTANTS_KEY_HEADERS, requestHeaderMap);
data.put(HttpProtocolConstant.CONSTANTS_KEY_BODY, requestBodyMap);
data.put(HttpProtocolConstant.CONSTANTS_KEY_PATH, requestURI);
data.put(HttpProtocolConstant.CONSTANTS_KEY_METHOD, httpEventWrapper.getHttpMethod());
// with data
builder = builder.withData(JsonUtils.toJSONString(data).getBytes(StandardCharsets.UTF_8));
} else if (StringUtils.equals(dataContentType, HttpProtocolConstant.PROTOBUF)) {
// with data
builder = builder.withData(requestBody);
}
return builder.build();
} catch (Exception e) {
throw new ProtocolHandleException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ private static AclProperties buildTcpAclProperties(String remoteAddr, UserAgent
aclProperties.setPwd(userAgent.getPassword());
aclProperties.setSubsystem(userAgent.getSubsystem());
aclProperties.setRequestCode(requestCode);
aclProperties.setVersion(userAgent.getVersion());
if (StringUtils.isNotBlank(topic)) {
aclProperties.setTopic(topic);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ public void start() throws Exception {
} catch (Exception ex) {
log.error("HTTPServer shutdown error!", ex);
}
System.exit(-1);
}
};

Expand Down Expand Up @@ -413,7 +414,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
|| !RequestCode.contains(Integer.valueOf(requestCode))) {
responseCommand =
requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID);
sendResponse(ctx, responseCommand.httpResponse());
sendResponse(ctx, responseCommand.httpResponse(HttpResponseStatus.BAD_REQUEST));

span = TraceUtils.prepareServerSpan(headerMap,
EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false);
Expand All @@ -427,7 +428,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
requestCommand.setBody(Body.buildBody(requestCode, bodyMap));
} catch (Exception e) {
responseCommand = requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_RUNTIME_ERR);
sendResponse(ctx, responseCommand.httpResponse());
sendResponse(ctx, responseCommand.httpResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR));

span = TraceUtils.prepareServerSpan(headerMap,
EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false);
Expand All @@ -447,6 +448,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

} catch (Exception ex) {
log.error("AbstractHTTPServer.HTTPHandler.channelRead error", ex);
sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
} finally {
ReferenceCountUtil.release(httpRequest);
}
Expand All @@ -466,13 +468,25 @@ public void processHttpRequest(final ChannelHandlerContext ctx,

final Pair<EventProcessor, ThreadPoolExecutor> choosed = eventProcessorTable.get(processorKey);
try {
if (choosed == null) {
log.warn("eventProcessorTable not contains EventProcessor, processorKey:{}", processorKey);
sendError(ctx, HttpResponseStatus.BAD_REQUEST);
return;
}
if (choosed.getObject2() == null) {
log.warn("eventProcessorTable not contains ThreadPoolExecutor, processorKey:{}", processorKey);
sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
return;
}

choosed.getObject2().submit(() -> {
try {
final EventProcessor processor = choosed.getObject1();
if (processor.rejectRequest()) {
final HttpEventWrapper responseWrapper =
requestWrapper.createHttpResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR);

responseWrapper.setHttpResponseStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
asyncContext.onComplete(responseWrapper);

if (asyncContext.isComplete()) {
Expand All @@ -499,10 +513,12 @@ public void processHttpRequest(final ChannelHandlerContext ctx,
sendResponse(ctx, asyncContext.getResponse().httpResponse());
} catch (Exception e) {
log.error("process error", e);
sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
});
} catch (RejectedExecutionException re) {
final HttpEventWrapper responseWrapper = requestWrapper.createHttpResponse(EventMeshRetCode.OVERLOAD);
responseWrapper.setHttpResponseStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
asyncContext.onComplete(responseWrapper);
metrics.getSummaryMetrics().recordHTTPDiscard();
metrics.getSummaryMetrics().recordHTTPReqResTimeCost(
Expand Down Expand Up @@ -619,6 +635,7 @@ private HttpEventWrapper parseHttpRequest(final HttpRequest httpRequest) throws
}

final long bodyDecodeStart = System.currentTimeMillis();
byte[] requestBody = null;

final FullHttpRequest fullHttpRequest = (FullHttpRequest) httpRequest;
final Map<String, Object> bodyMap = new HashMap<>();
Expand All @@ -635,6 +652,14 @@ private HttpEventWrapper parseHttpRequest(final HttpRequest httpRequest) throws
bodyMap.putAll(Objects.requireNonNull(JsonUtils.parseTypeReferenceObject(new String(body, Constants.DEFAULT_CHARSET),
new TypeReference<Map<String, Object>>() {
})));
requestBody = JsonUtils.toJSONString(bodyMap).getBytes(StandardCharsets.UTF_8);
}
} else if (StringUtils.equals(httpRequest.headers().get("Content-Type"), "application/x-protobuf")) {
int length = fullHttpRequest.content().readableBytes();
if (length > 0) {
byte[] body = new byte[length];
fullHttpRequest.content().readBytes(body);
requestBody = body;
}
} else {
final HttpPostRequestDecoder decoder =
Expand All @@ -646,13 +671,14 @@ private HttpEventWrapper parseHttpRequest(final HttpRequest httpRequest) throws
}
}
decoder.destroy();
requestBody = JsonUtils.toJSONString(bodyMap).getBytes(StandardCharsets.UTF_8);
}

} else {
throw new MethodNotSupportedException("UnSupported Method " + fullHttpRequest.method());
}

httpEventWrapper.setBody(Objects.requireNonNull(JsonUtils.toJSONString(bodyMap)).getBytes(StandardCharsets.UTF_8));
httpEventWrapper.setBody(requestBody);

metrics.getSummaryMetrics().recordDecodeTimeCost(System.currentTimeMillis() - bodyDecodeStart);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public void initChannel(final Channel ch) throws Exception {
} catch (Exception ex) {
log.error("EventMeshTCPServer RemotingServer shutdown Err!", ex);
}
System.exit(-1);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,16 @@ public synchronized void init() throws Exception {
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;

if (currentTopicConfig == null) {
log.error("no topicConfig found, consumerGroup:{} topic:{}",
consumerGroupConf.getConsumerGroup(), topic);
try {
sendMessageBack(event, uniqueId, bizSeqNo);
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
log.warn("no ConsumerGroupTopicConf found, sendMessageBack success, consumerGroup:{}, topic:{}, bizSeqNo={}, uniqueId={}",
consumerGroupConf.getConsumerGroup(), topic, bizSeqNo, uniqueId);
} catch (Exception ex) {
//ignore
log.warn("sendMessageBack fail, consumerGroup:{}, topic:{}, bizSeqNo={}, uniqueId={}",
consumerGroupConf.getConsumerGroup(), topic, bizSeqNo, uniqueId, ex);
}
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
}

SubscriptionItem subscriptionItem =
Expand All @@ -157,6 +158,7 @@ public synchronized void init() throws Exception {
sendMessageBack(event, uniqueId, bizSeqNo);
} catch (Exception e) {
//ignore
log.warn("sendMessageBack fail,topic:{}, bizSeqNo={}, uniqueId={}", topic, bizSeqNo, uniqueId, e);
}
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
}
Expand Down
Loading

0 comments on commit ac9519f

Please sign in to comment.