From abb227183925c25e3bbd8ed3678ccf00fa70707b Mon Sep 17 00:00:00 2001 From: lrhkobe Date: Tue, 11 Jul 2023 16:30:24 +0800 Subject: [PATCH 01/13] Fix the port occupation that caused the startup fail, and the process did not exit --- .../org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java | 1 + .../org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java | 1 + 2 files changed, 2 insertions(+) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java index d74b69a3ac..42b1f6c4d9 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java @@ -227,6 +227,7 @@ public void start() throws Exception { } catch (Exception ex) { log.error("HTTPServer shutdown error!", ex); } + System.exit(-1); } }; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java index 67b56ed6df..4b5c744296 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java @@ -195,6 +195,7 @@ public void initChannel(final Channel ch) throws Exception { } catch (Exception ex) { log.error("EventMeshTCPServer RemotingServer shutdown Err!", ex); } + System.exit(-1); } }; From 8fcde9c386180c2b8af747ee4af8234c4edb10bb Mon Sep 17 00:00:00 2001 From: lrhkobe Date: Tue, 11 Jul 2023 16:31:24 +0800 Subject: [PATCH 02/13] Fix concurrent modification exception --- .../metrics/api/model/HttpSummaryMetrics.java | 34 +++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java b/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java index 34de770820..adaf9ff5e2 100644 --- a/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java +++ b/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java @@ -22,6 +22,8 @@ import java.util.concurrent.DelayQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import lombok.extern.slf4j.Slf4j; @@ -106,6 +108,7 @@ public class HttpSummaryMetrics implements Metric { private final DelayQueue httpFailedQueue; + private Lock lock = new ReentrantLock(); public HttpSummaryMetrics(final ThreadPoolExecutor batchMsgExecutor, final ThreadPoolExecutor sendMsgExecutor, @@ -139,20 +142,37 @@ public void recordHTTPDiscard() { } public void snapshotHTTPTPS() { - Integer tps = httpRequestPerSecond.intValue(); - httpRequestTPSSnapshots.add(tps); - httpRequestPerSecond.set(0); - if (httpRequestTPSSnapshots.size() > STATIC_PERIOD / 1000) { - httpRequestTPSSnapshots.removeFirst(); + try{ + lock.lock(); + Integer tps = httpRequestPerSecond.intValue(); + httpRequestTPSSnapshots.add(tps); + httpRequestPerSecond.set(0); + if (httpRequestTPSSnapshots.size() > STATIC_PERIOD / 1000) { + httpRequestTPSSnapshots.removeFirst(); + } + }finally { + lock.unlock(); } } public float maxHTTPTPS() { - return Collections.max(httpRequestTPSSnapshots); + try{ + lock.lock(); + float tps = Collections.max(httpRequestTPSSnapshots); + return tps; + }finally { + lock.unlock(); + } } public float avgHTTPTPS() { - return avg(httpRequestTPSSnapshots); + try{ + lock.lock(); + float tps = avg(httpRequestTPSSnapshots); + return tps; + }finally { + lock.unlock(); + } } public void recordHTTPReqResTimeCost(long cost) { From 2a0fb98571f99912bcca17bc3af5c83d8cc45124 Mon Sep 17 00:00:00 2001 From: lrhkobe Date: Tue, 11 Jul 2023 16:33:28 +0800 Subject: [PATCH 03/13] Set http request timeout --- .../core/protocol/http/push/HTTPClientPool.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPClientPool.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPClientPool.java index fde09ae800..df5ff036a4 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPClientPool.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPClientPool.java @@ -18,6 +18,7 @@ package org.apache.eventmesh.runtime.core.protocol.http.push; import org.apache.commons.collections4.CollectionUtils; +import org.apache.http.client.config.RequestConfig; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; import org.apache.http.conn.socket.ConnectionSocketFactory; @@ -60,6 +61,9 @@ public class HTTPClientPool { private static final int DEFAULT_MAX_TOTAL = 200; private static final int DEFAULT_IDLETIME_SECONDS = 30; + private static final int CONNECT_TIMEOUT = 5000; + private static final int SOCKET_TIMEOUT = 5000; + private transient PoolingHttpClientConnectionManager connectionManager; public HTTPClientPool(final int core) { @@ -115,7 +119,13 @@ public CloseableHttpClient getHttpClient(final int maxTotal, final int idleTimeI connectionManager.setMaxTotal(maxTotal); } + RequestConfig config = RequestConfig.custom() + .setConnectTimeout(CONNECT_TIMEOUT) + .setConnectionRequestTimeout(CONNECT_TIMEOUT) + .setSocketTimeout(SOCKET_TIMEOUT).build(); + return HttpClients.custom() + .setDefaultRequestConfig(config) .setConnectionManager(connectionManager) .setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy()) .evictIdleConnections(idleTimeInSeconds, TimeUnit.SECONDS) From fc3a09a4a858b5a8ce02e0c65b5b7c7a971d7f77 Mon Sep 17 00:00:00 2001 From: lrhkobe Date: Tue, 11 Jul 2023 16:42:55 +0800 Subject: [PATCH 04/13] Authentication supports version number verification --- .../main/java/org/apache/eventmesh/runtime/acl/Acl.java | 1 + .../java/org/apache/eventmesh/api/acl/AclProperties.java | 7 +++++++ .../java/org/apache/eventmesh/api/acl/AclPropertyKeys.java | 1 + 3 files changed, 9 insertions(+) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java index c2adcce873..13ed186c93 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java @@ -196,6 +196,7 @@ private static AclProperties buildTcpAclProperties(String remoteAddr, UserAgent aclProperties.setPwd(userAgent.getPassword()); aclProperties.setSubsystem(userAgent.getSubsystem()); aclProperties.setRequestCode(requestCode); + aclProperties.setVersion(userAgent.getVersion()); if (StringUtils.isNotBlank(topic)) { aclProperties.setTopic(topic); } diff --git a/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclProperties.java b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclProperties.java index 2338f05860..d340f12888 100644 --- a/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclProperties.java +++ b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclProperties.java @@ -30,6 +30,7 @@ public class AclProperties { private Integer requestCode; private String requestURI; private String token; + private String version; private Map extendedFields = new ConcurrentHashMap<>(); public String getClientIp() { @@ -104,5 +105,11 @@ public Object getExtendedField(String key) { return extendedFields.get(key); } + public String getVersion() { + return version; + } + public void setVersion(String version) { + this.version = version; + } } diff --git a/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclPropertyKeys.java b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclPropertyKeys.java index 7bf56f0c3c..606a91b949 100644 --- a/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclPropertyKeys.java +++ b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclPropertyKeys.java @@ -26,4 +26,5 @@ public class AclPropertyKeys { public static final String TOPIC = "topic"; public static final String REQUEST_CODE = "requestCode"; public static final String REQUEST_URI = "requestURI"; + public static final String CLIENT_VERSION = "clientVersion"; } From 5049542a03a77ca7834c119152f8f1b2807a812e Mon Sep 17 00:00:00 2001 From: lrhkobe Date: Tue, 11 Jul 2023 19:50:27 +0800 Subject: [PATCH 05/13] log optimization --- .../protocol/http/consumer/EventMeshConsumer.java | 10 +++++----- .../protocol/http/consumer/HandleMsgContext.java | 10 +++++++--- .../protocol/http/push/AsyncHTTPPushRequest.java | 12 ++++++------ 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java index 31fd31b598..664650c4e7 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java @@ -130,15 +130,14 @@ public synchronized void init() throws Exception { EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context; if (currentTopicConfig == null) { - log.error("no topicConfig found, consumerGroup:{} topic:{}", - consumerGroupConf.getConsumerGroup(), topic); try { sendMessageBack(event, uniqueId, bizSeqNo); - eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); - return; + log.warn("no ConsumerGroupTopicConf found, sendMessageBack success, consumerGroup:{}, topic:{}, bizSeqNo={}, uniqueId={}", consumerGroupConf.getConsumerGroup(), topic, bizSeqNo, uniqueId); } catch (Exception ex) { - //ignore + log.warn("sendMessageBack fail, consumerGroup:{}, topic:{}, bizSeqNo={}, uniqueId={}", consumerGroupConf.getConsumerGroup(), topic, bizSeqNo, uniqueId, ex); } + eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); + return; } SubscriptionItem subscriptionItem = @@ -157,6 +156,7 @@ public synchronized void init() throws Exception { sendMessageBack(event, uniqueId, bizSeqNo); } catch (Exception e) { //ignore + log.warn("sendMessageBack fail,topic:{}, bizSeqNo={}, uniqueId={}", topic, bizSeqNo, uniqueId, e); } eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java index e2da611851..6afa0f7696 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java @@ -38,9 +38,14 @@ import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + @Slf4j public class HandleMsgContext { + public static final Logger MESSAGE_LOGGER = LoggerFactory.getLogger(EventMeshConstants.MESSAGE); + private String msgRandomNo; private String consumerGroup; @@ -205,9 +210,8 @@ public EventMeshHTTPServer getEventMeshHTTPServer() { public void finish() { if (Objects.nonNull(eventMeshConsumer) && Objects.nonNull(context) && Objects.nonNull(event)) { - if (log.isDebugEnabled()) { - log.debug("messageAcked|topic={}|event={}", topic, event); - } + MESSAGE_LOGGER.info("messageAcked|group={}|topic={}|bizSeq={}|uniqId={}|msgRandomNo={}|queueId={}|queueOffset={}", + consumerGroup, topic, bizSeqNo, uniqueId, msgRandomNo, event.getExtension(Constants.PROPERTY_MESSAGE_QUEUE_ID), event.getExtension(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET)); eventMeshConsumer.updateOffset(topic, subscriptionItem.getMode(), Collections.singletonList(event), context); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java index 6ac0461ab0..feb4fca069 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java @@ -91,6 +91,8 @@ public void tryHTTPRequest() { currPushUrl = getUrl(); if (StringUtils.isBlank(currPushUrl)) { + LOGGER.warn("tryHTTPRequest fail, getUrl is null, group:{}, topic:{}, bizSeqNo={}, uniqueId={}", this.handleMsgContext.getConsumerGroup(), + this.handleMsgContext.getTopic(), this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId()); return; } @@ -144,7 +146,8 @@ public void tryHTTPRequest() { } } catch (Exception ex) { - LOGGER.error("Failed to convert EventMeshMessage from CloudEvent", ex); + LOGGER.warn("cloudevent to HttpEventWrapper occur except, group:{}, topic:{}, bizSeqNo={}, uniqueId={}", this.handleMsgContext.getConsumerGroup(), + this.handleMsgContext.getTopic(), this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId(), ex); return; } @@ -207,17 +210,14 @@ public void tryHTTPRequest() { try { res = EntityUtils.toString(response.getEntity(), Charset.forName(EventMeshConstants.DEFAULT_CHARSET)); } catch (IOException e) { + LOGGER.warn("handleResponse exception", e); handleMsgContext.finish(); return new Object(); } ClientRetCode result = processResponseContent(res); - if (MESSAGE_LOGGER.isInfoEnabled()) { - MESSAGE_LOGGER.info( - "message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}" - + "|uniqueId={}|cost={}", + MESSAGE_LOGGER.info("message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}|uniqueId={}|cost={}", result, currPushUrl, handleMsgContext.getTopic(), handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost); - } switch (result) { case OK: case REMOTE_OK: From ec70e9b256edfd7ba98645fe4e18e62f10c3bb91 Mon Sep 17 00:00:00 2001 From: lrhkobe Date: Tue, 11 Jul 2023 20:00:45 +0800 Subject: [PATCH 06/13] fix NPE, security upgrade --- .../java/org/apache/eventmesh/runtime/util/WebhookUtil.java | 6 +++++- tools/dependency-check/known-dependencies.txt | 2 +- tools/third-party-licenses/LICENSE | 4 ++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/WebhookUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/WebhookUtil.java index fa77f1b8ad..68e9b8c025 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/WebhookUtil.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/WebhookUtil.java @@ -60,7 +60,11 @@ public static boolean obtainDeliveryAgreement(final CloseableHttpClient httpClie builder.addHeader(REQUEST_ORIGIN_HEADER, requestOrigin); try (CloseableHttpResponse response = httpClient.execute(builder)) { - final String allowedOrigin = response.getLastHeader(ALLOWED_ORIGIN_HEADER).getValue(); + String allowedOrigin = null; + + if(response.getLastHeader(ALLOWED_ORIGIN_HEADER) != null){ + allowedOrigin = response.getLastHeader(ALLOWED_ORIGIN_HEADER).getValue(); + } return StringUtils.isEmpty(allowedOrigin) || "*".equals(allowedOrigin) || allowedOrigin.equalsIgnoreCase(requestOrigin); } catch (Exception e) { diff --git a/tools/dependency-check/known-dependencies.txt b/tools/dependency-check/known-dependencies.txt index 2711e43c7e..612dd8dfa4 100644 --- a/tools/dependency-check/known-dependencies.txt +++ b/tools/dependency-check/known-dependencies.txt @@ -216,7 +216,7 @@ simpleclient-0.8.1.jar simpleclient_common-0.8.1.jar simpleclient_httpserver-0.8.1.jar slf4j-api-1.7.30.jar -snakeyaml-1.30.jar +snakeyaml-2.0.jar snappy-java-1.1.8.1.jar system-rules-1.16.1.jar truth-0.30.jar diff --git a/tools/third-party-licenses/LICENSE b/tools/third-party-licenses/LICENSE index 9456ae5c1a..7cb8a149d1 100644 --- a/tools/third-party-licenses/LICENSE +++ b/tools/third-party-licenses/LICENSE @@ -240,7 +240,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. error_prone_annotations 2.7.1: https://github.com/google/error-prone, Apache 2.0 failureaccess 1.0.1: https://github.com/google/guava, Apache 2.0 listenablefuture 9999.0-empty-to-avoid-conflict-with-guava: https://github.com/google/guava, Apache 2.0 - fastjson 1.2.76: https://github.com/alibaba/fastjson, Apache 2.0 + fastjson 1.2.83: https://github.com/alibaba/fastjson, Apache 2.0 guava 31.0.1-jre: https://github.com/google/guava, Apache 2.0 guice 4.2.2: https://github.com/google/guice, Apache 2.0 grpc-api 1.43.2: https://github.com/grpc/grpc-java, Apache-2.0 @@ -367,7 +367,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. simpleclient 0.8.1: https://github.com/prometheus/client_java, Apache 2.0 simpleclient_common 0.8.1: https://github.com/prometheus/client_java, Apache 2.0 simpleclient_httpserver 0.8.1: https://github.com/prometheus/client_java, Apache 2.0 - snakeyaml 1.30: https://bitbucket.org/asomov/snakeyaml, Apache 2.0 + snakeyaml 2.0: https://bitbucket.org/asomov/snakeyaml, Apache 2.0 truth 0.30: https://github.com/google/truth, Apache 2.0 validation-api 1.1.0.Final: https://github.com/jakartaee/validation, Apache 2.0 zipkin 2.23.2: https://github.com/openzipkin/zipkin, Apache 2.0 From 9ad1261dcb3c44e581774bcc145ab6f924aad233 Mon Sep 17 00:00:00 2001 From: lrhkobe Date: Tue, 11 Jul 2023 22:53:57 +0800 Subject: [PATCH 07/13] Optimize remote subscribe and unsubscribe interface --- .../common/config/CommonConfiguration.java | 2 +- .../resolver/HttpRequestProtocolResolver.java | 2 +- .../RemoteSubscribeEventProcessor.java | 64 +++++++++---------- .../RemoteUnSubscribeEventProcessor.java | 11 +--- .../processor/inf/AbstractEventProcessor.java | 4 +- 5 files changed, 37 insertions(+), 46 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java index 61e4d9ac61..50be034c8c 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java @@ -124,6 +124,6 @@ public void reload() { this.eventMeshProvideServerProtocols = Collections.singletonList(ConfigurationContextUtil.HTTP); } - meshGroup = String.join("-", this.eventMeshEnv, this.eventMeshCluster, this.sysID); + meshGroup = String.join("-", this.eventMeshEnv, this.eventMeshIDC, this.eventMeshCluster, this.sysID); } } diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/resolver/HttpRequestProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/resolver/HttpRequestProtocolResolver.java index 186a74e5bc..a9ddaea78e 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/resolver/HttpRequestProtocolResolver.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/resolver/HttpRequestProtocolResolver.java @@ -58,7 +58,7 @@ public static CloudEvent buildEvent(HttpEventWrapper httpEventWrapper) throws Pr HttpProtocolConstant.CONSTANTS_DEFAULT_TYPE).toString(); String subject = sysHeaderMap.getOrDefault(HttpProtocolConstant.CONSTANTS_KEY_SUBJECT, - HttpProtocolConstant.CONSTANTS_DEFAULT_SUBJECT).toString(); + "").toString(); // with attributes builder.withId(id) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java index dc2a9bed8a..cb797bd48b 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java @@ -117,8 +117,7 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest return; } - String url = requestBodyMap.get(EventMeshConstants.URL).toString(); - String consumerGroup = requestBodyMap.get(EventMeshConstants.CONSUMER_GROUP).toString(); +// String url = requestBodyMap.get(EventMeshConstants.URL).toString(); String topic = JsonUtils.toJSONString(requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC)); // SubscriptionItem @@ -146,32 +145,32 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest } // validate URL - try { - if (!IPUtils.isValidDomainOrIp(url, eventMeshHttpConfiguration.getEventMeshIpv4BlackList(), - eventMeshHttpConfiguration.getEventMeshIpv6BlackList())) { - httpLogger.error("subscriber url {} is not valid", url); - handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap, - responseBodyMap, null); - return; - } - } catch (Exception e) { - httpLogger.error("subscriber url {} is not valid, error {}", url, e.getMessage()); - handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap, - responseBodyMap, null); - return; - } - - CloseableHttpClient closeableHttpClient = eventMeshHTTPServer.getHttpClientPool().getClient(); - // obtain webhook delivery agreement for Abuse Protection - boolean isWebhookAllowed = WebhookUtil.obtainDeliveryAgreement(closeableHttpClient, - url, eventMeshHttpConfiguration.getEventMeshWebhookOrigin()); - - if (!isWebhookAllowed) { - httpLogger.error("subscriber url {} is not allowed by the target system", url); - handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap, - responseBodyMap, null); - return; - } +// try { +// if (!IPUtils.isValidDomainOrIp(url, eventMeshHttpConfiguration.getEventMeshIpv4BlackList(), +// eventMeshHttpConfiguration.getEventMeshIpv6BlackList())) { +// httpLogger.error("subscriber url {} is not valid", url); +// handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap, +// responseBodyMap, null); +// return; +// } +// } catch (Exception e) { +// httpLogger.error("subscriber url {} is not valid, error {}", url, e.getMessage()); +// handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap, +// responseBodyMap, null); +// return; +// } +// +// CloseableHttpClient closeableHttpClient = eventMeshHTTPServer.getHttpClientPool().getClient(); +// // obtain webhook delivery agreement for Abuse Protection +// boolean isWebhookAllowed = WebhookUtil.obtainDeliveryAgreement(closeableHttpClient, +// url, eventMeshHttpConfiguration.getEventMeshWebhookOrigin()); +// +// if (!isWebhookAllowed) { +// httpLogger.error("subscriber url {} is not allowed by the target system", url); +// handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap, +// responseBodyMap, null); +// return; +// } long startTime = System.currentTimeMillis(); try { @@ -187,11 +186,12 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest String targetMesh = requestBodyMap.get("remoteMesh") == null ? "" : requestBodyMap.get("remoteMesh").toString(); // Get mesh address from registry - String meshAddress = getTargetMesh(consumerGroup, subscriptionList); + String meshAddress = getTargetMesh(eventMeshHttpConfiguration.getMeshGroup(), subscriptionList); if (StringUtils.isNotBlank(meshAddress)) { targetMesh = meshAddress; } + CloseableHttpClient closeableHttpClient = eventMeshHTTPServer.getHttpClientPool().getClient(); String remoteResult = post(closeableHttpClient, targetMesh, builderRemoteHeaderMap(localAddress), remoteBodyMap, response -> EntityUtils.toString(response.getEntity(), Constants.DEFAULT_CHARSET)); @@ -213,10 +213,8 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest } catch (Exception e) { long endTime = System.currentTimeMillis(); - httpLogger.error( - "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}" - + "|bizSeqNo={}|uniqueId={}", endTime - startTime, - JsonUtils.toJSONString(subscriptionList), url, e); + httpLogger.error("subscribe Remote|cost={}ms|topic={}", endTime - startTime, + JsonUtils.toJSONString(subscriptionList), e); handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR, responseHeaderMap, responseBodyMap, null); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java index 31dbb19121..f601e302dd 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java @@ -115,9 +115,7 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest return; } - String unSubscribeUrl = requestBodyMap.get(EventMeshConstants.URL).toString(); - String consumerGroup = requestBodyMap.get(EventMeshConstants.CONSUMER_GROUP).toString(); - String topic = requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC).toString(); + String topic = JsonUtils.toJSONString(requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC).toString()); long startTime = System.currentTimeMillis(); try { @@ -156,7 +154,7 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest return subscriptionItem; }).collect(Collectors.toList()); // Get mesh address from registry - String meshAddress = getTargetMesh(consumerGroup, subscriptionList); + String meshAddress = getTargetMesh(meshGroup, subscriptionList); if (StringUtils.isNotBlank(meshAddress)) { targetMesh = meshAddress; } @@ -182,10 +180,7 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest } } catch (Exception e) { long endTime = System.currentTimeMillis(); - httpLogger.error( - "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}" - + "|bizSeqNo={}|uniqueId={}", endTime - startTime, - topic, unSubscribeUrl, e); + httpLogger.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}", endTime - startTime, topic, e); handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR, responseHeaderMap, responseBodyMap, null); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java index 10925a29fd..aeb053d583 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java @@ -206,9 +206,7 @@ protected boolean validateSysHeader(Map sysHeaderMap) { * @return any null then true */ protected boolean validatedRequestBodyMap(Map requestBodyMap) { - return requestBodyMap.get(EventMeshConstants.URL) == null - || requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC) == null - || requestBodyMap.get(EventMeshConstants.CONSUMER_GROUP) == null; + return requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC) == null; } From 16aceadc6c74bc2c7034c4d5713cdad547d8d658 Mon Sep 17 00:00:00 2001 From: lrhkobe Date: Tue, 11 Jul 2023 22:56:14 +0800 Subject: [PATCH 08/13] Default parameter optimization and codec optimization --- .../eventmesh/common/protocol/tcp/codec/Codec.java | 12 ++++++++++++ .../http/processor/SendAsyncEventProcessor.java | 6 +++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java index 3e86ba7524..7ce5ecbf8a 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java @@ -194,6 +194,18 @@ private static Object deserializeBody(String bodyJsonString, Header header) thro case RESPONSE_TO_CLIENT_ACK: case ASYNC_MESSAGE_TO_CLIENT_ACK: case BROADCAST_MESSAGE_TO_CLIENT_ACK: + case HELLO_RESPONSE: + case RECOMMEND_RESPONSE: + case SUBSCRIBE_RESPONSE: + case LISTEN_RESPONSE: + case UNSUBSCRIBE_RESPONSE: + case HEARTBEAT_RESPONSE: + case ASYNC_MESSAGE_TO_SERVER_ACK: + case BROADCAST_MESSAGE_TO_SERVER_ACK: + case CLIENT_GOODBYE_REQUEST: + case CLIENT_GOODBYE_RESPONSE: + case SERVER_GOODBYE_REQUEST: + case SERVER_GOODBYE_RESPONSE: // The message string will be deserialized by protocol plugin, if the event is cloudevents, the body is // just a string. return bodyJsonString; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java index 13d0c8db48..cee9aea37a 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java @@ -101,11 +101,11 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final requestWrapper.buildSysHeaderForCE(); final String bizNo = requestHeaderMap.getOrDefault(ProtocolKey.ClientInstanceKey.BIZSEQNO, - RandomStringUtils.generateNum(30)).toString(); + RandomStringUtils.generateNum(32)).toString(); final String uniqueId = requestHeaderMap.getOrDefault(ProtocolKey.ClientInstanceKey.UNIQUEID, - RandomStringUtils.generateNum(30)).toString(); + RandomStringUtils.generateNum(32)).toString(); final String ttl = requestHeaderMap.getOrDefault(Constants.EVENTMESH_MESSAGE_CONST_TTL, - 4 * 1000).toString(); + 14400000).toString(); requestWrapper.getSysHeaderMap().putIfAbsent(ProtocolKey.ClientInstanceKey.BIZSEQNO, bizNo); requestWrapper.getSysHeaderMap().putIfAbsent(ProtocolKey.ClientInstanceKey.UNIQUEID, uniqueId); From c5be10b4daca6871fab46e4fbf2658d71ef9ced7 Mon Sep 17 00:00:00 2001 From: lrhkobe Date: Tue, 11 Jul 2023 23:32:26 +0800 Subject: [PATCH 09/13] support application/protobuf --- .../protocol/http/HttpProtocolConstant.java | 9 +++-- .../resolver/HttpRequestProtocolResolver.java | 34 +++++++++++-------- .../runtime/boot/AbstractHTTPServer.java | 14 ++++++-- 3 files changed, 37 insertions(+), 20 deletions(-) diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/HttpProtocolConstant.java b/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/HttpProtocolConstant.java index 0e70bd7d5a..f45204594e 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/HttpProtocolConstant.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/HttpProtocolConstant.java @@ -21,11 +21,9 @@ public enum HttpProtocolConstant { ; public static final String PROTOCOL_NAME = "http"; - public static final String DATA_CONTENT_TYPE = "application/json"; - public static final String CONSTANTS_DEFAULT_SOURCE = "/"; public static final String CONSTANTS_DEFAULT_TYPE = "http_request"; - public static final String CONSTANTS_DEFAULT_SUBJECT = "TOPIC-HTTP-REQUEST"; + public static final String CONSTANTS_DEFAULT_SUBJECT = ""; public static final String CONSTANTS_KEY_ID = "id"; public static final String CONSTANTS_KEY_SOURCE = "source"; @@ -35,4 +33,9 @@ public enum HttpProtocolConstant { public static final String CONSTANTS_KEY_BODY = "body"; public static final String CONSTANTS_KEY_PATH = "path"; public static final String CONSTANTS_KEY_METHOD = "method"; + + public static final String DATA_CONTENT_TYPE = "Content-Type"; + + public static final String APPLICATION_JSON = "application/json"; + public static final String PROTOBUF = "application/x-protobuf"; } diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/resolver/HttpRequestProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/resolver/HttpRequestProtocolResolver.java index a9ddaea78e..eaf3570f45 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/resolver/HttpRequestProtocolResolver.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/resolver/HttpRequestProtocolResolver.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.protocol.http.resolver; -import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException; @@ -30,7 +29,6 @@ import java.util.HashMap; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.UUID; import io.cloudevents.CloudEvent; @@ -58,14 +56,15 @@ public static CloudEvent buildEvent(HttpEventWrapper httpEventWrapper) throws Pr HttpProtocolConstant.CONSTANTS_DEFAULT_TYPE).toString(); String subject = sysHeaderMap.getOrDefault(HttpProtocolConstant.CONSTANTS_KEY_SUBJECT, - "").toString(); + HttpProtocolConstant.CONSTANTS_DEFAULT_SUBJECT).toString(); + String dataContentType = requestHeaderMap.getOrDefault(HttpProtocolConstant.DATA_CONTENT_TYPE, HttpProtocolConstant.APPLICATION_JSON).toString(); // with attributes builder.withId(id) .withType(type) .withSource(URI.create(HttpProtocolConstant.CONSTANTS_KEY_SOURCE + ":" + source)) .withSubject(subject) - .withDataContentType(HttpProtocolConstant.DATA_CONTENT_TYPE); + .withDataContentType(dataContentType); // with extensions for (Map.Entry extension : sysHeaderMap.entrySet()) { @@ -81,19 +80,24 @@ public static CloudEvent buildEvent(HttpEventWrapper httpEventWrapper) throws Pr byte[] requestBody = httpEventWrapper.getBody(); - Map requestBodyMap = JsonUtils.parseTypeReferenceObject(new String(requestBody, Constants.DEFAULT_CHARSET), - new TypeReference>() { + if (StringUtils.equals(dataContentType, HttpProtocolConstant.APPLICATION_JSON)) { + Map requestBodyMap = JsonUtils.parseTypeReferenceObject(new String(requestBody), new TypeReference>() { }); - String requestURI = httpEventWrapper.getRequestURI(); - - Map data = new HashMap<>(); - data.put(HttpProtocolConstant.CONSTANTS_KEY_HEADERS, requestHeaderMap); - data.put(HttpProtocolConstant.CONSTANTS_KEY_BODY, requestBodyMap); - data.put(HttpProtocolConstant.CONSTANTS_KEY_PATH, requestURI); - data.put(HttpProtocolConstant.CONSTANTS_KEY_METHOD, httpEventWrapper.getHttpMethod()); - // with data - return builder.withData(Objects.requireNonNull(JsonUtils.toJSONString(data)).getBytes(StandardCharsets.UTF_8)).build(); + String requestURI = httpEventWrapper.getRequestURI(); + + Map data = new HashMap<>(); + data.put(HttpProtocolConstant.CONSTANTS_KEY_HEADERS, requestHeaderMap); + data.put(HttpProtocolConstant.CONSTANTS_KEY_BODY, requestBodyMap); + data.put(HttpProtocolConstant.CONSTANTS_KEY_PATH, requestURI); + data.put(HttpProtocolConstant.CONSTANTS_KEY_METHOD, httpEventWrapper.getHttpMethod()); + // with data + builder = builder.withData(JsonUtils.toJSONString(data).getBytes(StandardCharsets.UTF_8)); + } else if (StringUtils.equals(dataContentType, HttpProtocolConstant.PROTOBUF)) { + // with data + builder = builder.withData(requestBody); + } + return builder.build(); } catch (Exception e) { throw new ProtocolHandleException(e.getMessage(), e); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java index 42b1f6c4d9..b11421bb5e 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java @@ -620,6 +620,7 @@ private HttpEventWrapper parseHttpRequest(final HttpRequest httpRequest) throws } final long bodyDecodeStart = System.currentTimeMillis(); + byte[] requestBody = null; final FullHttpRequest fullHttpRequest = (FullHttpRequest) httpRequest; final Map bodyMap = new HashMap<>(); @@ -636,8 +637,16 @@ private HttpEventWrapper parseHttpRequest(final HttpRequest httpRequest) throws bodyMap.putAll(Objects.requireNonNull(JsonUtils.parseTypeReferenceObject(new String(body, Constants.DEFAULT_CHARSET), new TypeReference>() { }))); + requestBody = JsonUtils.toJSONString(bodyMap).getBytes(StandardCharsets.UTF_8); } - } else { + } else if (StringUtils.equals(httpRequest.headers().get("Content-Type"), "application/x-protobuf")) { + int length = fullHttpRequest.content().readableBytes(); + if (length > 0) { + byte[] body = new byte[length]; + fullHttpRequest.content().readBytes(body); + requestBody = body; + } + }else { final HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(DEFAULT_HTTP_DATA_FACTORY, httpRequest); for (final InterfaceHttpData parm : decoder.getBodyHttpDatas()) { @@ -647,13 +656,14 @@ private HttpEventWrapper parseHttpRequest(final HttpRequest httpRequest) throws } } decoder.destroy(); + requestBody = JsonUtils.toJSONString(bodyMap).getBytes(StandardCharsets.UTF_8); } } else { throw new MethodNotSupportedException("UnSupported Method " + fullHttpRequest.method()); } - httpEventWrapper.setBody(Objects.requireNonNull(JsonUtils.toJSONString(bodyMap)).getBytes(StandardCharsets.UTF_8)); + httpEventWrapper.setBody(requestBody); metrics.getSummaryMetrics().recordDecodeTimeCost(System.currentTimeMillis() - bodyDecodeStart); From 9471624cc9a3a30391dfa3e0edbfbd0e18fc39d9 Mon Sep 17 00:00:00 2001 From: lrhkobe Date: Wed, 12 Jul 2023 09:54:40 +0800 Subject: [PATCH 10/13] fix checkstyle --- .../metrics/api/model/HttpSummaryMetrics.java | 12 ++-- .../resolver/HttpRequestProtocolResolver.java | 7 ++- .../http/consumer/EventMeshConsumer.java | 6 +- .../http/consumer/HandleMsgContext.java | 10 ++-- .../RemoteSubscribeEventProcessor.java | 55 +++++++++---------- .../http/push/AsyncHTTPPushRequest.java | 3 +- .../eventmesh/runtime/util/WebhookUtil.java | 2 +- 7 files changed, 49 insertions(+), 46 deletions(-) diff --git a/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java b/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java index adaf9ff5e2..5a9a092f7f 100644 --- a/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java +++ b/eventmesh-metrics-plugin/eventmesh-metrics-api/src/main/java/org/apache/eventmesh/metrics/api/model/HttpSummaryMetrics.java @@ -142,7 +142,7 @@ public void recordHTTPDiscard() { } public void snapshotHTTPTPS() { - try{ + try { lock.lock(); Integer tps = httpRequestPerSecond.intValue(); httpRequestTPSSnapshots.add(tps); @@ -150,27 +150,27 @@ public void snapshotHTTPTPS() { if (httpRequestTPSSnapshots.size() > STATIC_PERIOD / 1000) { httpRequestTPSSnapshots.removeFirst(); } - }finally { + } finally { lock.unlock(); } } public float maxHTTPTPS() { - try{ + try { lock.lock(); float tps = Collections.max(httpRequestTPSSnapshots); return tps; - }finally { + } finally { lock.unlock(); } } public float avgHTTPTPS() { - try{ + try { lock.lock(); float tps = avg(httpRequestTPSSnapshots); return tps; - }finally { + } finally { lock.unlock(); } } diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/resolver/HttpRequestProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/resolver/HttpRequestProtocolResolver.java index eaf3570f45..ac723cf858 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/resolver/HttpRequestProtocolResolver.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-http/src/main/java/org/apache/eventmesh/protocol/http/resolver/HttpRequestProtocolResolver.java @@ -58,7 +58,8 @@ public static CloudEvent buildEvent(HttpEventWrapper httpEventWrapper) throws Pr String subject = sysHeaderMap.getOrDefault(HttpProtocolConstant.CONSTANTS_KEY_SUBJECT, HttpProtocolConstant.CONSTANTS_DEFAULT_SUBJECT).toString(); - String dataContentType = requestHeaderMap.getOrDefault(HttpProtocolConstant.DATA_CONTENT_TYPE, HttpProtocolConstant.APPLICATION_JSON).toString(); + String dataContentType = requestHeaderMap.getOrDefault(HttpProtocolConstant.DATA_CONTENT_TYPE, + HttpProtocolConstant.APPLICATION_JSON).toString(); // with attributes builder.withId(id) .withType(type) @@ -81,8 +82,8 @@ public static CloudEvent buildEvent(HttpEventWrapper httpEventWrapper) throws Pr byte[] requestBody = httpEventWrapper.getBody(); if (StringUtils.equals(dataContentType, HttpProtocolConstant.APPLICATION_JSON)) { - Map requestBodyMap = JsonUtils.parseTypeReferenceObject(new String(requestBody), new TypeReference>() { - }); + Map requestBodyMap = JsonUtils.parseTypeReferenceObject(new String(requestBody), + new TypeReference>() {}); String requestURI = httpEventWrapper.getRequestURI(); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java index 664650c4e7..6a43e88ec3 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java @@ -132,9 +132,11 @@ public synchronized void init() throws Exception { if (currentTopicConfig == null) { try { sendMessageBack(event, uniqueId, bizSeqNo); - log.warn("no ConsumerGroupTopicConf found, sendMessageBack success, consumerGroup:{}, topic:{}, bizSeqNo={}, uniqueId={}", consumerGroupConf.getConsumerGroup(), topic, bizSeqNo, uniqueId); + log.warn("no ConsumerGroupTopicConf found, sendMessageBack success, consumerGroup:{}, topic:{}, bizSeqNo={}, uniqueId={}", + consumerGroupConf.getConsumerGroup(), topic, bizSeqNo, uniqueId); } catch (Exception ex) { - log.warn("sendMessageBack fail, consumerGroup:{}, topic:{}, bizSeqNo={}, uniqueId={}", consumerGroupConf.getConsumerGroup(), topic, bizSeqNo, uniqueId, ex); + log.warn("sendMessageBack fail, consumerGroup:{}, topic:{}, bizSeqNo={}, uniqueId={}", + consumerGroupConf.getConsumerGroup(), topic, bizSeqNo, uniqueId, ex); } eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); return; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java index 6afa0f7696..e6af880a90 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java @@ -33,14 +33,13 @@ import java.util.Map; import java.util.Objects; -import io.cloudevents.CloudEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.cloudevents.CloudEvent; import lombok.extern.slf4j.Slf4j; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - @Slf4j public class HandleMsgContext { @@ -211,7 +210,8 @@ public EventMeshHTTPServer getEventMeshHTTPServer() { public void finish() { if (Objects.nonNull(eventMeshConsumer) && Objects.nonNull(context) && Objects.nonNull(event)) { MESSAGE_LOGGER.info("messageAcked|group={}|topic={}|bizSeq={}|uniqId={}|msgRandomNo={}|queueId={}|queueOffset={}", - consumerGroup, topic, bizSeqNo, uniqueId, msgRandomNo, event.getExtension(Constants.PROPERTY_MESSAGE_QUEUE_ID), event.getExtension(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET)); + consumerGroup, topic, bizSeqNo, uniqueId, msgRandomNo, event.getExtension(Constants.PROPERTY_MESSAGE_QUEUE_ID), + event.getExtension(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET)); eventMeshConsumer.updateOffset(topic, subscriptionItem.getMode(), Collections.singletonList(event), context); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java index cb797bd48b..7181038326 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java @@ -33,7 +33,6 @@ import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext; import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.AbstractEventProcessor; import org.apache.eventmesh.runtime.util.RemotingHelper; -import org.apache.eventmesh.runtime.util.WebhookUtil; import org.apache.commons.lang3.StringUtils; import org.apache.http.impl.client.CloseableHttpClient; @@ -117,7 +116,7 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest return; } -// String url = requestBodyMap.get(EventMeshConstants.URL).toString(); + //String url = requestBodyMap.get(EventMeshConstants.URL).toString(); String topic = JsonUtils.toJSONString(requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC)); // SubscriptionItem @@ -145,32 +144,32 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest } // validate URL -// try { -// if (!IPUtils.isValidDomainOrIp(url, eventMeshHttpConfiguration.getEventMeshIpv4BlackList(), -// eventMeshHttpConfiguration.getEventMeshIpv6BlackList())) { -// httpLogger.error("subscriber url {} is not valid", url); -// handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap, -// responseBodyMap, null); -// return; -// } -// } catch (Exception e) { -// httpLogger.error("subscriber url {} is not valid, error {}", url, e.getMessage()); -// handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap, -// responseBodyMap, null); -// return; -// } -// -// CloseableHttpClient closeableHttpClient = eventMeshHTTPServer.getHttpClientPool().getClient(); -// // obtain webhook delivery agreement for Abuse Protection -// boolean isWebhookAllowed = WebhookUtil.obtainDeliveryAgreement(closeableHttpClient, -// url, eventMeshHttpConfiguration.getEventMeshWebhookOrigin()); -// -// if (!isWebhookAllowed) { -// httpLogger.error("subscriber url {} is not allowed by the target system", url); -// handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap, -// responseBodyMap, null); -// return; -// } + // try { + // if (!IPUtils.isValidDomainOrIp(url, eventMeshHttpConfiguration.getEventMeshIpv4BlackList(), + // eventMeshHttpConfiguration.getEventMeshIpv6BlackList())) { + // httpLogger.error("subscriber url {} is not valid", url); + // handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap, + // responseBodyMap, null); + // return; + // } + // } catch (Exception e) { + // httpLogger.error("subscriber url {} is not valid, error {}", url, e.getMessage()); + // handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap, + // responseBodyMap, null); + // return; + // } + // + // CloseableHttpClient closeableHttpClient = eventMeshHTTPServer.getHttpClientPool().getClient(); + // // obtain webhook delivery agreement for Abuse Protection + // boolean isWebhookAllowed = WebhookUtil.obtainDeliveryAgreement(closeableHttpClient, + // url, eventMeshHttpConfiguration.getEventMeshWebhookOrigin()); + // + // if (!isWebhookAllowed) { + // httpLogger.error("subscriber url {} is not allowed by the target system", url); + // handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap, + // responseBodyMap, null); + // return; + // } long startTime = System.currentTimeMillis(); try { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java index feb4fca069..2417cea174 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java @@ -146,7 +146,8 @@ public void tryHTTPRequest() { } } catch (Exception ex) { - LOGGER.warn("cloudevent to HttpEventWrapper occur except, group:{}, topic:{}, bizSeqNo={}, uniqueId={}", this.handleMsgContext.getConsumerGroup(), + LOGGER.warn("cloudevent to HttpEventWrapper occur except, group:{}, topic:{}, bizSeqNo={}, uniqueId={}", + this.handleMsgContext.getConsumerGroup(), this.handleMsgContext.getTopic(), this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId(), ex); return; } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/WebhookUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/WebhookUtil.java index 68e9b8c025..8308fdad58 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/WebhookUtil.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/WebhookUtil.java @@ -62,7 +62,7 @@ public static boolean obtainDeliveryAgreement(final CloseableHttpClient httpClie try (CloseableHttpResponse response = httpClient.execute(builder)) { String allowedOrigin = null; - if(response.getLastHeader(ALLOWED_ORIGIN_HEADER) != null){ + if (response.getLastHeader(ALLOWED_ORIGIN_HEADER) != null) { allowedOrigin = response.getLastHeader(ALLOWED_ORIGIN_HEADER).getValue(); } return StringUtils.isEmpty(allowedOrigin) From e4c97755983abdfbd852afa7b120a3f4da609d58 Mon Sep 17 00:00:00 2001 From: lrhkobe Date: Wed, 12 Jul 2023 10:03:29 +0800 Subject: [PATCH 11/13] Add response status code setting for http request --- .../common/protocol/http/HttpCommand.java | 14 +++++++++++++ .../protocol/http/HttpEventWrapper.java | 8 ++++++- .../runtime/boot/AbstractHTTPServer.java | 21 ++++++++++++++++--- 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpCommand.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpCommand.java index 76cda7a70f..fa2057ae4c 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpCommand.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpCommand.java @@ -176,6 +176,20 @@ public DefaultFullHttpResponse httpResponse() throws Exception { return response; } + public DefaultFullHttpResponse httpResponse(HttpResponseStatus httpResponseStatus) throws Exception { + if (cmdType == CmdType.REQ) { + return null; + } + DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus, + Unpooled.wrappedBuffer(Objects.requireNonNull(JsonUtils.toJSONString(this.getBody())).getBytes(Constants.DEFAULT_CHARSET))); + HttpHeaders headers = response.headers(); + headers.add(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=" + Constants.DEFAULT_CHARSET); + headers.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); + headers.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + Optional.of(this.getHeader().toMap()).ifPresent(customerHeader -> customerHeader.forEach(headers::add)); + return response; + } + public enum CmdType { REQ, RES diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpEventWrapper.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpEventWrapper.java index 2779bb73d8..3b40d8c163 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpEventWrapper.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/HttpEventWrapper.java @@ -66,6 +66,8 @@ public class HttpEventWrapper implements ProtocolTransportObject { //Command response time public long resTime; + private HttpResponseStatus httpResponseStatus = HttpResponseStatus.OK; + public HttpEventWrapper() { this(null, null, null); } @@ -180,7 +182,7 @@ public void setBody(byte[] newBody) { } public DefaultFullHttpResponse httpResponse() throws Exception { - DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, + DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus, Unpooled.wrappedBuffer(this.body)); HttpHeaders headers = response.headers(); headers.add(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=" + Constants.DEFAULT_CHARSET); @@ -225,4 +227,8 @@ public void buildSysHeaderForCE() { sysHeaderMap.put(ProtocolKey.CloudEventsKey.SUBJECT, topic); } + public void setHttpResponseStatus(HttpResponseStatus httpResponseStatus) { + this.httpResponseStatus = httpResponseStatus; + } + } \ No newline at end of file diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java index b11421bb5e..5e67282225 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java @@ -414,7 +414,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception || !RequestCode.contains(Integer.valueOf(requestCode))) { responseCommand = requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID); - sendResponse(ctx, responseCommand.httpResponse()); + sendResponse(ctx, responseCommand.httpResponse(HttpResponseStatus.BAD_REQUEST)); span = TraceUtils.prepareServerSpan(headerMap, EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); @@ -428,7 +428,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception requestCommand.setBody(Body.buildBody(requestCode, bodyMap)); } catch (Exception e) { responseCommand = requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_RUNTIME_ERR); - sendResponse(ctx, responseCommand.httpResponse()); + sendResponse(ctx, responseCommand.httpResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR)); span = TraceUtils.prepareServerSpan(headerMap, EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false); @@ -448,6 +448,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } catch (Exception ex) { log.error("AbstractHTTPServer.HTTPHandler.channelRead error", ex); + sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR); } finally { ReferenceCountUtil.release(httpRequest); } @@ -467,6 +468,17 @@ public void processHttpRequest(final ChannelHandlerContext ctx, final Pair choosed = eventProcessorTable.get(processorKey); try { + if (choosed == null) { + log.warn("eventProcessorTable not contains EventProcessor, processorKey:{}", processorKey); + sendError(ctx, HttpResponseStatus.BAD_REQUEST); + return; + } + if (choosed.getObject2() == null) { + log.warn("eventProcessorTable not contains ThreadPoolExecutor, processorKey:{}", processorKey); + sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR); + return; + } + choosed.getObject2().submit(() -> { try { final EventProcessor processor = choosed.getObject1(); @@ -474,6 +486,7 @@ public void processHttpRequest(final ChannelHandlerContext ctx, final HttpEventWrapper responseWrapper = requestWrapper.createHttpResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR); + responseWrapper.setHttpResponseStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); asyncContext.onComplete(responseWrapper); if (asyncContext.isComplete()) { @@ -500,10 +513,12 @@ public void processHttpRequest(final ChannelHandlerContext ctx, sendResponse(ctx, asyncContext.getResponse().httpResponse()); } catch (Exception e) { log.error("process error", e); + sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR); } }); } catch (RejectedExecutionException re) { final HttpEventWrapper responseWrapper = requestWrapper.createHttpResponse(EventMeshRetCode.OVERLOAD); + responseWrapper.setHttpResponseStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); asyncContext.onComplete(responseWrapper); metrics.getSummaryMetrics().recordHTTPDiscard(); metrics.getSummaryMetrics().recordHTTPReqResTimeCost( @@ -646,7 +661,7 @@ private HttpEventWrapper parseHttpRequest(final HttpRequest httpRequest) throws fullHttpRequest.content().readBytes(body); requestBody = body; } - }else { + } else { final HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(DEFAULT_HTTP_DATA_FACTORY, httpRequest); for (final InterfaceHttpData parm : decoder.getBodyHttpDatas()) { From d95a2db031278289e7f24eef9983b2de0bed302e Mon Sep 17 00:00:00 2001 From: lrhkobe Date: Thu, 13 Jul 2023 09:35:02 +0800 Subject: [PATCH 12/13] fix license check problem --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index a30f89c3b5..cc3250c192 100644 --- a/build.gradle +++ b/build.gradle @@ -512,7 +512,7 @@ subprojects { dependency "com.github.seancfoley:ipaddress:5.3.3" dependency "com.google.code.gson:gson:2.8.2" - dependency "org.yaml:snakeyaml:1.30" + dependency "org.yaml:snakeyaml:2.0" dependency "org.javassist:javassist:3.24.0-GA" dependency "com.alibaba.nacos:nacos-client:2.2.1" From 81690eccd255cea367cb5574c2e07cfc69b044ca Mon Sep 17 00:00:00 2001 From: lrhkobe Date: Thu, 13 Jul 2023 12:24:42 +0800 Subject: [PATCH 13/13] fix checkstyle problem --- build.gradle | 2 +- tools/dependency-check/known-dependencies.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index cc3250c192..a30f89c3b5 100644 --- a/build.gradle +++ b/build.gradle @@ -512,7 +512,7 @@ subprojects { dependency "com.github.seancfoley:ipaddress:5.3.3" dependency "com.google.code.gson:gson:2.8.2" - dependency "org.yaml:snakeyaml:2.0" + dependency "org.yaml:snakeyaml:1.30" dependency "org.javassist:javassist:3.24.0-GA" dependency "com.alibaba.nacos:nacos-client:2.2.1" diff --git a/tools/dependency-check/known-dependencies.txt b/tools/dependency-check/known-dependencies.txt index 612dd8dfa4..2711e43c7e 100644 --- a/tools/dependency-check/known-dependencies.txt +++ b/tools/dependency-check/known-dependencies.txt @@ -216,7 +216,7 @@ simpleclient-0.8.1.jar simpleclient_common-0.8.1.jar simpleclient_httpserver-0.8.1.jar slf4j-api-1.7.30.jar -snakeyaml-2.0.jar +snakeyaml-1.30.jar snappy-java-1.1.8.1.jar system-rules-1.16.1.jar truth-0.30.jar