Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat http source2 #2

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eventmesh-connectors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Add a new connector by implementing the source/sink interface using :
| [File](eventmesh-connector-file) | Sink | ✅ |
| Github | Source | ⬜ |
| Github | Sink | ⬜ |
| Http | Source | |
| [Http](eventmesh-connector-http) | Source | |
| Http | Sink | ⬜ |
| Jdbc | Source | ⬜ |
| [Jdbc](eventmesh-connector-jdbc) | Sink | ✅ |
Expand Down
27 changes: 27 additions & 0 deletions eventmesh-connectors/eventmesh-connector-http/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

dependencies {
api project(":eventmesh-openconnect:eventmesh-openconnect-java")
implementation project(":eventmesh-common")
implementation 'io.cloudevents:cloudevents-http-vertx:2.3.0'
implementation 'io.vertx:vertx-web:4.4.6'

testImplementation "org.apache.httpcomponents:httpclient"
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}
18 changes: 18 additions & 0 deletions eventmesh-connectors/eventmesh-connector-http/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
pluginType=connector
pluginName=http
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.config;

import org.apache.eventmesh.openconnect.api.config.Config;

import lombok.Data;
import lombok.EqualsAndHashCode;

@Data
@EqualsAndHashCode(callSuper = true)
public class HttpServerConfig extends Config {

private boolean sourceEnable;

private boolean sinkEnable;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.server;

import org.apache.eventmesh.connector.http.config.HttpServerConfig;
import org.apache.eventmesh.connector.http.source.connector.HttpSourceConnector;
import org.apache.eventmesh.openconnect.Application;
import org.apache.eventmesh.openconnect.util.ConfigUtil;

public class HttpConnectServer {

public static void main(String[] args) throws Exception {
HttpServerConfig serverConfig = ConfigUtil.parse(HttpServerConfig.class, "server-config.yml");

if (serverConfig.isSourceEnable()) {
Application httpSourceApp = new Application();
httpSourceApp.run(HttpSourceConnector.class);
}

if (serverConfig.isSinkEnable()) {
// TODO support sink connector
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.source.config;

import org.apache.eventmesh.openconnect.api.config.SourceConfig;

import lombok.Data;
import lombok.EqualsAndHashCode;

@Data
@EqualsAndHashCode(callSuper = true)
public class HttpSourceConfig extends SourceConfig {

public SourceConnectorConfig connectorConfig;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.source.config;

import lombok.Data;

@Data
public class SourceConnectorConfig {

private String connectorName;

private String path;

private int port;

private int idleTimeout;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.http.source.connector;

import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.connector.http.source.config.HttpSourceConfig;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.util.CloudEventUtil;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.http.vertx.VertxMessageFactory;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.LoggerHandler;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class HttpSourceConnector implements Source {

private static final int DEFAULT_BATCH_SIZE = 10;

private HttpSourceConfig sourceConfig;
private BlockingQueue<CloudEvent> queue;
private HttpServer server;

@Override
public Class<? extends Config> configClass() {
return HttpSourceConfig.class;
}

@Override
public void init(Config config) throws Exception {
this.sourceConfig = (HttpSourceConfig) config;
doInit();
}

@Override
public void init(ConnectorContext connectorContext) throws Exception {
SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext;
this.sourceConfig = (HttpSourceConfig) sourceConnectorContext.getSourceConfig();
doInit();
}

private void doInit() throws Exception {
this.queue = new LinkedBlockingQueue<>(1000);

final Vertx vertx = Vertx.vertx();
final Router router = Router.router(vertx);
router.route()
.path(this.sourceConfig.connectorConfig.getPath())
.method(HttpMethod.POST)
.handler(LoggerHandler.create())
.handler(ctx -> {
VertxMessageFactory.createReader(ctx.request())
.map(MessageReader::toEvent)
.onSuccess(event -> {
queue.add(event);
log.info("[HttpSourceConnector] Succeed to convert payload into CloudEvent. StatusCode={}", HttpResponseStatus.OK.code());
ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end();
})
.onFailure(t -> {
log.error("[HttpSourceConnector] Malformed request. StatusCode={}", HttpResponseStatus.BAD_REQUEST.code(), t);
ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).setStatusMessage(t.getMessage()).end();
});
});
this.server = vertx.createHttpServer(new HttpServerOptions()
.setPort(this.sourceConfig.connectorConfig.getPort())
.setIdleTimeout(this.sourceConfig.connectorConfig.getIdleTimeout())).requestHandler(router);
}

@Override
public void start() throws Exception {
Throwable t = this.server.listen().cause();
if (t != null) {
log.error("[HttpSourceConnector] Failed to start Vertx server.", t);
throw new EventMeshException("Failed to start Vertx server");
}
}

@Override
public void commit(ConnectRecord record) {

}

@Override
public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}

@Override
public void stop() throws Exception {
Throwable t = this.server.close().cause();
if (t != null) {
log.error("[HttpSourceConnector] Failed to stop Vertx server.", t);
throw new EventMeshException("Failed to stop Vertx server");
}
}

@Override
public List<ConnectRecord> poll() {
List<ConnectRecord> connectRecords = new ArrayList<>(DEFAULT_BATCH_SIZE);
for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) {
try {
CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
if (event == null) {
break;
}
connectRecords.add(CloudEventUtil.convertEventToRecord(event));
} catch (InterruptedException e) {
break;
}
}
return connectRecords;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

sourceEnable: true
sinkEnable: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

pubSubConfig:
meshAddress: 127.0.0.1:10000
subject: TopicTest
idc: FT
env: PRD
group: httpSource
appId: 5032
userName: httpSourceUser
passWord: httpmqPassWord
connectorConfig:
connectorName: httpSource
path: /test
port: 3755
idleTimeout: 5
Loading
Loading