Skip to content

Commit

Permalink
JPMS support
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Sep 18, 2024
1 parent 7333d56 commit 27ce1e2
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 38 deletions.
7 changes: 3 additions & 4 deletions src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.net.impl.NetClientBuilder;
import io.vertx.core.internal.buffer.BufferInternal;
import io.vertx.core.internal.net.NetSocketInternal;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttConnectionException;
Expand Down Expand Up @@ -177,7 +176,7 @@ public Future<MqttConnAckMessage> connect(int port, String host, String serverNa
private Future<MqttConnAckMessage> doConnect(int port, String host, String serverName) {

ContextInternal ctx = vertx.getOrCreateContext();
NetClient client = new NetClientBuilder(vertx, options).build();
NetClient client = vertx.createNetClient(options);
PromiseInternal<MqttConnAckMessage> connectPromise = ctx.promise();
PromiseInternal<Void> disconnectPromise = ctx.promise();

Expand Down Expand Up @@ -861,7 +860,7 @@ private void handleMessage(ChannelHandlerContext chctx, Object msg) {
case PUBLISH:

io.netty.handler.codec.mqtt.MqttPublishMessage publish = (io.netty.handler.codec.mqtt.MqttPublishMessage) mqttMessage;
ByteBuf newBuf = VertxHandler.safeBuffer(publish.payload());
Buffer newBuf = BufferInternal.safeBuffer(publish.payload());

MqttPublishMessage mqttPublishMessage = MqttPublishMessage.create(
publish.variableHeader().packetId(),
Expand Down
8 changes: 3 additions & 5 deletions src/main/java/io/vertx/mqtt/impl/MqttServerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.vertx.mqtt.impl;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderResult;
Expand All @@ -30,11 +29,10 @@
import io.vertx.core.MultiMap;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.impl.headers.HeadersAdaptor;
import io.vertx.core.internal.buffer.BufferInternal;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.internal.net.NetSocketInternal;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.mqtt.MqttAuth;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServerOptions;
Expand Down Expand Up @@ -137,7 +135,7 @@ void handleMessage(Object msg) {
case PUBLISH:

io.netty.handler.codec.mqtt.MqttPublishMessage publish = (io.netty.handler.codec.mqtt.MqttPublishMessage) mqttMessage;
ByteBuf newBuf = VertxHandler.safeBuffer(publish.payload());
Buffer newBuf = BufferInternal.safeBuffer(publish.payload());

MqttPublishMessage mqttPublishMessage = MqttPublishMessage.create(
publish.variableHeader().packetId(),
Expand Down Expand Up @@ -441,7 +439,7 @@ void handleDisconnect(MqttDisconnectReasonCode code, MqttProperties properties)
}

void handleHandshakeComplete(WebSocketServerProtocolHandler.HandshakeComplete handshake) {
httpHeaders = new HeadersAdaptor(handshake.requestHeaders());
httpHeaders = io.vertx.core.internal.http.HttpHeadersInternal.headers(handshake.requestHeaders());
httpRequestUri = handshake.requestUri();
}

Expand Down
9 changes: 4 additions & 5 deletions src/main/java/io/vertx/mqtt/messages/MqttPublishMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.vertx.mqtt.messages;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.codegen.annotations.CacheReturn;
Expand All @@ -43,11 +42,11 @@ public interface MqttPublishMessage extends MqttMessage {
* @return Vert.x publish message
*/
@GenIgnore
static MqttPublishMessage create(int messageId, MqttQoS qosLevel, boolean isDup, boolean isRetain, String topicName, ByteBuf payload) {
static MqttPublishMessage create(int messageId, MqttQoS qosLevel, boolean isDup, boolean isRetain, String topicName, Buffer payload) {

return new MqttPublishMessageImpl(messageId, qosLevel, isDup, isRetain, topicName, payload, MqttProperties.NO_PROPERTIES);
}

/**
* Create a concrete instance of a Vert.x publish message
*
Expand All @@ -61,7 +60,7 @@ static MqttPublishMessage create(int messageId, MqttQoS qosLevel, boolean isDup,
* @return Vert.x publish message
*/
@GenIgnore(GenIgnore.PERMITTED_TYPE)
static MqttPublishMessage create(int messageId, MqttQoS qosLevel, boolean isDup, boolean isRetain, String topicName, ByteBuf payload, MqttProperties properties) {
static MqttPublishMessage create(int messageId, MqttQoS qosLevel, boolean isDup, boolean isRetain, String topicName, Buffer payload, MqttProperties properties) {

return new MqttPublishMessageImpl(messageId, qosLevel, isDup, isRetain, topicName, payload, properties);
}
Expand Down Expand Up @@ -95,7 +94,7 @@ static MqttPublishMessage create(int messageId, MqttQoS qosLevel, boolean isDup,
*/
@CacheReturn
Buffer payload();

/**
* Send the PUBACK/PUBCOMP to the broker. Use this method only if autoAck option is set to false.
* @throws IllegalStateException if you are ack a message (with QoS > 0) when the Auto Ack is true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@

package io.vertx.mqtt.messages.impl;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.internal.buffer.BufferInternal;
import io.vertx.mqtt.messages.MqttPublishMessage;

/**
Expand Down Expand Up @@ -50,13 +48,13 @@ public class MqttPublishMessageImpl implements MqttPublishMessage {
* @param payload payload message
* @param properties MQTT properties
*/
public MqttPublishMessageImpl(int messageId, MqttQoS qosLevel, boolean isDup, boolean isRetain, String topicName, ByteBuf payload, MqttProperties properties) {
public MqttPublishMessageImpl(int messageId, MqttQoS qosLevel, boolean isDup, boolean isRetain, String topicName, Buffer payload, MqttProperties properties) {
this.messageId = messageId;
this.qosLevel = qosLevel;
this.isDup = isDup;
this.isRetain = isRetain;
this.topicName = topicName;
this.payload = BufferInternal.buffer(payload);
this.payload = payload;
this.properties = properties;
}

Expand Down
37 changes: 37 additions & 0 deletions src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2024 Red Hat Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
module io.vertx.mqtt {

requires static io.vertx.docgen;
requires static io.vertx.codegen.api;
requires static io.vertx.codegen.json;

requires io.netty.common;
requires io.netty.buffer;
requires io.netty.handler;
requires io.netty.transport;
requires io.netty.codec;
requires io.netty.codec.mqtt;
requires io.netty.codec.compression;
requires io.netty.codec.http;
requires io.vertx.core;
requires io.vertx.core.logging;

exports io.vertx.mqtt;
exports io.vertx.mqtt.messages;
exports io.vertx.mqtt.messages.codes;

}
2 changes: 0 additions & 2 deletions src/main/resources/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
@@ -1,2 +0,0 @@
Automatic-Module-Name: io.vertx.mqtt

31 changes: 15 additions & 16 deletions src/test/java/io/vertx/mqtt/test/client/MqttClientSslTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.net.JksOptions;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.core.net.SelfSignedCertificate;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.test.tls.Cert;
import io.vertx.test.tls.Trust;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -41,38 +44,35 @@ public class MqttClientSslTest {
private static final int MQTT_SERVER_TLS_PORT = 8883;
private static final String MQTT_SERVER_HOST = "localhost";

Vertx vertx = Vertx.vertx();
Vertx vertx;
MqttServer server;
TestContext context;
MqttClient client;

@Test
public void clientSslTrustAllTest(TestContext context) {
public void clientSslTrustAllTest() {
MqttClientOptions clientOptions = new MqttClientOptions()
.setSsl(true)
.setHostnameVerificationAlgorithm("")
.setTrustAll(true);

MqttClient client = MqttClient.create(vertx, clientOptions);
client.exceptionHandler(t -> context.assertTrue(false));
client = MqttClient.create(vertx, clientOptions);

this.context = context;
client.connect(MQTT_SERVER_TLS_PORT, MQTT_SERVER_HOST)
.compose(msg -> client.disconnect())
.onComplete(context.asyncAssertSuccess());
.await();
}

@Test
public void clientSslClientTruststoreTest(TestContext context) {

this.context = context;
JksOptions jksOptions = new JksOptions().setPath("tls/client-truststore.jks");
JksOptions jksOptions = Trust.SERVER_JKS.get();

MqttClientOptions clientOptions = new MqttClientOptions()
.setSsl(true)
.setHostnameVerificationAlgorithm("")
.setTrustOptions(jksOptions);

MqttClient client = MqttClient.create(vertx, clientOptions);
client = MqttClient.create(vertx, clientOptions);
client.exceptionHandler(t -> context.assertTrue(false));

client.connect(MQTT_SERVER_TLS_PORT, MQTT_SERVER_HOST)
Expand All @@ -81,10 +81,10 @@ public void clientSslClientTruststoreTest(TestContext context) {
}

@Before
public void before(TestContext ctx) {
PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions()
.setKeyPath("tls/server-key.pem")
.setCertPath("tls/server-cert.pem");
public void before() {
this.vertx = Vertx.vertx();

PemKeyCertOptions pemKeyCertOptions = Cert.SERVER_PEM.get();

MqttServerOptions serverOptions = new MqttServerOptions()
.setPort(MQTT_SERVER_TLS_PORT)
Expand All @@ -93,12 +93,11 @@ public void before(TestContext ctx) {
.setSsl(true);

server = MqttServer.create(vertx, serverOptions);
server.exceptionHandler(t -> context.assertTrue(false));
server.endpointHandler(e -> {
log.info("Client connected");
e.disconnectHandler(d -> log.info("Client disconnected"));
e.accept(false);
}).listen().onComplete(ctx.asyncAssertSuccess());
}).listen().await();
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.test.tls.Cert;
import io.vertx.test.tls.Trust;
import org.junit.runner.RunWith;

import javax.net.ssl.*;
Expand Down Expand Up @@ -112,7 +114,7 @@ protected void endpointHandler(MqttEndpoint endpoint, TestContext context) {
*/
protected SSLSocketFactory getSocketFactory(String trustStoreName, String keyStoreName) throws Exception {

InputStream clientTrustStoreInput = this.getClass().getResourceAsStream(trustStoreName);
InputStream clientTrustStoreInput = Trust.class.getResourceAsStream(trustStoreName);

KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
trustStore.load(clientTrustStoreInput, "wibble".toCharArray());
Expand All @@ -123,7 +125,7 @@ protected SSLSocketFactory getSocketFactory(String trustStoreName, String keySto
KeyManager[] keyManagers = null;

if (keyStoreName != null) {
InputStream clientKeyStoreInput = this.getClass().getResourceAsStream(keyStoreName);
InputStream clientKeyStoreInput = Cert.class.getResourceAsStream(keyStoreName);

KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(clientKeyStoreInput, "wibble".toCharArray());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.vertx.core.http.ClientAuth;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.core.net.PemTrustOptions;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
Expand Down
33 changes: 33 additions & 0 deletions src/test/java/module-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2024 Red Hat Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
open module io.vertx.tests.mqtt {
requires io.netty.common;
requires io.netty.buffer;
requires io.netty.handler;
requires io.netty.transport;
requires io.netty.codec.mqtt;
requires io.netty.codec.compression;
requires io.netty.codec;
requires io.vertx.core.logging;
requires io.vertx.core;
requires io.vertx.mqtt;
requires io.vertx.testing.unit;
requires io.vertx.core.tests;
requires junit;
requires org.eclipse.paho.client.mqttv3;
requires org.eclipse.paho.mqttv5.client;
requires testcontainers;
}

0 comments on commit 27ce1e2

Please sign in to comment.