diff --git a/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java b/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java index f5a0d88e..0cae724e 100644 --- a/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java +++ b/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java @@ -45,12 +45,11 @@ import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.VertxInternal; import io.vertx.core.internal.PromiseInternal; -import io.vertx.core.net.impl.NetClientBuilder; +import io.vertx.core.internal.buffer.BufferInternal; import io.vertx.core.internal.net.NetSocketInternal; import io.vertx.core.internal.logging.Logger; import io.vertx.core.internal.logging.LoggerFactory; import io.vertx.core.net.NetClient; -import io.vertx.core.net.impl.VertxHandler; import io.vertx.mqtt.MqttClient; import io.vertx.mqtt.MqttClientOptions; import io.vertx.mqtt.MqttConnectionException; @@ -177,7 +176,7 @@ public Future connect(int port, String host, String serverNa private Future doConnect(int port, String host, String serverName) { ContextInternal ctx = vertx.getOrCreateContext(); - NetClient client = new NetClientBuilder(vertx, options).build(); + NetClient client = vertx.createNetClient(options); PromiseInternal connectPromise = ctx.promise(); PromiseInternal disconnectPromise = ctx.promise(); @@ -861,7 +860,7 @@ private void handleMessage(ChannelHandlerContext chctx, Object msg) { case PUBLISH: io.netty.handler.codec.mqtt.MqttPublishMessage publish = (io.netty.handler.codec.mqtt.MqttPublishMessage) mqttMessage; - ByteBuf newBuf = VertxHandler.safeBuffer(publish.payload()); + Buffer newBuf = BufferInternal.safeBuffer(publish.payload()); MqttPublishMessage mqttPublishMessage = MqttPublishMessage.create( publish.variableHeader().packetId(), diff --git a/src/main/java/io/vertx/mqtt/impl/MqttServerConnection.java b/src/main/java/io/vertx/mqtt/impl/MqttServerConnection.java index bfc89c6b..48380faf 100644 --- a/src/main/java/io/vertx/mqtt/impl/MqttServerConnection.java +++ b/src/main/java/io/vertx/mqtt/impl/MqttServerConnection.java @@ -16,7 +16,6 @@ package io.vertx.mqtt.impl; -import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderResult; @@ -30,11 +29,10 @@ import io.vertx.core.MultiMap; import io.vertx.core.VertxException; import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.impl.headers.HeadersAdaptor; +import io.vertx.core.internal.buffer.BufferInternal; import io.vertx.core.internal.logging.Logger; import io.vertx.core.internal.logging.LoggerFactory; import io.vertx.core.internal.net.NetSocketInternal; -import io.vertx.core.net.impl.VertxHandler; import io.vertx.mqtt.MqttAuth; import io.vertx.mqtt.MqttEndpoint; import io.vertx.mqtt.MqttServerOptions; @@ -137,7 +135,7 @@ void handleMessage(Object msg) { case PUBLISH: io.netty.handler.codec.mqtt.MqttPublishMessage publish = (io.netty.handler.codec.mqtt.MqttPublishMessage) mqttMessage; - ByteBuf newBuf = VertxHandler.safeBuffer(publish.payload()); + Buffer newBuf = BufferInternal.safeBuffer(publish.payload()); MqttPublishMessage mqttPublishMessage = MqttPublishMessage.create( publish.variableHeader().packetId(), @@ -441,7 +439,7 @@ void handleDisconnect(MqttDisconnectReasonCode code, MqttProperties properties) } void handleHandshakeComplete(WebSocketServerProtocolHandler.HandshakeComplete handshake) { - httpHeaders = new HeadersAdaptor(handshake.requestHeaders()); + httpHeaders = io.vertx.core.internal.http.HttpHeadersInternal.headers(handshake.requestHeaders()); httpRequestUri = handshake.requestUri(); } diff --git a/src/main/java/io/vertx/mqtt/messages/MqttPublishMessage.java b/src/main/java/io/vertx/mqtt/messages/MqttPublishMessage.java index 429c3c2d..f548c1a8 100644 --- a/src/main/java/io/vertx/mqtt/messages/MqttPublishMessage.java +++ b/src/main/java/io/vertx/mqtt/messages/MqttPublishMessage.java @@ -16,7 +16,6 @@ package io.vertx.mqtt.messages; -import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.codegen.annotations.CacheReturn; @@ -43,11 +42,11 @@ public interface MqttPublishMessage extends MqttMessage { * @return Vert.x publish message */ @GenIgnore - static MqttPublishMessage create(int messageId, MqttQoS qosLevel, boolean isDup, boolean isRetain, String topicName, ByteBuf payload) { + static MqttPublishMessage create(int messageId, MqttQoS qosLevel, boolean isDup, boolean isRetain, String topicName, Buffer payload) { return new MqttPublishMessageImpl(messageId, qosLevel, isDup, isRetain, topicName, payload, MqttProperties.NO_PROPERTIES); } - + /** * Create a concrete instance of a Vert.x publish message * @@ -61,7 +60,7 @@ static MqttPublishMessage create(int messageId, MqttQoS qosLevel, boolean isDup, * @return Vert.x publish message */ @GenIgnore(GenIgnore.PERMITTED_TYPE) - static MqttPublishMessage create(int messageId, MqttQoS qosLevel, boolean isDup, boolean isRetain, String topicName, ByteBuf payload, MqttProperties properties) { + static MqttPublishMessage create(int messageId, MqttQoS qosLevel, boolean isDup, boolean isRetain, String topicName, Buffer payload, MqttProperties properties) { return new MqttPublishMessageImpl(messageId, qosLevel, isDup, isRetain, topicName, payload, properties); } @@ -95,7 +94,7 @@ static MqttPublishMessage create(int messageId, MqttQoS qosLevel, boolean isDup, */ @CacheReturn Buffer payload(); - + /** * Send the PUBACK/PUBCOMP to the broker. Use this method only if autoAck option is set to false. * @throws IllegalStateException if you are ack a message (with QoS > 0) when the Auto Ack is true diff --git a/src/main/java/io/vertx/mqtt/messages/impl/MqttPublishMessageImpl.java b/src/main/java/io/vertx/mqtt/messages/impl/MqttPublishMessageImpl.java index d3f6e9b0..15de631a 100644 --- a/src/main/java/io/vertx/mqtt/messages/impl/MqttPublishMessageImpl.java +++ b/src/main/java/io/vertx/mqtt/messages/impl/MqttPublishMessageImpl.java @@ -16,11 +16,9 @@ package io.vertx.mqtt.messages.impl; -import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.core.buffer.Buffer; -import io.vertx.core.internal.buffer.BufferInternal; import io.vertx.mqtt.messages.MqttPublishMessage; /** @@ -50,13 +48,13 @@ public class MqttPublishMessageImpl implements MqttPublishMessage { * @param payload payload message * @param properties MQTT properties */ - public MqttPublishMessageImpl(int messageId, MqttQoS qosLevel, boolean isDup, boolean isRetain, String topicName, ByteBuf payload, MqttProperties properties) { + public MqttPublishMessageImpl(int messageId, MqttQoS qosLevel, boolean isDup, boolean isRetain, String topicName, Buffer payload, MqttProperties properties) { this.messageId = messageId; this.qosLevel = qosLevel; this.isDup = isDup; this.isRetain = isRetain; this.topicName = topicName; - this.payload = BufferInternal.buffer(payload); + this.payload = payload; this.properties = properties; } diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java new file mode 100644 index 00000000..ee8ef6f3 --- /dev/null +++ b/src/main/java/module-info.java @@ -0,0 +1,37 @@ +/* + * Copyright 2024 Red Hat Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +module io.vertx.mqtt { + + requires static io.vertx.docgen; + requires static io.vertx.codegen.api; + requires static io.vertx.codegen.json; + + requires io.netty.common; + requires io.netty.buffer; + requires io.netty.handler; + requires io.netty.transport; + requires io.netty.codec; + requires io.netty.codec.mqtt; + requires io.netty.codec.compression; + requires io.netty.codec.http; + requires io.vertx.core; + requires io.vertx.core.logging; + + exports io.vertx.mqtt; + exports io.vertx.mqtt.messages; + exports io.vertx.mqtt.messages.codes; + +} diff --git a/src/main/resources/META-INF/MANIFEST.MF b/src/main/resources/META-INF/MANIFEST.MF index 75f999c6..e69de29b 100644 --- a/src/main/resources/META-INF/MANIFEST.MF +++ b/src/main/resources/META-INF/MANIFEST.MF @@ -1,2 +0,0 @@ -Automatic-Module-Name: io.vertx.mqtt - diff --git a/src/test/java/io/vertx/mqtt/test/client/MqttClientSslTest.java b/src/test/java/io/vertx/mqtt/test/client/MqttClientSslTest.java index 35e40eee..184182d8 100644 --- a/src/test/java/io/vertx/mqtt/test/client/MqttClientSslTest.java +++ b/src/test/java/io/vertx/mqtt/test/client/MqttClientSslTest.java @@ -20,12 +20,15 @@ import io.vertx.core.internal.logging.LoggerFactory; import io.vertx.core.net.JksOptions; import io.vertx.core.net.PemKeyCertOptions; +import io.vertx.core.net.SelfSignedCertificate; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import io.vertx.mqtt.MqttClient; import io.vertx.mqtt.MqttClientOptions; import io.vertx.mqtt.MqttServer; import io.vertx.mqtt.MqttServerOptions; +import io.vertx.test.tls.Cert; +import io.vertx.test.tls.Trust; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -41,38 +44,35 @@ public class MqttClientSslTest { private static final int MQTT_SERVER_TLS_PORT = 8883; private static final String MQTT_SERVER_HOST = "localhost"; - Vertx vertx = Vertx.vertx(); + Vertx vertx; MqttServer server; - TestContext context; + MqttClient client; @Test - public void clientSslTrustAllTest(TestContext context) { + public void clientSslTrustAllTest() { MqttClientOptions clientOptions = new MqttClientOptions() .setSsl(true) .setHostnameVerificationAlgorithm("") .setTrustAll(true); - MqttClient client = MqttClient.create(vertx, clientOptions); - client.exceptionHandler(t -> context.assertTrue(false)); + client = MqttClient.create(vertx, clientOptions); - this.context = context; client.connect(MQTT_SERVER_TLS_PORT, MQTT_SERVER_HOST) .compose(msg -> client.disconnect()) - .onComplete(context.asyncAssertSuccess()); + .await(); } @Test public void clientSslClientTruststoreTest(TestContext context) { - this.context = context; - JksOptions jksOptions = new JksOptions().setPath("tls/client-truststore.jks"); + JksOptions jksOptions = Trust.SERVER_JKS.get(); MqttClientOptions clientOptions = new MqttClientOptions() .setSsl(true) .setHostnameVerificationAlgorithm("") .setTrustOptions(jksOptions); - MqttClient client = MqttClient.create(vertx, clientOptions); + client = MqttClient.create(vertx, clientOptions); client.exceptionHandler(t -> context.assertTrue(false)); client.connect(MQTT_SERVER_TLS_PORT, MQTT_SERVER_HOST) @@ -81,10 +81,10 @@ public void clientSslClientTruststoreTest(TestContext context) { } @Before - public void before(TestContext ctx) { - PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions() - .setKeyPath("tls/server-key.pem") - .setCertPath("tls/server-cert.pem"); + public void before() { + this.vertx = Vertx.vertx(); + + PemKeyCertOptions pemKeyCertOptions = Cert.SERVER_PEM.get(); MqttServerOptions serverOptions = new MqttServerOptions() .setPort(MQTT_SERVER_TLS_PORT) @@ -93,12 +93,11 @@ public void before(TestContext ctx) { .setSsl(true); server = MqttServer.create(vertx, serverOptions); - server.exceptionHandler(t -> context.assertTrue(false)); server.endpointHandler(e -> { log.info("Client connected"); e.disconnectHandler(d -> log.info("Client disconnected")); e.accept(false); - }).listen().onComplete(ctx.asyncAssertSuccess()); + }).listen().await(); } @After diff --git a/src/test/java/io/vertx/mqtt/test/server/MqttServerBaseTest.java b/src/test/java/io/vertx/mqtt/test/server/MqttServerBaseTest.java index 689772b4..f263b466 100644 --- a/src/test/java/io/vertx/mqtt/test/server/MqttServerBaseTest.java +++ b/src/test/java/io/vertx/mqtt/test/server/MqttServerBaseTest.java @@ -25,6 +25,8 @@ import io.vertx.mqtt.MqttEndpoint; import io.vertx.mqtt.MqttServer; import io.vertx.mqtt.MqttServerOptions; +import io.vertx.test.tls.Cert; +import io.vertx.test.tls.Trust; import org.junit.runner.RunWith; import javax.net.ssl.*; @@ -112,7 +114,7 @@ protected void endpointHandler(MqttEndpoint endpoint, TestContext context) { */ protected SSLSocketFactory getSocketFactory(String trustStoreName, String keyStoreName) throws Exception { - InputStream clientTrustStoreInput = this.getClass().getResourceAsStream(trustStoreName); + InputStream clientTrustStoreInput = Trust.class.getResourceAsStream(trustStoreName); KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); trustStore.load(clientTrustStoreInput, "wibble".toCharArray()); @@ -123,7 +125,7 @@ protected SSLSocketFactory getSocketFactory(String trustStoreName, String keySto KeyManager[] keyManagers = null; if (keyStoreName != null) { - InputStream clientKeyStoreInput = this.getClass().getResourceAsStream(keyStoreName); + InputStream clientKeyStoreInput = Cert.class.getResourceAsStream(keyStoreName); KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); keyStore.load(clientKeyStoreInput, "wibble".toCharArray()); diff --git a/src/test/java/io/vertx/mqtt/test/server/MqttServerClientCertSslTest.java b/src/test/java/io/vertx/mqtt/test/server/MqttServerClientCertSslTest.java index 70f10830..afce3692 100644 --- a/src/test/java/io/vertx/mqtt/test/server/MqttServerClientCertSslTest.java +++ b/src/test/java/io/vertx/mqtt/test/server/MqttServerClientCertSslTest.java @@ -19,6 +19,8 @@ import io.vertx.core.http.ClientAuth; import io.vertx.core.internal.logging.Logger; import io.vertx.core.internal.logging.LoggerFactory; +import io.vertx.core.net.PemKeyCertOptions; +import io.vertx.core.net.PemTrustOptions; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; diff --git a/src/test/java/module-info.java b/src/test/java/module-info.java new file mode 100644 index 00000000..fa4ec549 --- /dev/null +++ b/src/test/java/module-info.java @@ -0,0 +1,33 @@ +/* + * Copyright 2024 Red Hat Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +open module io.vertx.tests.mqtt { + requires io.netty.common; + requires io.netty.buffer; + requires io.netty.handler; + requires io.netty.transport; + requires io.netty.codec.mqtt; + requires io.netty.codec.compression; + requires io.netty.codec; + requires io.vertx.core.logging; + requires io.vertx.core; + requires io.vertx.mqtt; + requires io.vertx.testing.unit; + requires io.vertx.core.tests; + requires junit; + requires org.eclipse.paho.client.mqttv3; + requires org.eclipse.paho.mqttv5.client; + requires testcontainers; +}