Skip to content

Commit

Permalink
Fix MqttSubscribeMessage.properties() always be empty
Browse files Browse the repository at this point in the history
  • Loading branch information
jpforevers committed Sep 26, 2024
1 parent 559d25a commit cd3c9e4
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 10 deletions.
6 changes: 4 additions & 2 deletions src/main/java/io/vertx/mqtt/impl/MqttServerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ void handleMessage(Object msg) {

MqttSubscribeMessage mqttSubscribeMessage = MqttSubscribeMessage.create(
subscribe.variableHeader().messageId(),
subscribe.payload().topicSubscriptions());
subscribe.payload().topicSubscriptions(),
subscribe.idAndPropertiesVariableHeader().properties());
this.handleSubscribe(mqttSubscribeMessage);
break;

Expand All @@ -138,7 +139,8 @@ void handleMessage(Object msg) {

MqttUnsubscribeMessage mqttUnsubscribeMessage = MqttUnsubscribeMessage.create(
unsubscribe.variableHeader().messageId(),
unsubscribe.payload().topics());
unsubscribe.payload().topics(),
unsubscribe.idAndPropertiesVariableHeader().properties());
this.handleUnsubscribe(mqttUnsubscribeMessage);
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
import io.vertx.mqtt.MqttTopicSubscription;
import io.vertx.mqtt.messages.codes.MqttReasonCode;
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
Expand Down Expand Up @@ -61,6 +62,7 @@ public class Mqtt5ServerSubscribeTest extends MqttServerBaseTest {
private static final String MQTT_TOPIC = "/my_topic";
private static final String MQTT_TOPIC_FAILURE = "/my_topic/failure";
private static final String MQTT_FAILURE_REASON = "test reason";
private static final int SUBSCRIPTION_IDENTIFIER = 42;

@Before
public void before(TestContext context) {
Expand Down Expand Up @@ -104,8 +106,9 @@ private void subscribe(TestContext context, String topic, int expectedQos, boole

try {
MemoryPersistence persistence = new MemoryPersistence();
MqttClient client = new MqttClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, MQTT_SERVER_PORT), "12345", persistence);
client.connect();
MqttAsyncClient client = new MqttAsyncClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, MQTT_SERVER_PORT), "12345", persistence);
IMqttToken token = client.connect();
token.waitForCompletion();

MqttSubscription subscription = new MqttSubscription(topic, expectedQos);
this.requestedQos = expectedQos;
Expand All @@ -115,7 +118,10 @@ private void subscribe(TestContext context, String topic, int expectedQos, boole
this.requestedNoLocal = noLocal;
this.requestedRetainAsPublished = retainAsPublished;
this.requestedRetainHandling = retainHandling;
client.subscribe(new MqttSubscription[]{ subscription });
org.eclipse.paho.mqttv5.common.packet.MqttProperties mqttProperties = new org.eclipse.paho.mqttv5.common.packet.MqttProperties();
mqttProperties.setSubscriptionIdentifier(SUBSCRIPTION_IDENTIFIER);
token = client.subscribe(new MqttSubscription[]{ subscription }, null, null, mqttProperties);
token.waitForCompletion();

this.async.await();
} catch (MqttException e) {
Expand Down Expand Up @@ -173,6 +179,7 @@ protected void endpointHandler(MqttEndpoint endpoint, TestContext context) {
context.assertEquals(requestedNoLocal, subscription.subscriptionOption().isNoLocal());
context.assertEquals(requestedRetainAsPublished, subscription.subscriptionOption().isRetainAsPublished());
context.assertEquals(requestedRetainHandling, subscription.subscriptionOption().retainHandling().value());
context.assertEquals(SUBSCRIPTION_IDENTIFIER, subscribe.properties().getProperty(MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value()).value());

List<MqttSubAckReasonCode> reasonCodes = new ArrayList<>();
MqttProperties subackProperties = new MqttProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;
import io.vertx.mqtt.messages.codes.MqttUnsubAckReasonCode;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -53,6 +56,9 @@ public class Mqtt5ServerUnsubscribeTest extends MqttServerBaseTest {

private static final String MQTT_REASON_STRING = "because I've said so";

private static final String USER_PROPERTY_KEY = "key";
private static final String USER_PROPERTY_VALUE = "value";

@Before
public void before(TestContext context) {

Expand All @@ -70,15 +76,22 @@ public void unsubscribe(TestContext context) {

try {
MemoryPersistence persistence = new MemoryPersistence();
MqttClient client = new MqttClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, MQTT_SERVER_PORT), "12345", persistence);
client.connect();
MqttAsyncClient client = new MqttAsyncClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, MQTT_SERVER_PORT), "12345", persistence);
IMqttToken token = client.connect();
token.waitForCompletion();

expectedReasonCodes = Collections.singletonList(MqttUnsubAckReasonCode.SUCCESS);
String[] topics = new String[]{MQTT_TOPIC};
int[] qos = new int[]{0};
client.subscribe(topics, qos);
token = client.subscribe(topics, qos);
token.waitForCompletion();

client.unsubscribe(topics);
org.eclipse.paho.mqttv5.common.packet.MqttProperties mqttProperties = new org.eclipse.paho.mqttv5.common.packet.MqttProperties();
List<UserProperty> userProperties = new ArrayList<>();
userProperties.add(new UserProperty(USER_PROPERTY_KEY, USER_PROPERTY_VALUE));
mqttProperties.setUserProperties(userProperties);
token = client.unsubscribe(topics, null, null, mqttProperties);
token.waitForCompletion();

context.assertTrue(true);

Expand Down Expand Up @@ -121,6 +134,12 @@ protected void endpointHandler(MqttEndpoint endpoint, TestContext context) {

}).unsubscribeHandler(unsubscribe -> {

if(expectedReasonCodes.get(0) == MqttUnsubAckReasonCode.SUCCESS) {
MqttProperties.UserProperties userProperties = (MqttProperties.UserProperties) unsubscribe.properties().getProperty(MqttProperties.MqttPropertyType.USER_PROPERTY.value());
context.assertEquals(userProperties.value().get(0).key, USER_PROPERTY_KEY);
context.assertEquals(userProperties.value().get(0).value, USER_PROPERTY_VALUE);
}

MqttProperties props = new MqttProperties();
if(expectedReasonCodes.get(0) != MqttUnsubAckReasonCode.SUCCESS) {
props.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.REASON_STRING.value(), MQTT_REASON_STRING));
Expand Down

0 comments on commit cd3c9e4

Please sign in to comment.