Skip to content

Commit

Permalink
[fix][build] Remove unnecessary Oracle maven repository from pom.xml (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari authored and Denovo1998 committed Aug 17, 2024
1 parent 551bc2a commit b80cf4e
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 43 deletions.
8 changes: 0 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2804,13 +2804,5 @@ flexible messaging model and an intuitive client API.</description>
<enabled>false</enabled>
</snapshots>
</repository>
<!-- For the BDB JE dependency -->
<repository>
<id>oracle.releases</id>
<url>https://download.oracle.com/maven</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>
14 changes: 14 additions & 0 deletions pulsar-io/rabbitmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,20 @@
<artifactId>qpid-broker</artifactId>
<version>9.2.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-bdbstore</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-link-store</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-derby-store</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pulsar.io.rabbitmq;

import java.io.File;
import java.io.FileOutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
Expand All @@ -42,28 +40,14 @@ public void stopBroker() {

Map<String, Object> getBrokerOptions(String port) throws Exception {
Path tmpFolder = Files.createTempDirectory("qpidWork");
Path homeFolder = Files.createTempDirectory("qpidHome");
File etc = new File(homeFolder.toFile(), "etc");
etc.mkdir();
FileOutputStream fos = new FileOutputStream(new File(etc, "passwd"));
fos.write("guest:guest\n".getBytes());
fos.close();

Map<String, Object> config = new HashMap<>();
config.put("qpid.work_dir", tmpFolder.toAbsolutePath().toString());
config.put("qpid.amqp_port", port);
config.put("qpid.home_dir", homeFolder.toAbsolutePath().toString());
String configPath = getFile("qpid.json").getAbsolutePath();

Map<String, Object> context = new HashMap<>();
context.put(SystemConfig.INITIAL_CONFIGURATION_LOCATION, configPath);
context.put(SystemConfig.INITIAL_CONFIGURATION_LOCATION, "classpath:qpid.json");
context.put(SystemConfig.TYPE, "Memory");
context.put(SystemConfig.CONTEXT, config);
return context;
}

private File getFile(String name) {
ClassLoader classLoader = getClass().getClassLoader();
return new File(classLoader.getResource(name).getFile());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
*/
package org.apache.pulsar.io.rabbitmq.sink;

import static org.mockito.Mockito.mock;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.rabbitmq.RabbitMQBrokerManager;
import org.apache.pulsar.io.rabbitmq.RabbitMQSink;
import org.awaitility.Awaitility;
Expand All @@ -46,7 +49,7 @@ public void tearDown() {
}

@Test
public void TestOpenAndWriteSink() throws Exception {
public void testOpenAndWriteSink() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put("host", "localhost");
configs.put("port", "5673");
Expand All @@ -66,7 +69,9 @@ public void TestOpenAndWriteSink() throws Exception {

// open should success
// rabbitmq service may need time to initialize
Awaitility.await().ignoreExceptions().untilAsserted(() -> sink.open(configs, null));
SinkContext sinkContext = mock(SinkContext.class);
Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(1))
.untilAsserted(() -> sink.open(configs, sinkContext));

// write should success
Record<byte[]> record = build("test-topic", "fakeKey", "fakeValue", "fakeRoutingKey");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pulsar.io.rabbitmq.source;

import static org.mockito.Mockito.mock;
import java.time.Duration;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.rabbitmq.RabbitMQBrokerManager;
import org.apache.pulsar.io.rabbitmq.RabbitMQSource;
import org.awaitility.Awaitility;
Expand All @@ -44,7 +47,7 @@ public void tearDown() {
}

@Test
public void TestOpenAndWriteSink() throws Exception {
public void testOpenAndWriteSink() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put("host", "localhost");
configs.put("port", "5672");
Expand All @@ -66,8 +69,11 @@ public void TestOpenAndWriteSink() throws Exception {

// open should success
// rabbitmq service may need time to initialize
Awaitility.await().ignoreExceptions().untilAsserted(() -> source.open(configs, null));
SourceContext sourceContext = mock(SourceContext.class);
Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(1))
.untilAsserted(() -> source.open(configs, sourceContext));
source.close();
}


}
59 changes: 45 additions & 14 deletions pulsar-io/rabbitmq/src/test/resources/qpid.json
Original file line number Diff line number Diff line change
@@ -1,25 +1,57 @@
{
"name": "EmbeddedBroker",
"name": "${broker.name}",
"modelVersion": "2.0",
"storeVersion": 1,
"authenticationproviders": [
{
"name": "noPassword",
"type": "Anonymous",
"secureOnlyMechanisms": []
},
"name": "plain",
"type": "Plain",
"secureOnlyMechanisms": [],
"users": [
{
"name": "guest",
"password": "guest",
"type": "managed"
}
]
}
],
"brokerloggers": [
{
"name": "passwordFile",
"type": "PlainPasswordFile",
"path": "${qpid.home_dir}${file.separator}etc${file.separator}passwd",
"secureOnlyMechanisms": []
"name": "console",
"type": "Console",
"brokerloginclusionrules": [
{
"name": "Root",
"type": "NameAndLevel",
"level": "WARN",
"loggerName": "ROOT"
},
{
"name": "Qpid",
"type": "NameAndLevel",
"level": "INFO",
"loggerName": "org.apache.qpid.*"
},
{
"name": "Operational",
"type": "NameAndLevel",
"level": "INFO",
"loggerName": "qpid.message.*"
},
{
"name": "Statistics",
"type": "NameAndLevel",
"level": "INFO",
"loggerName": "qpid.statistics.*"
}
]
}
],
"ports": [
{
"name": "AMQP",
"port": "${qpid.amqp_port}",
"authenticationProvider": "passwordFile",
"authenticationProvider": "plain",
"protocols": [
"AMQP_0_9_1"
]
Expand All @@ -28,10 +60,9 @@
"virtualhostnodes": [
{
"name": "default",
"type": "JSON",
"type": "Memory",
"defaultVirtualHostNode": "true",
"virtualHostInitialConfiguration": "${qpid.initial_config_virtualhost_config}",
"storeType": "DERBY"
"virtualHostInitialConfiguration": "{\"type\": \"Memory\"}"
}
]
}

0 comments on commit b80cf4e

Please sign in to comment.