diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java index 61e4d9ac61..50be034c8c 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java @@ -124,6 +124,6 @@ public void reload() { this.eventMeshProvideServerProtocols = Collections.singletonList(ConfigurationContextUtil.HTTP); } - meshGroup = String.join("-", this.eventMeshEnv, this.eventMeshCluster, this.sysID); + meshGroup = String.join("-", this.eventMeshEnv, this.eventMeshIDC, this.eventMeshCluster, this.sysID); } } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpCommand.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpCommand.java index 76cda7a70f..fa2057ae4c 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpCommand.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpCommand.java @@ -176,6 +176,20 @@ public DefaultFullHttpResponse httpResponse() throws Exception { return response; } + public DefaultFullHttpResponse httpResponse(HttpResponseStatus httpResponseStatus) throws Exception { + if (cmdType == CmdType.REQ) { + return null; + } + DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus, + Unpooled.wrappedBuffer(Objects.requireNonNull(JsonUtils.toJSONString(this.getBody())).getBytes(Constants.DEFAULT_CHARSET))); + HttpHeaders headers = response.headers(); + headers.add(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=" + Constants.DEFAULT_CHARSET); + headers.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); + headers.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + Optional.of(this.getHeader().toMap()).ifPresent(customerHeader -> customerHeader.forEach(headers::add)); + return response; + } + public enum CmdType { REQ, RES diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpEventWrapper.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpEventWrapper.java index 2779bb73d8..3b40d8c163 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpEventWrapper.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpEventWrapper.java @@ -66,6 +66,8 @@ public class HttpEventWrapper implements ProtocolTransportObject { //Command response time public long resTime; + private HttpResponseStatus httpResponseStatus = HttpResponseStatus.OK; + public HttpEventWrapper() { this(null, null, null); } @@ -180,7 +182,7 @@ public void setBody(byte[] newBody) { } public DefaultFullHttpResponse httpResponse() throws Exception { - DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, + DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus, Unpooled.wrappedBuffer(this.body)); HttpHeaders headers = response.headers(); headers.add(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=" + Constants.DEFAULT_CHARSET); @@ -225,4 +227,8 @@ public void buildSysHeaderForCE() { sysHeaderMap.put(ProtocolKey.CloudEventsKey.SUBJECT, topic); } + public void setHttpResponseStatus(HttpResponseStatus httpResponseStatus) { + this.httpResponseStatus = httpResponseStatus; + } + } \ No newline at end of file diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java index 3e86ba7524..7ce5ecbf8a 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java @@ -194,6 +194,18 @@ private static Object deserializeBody(String bodyJsonString, Header header) thro case RESPONSE_TO_CLIENT_ACK: case ASYNC_MESSAGE_TO_CLIENT_ACK: case BROADCAST_MESSAGE_TO_CLIENT_ACK: + case HELLO_RESPONSE: + case RECOMMEND_RESPONSE: + case SUBSCRIBE_RESPONSE: + case LISTEN_RESPONSE: + case UNSUBSCRIBE_RESPONSE: + case HEARTBEAT_RESPONSE: + case ASYNC_MESSAGE_TO_SERVER_ACK: + case BROADCAST_MESSAGE_TO_SERVER_ACK: + case CLIENT_GOODBYE_REQUEST: + case CLIENT_GOODBYE_RESPONSE: + case SERVER_GOODBYE_REQUEST: + case SERVER_GOODBYE_RESPONSE: // The message string will be deserialized by protocol plugin, if the event is cloudevents, the body is // just a string. return bodyJsonString; diff --git a/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java b/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java index 34de770820..5a9a092f7f 100644 --- a/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java +++ b/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java @@ -22,6 +22,8 @@ import java.util.concurrent.DelayQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import lombok.extern.slf4j.Slf4j; @@ -106,6 +108,7 @@ public class HttpSummaryMetrics implements Metric { private final DelayQueue httpFailedQueue; + private Lock lock = new ReentrantLock(); public HttpSummaryMetrics(final ThreadPoolExecutor batchMsgExecutor, final ThreadPoolExecutor sendMsgExecutor, @@ -139,20 +142,37 @@ public void recordHTTPDiscard() { } public void snapshotHTTPTPS() { - Integer tps = httpRequestPerSecond.intValue(); - httpRequestTPSSnapshots.add(tps); - httpRequestPerSecond.set(0); - if (httpRequestTPSSnapshots.size() > STATIC_PERIOD / 1000) { - httpRequestTPSSnapshots.removeFirst(); + try { + lock.lock(); + Integer tps = httpRequestPerSecond.intValue(); + httpRequestTPSSnapshots.add(tps); + httpRequestPerSecond.set(0); + if (httpRequestTPSSnapshots.size() > STATIC_PERIOD / 1000) { + httpRequestTPSSnapshots.removeFirst(); + } + } finally { + lock.unlock(); } } public float maxHTTPTPS() { - return Collections.max(httpRequestTPSSnapshots); + try { + lock.lock(); + float tps = Collections.max(httpRequestTPSSnapshots); + return tps; + } finally { + lock.unlock(); + } } public float avgHTTPTPS() { - return avg(httpRequestTPSSnapshots); + try { + lock.lock(); + float tps = avg(httpRequestTPSSnapshots); + return tps; + } finally { + lock.unlock(); + } } public void recordHTTPReqResTimeCost(long cost) { diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/HttpProtocolConstant.java b/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/HttpProtocolConstant.java index 0e70bd7d5a..f45204594e 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/HttpProtocolConstant.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/HttpProtocolConstant.java @@ -21,11 +21,9 @@ public enum HttpProtocolConstant { ; public static final String PROTOCOL_NAME = "http"; - public static final String DATA_CONTENT_TYPE = "application/json"; - public static final String CONSTANTS_DEFAULT_SOURCE = "/"; public static final String CONSTANTS_DEFAULT_TYPE = "http_request"; - public static final String CONSTANTS_DEFAULT_SUBJECT = "TOPIC-HTTP-REQUEST"; + public static final String CONSTANTS_DEFAULT_SUBJECT = ""; public static final String CONSTANTS_KEY_ID = "id"; public static final String CONSTANTS_KEY_SOURCE = "source"; @@ -35,4 +33,9 @@ public enum HttpProtocolConstant { public static final String CONSTANTS_KEY_BODY = "body"; public static final String CONSTANTS_KEY_PATH = "path"; public static final String CONSTANTS_KEY_METHOD = "method"; + + public static final String DATA_CONTENT_TYPE = "Content-Type"; + + public static final String APPLICATION_JSON = "application/json"; + public static final String PROTOBUF = "application/x-protobuf"; } diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/resolver/HttpRequestProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/resolver/HttpRequestProtocolResolver.java index 186a74e5bc..ac723cf858 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/resolver/HttpRequestProtocolResolver.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/resolver/HttpRequestProtocolResolver.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.protocol.http.resolver; -import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException; @@ -30,7 +29,6 @@ import java.util.HashMap; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.UUID; import io.cloudevents.CloudEvent; @@ -60,12 +58,14 @@ public static CloudEvent buildEvent(HttpEventWrapper httpEventWrapper) throws Pr String subject = sysHeaderMap.getOrDefault(HttpProtocolConstant.CONSTANTS_KEY_SUBJECT, HttpProtocolConstant.CONSTANTS_DEFAULT_SUBJECT).toString(); + String dataContentType = requestHeaderMap.getOrDefault(HttpProtocolConstant.DATA_CONTENT_TYPE, + HttpProtocolConstant.APPLICATION_JSON).toString(); // with attributes builder.withId(id) .withType(type) .withSource(URI.create(HttpProtocolConstant.CONSTANTS_KEY_SOURCE + ":" + source)) .withSubject(subject) - .withDataContentType(HttpProtocolConstant.DATA_CONTENT_TYPE); + .withDataContentType(dataContentType); // with extensions for (Map.Entry extension : sysHeaderMap.entrySet()) { @@ -81,19 +81,24 @@ public static CloudEvent buildEvent(HttpEventWrapper httpEventWrapper) throws Pr byte[] requestBody = httpEventWrapper.getBody(); - Map requestBodyMap = JsonUtils.parseTypeReferenceObject(new String(requestBody, Constants.DEFAULT_CHARSET), - new TypeReference>() { - }); - - String requestURI = httpEventWrapper.getRequestURI(); - - Map data = new HashMap<>(); - data.put(HttpProtocolConstant.CONSTANTS_KEY_HEADERS, requestHeaderMap); - data.put(HttpProtocolConstant.CONSTANTS_KEY_BODY, requestBodyMap); - data.put(HttpProtocolConstant.CONSTANTS_KEY_PATH, requestURI); - data.put(HttpProtocolConstant.CONSTANTS_KEY_METHOD, httpEventWrapper.getHttpMethod()); - // with data - return builder.withData(Objects.requireNonNull(JsonUtils.toJSONString(data)).getBytes(StandardCharsets.UTF_8)).build(); + if (StringUtils.equals(dataContentType, HttpProtocolConstant.APPLICATION_JSON)) { + Map requestBodyMap = JsonUtils.parseTypeReferenceObject(new String(requestBody), + new TypeReference>() {}); + + String requestURI = httpEventWrapper.getRequestURI(); + + Map data = new HashMap<>(); + data.put(HttpProtocolConstant.CONSTANTS_KEY_HEADERS, requestHeaderMap); + data.put(HttpProtocolConstant.CONSTANTS_KEY_BODY, requestBodyMap); + data.put(HttpProtocolConstant.CONSTANTS_KEY_PATH, requestURI); + data.put(HttpProtocolConstant.CONSTANTS_KEY_METHOD, httpEventWrapper.getHttpMethod()); + // with data + builder = builder.withData(JsonUtils.toJSONString(data).getBytes(StandardCharsets.UTF_8)); + } else if (StringUtils.equals(dataContentType, HttpProtocolConstant.PROTOBUF)) { + // with data + builder = builder.withData(requestBody); + } + return builder.build(); } catch (Exception e) { throw new ProtocolHandleException(e.getMessage(), e); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java index c2adcce873..13ed186c93 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java @@ -196,6 +196,7 @@ private static AclProperties buildTcpAclProperties(String remoteAddr, UserAgent aclProperties.setPwd(userAgent.getPassword()); aclProperties.setSubsystem(userAgent.getSubsystem()); aclProperties.setRequestCode(requestCode); + aclProperties.setVersion(userAgent.getVersion()); if (StringUtils.isNotBlank(topic)) { aclProperties.setTopic(topic); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java index d74b69a3ac..5e67282225 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java @@ -227,6 +227,7 @@ public void start() throws Exception { } catch (Exception ex) { log.error("HTTPServer shutdown error!", ex); } + System.exit(-1); } }; @@ -413,7 +414,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception || !RequestCode.contains(Integer.valueOf(requestCode))) { responseCommand = requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID); - sendResponse(ctx, responseCommand.httpResponse()); + sendResponse(ctx, responseCommand.httpResponse(HttpResponseStatus.BAD_REQUEST)); span = TraceUtils.prepareServerSpan(headerMap, EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); @@ -427,7 +428,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception requestCommand.setBody(Body.buildBody(requestCode, bodyMap)); } catch (Exception e) { responseCommand = requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_RUNTIME_ERR); - sendResponse(ctx, responseCommand.httpResponse()); + sendResponse(ctx, responseCommand.httpResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR)); span = TraceUtils.prepareServerSpan(headerMap, EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); @@ -447,6 +448,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } catch (Exception ex) { log.error("AbstractHTTPServer.HTTPHandler.channelRead error", ex); + sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR); } finally { ReferenceCountUtil.release(httpRequest); } @@ -466,6 +468,17 @@ public void processHttpRequest(final ChannelHandlerContext ctx, final Pair choosed = eventProcessorTable.get(processorKey); try { + if (choosed == null) { + log.warn("eventProcessorTable not contains EventProcessor, processorKey:{}", processorKey); + sendError(ctx, HttpResponseStatus.BAD_REQUEST); + return; + } + if (choosed.getObject2() == null) { + log.warn("eventProcessorTable not contains ThreadPoolExecutor, processorKey:{}", processorKey); + sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR); + return; + } + choosed.getObject2().submit(() -> { try { final EventProcessor processor = choosed.getObject1(); @@ -473,6 +486,7 @@ public void processHttpRequest(final ChannelHandlerContext ctx, final HttpEventWrapper responseWrapper = requestWrapper.createHttpResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR); + responseWrapper.setHttpResponseStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); asyncContext.onComplete(responseWrapper); if (asyncContext.isComplete()) { @@ -499,10 +513,12 @@ public void processHttpRequest(final ChannelHandlerContext ctx, sendResponse(ctx, asyncContext.getResponse().httpResponse()); } catch (Exception e) { log.error("process error", e); + sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR); } }); } catch (RejectedExecutionException re) { final HttpEventWrapper responseWrapper = requestWrapper.createHttpResponse(EventMeshRetCode.OVERLOAD); + responseWrapper.setHttpResponseStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); asyncContext.onComplete(responseWrapper); metrics.getSummaryMetrics().recordHTTPDiscard(); metrics.getSummaryMetrics().recordHTTPReqResTimeCost( @@ -619,6 +635,7 @@ private HttpEventWrapper parseHttpRequest(final HttpRequest httpRequest) throws } final long bodyDecodeStart = System.currentTimeMillis(); + byte[] requestBody = null; final FullHttpRequest fullHttpRequest = (FullHttpRequest) httpRequest; final Map bodyMap = new HashMap<>(); @@ -635,6 +652,14 @@ private HttpEventWrapper parseHttpRequest(final HttpRequest httpRequest) throws bodyMap.putAll(Objects.requireNonNull(JsonUtils.parseTypeReferenceObject(new String(body, Constants.DEFAULT_CHARSET), new TypeReference>() { }))); + requestBody = JsonUtils.toJSONString(bodyMap).getBytes(StandardCharsets.UTF_8); + } + } else if (StringUtils.equals(httpRequest.headers().get("Content-Type"), "application/x-protobuf")) { + int length = fullHttpRequest.content().readableBytes(); + if (length > 0) { + byte[] body = new byte[length]; + fullHttpRequest.content().readBytes(body); + requestBody = body; } } else { final HttpPostRequestDecoder decoder = @@ -646,13 +671,14 @@ private HttpEventWrapper parseHttpRequest(final HttpRequest httpRequest) throws } } decoder.destroy(); + requestBody = JsonUtils.toJSONString(bodyMap).getBytes(StandardCharsets.UTF_8); } } else { throw new MethodNotSupportedException("UnSupported Method " + fullHttpRequest.method()); } - httpEventWrapper.setBody(Objects.requireNonNull(JsonUtils.toJSONString(bodyMap)).getBytes(StandardCharsets.UTF_8)); + httpEventWrapper.setBody(requestBody); metrics.getSummaryMetrics().recordDecodeTimeCost(System.currentTimeMillis() - bodyDecodeStart); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java index 67b56ed6df..4b5c744296 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java @@ -195,6 +195,7 @@ public void initChannel(final Channel ch) throws Exception { } catch (Exception ex) { log.error("EventMeshTCPServer RemotingServer shutdown Err!", ex); } + System.exit(-1); } }; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java index 31fd31b598..6a43e88ec3 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java @@ -130,15 +130,16 @@ public synchronized void init() throws Exception { EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context; if (currentTopicConfig == null) { - log.error("no topicConfig found, consumerGroup:{} topic:{}", - consumerGroupConf.getConsumerGroup(), topic); try { sendMessageBack(event, uniqueId, bizSeqNo); - eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); - return; + log.warn("no ConsumerGroupTopicConf found, sendMessageBack success, consumerGroup:{}, topic:{}, bizSeqNo={}, uniqueId={}", + consumerGroupConf.getConsumerGroup(), topic, bizSeqNo, uniqueId); } catch (Exception ex) { - //ignore + log.warn("sendMessageBack fail, consumerGroup:{}, topic:{}, bizSeqNo={}, uniqueId={}", + consumerGroupConf.getConsumerGroup(), topic, bizSeqNo, uniqueId, ex); } + eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); + return; } SubscriptionItem subscriptionItem = @@ -157,6 +158,7 @@ public synchronized void init() throws Exception { sendMessageBack(event, uniqueId, bizSeqNo); } catch (Exception e) { //ignore + log.warn("sendMessageBack fail,topic:{}, bizSeqNo={}, uniqueId={}", topic, bizSeqNo, uniqueId, e); } eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java index e2da611851..e6af880a90 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java @@ -33,14 +33,18 @@ import java.util.Map; import java.util.Objects; -import io.cloudevents.CloudEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.cloudevents.CloudEvent; import lombok.extern.slf4j.Slf4j; @Slf4j public class HandleMsgContext { + public static final Logger MESSAGE_LOGGER = LoggerFactory.getLogger(EventMeshConstants.MESSAGE); + private String msgRandomNo; private String consumerGroup; @@ -205,9 +209,9 @@ public EventMeshHTTPServer getEventMeshHTTPServer() { public void finish() { if (Objects.nonNull(eventMeshConsumer) && Objects.nonNull(context) && Objects.nonNull(event)) { - if (log.isDebugEnabled()) { - log.debug("messageAcked|topic={}|event={}", topic, event); - } + MESSAGE_LOGGER.info("messageAcked|group={}|topic={}|bizSeq={}|uniqId={}|msgRandomNo={}|queueId={}|queueOffset={}", + consumerGroup, topic, bizSeqNo, uniqueId, msgRandomNo, event.getExtension(Constants.PROPERTY_MESSAGE_QUEUE_ID), + event.getExtension(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET)); eventMeshConsumer.updateOffset(topic, subscriptionItem.getMode(), Collections.singletonList(event), context); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java index dc2a9bed8a..7181038326 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java @@ -33,7 +33,6 @@ import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.AbstractEventProcessor; import org.apache.eventmesh.runtime.util.RemotingHelper; -import org.apache.eventmesh.runtime.util.WebhookUtil; import org.apache.commons.lang3.StringUtils; import org.apache.http.impl.client.CloseableHttpClient; @@ -117,8 +116,7 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest return; } - String url = requestBodyMap.get(EventMeshConstants.URL).toString(); - String consumerGroup = requestBodyMap.get(EventMeshConstants.CONSUMER_GROUP).toString(); + //String url = requestBodyMap.get(EventMeshConstants.URL).toString(); String topic = JsonUtils.toJSONString(requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC)); // SubscriptionItem @@ -146,32 +144,32 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest } // validate URL - try { - if (!IPUtils.isValidDomainOrIp(url, eventMeshHttpConfiguration.getEventMeshIpv4BlackList(), - eventMeshHttpConfiguration.getEventMeshIpv6BlackList())) { - httpLogger.error("subscriber url {} is not valid", url); - handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap, - responseBodyMap, null); - return; - } - } catch (Exception e) { - httpLogger.error("subscriber url {} is not valid, error {}", url, e.getMessage()); - handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap, - responseBodyMap, null); - return; - } - - CloseableHttpClient closeableHttpClient = eventMeshHTTPServer.getHttpClientPool().getClient(); - // obtain webhook delivery agreement for Abuse Protection - boolean isWebhookAllowed = WebhookUtil.obtainDeliveryAgreement(closeableHttpClient, - url, eventMeshHttpConfiguration.getEventMeshWebhookOrigin()); - - if (!isWebhookAllowed) { - httpLogger.error("subscriber url {} is not allowed by the target system", url); - handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap, - responseBodyMap, null); - return; - } + // try { + // if (!IPUtils.isValidDomainOrIp(url, eventMeshHttpConfiguration.getEventMeshIpv4BlackList(), + // eventMeshHttpConfiguration.getEventMeshIpv6BlackList())) { + // httpLogger.error("subscriber url {} is not valid", url); + // handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap, + // responseBodyMap, null); + // return; + // } + // } catch (Exception e) { + // httpLogger.error("subscriber url {} is not valid, error {}", url, e.getMessage()); + // handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap, + // responseBodyMap, null); + // return; + // } + // + // CloseableHttpClient closeableHttpClient = eventMeshHTTPServer.getHttpClientPool().getClient(); + // // obtain webhook delivery agreement for Abuse Protection + // boolean isWebhookAllowed = WebhookUtil.obtainDeliveryAgreement(closeableHttpClient, + // url, eventMeshHttpConfiguration.getEventMeshWebhookOrigin()); + // + // if (!isWebhookAllowed) { + // httpLogger.error("subscriber url {} is not allowed by the target system", url); + // handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap, + // responseBodyMap, null); + // return; + // } long startTime = System.currentTimeMillis(); try { @@ -187,11 +185,12 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest String targetMesh = requestBodyMap.get("remoteMesh") == null ? "" : requestBodyMap.get("remoteMesh").toString(); // Get mesh address from registry - String meshAddress = getTargetMesh(consumerGroup, subscriptionList); + String meshAddress = getTargetMesh(eventMeshHttpConfiguration.getMeshGroup(), subscriptionList); if (StringUtils.isNotBlank(meshAddress)) { targetMesh = meshAddress; } + CloseableHttpClient closeableHttpClient = eventMeshHTTPServer.getHttpClientPool().getClient(); String remoteResult = post(closeableHttpClient, targetMesh, builderRemoteHeaderMap(localAddress), remoteBodyMap, response -> EntityUtils.toString(response.getEntity(), Constants.DEFAULT_CHARSET)); @@ -213,10 +212,8 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest } catch (Exception e) { long endTime = System.currentTimeMillis(); - httpLogger.error( - "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}" - + "|bizSeqNo={}|uniqueId={}", endTime - startTime, - JsonUtils.toJSONString(subscriptionList), url, e); + httpLogger.error("subscribe Remote|cost={}ms|topic={}", endTime - startTime, + JsonUtils.toJSONString(subscriptionList), e); handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR, responseHeaderMap, responseBodyMap, null); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java index 31dbb19121..f601e302dd 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java @@ -115,9 +115,7 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest return; } - String unSubscribeUrl = requestBodyMap.get(EventMeshConstants.URL).toString(); - String consumerGroup = requestBodyMap.get(EventMeshConstants.CONSUMER_GROUP).toString(); - String topic = requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC).toString(); + String topic = JsonUtils.toJSONString(requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC).toString()); long startTime = System.currentTimeMillis(); try { @@ -156,7 +154,7 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest return subscriptionItem; }).collect(Collectors.toList()); // Get mesh address from registry - String meshAddress = getTargetMesh(consumerGroup, subscriptionList); + String meshAddress = getTargetMesh(meshGroup, subscriptionList); if (StringUtils.isNotBlank(meshAddress)) { targetMesh = meshAddress; } @@ -182,10 +180,7 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest } } catch (Exception e) { long endTime = System.currentTimeMillis(); - httpLogger.error( - "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}" - + "|bizSeqNo={}|uniqueId={}", endTime - startTime, - topic, unSubscribeUrl, e); + httpLogger.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}", endTime - startTime, topic, e); handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR, responseHeaderMap, responseBodyMap, null); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java index 13d0c8db48..cee9aea37a 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java @@ -101,11 +101,11 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final requestWrapper.buildSysHeaderForCE(); final String bizNo = requestHeaderMap.getOrDefault(ProtocolKey.ClientInstanceKey.BIZSEQNO, - RandomStringUtils.generateNum(30)).toString(); + RandomStringUtils.generateNum(32)).toString(); final String uniqueId = requestHeaderMap.getOrDefault(ProtocolKey.ClientInstanceKey.UNIQUEID, - RandomStringUtils.generateNum(30)).toString(); + RandomStringUtils.generateNum(32)).toString(); final String ttl = requestHeaderMap.getOrDefault(Constants.EVENTMESH_MESSAGE_CONST_TTL, - 4 * 1000).toString(); + 14400000).toString(); requestWrapper.getSysHeaderMap().putIfAbsent(ProtocolKey.ClientInstanceKey.BIZSEQNO, bizNo); requestWrapper.getSysHeaderMap().putIfAbsent(ProtocolKey.ClientInstanceKey.UNIQUEID, uniqueId); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java index 10925a29fd..aeb053d583 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java @@ -206,9 +206,7 @@ protected boolean validateSysHeader(Map sysHeaderMap) { * @return any null then true */ protected boolean validatedRequestBodyMap(Map requestBodyMap) { - return requestBodyMap.get(EventMeshConstants.URL) == null - || requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC) == null - || requestBodyMap.get(EventMeshConstants.CONSUMER_GROUP) == null; + return requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC) == null; } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java index 6ac0461ab0..2417cea174 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java @@ -91,6 +91,8 @@ public void tryHTTPRequest() { currPushUrl = getUrl(); if (StringUtils.isBlank(currPushUrl)) { + LOGGER.warn("tryHTTPRequest fail, getUrl is null, group:{}, topic:{}, bizSeqNo={}, uniqueId={}", this.handleMsgContext.getConsumerGroup(), + this.handleMsgContext.getTopic(), this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId()); return; } @@ -144,7 +146,9 @@ public void tryHTTPRequest() { } } catch (Exception ex) { - LOGGER.error("Failed to convert EventMeshMessage from CloudEvent", ex); + LOGGER.warn("cloudevent to HttpEventWrapper occur except, group:{}, topic:{}, bizSeqNo={}, uniqueId={}", + this.handleMsgContext.getConsumerGroup(), + this.handleMsgContext.getTopic(), this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId(), ex); return; } @@ -207,17 +211,14 @@ public void tryHTTPRequest() { try { res = EntityUtils.toString(response.getEntity(), Charset.forName(EventMeshConstants.DEFAULT_CHARSET)); } catch (IOException e) { + LOGGER.warn("handleResponse exception", e); handleMsgContext.finish(); return new Object(); } ClientRetCode result = processResponseContent(res); - if (MESSAGE_LOGGER.isInfoEnabled()) { - MESSAGE_LOGGER.info( - "message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}" - + "|uniqueId={}|cost={}", + MESSAGE_LOGGER.info("message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}|uniqueId={}|cost={}", result, currPushUrl, handleMsgContext.getTopic(), handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost); - } switch (result) { case OK: case REMOTE_OK: diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPClientPool.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPClientPool.java index fde09ae800..df5ff036a4 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPClientPool.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPClientPool.java @@ -18,6 +18,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.push; import org.apache.commons.collections4.CollectionUtils; +import org.apache.http.client.config.RequestConfig; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; import org.apache.http.conn.socket.ConnectionSocketFactory; @@ -60,6 +61,9 @@ public class HTTPClientPool { private static final int DEFAULT_MAX_TOTAL = 200; private static final int DEFAULT_IDLETIME_SECONDS = 30; + private static final int CONNECT_TIMEOUT = 5000; + private static final int SOCKET_TIMEOUT = 5000; + private transient PoolingHttpClientConnectionManager connectionManager; public HTTPClientPool(final int core) { @@ -115,7 +119,13 @@ public CloseableHttpClient getHttpClient(final int maxTotal, final int idleTimeI connectionManager.setMaxTotal(maxTotal); } + RequestConfig config = RequestConfig.custom() + .setConnectTimeout(CONNECT_TIMEOUT) + .setConnectionRequestTimeout(CONNECT_TIMEOUT) + .setSocketTimeout(SOCKET_TIMEOUT).build(); + return HttpClients.custom() + .setDefaultRequestConfig(config) .setConnectionManager(connectionManager) .setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy()) .evictIdleConnections(idleTimeInSeconds, TimeUnit.SECONDS) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/WebhookUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/WebhookUtil.java index fa77f1b8ad..8308fdad58 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/WebhookUtil.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/WebhookUtil.java @@ -60,7 +60,11 @@ public static boolean obtainDeliveryAgreement(final CloseableHttpClient httpClie builder.addHeader(REQUEST_ORIGIN_HEADER, requestOrigin); try (CloseableHttpResponse response = httpClient.execute(builder)) { - final String allowedOrigin = response.getLastHeader(ALLOWED_ORIGIN_HEADER).getValue(); + String allowedOrigin = null; + + if (response.getLastHeader(ALLOWED_ORIGIN_HEADER) != null) { + allowedOrigin = response.getLastHeader(ALLOWED_ORIGIN_HEADER).getValue(); + } return StringUtils.isEmpty(allowedOrigin) || "*".equals(allowedOrigin) || allowedOrigin.equalsIgnoreCase(requestOrigin); } catch (Exception e) { diff --git a/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclProperties.java b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclProperties.java index 2338f05860..d340f12888 100644 --- a/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclProperties.java +++ b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclProperties.java @@ -30,6 +30,7 @@ public class AclProperties { private Integer requestCode; private String requestURI; private String token; + private String version; private Map extendedFields = new ConcurrentHashMap<>(); public String getClientIp() { @@ -104,5 +105,11 @@ public Object getExtendedField(String key) { return extendedFields.get(key); } + public String getVersion() { + return version; + } + public void setVersion(String version) { + this.version = version; + } } diff --git a/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclPropertyKeys.java b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclPropertyKeys.java index 7bf56f0c3c..606a91b949 100644 --- a/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclPropertyKeys.java +++ b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclPropertyKeys.java @@ -26,4 +26,5 @@ public class AclPropertyKeys { public static final String TOPIC = "topic"; public static final String REQUEST_CODE = "requestCode"; public static final String REQUEST_URI = "requestURI"; + public static final String CLIENT_VERSION = "clientVersion"; } diff --git a/tools/third-party-licenses/LICENSE b/tools/third-party-licenses/LICENSE index 9456ae5c1a..7cb8a149d1 100644 --- a/tools/third-party-licenses/LICENSE +++ b/tools/third-party-licenses/LICENSE @@ -240,7 +240,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. error_prone_annotations 2.7.1: https://github.com/google/error-prone, Apache 2.0 failureaccess 1.0.1: https://github.com/google/guava, Apache 2.0 listenablefuture 9999.0-empty-to-avoid-conflict-with-guava: https://github.com/google/guava, Apache 2.0 - fastjson 1.2.76: https://github.com/alibaba/fastjson, Apache 2.0 + fastjson 1.2.83: https://github.com/alibaba/fastjson, Apache 2.0 guava 31.0.1-jre: https://github.com/google/guava, Apache 2.0 guice 4.2.2: https://github.com/google/guice, Apache 2.0 grpc-api 1.43.2: https://github.com/grpc/grpc-java, Apache-2.0 @@ -367,7 +367,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. simpleclient 0.8.1: https://github.com/prometheus/client_java, Apache 2.0 simpleclient_common 0.8.1: https://github.com/prometheus/client_java, Apache 2.0 simpleclient_httpserver 0.8.1: https://github.com/prometheus/client_java, Apache 2.0 - snakeyaml 1.30: https://bitbucket.org/asomov/snakeyaml, Apache 2.0 + snakeyaml 2.0: https://bitbucket.org/asomov/snakeyaml, Apache 2.0 truth 0.30: https://github.com/google/truth, Apache 2.0 validation-api 1.1.0.Final: https://github.com/jakartaee/validation, Apache 2.0 zipkin 2.23.2: https://github.com/openzipkin/zipkin, Apache 2.0