Skip to content

Commit

Permalink
Added NL (no local) subscription option support.
Browse files Browse the repository at this point in the history
  • Loading branch information
redboltz committed Oct 14, 2020
1 parent c4d5ae5 commit 894bc75
Show file tree
Hide file tree
Showing 2 changed files with 239 additions and 32 deletions.
196 changes: 196 additions & 0 deletions test/multi_sub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -629,4 +629,200 @@ BOOST_AUTO_TEST_CASE( multi_client_qos1 ) {
th.join();
}

BOOST_AUTO_TEST_CASE( multi_client_nl ) {
boost::asio::io_context iocb;
test_broker b(iocb);
MQTT_NS::optional<test_server_no_tls> s;
std::promise<void> p;
auto f = p.get_future();
std::thread th(
[&] {
s.emplace(iocb, b);
p.set_value();
iocb.run();
}
);
f.wait();
auto finish =
[&] {
as::post(
iocb,
[&] {
s->close();
}
);
};

int close_count = 0;
auto server_close = [&] {
if (++close_count == 2) finish();
};

boost::asio::io_context ioc;

auto c1 = MQTT_NS::make_client(ioc, broker_url, broker_notls_port, MQTT_NS::protocol_version::v5);
c1->set_clean_start(true);
c1->set_client_id("cid1");

auto c2 = MQTT_NS::make_client(ioc, broker_url, broker_notls_port, MQTT_NS::protocol_version::v5);
c2->set_clean_start(true);
c2->set_client_id("cid2");

using packet_id_t = typename std::remove_reference_t<decltype(*c1)>::packet_id_t;

checker chk = {
// connect
cont("h_connack_1"),
cont("h_connack_2"),

// subscribe topic1 QoS0 NL:yes
cont("h_suback_1"),
// subscribe topic1 QoS0 NL:no
cont("h_suback_2"),

// c1 publish topic1 QoS0
cont("h_publish_2_1"),
// c2 publish topic1 QoS0
cont("h_publish_1"),
cont("h_publish_2_2"),

// disconnect
cont("h_close_1"),
cont("h_close_2"),
};

c1->set_v5_connack_handler(
[&]
(bool sp, MQTT_NS::v5::connect_reason_code connack_return_code, MQTT_NS::v5::properties /*props*/) {
MQTT_CHK("h_connack_1");
BOOST_TEST(sp == false);
BOOST_TEST(connack_return_code == MQTT_NS::v5::connect_reason_code::success);
c2->connect();
return true;
}
);

c2->set_v5_connack_handler(
[&]
(bool sp, MQTT_NS::v5::connect_reason_code connack_return_code, MQTT_NS::v5::properties /*props*/) {
MQTT_CHK("h_connack_2");
BOOST_TEST(sp == false);
BOOST_TEST(connack_return_code == MQTT_NS::v5::connect_reason_code::success);
c1->subscribe("topic1", MQTT_NS::qos::at_most_once | MQTT_NS::nl::yes);
return true;
}
);

c1->set_v5_suback_handler(
[&]
(packet_id_t, std::vector<MQTT_NS::v5::suback_reason_code> reasons, MQTT_NS::v5::properties /*props*/) {
MQTT_CHK("h_suback_1");
BOOST_TEST(reasons.size() == 1U);
BOOST_TEST(reasons[0] == MQTT_NS::v5::suback_reason_code::granted_qos_0);
c2->subscribe("topic1", MQTT_NS::qos::at_most_once | MQTT_NS::nl::no);
return true;
}
);

c2->set_v5_suback_handler(
[&]
(packet_id_t, std::vector<MQTT_NS::v5::suback_reason_code> reasons, MQTT_NS::v5::properties /*props*/) {
MQTT_CHK("h_suback_2");
BOOST_TEST(reasons.size() == 1U);
BOOST_TEST(reasons[0] == MQTT_NS::v5::suback_reason_code::granted_qos_0);
c1->publish("topic1", "topic1_contents1", MQTT_NS::qos::at_most_once);
return true;
}
);

c1->set_v5_publish_handler(
[&]
(MQTT_NS::optional<packet_id_t> packet_id,
MQTT_NS::publish_options pubopts,
MQTT_NS::buffer topic,
MQTT_NS::buffer contents,
MQTT_NS::v5::properties /*props*/) {
MQTT_CHK("h_publish_1");
BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::no);
BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::at_most_once);
BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no);
BOOST_CHECK(!packet_id);
BOOST_TEST(topic == "topic1");
BOOST_TEST(contents == "topic1_contents2");
c1->disconnect();
return true;
}
);

c2->set_v5_publish_handler(
[&]
(MQTT_NS::optional<packet_id_t> packet_id,
MQTT_NS::publish_options pubopts,
MQTT_NS::buffer topic,
MQTT_NS::buffer contents,
MQTT_NS::v5::properties /*props*/) {
auto ret = chk.match(
"h_suback_2",
[&] {
MQTT_CHK("h_publish_2_1");
BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::no);
BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::at_most_once);
BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no);
BOOST_CHECK(!packet_id);
BOOST_TEST(topic == "topic1");
BOOST_TEST(contents == "topic1_contents1");
c2->publish("topic1", "topic1_contents2", MQTT_NS::qos::at_most_once);
},
"h_publish_2_1",
[&] {
MQTT_CHK("h_publish_2_2");
BOOST_TEST(pubopts.get_dup() == MQTT_NS::dup::no);
BOOST_TEST(pubopts.get_qos() == MQTT_NS::qos::at_most_once);
BOOST_TEST(pubopts.get_retain() == MQTT_NS::retain::no);
BOOST_CHECK(!packet_id);
BOOST_TEST(topic == "topic1");
BOOST_TEST(contents == "topic1_contents2");
c2->disconnect();
}
);
BOOST_TEST(ret);

return true;
}
);

c1->set_close_handler(
[&]
() {
MQTT_CHK("h_close_1");
server_close();
});
c2->set_close_handler(
[&]
() {
MQTT_CHK("h_close_2");
server_close();
});

c1->set_error_handler(
[]
(MQTT_NS::error_code) {
BOOST_CHECK(false);
});

c2->set_puback_handler(
[]
(std::uint16_t) {
BOOST_CHECK(false);
return true;
});

c1->connect();

ioc.run();
BOOST_TEST(chk.all());
th.join();
}


BOOST_AUTO_TEST_SUITE_END()
Loading

0 comments on commit 894bc75

Please sign in to comment.