From e8063b7f5cf7ab862a35a31413a2dfeee7a707a7 Mon Sep 17 00:00:00 2001 From: Fungx <38498093+Fungx@users.noreply.github.com> Date: Thu, 23 Nov 2023 15:49:24 +0800 Subject: [PATCH] [ISSUE #4618] Add HTTP source connector --- eventmesh-connectors/README.md | 2 +- .../eventmesh-connector-http/build.gradle | 27 ++++ .../gradle.properties | 18 +++ .../http/config/HttpServerConfig.java | 32 ++++ .../http/server/HttpConnectServer.java | 40 +++++ .../http/source/config/HttpSourceConfig.java | 30 ++++ .../source/config/SourceConnectorConfig.java | 32 ++++ .../source/connector/HttpSourceConnector.java | 147 ++++++++++++++++++ .../src/main/resources/server-config.yml | 19 +++ .../src/main/resources/source-config.yml | 31 ++++ .../connector/HttpSourceConnectorTest.java | 145 +++++++++++++++++ .../src/test/resources/server-config.yml | 19 +++ .../src/test/resources/source-config.yml | 30 ++++ settings.gradle | 1 + 14 files changed, 572 insertions(+), 1 deletion(-) create mode 100644 eventmesh-connectors/eventmesh-connector-http/build.gradle create mode 100644 eventmesh-connectors/eventmesh-connector-http/gradle.properties create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/config/HttpServerConfig.java create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/HttpSourceConfig.java create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/SourceConnectorConfig.java create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/test/resources/server-config.yml create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml diff --git a/eventmesh-connectors/README.md b/eventmesh-connectors/README.md index 1fc836f3ae..9ad4d86c6e 100644 --- a/eventmesh-connectors/README.md +++ b/eventmesh-connectors/README.md @@ -36,7 +36,7 @@ Add a new connector by implementing the source/sink interface using : | [File](eventmesh-connector-file) | Sink | ✅ | | Github | Source | ⬜ | | Github | Sink | ⬜ | -| Http | Source | ⬜ | +| [Http](eventmesh-connector-http) | Source | ✅ | | Http | Sink | ⬜ | | Jdbc | Source | ⬜ | | [Jdbc](eventmesh-connector-jdbc) | Sink | ✅ | diff --git a/eventmesh-connectors/eventmesh-connector-http/build.gradle b/eventmesh-connectors/eventmesh-connector-http/build.gradle new file mode 100644 index 0000000000..5e57e86e37 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/build.gradle @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +dependencies { + api project(":eventmesh-openconnect:eventmesh-openconnect-java") + implementation project(":eventmesh-common") + implementation 'io.cloudevents:cloudevents-http-vertx:2.3.0' + implementation 'io.vertx:vertx-web:4.4.5' + + testImplementation "org.apache.httpcomponents:httpclient" + compileOnly 'org.projectlombok:lombok' + annotationProcessor 'org.projectlombok:lombok' +} \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-http/gradle.properties b/eventmesh-connectors/eventmesh-connector-http/gradle.properties new file mode 100644 index 0000000000..cc0e7324ca --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/gradle.properties @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +pluginType=connector +pluginName=http \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/config/HttpServerConfig.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/config/HttpServerConfig.java new file mode 100644 index 0000000000..81a9f20923 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/config/HttpServerConfig.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.config; + +import org.apache.eventmesh.openconnect.api.config.Config; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +public class HttpServerConfig extends Config { + + private boolean sourceEnable; + + private boolean sinkEnable; +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java new file mode 100644 index 0000000000..bd94fed126 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.server; + +import org.apache.eventmesh.connector.http.config.HttpServerConfig; +import org.apache.eventmesh.connector.http.source.connector.HttpSourceConnector; +import org.apache.eventmesh.openconnect.Application; +import org.apache.eventmesh.openconnect.util.ConfigUtil; + +public class HttpConnectServer { + + public static void main(String[] args) throws Exception { + HttpServerConfig serverConfig = ConfigUtil.parse(HttpServerConfig.class, "server-config.yml"); + + if (serverConfig.isSourceEnable()) { + Application httpSourceApp = new Application(); + httpSourceApp.run(HttpSourceConnector.class); + } + + if (serverConfig.isSinkEnable()) { + // TODO support sink connector + } + } + +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/HttpSourceConfig.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/HttpSourceConfig.java new file mode 100644 index 0000000000..bee870cb1a --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/HttpSourceConfig.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.source.config; + +import org.apache.eventmesh.openconnect.api.config.SourceConfig; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +public class HttpSourceConfig extends SourceConfig { + + public SourceConnectorConfig connectorConfig; +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/SourceConnectorConfig.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/SourceConnectorConfig.java new file mode 100644 index 0000000000..873a0d1922 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/SourceConnectorConfig.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.source.config; + +import lombok.Data; + +@Data +public class SourceConnectorConfig { + + private String connectorName; + + private String path; + + private int port; + + private int idleTimeout; +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java new file mode 100644 index 0000000000..4dddb752ab --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.source.connector; + +import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.connector.http.source.config.HttpSourceConfig; +import org.apache.eventmesh.openconnect.api.config.Config; +import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; +import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; +import org.apache.eventmesh.openconnect.api.source.Source; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.eventmesh.openconnect.util.CloudEventUtil; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.http.vertx.VertxMessageFactory; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.handler.LoggerHandler; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class HttpSourceConnector implements Source { + + private static final int DEFAULT_BATCH_SIZE = 10; + + private HttpSourceConfig sourceConfig; + private BlockingQueue queue; + private HttpServer server; + + @Override + public Class configClass() { + return HttpSourceConfig.class; + } + + @Override + public void init(Config config) throws Exception { + this.sourceConfig = (HttpSourceConfig) config; + doInit(); + } + + @Override + public void init(ConnectorContext connectorContext) throws Exception { + SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext; + this.sourceConfig = (HttpSourceConfig) sourceConnectorContext.getSourceConfig(); + doInit(); + } + + private void doInit() throws Exception { + this.queue = new LinkedBlockingQueue<>(1000); + + final Vertx vertx = Vertx.vertx(); + final Router router = Router.router(vertx); + router.route() + .path(this.sourceConfig.connectorConfig.getPath()) + .method(HttpMethod.POST) + .handler(LoggerHandler.create()) + .handler(ctx -> { + VertxMessageFactory.createReader(ctx.request()) + .map(MessageReader::toEvent) + .onSuccess(event -> { + queue.add(event); + log.info("[HttpSourceConnector] Succeed to convert payload into CloudEvent. StatusCode={}", HttpResponseStatus.OK.code()); + ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end(); + }) + .onFailure(t -> { + log.error("[HttpSourceConnector] Malformed request. StatusCode={}", HttpResponseStatus.BAD_REQUEST.code(), t); + ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).setStatusMessage(t.getMessage()).end(); + }); + }); + this.server = vertx.createHttpServer(new HttpServerOptions() + .setPort(this.sourceConfig.connectorConfig.getPort()) + .setIdleTimeout(this.sourceConfig.connectorConfig.getIdleTimeout())).requestHandler(router); + } + + @Override + public void start() throws Exception { + Throwable t = this.server.listen().cause(); + if (t != null) { + log.error("[HttpSourceConnector] Failed to start Vertx server.", t); + throw new EventMeshException("Failed to start Vertx server"); + } + } + + @Override + public void commit(ConnectRecord record) { + + } + + @Override + public String name() { + return this.sourceConfig.getConnectorConfig().getConnectorName(); + } + + @Override + public void stop() throws Exception { + Throwable t = this.server.close().cause(); + if (t != null) { + log.error("[HttpSourceConnector] Failed to stop Vertx server.", t); + throw new EventMeshException("Failed to stop Vertx server"); + } + } + + @Override + public List poll() { + List connectRecords = new ArrayList<>(DEFAULT_BATCH_SIZE); + for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) { + try { + CloudEvent event = queue.poll(3, TimeUnit.SECONDS); + if (event == null) { + break; + } + connectRecords.add(CloudEventUtil.convertEventToRecord(event)); + } catch (InterruptedException e) { + break; + } + } + return connectRecords; + } + +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml new file mode 100644 index 0000000000..0cd7b5b5ab --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +sourceEnable: true +sinkEnable: false diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml new file mode 100644 index 0000000000..415c924b1c --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pubSubConfig: + meshAddress: 127.0.0.1:10000 + subject: TopicTest + idc: FT + env: PRD + group: httpSource + appId: 5032 + userName: httpSourceUser + passWord: httpmqPassWord +connectorConfig: + connectorName: httpSource + path: /test + port: 3755 + idleTimeout: 5 \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java new file mode 100644 index 0000000000..35d58b75c1 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.source.connector; + +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.connector.http.source.config.HttpSourceConfig; +import org.apache.eventmesh.connector.http.source.config.SourceConnectorConfig; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.eventmesh.openconnect.util.ConfigUtil; + +import org.apache.http.HttpHeaders; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.util.List; +import java.util.UUID; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class HttpSourceConnectorTest { + + private HttpSourceConnector connector; + private SourceConnectorConfig config; + private CloseableHttpClient httpClient; + private String uri; + private final String expectedMessage = "testHttpMessage"; + + @BeforeEach + void setUp() throws Exception { + connector = new HttpSourceConnector(); + HttpSourceConfig sourceConfig = (HttpSourceConfig) ConfigUtil.parse(connector.configClass()); + config = sourceConfig.getConnectorConfig(); + connector.init(sourceConfig); + connector.start(); + + uri = new URIBuilder().setScheme("http").setHost("127.0.0.1").setPort(config.getPort()).setPath(config.getPath()).build().toString(); + + httpClient = HttpClients.createDefault(); + } + + @Test + void testPoll() throws Exception { + final int batchSize = 10; + // test binary content mode + for (int i = 0; i < batchSize; i++) { + HttpResponse resp = mockBinaryRequest(); + Assertions.assertEquals(resp.getStatusLine().getStatusCode(), HttpStatus.SC_OK); + + } + List res = connector.poll(); + Assertions.assertEquals(batchSize, res.size()); + for (ConnectRecord r : res) { + Assertions.assertEquals(expectedMessage, new String((byte[]) r.getData())); + } + + // test structured content mode + for (int i = 0; i < batchSize; i++) { + HttpResponse resp = mockStructuredRequest(); + Assertions.assertEquals(resp.getStatusLine().getStatusCode(), HttpStatus.SC_OK); + } + res = connector.poll(); + Assertions.assertEquals(batchSize, res.size()); + for (ConnectRecord r : res) { + Assertions.assertEquals(expectedMessage, new String((byte[]) r.getData())); + } + + // test invalid requests + HttpPost invalidPost = new HttpPost(uri); + invalidPost.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain"); + invalidPost.setHeader("ce-id", String.valueOf(UUID.randomUUID())); + HttpResponse resp = httpClient.execute(invalidPost); + Assertions.assertEquals(HttpStatus.SC_BAD_REQUEST, resp.getStatusLine().getStatusCode()); + } + + HttpResponse mockBinaryRequest() throws Exception { + HttpPost httpPost = new HttpPost(uri); + httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain"); + httpPost.setHeader("ce-id", String.valueOf(UUID.randomUUID())); + httpPost.setHeader("ce-specversion", "1.0"); + httpPost.setHeader("ce-type", "com.example.someevent"); + httpPost.setHeader("ce-source", "/mycontext"); + httpPost.setHeader("ce-subject", "test"); + httpPost.setEntity(new StringEntity(expectedMessage)); + + return httpClient.execute(httpPost); + } + + HttpResponse mockStructuredRequest() throws Exception { + HttpPost httpPost = new HttpPost(uri); + // according to the CloudEvent specification, a json format event MUST use the media type `application/cloudevents+json` + httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/cloudevents+json"); + TestEvent event = new TestEvent(); + event.id = String.valueOf(UUID.randomUUID()); + event.specversion = "1.0"; + event.type = "com.example.someevent"; + event.source = "/mycontext"; + event.subject = "test"; + event.datacontenttype = "text/plain"; + event.data = expectedMessage; + httpPost.setEntity(new StringEntity(JsonUtils.toJSONString(event))); + + return httpClient.execute(httpPost); + } + + @AfterEach + void tearDown() throws Exception { + connector.stop(); + httpClient.close(); + } + + class TestEvent { + + public String specversion; + public String type; + public String source; + public String subject; + public String datacontenttype; + public String id; + + public String data; + } +} \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/server-config.yml b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/server-config.yml new file mode 100644 index 0000000000..0cd7b5b5ab --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/server-config.yml @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +sourceEnable: true +sinkEnable: false diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml new file mode 100644 index 0000000000..0a3e68d070 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pubSubConfig: + meshAddress: 127.0.0.1:10000 + subject: TopicTest + idc: FT + env: PRD + group: httpSource + appId: 5032 + userName: httpSourceUser + passWord: httpmqPassWord +connectorConfig: + connectorName: httpSource + path: /test + port: 3755 \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index a60f8d8624..9a6eed1f2f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -49,6 +49,7 @@ include 'eventmesh-connectors:eventmesh-connector-feishu' include 'eventmesh-connectors:eventmesh-connector-wecom' include 'eventmesh-connectors:eventmesh-connector-slack' include 'eventmesh-connectors:eventmesh-connector-wechat' +include 'eventmesh-connectors:eventmesh-connector-http' include 'eventmesh-storage-plugin:eventmesh-storage-api' include 'eventmesh-storage-plugin:eventmesh-storage-standalone'