forked from apache/eventmesh
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[ISSUE apache#3913] Add Source Connector RocketMQ Module
- Loading branch information
Showing
8 changed files
with
454 additions
and
0 deletions.
There are no files selected for viewing
41 changes: 41 additions & 0 deletions
41
eventmesh-connectors/source-connector-rocketmq/build.gradle
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
List rocketmq = [ | ||
"org.apache.rocketmq:rocketmq-client:$rocketmq_version", | ||
"org.apache.rocketmq:rocketmq-broker:$rocketmq_version", | ||
"org.apache.rocketmq:rocketmq-common:$rocketmq_version", | ||
"org.apache.rocketmq:rocketmq-store:$rocketmq_version", | ||
"org.apache.rocketmq:rocketmq-namesrv:$rocketmq_version", | ||
"org.apache.rocketmq:rocketmq-tools:$rocketmq_version", | ||
"org.apache.rocketmq:rocketmq-remoting:$rocketmq_version", | ||
"org.apache.rocketmq:rocketmq-logging:$rocketmq_version", | ||
"org.apache.rocketmq:rocketmq-test:$rocketmq_version", | ||
"org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version", | ||
"org.apache.rocketmq:rocketmq-filter:$rocketmq_version", | ||
"org.apache.rocketmq:rocketmq-acl:$rocketmq_version", | ||
"org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version", | ||
|
||
] | ||
|
||
dependencies { | ||
implementation project(":eventmesh-connectors:eventmesh-connector-api") | ||
implementation project(":eventmesh-sdk-java") | ||
implementation rocketmq | ||
compileOnly 'org.projectlombok:lombok' | ||
annotationProcessor 'org.projectlombok:lombok' | ||
} |
17 changes: 17 additions & 0 deletions
17
eventmesh-connectors/source-connector-rocketmq/gradle.properties
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
rocketmq_version=4.9.5 |
113 changes: 113 additions & 0 deletions
113
...etmq/src/main/java/org/apache/eventmesh/source/connector/rocketmq/EventMeshTestUtils.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.eventmesh.source.connector.rocketmq; | ||
|
||
import static org.apache.eventmesh.common.protocol.tcp.Command.RESPONSE_TO_SERVER; | ||
|
||
import io.cloudevents.CloudEvent; | ||
import io.cloudevents.core.builder.CloudEventBuilder; | ||
import java.net.URI; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.UUID; | ||
import java.util.concurrent.ThreadLocalRandom; | ||
import org.apache.eventmesh.client.tcp.common.EventMeshCommon; | ||
import org.apache.eventmesh.client.tcp.common.MessageUtils; | ||
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; | ||
import org.apache.eventmesh.common.protocol.tcp.Header; | ||
import org.apache.eventmesh.common.protocol.tcp.Package; | ||
import org.apache.eventmesh.common.protocol.tcp.UserAgent; | ||
import org.apache.eventmesh.common.utils.JsonUtils; | ||
|
||
public class EventMeshTestUtils { | ||
|
||
private static final int SEQ_LENGTH = 10; | ||
|
||
private static final String ASYNC_MSG_BODY = "testAsyncMessage"; | ||
|
||
private static final String DEFAULT_TTL_MS = "30000"; | ||
|
||
// generate pub-client | ||
public static UserAgent generateClient1() { | ||
final UserAgent agent = UserAgent.builder() | ||
.env(UtilsConstants.ENV) | ||
.host(UtilsConstants.HOST) | ||
.password(generateRandomString(UtilsConstants.PASSWORD_LENGTH)) | ||
.username(UtilsConstants.USER_NAME) | ||
.group(UtilsConstants.GROUP) | ||
.path(UtilsConstants.PATH) | ||
.port(UtilsConstants.PORT_1) | ||
.subsystem(UtilsConstants.SUB_SYSTEM_1) | ||
.pid(UtilsConstants.PID_1) | ||
.version(UtilsConstants.VERSION) | ||
.idc(UtilsConstants.IDC) | ||
.build(); | ||
return MessageUtils.generatePubClient(agent); | ||
} | ||
|
||
// generate sub-client | ||
public static UserAgent generateClient2() { | ||
final UserAgent agent = UserAgent.builder() | ||
.env(UtilsConstants.ENV) | ||
.host(UtilsConstants.HOST) | ||
.password(generateRandomString(UtilsConstants.PASSWORD_LENGTH)) | ||
.username(UtilsConstants.USER_NAME) | ||
.group(UtilsConstants.GROUP) | ||
.path(UtilsConstants.PATH) | ||
.port(UtilsConstants.PORT_2) | ||
.subsystem(UtilsConstants.SUB_SYSTEM_2) | ||
.pid(UtilsConstants.PID_2) | ||
.version(UtilsConstants.VERSION) | ||
.idc(UtilsConstants.IDC) | ||
.build(); | ||
return MessageUtils.generateSubClient(agent); | ||
} | ||
|
||
public static Package rrResponse(final EventMeshMessage request) { | ||
final Package msg = new Package(); | ||
msg.setHeader(new Header(RESPONSE_TO_SERVER, 0, null, generateRandomString(SEQ_LENGTH))); | ||
msg.setBody(request); | ||
return msg; | ||
} | ||
|
||
private static String generateRandomString(final int length) { | ||
final StringBuilder builder = new StringBuilder(length); | ||
for (int i = 0; i < length; i++) { | ||
builder.append((char) ThreadLocalRandom.current().nextInt(48, 57)); | ||
} | ||
return builder.toString(); | ||
} | ||
|
||
public static CloudEvent generateCloudEventV1(String destination, String message) { | ||
final Map<String, String> content = new HashMap<>(); | ||
content.put(UtilsConstants.CONTENT, message); | ||
|
||
return CloudEventBuilder.v1() | ||
.withId(UUID.randomUUID().toString()) | ||
.withSubject(destination) | ||
.withSource(URI.create("/")) | ||
.withDataContentType("application/cloudevents+json") | ||
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME) | ||
.withData(Objects.requireNonNull(JsonUtils.toJSONString(content)).getBytes(StandardCharsets.UTF_8)) | ||
.withExtension(UtilsConstants.TTL, DEFAULT_TTL_MS) | ||
.build(); | ||
} | ||
|
||
} |
60 changes: 60 additions & 0 deletions
60
...mq/src/main/java/org/apache/eventmesh/source/connector/rocketmq/RocketMQSourceWorker.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
package org.apache.eventmesh.source.connector.rocketmq; | ||
|
||
import io.cloudevents.CloudEvent; | ||
import java.util.List; | ||
import org.apache.eventmesh.client.tcp.EventMeshTCPClient; | ||
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory; | ||
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig; | ||
import org.apache.eventmesh.common.protocol.tcp.UserAgent; | ||
import org.apache.eventmesh.connector.api.data.ConnectRecord; | ||
import org.apache.eventmesh.source.connector.rocketmq.config.RocketMQSourceConfig; | ||
import org.apache.eventmesh.source.connector.rocketmq.connector.RocketMQSourceConnector; | ||
|
||
public class RocketMQSourceWorker { | ||
|
||
public static final String SOURCE_CONSUMER_GROUP = "DEFAULT-CONSUMER-GROUP"; | ||
public static final String SOURCE_CONNECT_NAMESRVADDR = "127.0.0.1:9877"; | ||
public static final String SOURCE_TOPIC = "TopicTest"; | ||
|
||
public static final String DESTINATION = "SourceTopic"; | ||
|
||
|
||
public static void main(String[] args) throws Exception { | ||
|
||
UserAgent userAgent = EventMeshTestUtils.generateClient1(); | ||
EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder() | ||
.host("127.0.0.1") | ||
.port(10002) | ||
.userAgent(userAgent) | ||
.build(); | ||
|
||
final EventMeshTCPClient<CloudEvent> client = | ||
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class); | ||
|
||
client.init(); | ||
|
||
RocketMQSourceConnector rocketMQSourceConnector = new RocketMQSourceConnector(); | ||
|
||
RocketMQSourceConfig rocketMQSourceConfig = new RocketMQSourceConfig(); | ||
|
||
rocketMQSourceConfig.setSourceNameserver(SOURCE_CONNECT_NAMESRVADDR); | ||
rocketMQSourceConfig.setSourceTopic(SOURCE_TOPIC); | ||
rocketMQSourceConfig.setSourceGroup(SOURCE_CONSUMER_GROUP); | ||
|
||
rocketMQSourceConnector.init(rocketMQSourceConfig); | ||
|
||
rocketMQSourceConnector.start(); | ||
|
||
while(true) { | ||
List<ConnectRecord> connectorRecordList = rocketMQSourceConnector.poll(); | ||
for(ConnectRecord connectRecord : connectorRecordList) { | ||
// todo:connectorRecord 转换 cloudEvents | ||
CloudEvent event = EventMeshTestUtils.generateCloudEventV1(connectRecord.getExtension("topic"), connectRecord.getData().toString()); | ||
client.publish(event, 3000); | ||
Thread.sleep(500); | ||
} | ||
} | ||
|
||
} | ||
|
||
} |
52 changes: 52 additions & 0 deletions
52
...rocketmq/src/main/java/org/apache/eventmesh/source/connector/rocketmq/UtilsConstants.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.eventmesh.source.connector.rocketmq; | ||
|
||
import lombok.AccessLevel; | ||
import lombok.NoArgsConstructor; | ||
|
||
|
||
@NoArgsConstructor(access = AccessLevel.PRIVATE) | ||
public class UtilsConstants { | ||
|
||
public static final String ENV = "test"; | ||
public static final String HOST = "localhost"; | ||
public static final Integer PASSWORD_LENGTH = 8; | ||
public static final String USER_NAME = "PU4283"; | ||
public static final String GROUP = "EventmeshTestGroup"; | ||
public static final String PATH = "/data/app/umg_proxy"; | ||
public static final Integer PORT_1 = 8362; | ||
public static final Integer PORT_2 = 9362; | ||
public static final String SUB_SYSTEM_1 = "5023"; | ||
public static final String SUB_SYSTEM_2 = "5017"; | ||
public static final Integer PID_1 = 32_893; | ||
public static final Integer PID_2 = 42_893; | ||
public static final String VERSION = "2.0.11"; | ||
public static final String IDC = "FT"; | ||
/** | ||
* PROPERTY KEY NAME . | ||
*/ | ||
public static final String MSG_TYPE = "msgtype"; | ||
public static final String TTL = "ttl"; | ||
public static final String KEYS = "keys"; | ||
public static final String REPLY_TO = "replyto"; | ||
public static final String PROPERTY_MESSAGE_REPLY_TO = "propertymessagereplyto"; | ||
public static final String CONTENT = "content"; | ||
|
||
|
||
} |
63 changes: 63 additions & 0 deletions
63
...main/java/org/apache/eventmesh/source/connector/rocketmq/config/RocketMQSourceConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.eventmesh.source.connector.rocketmq.config; | ||
|
||
import org.apache.eventmesh.connector.api.config.SourceConfig; | ||
|
||
public class RocketMQSourceConfig extends SourceConfig { | ||
|
||
String connectorName; | ||
|
||
String sourceNameserver; | ||
|
||
String sourceTopic; | ||
|
||
String sourceGroup; | ||
|
||
public String getConnectorName() { | ||
return connectorName; | ||
} | ||
|
||
public void setConnectorName(String connectorName) { | ||
this.connectorName = connectorName; | ||
} | ||
|
||
public String getSourceNameserver() { | ||
return sourceNameserver; | ||
} | ||
|
||
public void setSourceNameserver(String sourceNameserver) { | ||
this.sourceNameserver = sourceNameserver; | ||
} | ||
|
||
public String getSourceTopic() { | ||
return sourceTopic; | ||
} | ||
|
||
public void setSourceTopic(String sourceTopic) { | ||
this.sourceTopic = sourceTopic; | ||
} | ||
|
||
public String getSourceGroup() { | ||
return sourceGroup; | ||
} | ||
|
||
public void setSourceGroup(String sourceGroup) { | ||
this.sourceGroup = sourceGroup; | ||
} | ||
} |
Oops, something went wrong.