Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added basic part of new wildcard mechanism. #685

Merged
merged 1 commit into from
Oct 15, 2020
Merged

Added basic part of new wildcard mechanism. #685

merged 1 commit into from
Oct 15, 2020

Conversation

redboltz
Copy link
Owner

No description provided.

@codecov
Copy link

codecov bot commented Oct 15, 2020

Codecov Report

Merging #685 into master will not change coverage.
The diff coverage is n/a.

@@           Coverage Diff           @@
##           master     #685   +/-   ##
=======================================
  Coverage   82.70%   82.70%           
=======================================
  Files          46       46           
  Lines        7082     7082           
=======================================
  Hits         5857     5857           
  Misses       1225     1225           

@redboltz
Copy link
Owner Author

@kleunen , I have a question.
What is subscription_map and what is retained_topic_map ?

I guess that subscription_map is trie map that contains subscription information and I can search it by topic_filter that could contain wildcard.

I'm not sure about retaind_topic_map. Perhaps subscription_map is for online subscription and retaind_topic_map is offline subscription map ?

Offline subscription means when client subscribe some topic filter and disconnect but the session remains case. It happens if client connect with CleanSession:false on MQTT v3.1.1, and SessionExpiryInterval is not zero on MQTT v5.

@redboltz
Copy link
Owner Author

Or both online and offline session are managed by subscription_map ? And retaind_topic_map is for retained message ?

I mean,

  1. publisher publish a/b/c message1 retain:true
  2. publisher publish a/x/c message2 retain:true
  3. subscriber subscribe a/+/c, then broker needs to deliver message1 and message2

retained_topic_map is used for do this efficiently ?

@kleunen
Copy link
Contributor

kleunen commented Oct 15, 2020

Or both online and offline session are managed by subscription_map ? And retaind_topic_map is for retained message ?

yes, exactly. retained_topic_map stored the retained messages. The searching or filterering, also works the other way around:

subscription_map: Stores the filters ( or subscriptions) with wildcards, and when a topic is published, the respective values are looked up in the map.

retained_topic_map: Stores the retained topics (without wildcards), and when a new subscription is created (with wildcards), the retained_topic_map is searched for matching topics.

@redboltz
Copy link
Owner Author

Thanks you the answer. I understand.
subscription_map contains both online offline session(subscription)s. They are different types and have different members.
Now, I added the same signature named deliver() and from_me().

mqtt_cpp/test/test_broker.hpp

Lines 1497 to 1546 in 67b1953

// Mapping between connection object and subscription topics
struct sub_con_online {
sub_con_online(
MQTT_NS::buffer topic,
con_sp_t con,
MQTT_NS::subscribe_options subopts,
MQTT_NS::optional<std::size_t> sid)
:topic(MQTT_NS::force_move(topic)),
con(MQTT_NS::force_move(con)),
subopts(subopts),
sid(sid) {}
void deliver(
MQTT_NS::buffer pub_topic,
MQTT_NS::buffer contents,
MQTT_NS::qos pub_qos_value,
MQTT_NS::retain retain,
MQTT_NS::v5::properties props) {
MQTT_NS::publish_options pubopts = std::min(subopts.get_qos(), pub_qos_value);
if (subopts.get_rap() == MQTT_NS::rap::retain && retain == MQTT_NS::retain::yes) {
pubopts |= MQTT_NS::retain::yes;
}
if (sid) {
props.push_back(MQTT_NS::v5::property::subscription_identifier(*sid));
}
// TODO: Probably this should be switched to async_publish?
// Given the async_client / sync_client seperation
// and the way they have different function names,
// it wouldn't be possible for test_broker.hpp to be
// used with some hypothetical "async_server" in the future.
con->publish(
MQTT_NS::force_move(pub_topic),
MQTT_NS::force_move(contents),
pubopts,
MQTT_NS::force_move(props)
);
}
bool from_me(endpoint_t const& ep) const {
return con.get() == &ep;
}
MQTT_NS::buffer topic;
con_sp_t con;
MQTT_NS::subscribe_options subopts;
MQTT_NS::optional<std::size_t> sid;
};

https://github.com/redboltz/mqtt_cpp/blob/67b19538578eab9199153781125a3cd1effc6f0a/test/test_broker.hpp#L1621-1665

