Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supported mqtt::buffer sequence publish. #724

Merged
merged 1 commit into from
Nov 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 91 additions & 2 deletions include/mqtt/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@
#include <utility>
#include <type_traits>

#include <boost/asio/buffer.hpp>

#include <mqtt/namespace.hpp>
#include <mqtt/string_view.hpp>
#include <mqtt/shared_ptr_array.hpp>
#include <mqtt/move.hpp>

namespace MQTT_NS {

namespace as = boost::asio;

/**
* @brief buffer that has string_view interface
* This class provides string_view interface.
Expand Down Expand Up @@ -134,10 +138,95 @@ inline buffer allocate_buffer(string_view sv) {
return allocate_buffer(sv.begin(), sv.end());
}

inline buffer const* buffer_sequence_begin(buffer const& buf) {
return std::addressof(buf);
}

} // namespace MQTT_NS
inline buffer const* buffer_sequence_end(buffer const& buf) {
return std::addressof(buf) + 1;
}

#include <boost/asio/buffer.hpp>
template <typename Col>
inline typename Col::const_iterator buffer_sequence_begin(Col const& col) {
return col.cbegin();
}

template <typename Col>
inline typename Col::const_iterator buffer_sequence_end(Col const& col) {
return col.cend();
}

namespace detail {

template <typename>
char buffer_sequence_begin_helper(...);

template <typename T>
char (&buffer_sequence_begin_helper(
T* t,
typename std::enable_if<
!std::is_same<
decltype(buffer_sequence_begin(*t)),
void
>::value
>::type*)
)[2];

template <typename>
char buffer_sequence_end_helper(...);

template <typename T>
char (&buffer_sequence_end_helper(
T* t,
typename std::enable_if<
!std::is_same<
decltype(buffer_sequence_end(*t)),
void
>::value
>::type*)
)[2];

template <typename, typename>
char (&buffer_sequence_element_type_helper(...))[2];

template <typename T, typename Buffer>
char buffer_sequence_element_type_helper(
T* t,
typename std::enable_if<
std::is_convertible<
decltype(*buffer_sequence_begin(*t)),
Buffer
>::value
>::type*
);

template <typename T, typename Buffer>
struct is_buffer_sequence_class
: std::integral_constant<bool,
sizeof(buffer_sequence_begin_helper<T>(0, 0)) != 1 &&
sizeof(buffer_sequence_end_helper<T>(0, 0)) != 1 &&
sizeof(buffer_sequence_element_type_helper<T, Buffer>(0, 0)) == 1>
{
};

} // namespace detail

template <typename T>
struct is_buffer_sequence :
std::conditional<
std::is_class<T>::value,
detail::is_buffer_sequence_class<T, buffer>,
std::false_type
>::type
{
};

template <>
struct is_buffer_sequence<buffer> : std::true_type
{
};

} // namespace MQTT_NS

namespace boost {
namespace asio {
Expand Down
102 changes: 78 additions & 24 deletions include/mqtt/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @param topic_name
* A topic name to publish
* @param contents
* The contents or the range of contents to publish
* The contents or the range of the contents to publish
* @param pubopts
* qos, retain flag, and dup flag.
* @param props
Expand Down Expand Up @@ -1134,7 +1134,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @param topic_name
* A topic name to publish
* @param contents
* The contents or the range of contents to publish
* The contents or the range of the contents to publish
* @param pubopts
* qos, retain flag, and dup flag.
* @param life_keeper
Expand Down Expand Up @@ -1187,7 +1187,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @param topic_name
* A topic name to publish
* @param contents
* The contents to publish
* The contents or the range of the contents to publish
* @param pubopts
* qos, retain flag, and dup flag.
* @param life_keeper
Expand All @@ -1201,10 +1201,14 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* If topic_name and contents don't manage their lifetimes, then life_keeper should be used to keep
* their lifetimes.
*/
void publish(
template <typename BufferSequence>
typename std::enable_if<
is_buffer_sequence<BufferSequence>::value
>::type
publish(
packet_id_t packet_id,
buffer topic_name,
buffer contents,
BufferSequence contents,
publish_options pubopts = {},
any life_keeper = {}
) {
Expand All @@ -1220,11 +1224,21 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
BOOST_ASSERT((pubopts.get_qos() == qos::at_most_once && packet_id == 0) || (pubopts.get_qos() != qos::at_most_once && packet_id != 0));

auto topic_name_buf = as::buffer(topic_name);
auto contents_buf = as::buffer(contents);

std::vector<as::const_buffer> cbs;
{
auto b = mqtt::buffer_sequence_begin(contents);
auto e = mqtt::buffer_sequence_end(contents);
cbs.reserve(static_cast<std::size_t>(std::distance(b, e)));
for (; b != e; ++b) {
cbs.emplace_back(as::buffer(*b));
}
}

send_publish(
packet_id,
topic_name_buf,
contents_buf,
force_move(cbs),
pubopts,
v5::properties{},
std::make_tuple(
Expand All @@ -1244,7 +1258,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @param topic_name
* A topic name to publish
* @param contents
* The contents to publish
* The contents or the range of the contents to publish
* @param pubopts
* qos, retain flag, and dup flag.
* @param props
Expand All @@ -1260,10 +1274,14 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* internally until the broker has confirmed delivery, which may involve resends, and as such the
* life_keeper parameter is important.
*/
void publish(
template <typename BufferSequence>
typename std::enable_if<
is_buffer_sequence<BufferSequence>::value
>::type
publish(
packet_id_t packet_id,
buffer topic_name,
buffer contents,
BufferSequence contents,
publish_options pubopts,
v5::properties props,
any life_keeper = {}
Expand All @@ -1280,11 +1298,21 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
BOOST_ASSERT((pubopts.get_qos() == qos::at_most_once && packet_id == 0) || (pubopts.get_qos() != qos::at_most_once && packet_id != 0));

auto topic_name_buf = as::buffer(topic_name);
auto contents_buf = as::buffer(contents);

std::vector<as::const_buffer> cbs;
{
auto b = mqtt::buffer_sequence_begin(contents);
auto e = mqtt::buffer_sequence_end(contents);
cbs.reserve(static_cast<std::size_t>(std::distance(b, e)));
for (; b != e; ++b) {
cbs.emplace_back(as::buffer(*b));
}
}

send_publish(
packet_id,
topic_name_buf,
contents_buf,
force_move(cbs),
pubopts,
force_move(props),
std::make_tuple(
Expand Down Expand Up @@ -2282,7 +2310,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @param topic_name
* A topic name to publish
* @param contents
* The contents or the range of contents to publish
* The contents or the range of the contents to publish
* @param pubopts
* qos, retain flag, and dup flag.
* @param life_keeper
Expand Down Expand Up @@ -2339,7 +2367,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @param topic_name
* A topic name to publish
* @param contents
* The contents or the range of contents to publish
* The contents or the range of the contents to publish
* @param pubopts
* qos, retain flag, and dup flag.
* @param props
Expand Down Expand Up @@ -2397,7 +2425,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @param topic_name
* A topic name to publish
* @param contents
* The contents to publish
* The contents or the range of the contents to publish
* @param pubopts
* qos, retain flag, and dup flag.
* @param life_keeper
Expand All @@ -2407,10 +2435,14 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @param func
* functor object who's operator() will be called when the async operation completes.
*/
void async_publish(
template <typename BufferSequence>
typename std::enable_if<
is_buffer_sequence<BufferSequence>::value
>::type
async_publish(
packet_id_t packet_id,
buffer topic_name,
buffer contents,
BufferSequence contents,
publish_options pubopts = {},
any life_keeper = {},
async_handler_t func = {}
Expand All @@ -2427,12 +2459,21 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
BOOST_ASSERT((pubopts.get_qos() == qos::at_most_once && packet_id == 0) || (pubopts.get_qos() != qos::at_most_once && packet_id != 0));

auto topic_name_buf = as::buffer(topic_name);
auto contents_buf = as::buffer(contents);

std::vector<as::const_buffer> cbs;
{
auto b = mqtt::buffer_sequence_begin(contents);
auto e = mqtt::buffer_sequence_end(contents);
cbs.reserve(static_cast<std::size_t>(std::distance(b, e)));
for (; b != e; ++b) {
cbs.emplace_back(as::buffer(*b));
}
}

async_send_publish(
packet_id,
topic_name_buf,
contents_buf,
force_move(cbs),
pubopts,
v5::properties{},
std::make_tuple(
Expand All @@ -2453,7 +2494,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* @param topic_name
* A topic name to publish
* @param contents
* The contents to publish
* The contents or the range of the contents to publish
* @param pubopts
* qos, retain flag, and dup flag.
* @param props
Expand All @@ -2471,10 +2512,14 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
* internally until the broker has confirmed delivery, which may involve resends, and as such the
* life_keeper parameter is important.
*/
void async_publish(
template <typename BufferSequence>
typename std::enable_if<
is_buffer_sequence<BufferSequence>::value
>::type
async_publish(
packet_id_t packet_id,
buffer topic_name,
buffer contents,
BufferSequence contents,
publish_options pubopts,
v5::properties props,
any life_keeper = {},
Expand All @@ -2492,12 +2537,21 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
BOOST_ASSERT((pubopts.get_qos() == qos::at_most_once && packet_id == 0) || (pubopts.get_qos() != qos::at_most_once && packet_id != 0));

auto topic_name_buf = as::buffer(topic_name);
auto contents_buf = as::buffer(contents);

std::vector<as::const_buffer> cbs;
{
auto b = mqtt::buffer_sequence_begin(contents);
auto e = mqtt::buffer_sequence_end(contents);
cbs.reserve(static_cast<std::size_t>(std::distance(b, e)));
for (; b != e; ++b) {
cbs.emplace_back(as::buffer(*b));
}
}

async_send_publish(
packet_id,
topic_name_buf,
contents_buf,
force_move(cbs),
pubopts,
force_move(props),
std::make_tuple(
Expand Down
Loading