Skip to content

Commit

Permalink
[ISSUE apache#4618] Add HTTP Source Connector
Browse files Browse the repository at this point in the history
  • Loading branch information
Fungx committed Dec 10, 2023
1 parent ee9ee4c commit 86d0a78
Show file tree
Hide file tree
Showing 13 changed files with 571 additions and 0 deletions.
27 changes: 27 additions & 0 deletions eventmesh-connectors/eventmesh-connector-http/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

dependencies {
api project(":eventmesh-openconnect:eventmesh-openconnect-java")
implementation project(":eventmesh-common")
implementation 'io.cloudevents:cloudevents-http-vertx:2.3.0'
implementation 'io.vertx:vertx-web:4.4.6'

testImplementation "org.apache.httpcomponents:httpclient"
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}
18 changes: 18 additions & 0 deletions eventmesh-connectors/eventmesh-connector-http/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
pluginType=connector
pluginName=http
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.config;

import org.apache.eventmesh.openconnect.api.config.Config;

import lombok.Data;
import lombok.EqualsAndHashCode;

@Data
@EqualsAndHashCode(callSuper = true)
public class HttpServerConfig extends Config {

private boolean sourceEnable;

private boolean sinkEnable;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.server;

import org.apache.eventmesh.connector.http.config.HttpServerConfig;
import org.apache.eventmesh.connector.http.source.connector.HttpSourceConnector;
import org.apache.eventmesh.openconnect.Application;
import org.apache.eventmesh.openconnect.util.ConfigUtil;

public class HttpConnectServer {

public static void main(String[] args) throws Exception {
HttpServerConfig serverConfig = ConfigUtil.parse(HttpServerConfig.class, "server-config.yml");

if (serverConfig.isSourceEnable()) {
Application httpSourceApp = new Application();
httpSourceApp.run(HttpSourceConnector.class);
}

if (serverConfig.isSinkEnable()) {
// TODO support sink connector
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.source.config;

import org.apache.eventmesh.openconnect.api.config.SourceConfig;

import lombok.Data;
import lombok.EqualsAndHashCode;

@Data
@EqualsAndHashCode(callSuper = true)
public class HttpSourceConfig extends SourceConfig {

public SourceConnectorConfig connectorConfig;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.source.config;

import lombok.Data;

@Data
public class SourceConnectorConfig {

private String connectorName;

private String path;

private int port;

private int idleTimeout;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.source.connector;

import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.connector.http.source.config.HttpSourceConfig;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.util.CloudEventUtil;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.http.vertx.VertxMessageFactory;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.LoggerHandler;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class HttpSourceConnector implements Source {

private static final int DEFAULT_BATCH_SIZE = 10;

private HttpSourceConfig sourceConfig;
private BlockingQueue<CloudEvent> queue;
private HttpServer server;

@Override
public Class<? extends Config> configClass() {
return HttpSourceConfig.class;
}

@Override
public void init(Config config) throws Exception {
this.sourceConfig = (HttpSourceConfig) config;
doInit();
}

@Override
public void init(ConnectorContext connectorContext) throws Exception {
SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext;
this.sourceConfig = (HttpSourceConfig) sourceConnectorContext.getSourceConfig();
doInit();
}

private void doInit() throws Exception {
this.queue = new LinkedBlockingQueue<>(1000);

final Vertx vertx = Vertx.vertx();
final Router router = Router.router(vertx);
router.route()
.path(this.sourceConfig.connectorConfig.getPath())
.method(HttpMethod.POST)
.handler(LoggerHandler.create())
.handler(ctx -> {
VertxMessageFactory.createReader(ctx.request())
.map(MessageReader::toEvent)
.onSuccess(event -> {
queue.add(event);
log.info("[HttpSourceConnector] Succeed to convert payload into CloudEvent. StatusCode={}", HttpResponseStatus.OK.code());
ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end();
})
.onFailure(t -> {
log.error("[HttpSourceConnector] Malformed request. StatusCode={}", HttpResponseStatus.BAD_REQUEST.code(), t);
ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).setStatusMessage(t.getMessage()).end();
});
});
this.server = vertx.createHttpServer(new HttpServerOptions()
.setPort(this.sourceConfig.connectorConfig.getPort())
.setIdleTimeout(this.sourceConfig.connectorConfig.getIdleTimeout())).requestHandler(router);
}

@Override
public void start() throws Exception {
Throwable t = this.server.listen().cause();
if (t != null) {
log.error("[HttpSourceConnector] Failed to start Vertx server.", t);
throw new EventMeshException("Failed to start Vertx server");
}
}

@Override
public void commit(ConnectRecord record) {

}

@Override
public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}

@Override
public void stop() throws Exception {
Throwable t = this.server.close().cause();
if (t != null) {
log.error("[HttpSourceConnector] Failed to stop Vertx server.", t);
throw new EventMeshException("Failed to stop Vertx server");
}
}

@Override
public List<ConnectRecord> poll() {
List<ConnectRecord> connectRecords = new ArrayList<>(DEFAULT_BATCH_SIZE);
for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) {
try {
CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
if (event == null) {
break;
}
connectRecords.add(CloudEventUtil.convertEventToRecord(event));
} catch (InterruptedException e) {
break;
}
}
return connectRecords;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

sourceEnable: true
sinkEnable: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

pubSubConfig:
meshAddress: 127.0.0.1:10000
subject: TopicTest
idc: FT
env: PRD
group: httpSource
appId: 5032
userName: httpSourceUser
passWord: httpmqPassWord
connectorConfig:
connectorName: httpSource
path: /test
port: 3755
idleTimeout: 5
Loading

0 comments on commit 86d0a78

Please sign in to comment.