We can treat them like as follows:

mqtt_cpp/test/test_broker.hpp

Lines 1186 to 1217 in 67b1953

auto deliver_proc =
[&](auto& col) {
// For each active subscription registered for this topic
auto& idx = col.template get<tag_topic>();
for(auto& item : idx) {
if(compare_topic_filter(item.topic, topic)) {
// If NL (no local) subscription option is set and
// publisher is the same as subscriber, then skip it.
if (item.subopts.get_nl() == MQTT_NS::nl::yes && item.from_me(ep)) continue;
// publish the message to subscribers.
// retain is delivered as the original only if rap_value is rap::retain.
// On MQTT v3.1.1, rap_value is always rap::dont.
idx.modify(
idx.iterator_to(item),
[&](auto& val) {
val.deliver(
topic,
contents,
pubopts.get_qos(),
pubopts.get_retain(),
props // TODO: Copying the properties vector for each subscription.
);
},
[](auto&) { BOOST_ASSERT(false); }
);
}
}
};
deliver_proc(subs_online_);
deliver_proc(subs_offline_);
.

So far, in the do_publish(), first deliver to online and then deliver to offline. But when we update to trie version, offline and online can be mixed. ( It is up to the design policy)

We can introduce the following variant type.

using sub_con_variant = MQTT_NS::variant<sub_con_offline, sub_con_online>;

@redboltz redboltz merged commit a897c81 into master Oct 15, 2020
@kleunen
Copy link
Contributor

kleunen commented Oct 15, 2020

So far, in the do_publish(), first deliver to online and then deliver to offline. But when we update to trie version, offline and online can be mixed. ( It is up to the design policy)

We can introduce the following variant type.

using sub_con_variant = MQTT_NS::variant<sub_con_offline, sub_con_online>;

It has been a while since I looked at the test_broker. You said in another comment the design is not very clear. I remember when I looked at the broker last year, the design did not make completely sense to me. It was strange for me the subscriptions were passed between online and offline clients, this is not necessary. A client will get a certain "session" when it connects to the broker, subscriptions are stored there. If the connection is lost, the session may remain active. Subscriptions are still handled. A client may reconnect to this session or request a new clean session upon reconnect. That way it is not necessary to shift subscriptions between offline and online connections.

But ok, it is small piece of code, and design can be improved maybe. I would be interested to see a benchmark of the broker, i also made a small test broker last year. And it had very good performance.

@redboltz
Copy link
Owner Author

So far, in the do_publish(), first deliver to online and then deliver to offline. But when we update to trie version, offline and online can be mixed. ( It is up to the design policy)
We can introduce the following variant type.

using sub_con_variant = MQTT_NS::variant<sub_con_offline, sub_con_online>;

It has been a while since I looked at the test_broker. You said in another comment the design is not very clear. I remember when I looked at the broker last year, the design did not make completely sense to me. It was strange for me the subscriptions were passed between online and offline clients, this is not necessary. A client will get a certain "session" when it connects to the broker, subscriptions are stored there. If the connection is lost, the session may remain active. Subscriptions are still handled. A client may reconnect to this session or request a new clean session upon reconnect. That way it is not necessary to shift subscriptions between offline and online connections.

But ok, it is small piece of code, and design can be improved maybe. I would be interested to see a benchmark of the broker, i also made a small test broker last year. And it had very good performance.

Yes we have some design choice.
The current test_broker has only memory storage. Offline message is sometimes store to another device such as SSD.
Consider the following scenario.

There are 4 subscription. All of them subscribe topic1.

If online and offline elements are mixed, then the order of delivery could be as follows:

  1. offline
  2. online
  3. offline
  4. online

1 and 3 might have a longer time than 2 and 4 due to storage access (even if it is asynchronous access).
If broker has 1, 3 as sub_con_offline, and 2,4 as sub_con_online, then the broker can process 2,4 first and then 1,3. It can minimize delivery latency for online elements.

Of course, we can achieve it using only con container. Adding offline/online member to sub_con and use it as multi_index_key.
Then I can get only offline/online elements efficiently.

I'm not sure the works well on subscription_map.
Anyway, let's discuss at #686.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants