Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supported ConstBufferSequence payload publish. #723

Merged
merged 1 commit into from
Nov 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 53 additions & 29 deletions include/mqtt/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @param topic_name
* A topic name to publish
* @param contents
* The contents to publish
* The contents or the range of contents to publish
* @param pubopts
* qos, retain flag, and dup flag.
* @param props
Expand All @@ -1092,10 +1092,14 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* internally until the broker has confirmed delivery, which may involve resends, and as such the
* life_keeper parameter is important.
*/
void publish(
template <typename ConstBufferSequence>
typename std::enable_if<
as::is_const_buffer_sequence<ConstBufferSequence>::value
>::type
publish(
packet_id_t packet_id,
as::const_buffer topic_name,
as::const_buffer contents,
ConstBufferSequence contents,
publish_options pubopts,
v5::properties props,
any life_keeper
Expand All @@ -1114,7 +1118,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
send_publish(
packet_id,
topic_name,
contents,
force_move(contents),
pubopts,
force_move(props),
force_move(life_keeper)
Expand All @@ -1130,7 +1134,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @param topic_name
* A topic name to publish
* @param contents
* The contents to publish
* The contents or the range of contents to publish
* @param pubopts
* qos, retain flag, and dup flag.
* @param life_keeper
Expand All @@ -1142,10 +1146,14 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* internally until the broker has confirmed delivery, which may involve resends, and as such the
* life_keeper parameter is important.
*/
void publish(
template <typename ConstBufferSequence>
typename std::enable_if<
as::is_const_buffer_sequence<ConstBufferSequence>::value
>::type
publish(
packet_id_t packet_id,
as::const_buffer topic_name,
as::const_buffer contents,
ConstBufferSequence contents,
publish_options pubopts,
any life_keeper
) {
Expand All @@ -1163,7 +1171,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
send_publish(
packet_id,
topic_name,
contents,
force_move(contents),
pubopts,
v5::properties{},
force_move(life_keeper)
Expand Down Expand Up @@ -2274,7 +2282,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @param topic_name
* A topic name to publish
* @param contents
* The contents to publish
* The contents or the range of contents to publish
* @param pubopts
* qos, retain flag, and dup flag.
* @param life_keeper
Expand All @@ -2288,10 +2296,14 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* internally until the broker has confirmed delivery, which may involve resends, and as such the
* life_keeper parameter is important.
*/
void async_publish(
template <typename ConstBufferSequence>
typename std::enable_if<
as::is_const_buffer_sequence<ConstBufferSequence>::value
>::type
async_publish(
packet_id_t packet_id,
as::const_buffer topic_name,
as::const_buffer contents,
ConstBufferSequence contents,
publish_options pubopts = {},
any life_keeper = {},
async_handler_t func = {}
Expand All @@ -2310,7 +2322,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
async_send_publish(
packet_id,
topic_name,
contents,
force_move(contents),
pubopts,
v5::properties{},
force_move(life_keeper),
Expand All @@ -2327,7 +2339,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @param topic_name
* A topic name to publish
* @param contents
* The contents to publish
* The contents or the range of contents to publish
* @param pubopts
* qos, retain flag, and dup flag.
* @param props
Expand All @@ -2341,10 +2353,14 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @param func
* functor object who's operator() will be called when the async operation completes.
*/
void async_publish(
template <typename ConstBufferSequence>
typename std::enable_if<
as::is_const_buffer_sequence<ConstBufferSequence>::value
>::type
async_publish(
packet_id_t packet_id,
as::const_buffer topic_name,
as::const_buffer contents,
ConstBufferSequence contents,
publish_options pubopts,
v5::properties props,
any life_keeper = {},
Expand All @@ -2364,7 +2380,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
async_send_publish(
packet_id,
topic_name,
contents,
force_move(contents),
pubopts,
force_move(props),
force_move(life_keeper),
Expand Down Expand Up @@ -8820,13 +8836,17 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
}
}

void send_publish(
packet_id_t packet_id,
as::const_buffer topic_name,
as::const_buffer payload,
publish_options pubopts,
v5::properties props,
any life_keeper) {
template <typename ConstBufferSequence>
typename std::enable_if<
as::is_const_buffer_sequence<ConstBufferSequence>::value
>::type
send_publish(
packet_id_t packet_id,
as::const_buffer topic_name,
ConstBufferSequence payloads,
publish_options pubopts,
v5::properties props,
any life_keeper) {

auto do_send_publish =
[&](auto msg, auto const& serialize_publish) {
Expand Down Expand Up @@ -8854,7 +8874,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
v3_1_1::basic_publish_message<PacketIdBytes>(
packet_id,
topic_name,
payload,
force_move(payloads),
pubopts
),
&endpoint::on_serialize_publish_message
Expand All @@ -8865,7 +8885,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
v5::basic_publish_message<PacketIdBytes>(
packet_id,
topic_name,
payload,
force_move(payloads),
pubopts,
force_move(props)
),
Expand Down Expand Up @@ -9296,10 +9316,14 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
}
}

void async_send_publish(
template <typename ConstBufferSequence>
typename std::enable_if<
as::is_const_buffer_sequence<ConstBufferSequence>::value
>::type
async_send_publish(
packet_id_t packet_id,
as::const_buffer topic_name,
as::const_buffer payload,
ConstBufferSequence payloads,
publish_options pubopts,
v5::properties props,
any life_keeper,
Expand Down Expand Up @@ -9339,7 +9363,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
v3_1_1::basic_publish_message<PacketIdBytes>(
packet_id,
topic_name,
payload,
force_move(payloads),
pubopts
),
&endpoint::on_serialize_publish_message
Expand All @@ -9350,7 +9374,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
v5::basic_publish_message<PacketIdBytes>(
packet_id,
topic_name,
payload,
force_move(payloads),
pubopts,
force_move(props)
),
Expand Down
83 changes: 54 additions & 29 deletions include/mqtt/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <vector>
#include <memory>
#include <algorithm>
#include <numeric>

#include <boost/asio/buffer.hpp>
#include <boost/container/static_vector.hpp>
Expand Down Expand Up @@ -503,25 +504,40 @@ class connect_message {
template <std::size_t PacketIdBytes>
class basic_publish_message {
public:
template <
typename ConstBufferSequence,
typename std::enable_if<
as::is_const_buffer_sequence<ConstBufferSequence>::value,
std::nullptr_t
>::type = nullptr
>
basic_publish_message(
typename packet_id_type<PacketIdBytes>::type packet_id,
as::const_buffer topic_name,
as::const_buffer payload,
ConstBufferSequence payloads,
publish_options pubopts
)
: fixed_header_(make_fixed_header(control_packet_type::publish, 0b0000) | pubopts.operator std::uint8_t()),
topic_name_(topic_name),
topic_name_length_buf_ { num_to_2bytes(boost::numeric_cast<std::uint16_t>(topic_name.size())) },
payload_(payload),
remaining_length_(
2 // topic name length
+ topic_name_.size() // topic name
+ payload_.size() // payload
+ ( (pubopts.get_qos() == qos::at_least_once || pubopts.get_qos() == qos::exactly_once)
? PacketIdBytes // packet_id
: 0)
)
{
auto b = as::buffer_sequence_begin(payloads);
auto e = as::buffer_sequence_end(payloads);
auto num_of_payloads = static_cast<std::size_t>(std::distance(b, e));
payloads_.reserve(num_of_payloads);
for (; b != e; ++b) {
auto const& payload = *b;
remaining_length_ += payload.size();
payloads_.push_back(payload);
}

utf8string_check(topic_name_);

auto rb = remaining_bytes(remaining_length_);
Expand Down Expand Up @@ -578,7 +594,9 @@ class basic_publish_message {
break;
};

payload_ = as::buffer(buf);
if (!buf.empty()) {
payloads_.emplace_back(as::buffer(buf));
}
}

/**
Expand All @@ -587,27 +605,20 @@ class basic_publish_message {
* @return const buffer sequence
*/
std::vector<as::const_buffer> const_buffer_sequence() const {
if (packet_id_.empty()) {
return
{
as::buffer(&fixed_header_, 1),
as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()),
as::buffer(topic_name_length_buf_.data(), topic_name_length_buf_.size()),
as::buffer(topic_name_),
as::buffer(payload_)
};
}
else {
return
{
as::buffer(&fixed_header_, 1),
as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()),
as::buffer(topic_name_length_buf_.data(), topic_name_length_buf_.size()),
as::buffer(topic_name_),
as::buffer(packet_id_.data(), packet_id_.size()),
as::buffer(payload_)
};
std::vector<as::const_buffer> ret;
ret.reserve(
5 + // fixed_header, remaining_length_buf, topic_name_length_buf, topic_name, packet_id
payloads_.size()
);
ret.emplace_back(as::buffer(&fixed_header_, 1));
ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()));
ret.emplace_back(as::buffer(topic_name_length_buf_.data(), topic_name_length_buf_.size()));
ret.emplace_back(as::buffer(topic_name_));
if (!packet_id_.empty()) {
ret.emplace_back(as::buffer(packet_id_.data(), packet_id_.size()));
}
std::copy(payloads_.begin(), payloads_.end(), std::back_inserter(ret));
return ret;
}

/**
Expand All @@ -631,7 +642,14 @@ class basic_publish_message {
1 + // remaining length
2 + // topic name length, topic name
(packet_id_.empty() ? 0 : 1) + // packet_id
1; // payload
std::accumulate(
payloads_.begin(),
payloads_.end(),
std::size_t(0),
[](std::size_t s, as::const_buffer const& payload) {
return s + payload.size();
}
);
}

/**
Expand All @@ -652,7 +670,9 @@ class basic_publish_message {
ret.append(get_pointer(topic_name_), get_size(topic_name_));

ret.append(packet_id_.data(), packet_id_.size());
ret.append(get_pointer(payload_), get_size(payload_));
for (auto const& payload : payloads_) {
ret.append(get_pointer(payload), get_size(payload));
}

return ret;
}
Expand Down Expand Up @@ -709,8 +729,13 @@ class basic_publish_message {
* @brief Get payload
* @return payload
*/
constexpr string_view payload() const {
return string_view(get_pointer(payload_), get_size(payload_));
std::vector<string_view> payload() const {
std::vector<string_view> ret;
ret.reserve(payloads_.size());
for (auto const& payload : payloads_) {
ret.emplace_back(get_pointer(payload), get_size(payload));
}
return ret;
}

/**
Expand All @@ -726,7 +751,7 @@ class basic_publish_message {
as::const_buffer topic_name_;
boost::container::static_vector<char, 2> topic_name_length_buf_;
boost::container::static_vector<char, PacketIdBytes> packet_id_;
as::const_buffer payload_;
std::vector<as::const_buffer> payloads_;
std::size_t remaining_length_;
boost::container::static_vector<char, 4> remaining_length_buf_;
};
Expand Down
Loading