From cd3c9e42f9ff18f86a2e3ac01d9572cf6cf04e11 Mon Sep 17 00:00:00 2001 From: jpforevers Date: Thu, 26 Sep 2024 10:55:21 +0800 Subject: [PATCH] Fix MqttSubscribeMessage.properties() always be empty --- .../vertx/mqtt/impl/MqttServerConnection.java | 6 +++-- .../test/server/Mqtt5ServerSubscribeTest.java | 15 ++++++++--- .../server/Mqtt5ServerUnsubscribeTest.java | 27 ++++++++++++++++--- 3 files changed, 38 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/vertx/mqtt/impl/MqttServerConnection.java b/src/main/java/io/vertx/mqtt/impl/MqttServerConnection.java index 09ad5ba3..744a30e3 100644 --- a/src/main/java/io/vertx/mqtt/impl/MqttServerConnection.java +++ b/src/main/java/io/vertx/mqtt/impl/MqttServerConnection.java @@ -128,7 +128,8 @@ void handleMessage(Object msg) { MqttSubscribeMessage mqttSubscribeMessage = MqttSubscribeMessage.create( subscribe.variableHeader().messageId(), - subscribe.payload().topicSubscriptions()); + subscribe.payload().topicSubscriptions(), + subscribe.idAndPropertiesVariableHeader().properties()); this.handleSubscribe(mqttSubscribeMessage); break; @@ -138,7 +139,8 @@ void handleMessage(Object msg) { MqttUnsubscribeMessage mqttUnsubscribeMessage = MqttUnsubscribeMessage.create( unsubscribe.variableHeader().messageId(), - unsubscribe.payload().topics()); + unsubscribe.payload().topics(), + unsubscribe.idAndPropertiesVariableHeader().properties()); this.handleUnsubscribe(mqttUnsubscribeMessage); break; diff --git a/src/test/java/io/vertx/mqtt/test/server/Mqtt5ServerSubscribeTest.java b/src/test/java/io/vertx/mqtt/test/server/Mqtt5ServerSubscribeTest.java index 2df89c8c..23958c4b 100644 --- a/src/test/java/io/vertx/mqtt/test/server/Mqtt5ServerSubscribeTest.java +++ b/src/test/java/io/vertx/mqtt/test/server/Mqtt5ServerSubscribeTest.java @@ -29,7 +29,8 @@ import io.vertx.mqtt.MqttTopicSubscription; import io.vertx.mqtt.messages.codes.MqttReasonCode; import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode; -import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.client.IMqttToken; +import org.eclipse.paho.mqttv5.client.MqttAsyncClient; import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttSubscription; @@ -61,6 +62,7 @@ public class Mqtt5ServerSubscribeTest extends MqttServerBaseTest { private static final String MQTT_TOPIC = "/my_topic"; private static final String MQTT_TOPIC_FAILURE = "/my_topic/failure"; private static final String MQTT_FAILURE_REASON = "test reason"; + private static final int SUBSCRIPTION_IDENTIFIER = 42; @Before public void before(TestContext context) { @@ -104,8 +106,9 @@ private void subscribe(TestContext context, String topic, int expectedQos, boole try { MemoryPersistence persistence = new MemoryPersistence(); - MqttClient client = new MqttClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, MQTT_SERVER_PORT), "12345", persistence); - client.connect(); + MqttAsyncClient client = new MqttAsyncClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, MQTT_SERVER_PORT), "12345", persistence); + IMqttToken token = client.connect(); + token.waitForCompletion(); MqttSubscription subscription = new MqttSubscription(topic, expectedQos); this.requestedQos = expectedQos; @@ -115,7 +118,10 @@ private void subscribe(TestContext context, String topic, int expectedQos, boole this.requestedNoLocal = noLocal; this.requestedRetainAsPublished = retainAsPublished; this.requestedRetainHandling = retainHandling; - client.subscribe(new MqttSubscription[]{ subscription }); + org.eclipse.paho.mqttv5.common.packet.MqttProperties mqttProperties = new org.eclipse.paho.mqttv5.common.packet.MqttProperties(); + mqttProperties.setSubscriptionIdentifier(SUBSCRIPTION_IDENTIFIER); + token = client.subscribe(new MqttSubscription[]{ subscription }, null, null, mqttProperties); + token.waitForCompletion(); this.async.await(); } catch (MqttException e) { @@ -173,6 +179,7 @@ protected void endpointHandler(MqttEndpoint endpoint, TestContext context) { context.assertEquals(requestedNoLocal, subscription.subscriptionOption().isNoLocal()); context.assertEquals(requestedRetainAsPublished, subscription.subscriptionOption().isRetainAsPublished()); context.assertEquals(requestedRetainHandling, subscription.subscriptionOption().retainHandling().value()); + context.assertEquals(SUBSCRIPTION_IDENTIFIER, subscribe.properties().getProperty(MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value()).value()); List reasonCodes = new ArrayList<>(); MqttProperties subackProperties = new MqttProperties(); diff --git a/src/test/java/io/vertx/mqtt/test/server/Mqtt5ServerUnsubscribeTest.java b/src/test/java/io/vertx/mqtt/test/server/Mqtt5ServerUnsubscribeTest.java index 9a626cc8..4c197daf 100644 --- a/src/test/java/io/vertx/mqtt/test/server/Mqtt5ServerUnsubscribeTest.java +++ b/src/test/java/io/vertx/mqtt/test/server/Mqtt5ServerUnsubscribeTest.java @@ -26,10 +26,13 @@ import io.vertx.mqtt.MqttEndpoint; import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode; import io.vertx.mqtt.messages.codes.MqttUnsubAckReasonCode; +import org.eclipse.paho.mqttv5.client.IMqttToken; +import org.eclipse.paho.mqttv5.client.MqttAsyncClient; import org.eclipse.paho.mqttv5.client.MqttClient; import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode; +import org.eclipse.paho.mqttv5.common.packet.UserProperty; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -53,6 +56,9 @@ public class Mqtt5ServerUnsubscribeTest extends MqttServerBaseTest { private static final String MQTT_REASON_STRING = "because I've said so"; + private static final String USER_PROPERTY_KEY = "key"; + private static final String USER_PROPERTY_VALUE = "value"; + @Before public void before(TestContext context) { @@ -70,15 +76,22 @@ public void unsubscribe(TestContext context) { try { MemoryPersistence persistence = new MemoryPersistence(); - MqttClient client = new MqttClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, MQTT_SERVER_PORT), "12345", persistence); - client.connect(); + MqttAsyncClient client = new MqttAsyncClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, MQTT_SERVER_PORT), "12345", persistence); + IMqttToken token = client.connect(); + token.waitForCompletion(); expectedReasonCodes = Collections.singletonList(MqttUnsubAckReasonCode.SUCCESS); String[] topics = new String[]{MQTT_TOPIC}; int[] qos = new int[]{0}; - client.subscribe(topics, qos); + token = client.subscribe(topics, qos); + token.waitForCompletion(); - client.unsubscribe(topics); + org.eclipse.paho.mqttv5.common.packet.MqttProperties mqttProperties = new org.eclipse.paho.mqttv5.common.packet.MqttProperties(); + List userProperties = new ArrayList<>(); + userProperties.add(new UserProperty(USER_PROPERTY_KEY, USER_PROPERTY_VALUE)); + mqttProperties.setUserProperties(userProperties); + token = client.unsubscribe(topics, null, null, mqttProperties); + token.waitForCompletion(); context.assertTrue(true); @@ -121,6 +134,12 @@ protected void endpointHandler(MqttEndpoint endpoint, TestContext context) { }).unsubscribeHandler(unsubscribe -> { + if(expectedReasonCodes.get(0) == MqttUnsubAckReasonCode.SUCCESS) { + MqttProperties.UserProperties userProperties = (MqttProperties.UserProperties) unsubscribe.properties().getProperty(MqttProperties.MqttPropertyType.USER_PROPERTY.value()); + context.assertEquals(userProperties.value().get(0).key, USER_PROPERTY_KEY); + context.assertEquals(userProperties.value().get(0).value, USER_PROPERTY_VALUE); + } + MqttProperties props = new MqttProperties(); if(expectedReasonCodes.get(0) != MqttUnsubAckReasonCode.SUCCESS) { props.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.REASON_STRING.value(), MQTT_REASON_STRING));