diff --git a/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnectorTest.java b/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnectorTest.java new file mode 100644 index 0000000000..5f5e3410bf --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/source/connector/RocketMQSourceConnectorTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.rocketmq.source.connector; + +import org.apache.eventmesh.connector.rocketmq.source.config.RocketMQSourceConfig; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.eventmesh.openconnect.util.ConfigUtil; + +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.common.message.MessageExt; + +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.platform.commons.support.HierarchyTraversalMode; +import org.junit.platform.commons.support.ReflectionSupport; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class RocketMQSourceConnectorTest { + + @InjectMocks + private RocketMQSourceConnector sourceConnector; + + @Mock + private DefaultLitePullConsumer consumer; + + private RocketMQSourceConfig sourceConfig; + + private static final String EXPECTED_MESSAGE = "testMessage"; + + @BeforeEach + public void setUp() throws Exception { + sourceConfig = (RocketMQSourceConfig) ConfigUtil.parse(sourceConnector.configClass()); + Mockito.doReturn(generateMockedMessages()).when(consumer).poll(); + Field field = ReflectionSupport.findFields(sourceConnector.getClass(), + (f) -> f.getName().equals("consumer"), HierarchyTraversalMode.BOTTOM_UP).get(0); + field.setAccessible(true); + field.set(sourceConnector, consumer); + } + + @Test + public void testRocketMQSourceConnectorPoll() { + List poll = sourceConnector.poll(); + poll.forEach(connectRecord -> { + Assertions.assertNotNull(connectRecord); + Assertions.assertEquals(EXPECTED_MESSAGE, connectRecord.getData()); + Assertions.assertNotNull(connectRecord.getExtension("topic")); + Assertions.assertNotNull(connectRecord.getPosition()); + Assertions.assertEquals(connectRecord.getExtension("topic"), sourceConfig.getConnectorConfig().getTopic()); + }); + } + + private List generateMockedMessages() { + final int mockCount = 5; + List messageExts = new ArrayList<>(); + for (int i = 0; i < mockCount; i++) { + MessageExt messageExt = new MessageExt(); + messageExt.setTopic(sourceConfig.getConnectorConfig().getTopic()); + messageExt.setBody(EXPECTED_MESSAGE.getBytes(StandardCharsets.UTF_8)); + messageExt.setQueueOffset(1L); + messageExt.setQueueId(2); + messageExt.setBrokerName("testBroker"); + messageExts.add(messageExt); + } + return messageExts; + } +} diff --git a/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/resources/source-config.yml b/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/resources/source-config.yml new file mode 100644 index 0000000000..7a7880b877 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-rocketmq/src/test/resources/source-config.yml @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pubSubConfig: + meshAddress: 127.0.0.1:10000 + subject: TopicTest + idc: FT + env: PRD + group: rocketmqSource + appId: 5032 + userName: rocketmqSourceUser + passWord: rocketmqPassWord +connectorConfig: + connectorName: rocketmqSource + nameserver: 127.0.0.1:9876 + topic: TopicTest + commitOffsetIntervalMs: 5000 +offsetStorageConfig: + offsetStorageType: nacos + offsetStorageAddr: 127.0.0.1:8848 + extensions: { + #same with topic + dataId: TopicTest, + #same with group + group: rocketmqSource + }