Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

List subscribed topics when running topic list #379

Merged
merged 6 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/ci/packages.apt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
libgz-cmake3-dev
libgz-math7-dev
libgz-msgs9-dev
libgz-msgs10-dev
libgz-tools2-dev
libgz-utils2-cli-dev
libprotobuf-dev
Expand Down
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ set(GZ_UTILS_VER ${gz-utils2_VERSION_MAJOR})

#--------------------------------------
# Find gz-msgs
gz_find_package(gz-msgs9 REQUIRED)
set(GZ_MSGS_VER ${gz-msgs9_VERSION_MAJOR})
gz_find_package(gz-msgs10 REQUIRED)
set(GZ_MSGS_VER ${gz-msgs10_VERSION_MAJOR})

#--------------------------------------
# Find ifaddrs
Expand Down
2 changes: 1 addition & 1 deletion docker/gz-transport/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ RUN sudo /bin/sh -c 'echo "deb [trusted=yes] http://packages.osrfoundation.org/g
&& sudo apt-get install -y \
libgz-cmake3-dev \
libgz-math7-dev \
libgz-msgs9-dev \
libgz-msgs10-dev \
libgz-utils2-cli-dev \
&& sudo apt-get clean

Expand Down
82 changes: 80 additions & 2 deletions include/gz/transport/Discovery.hh
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,14 @@ namespace gz
return true;
}

/// \brief Send the response to a SUBSCRIBERS_REQ message.
/// \param[in] _pub Information to send.
public: void SendSubscribersRep(const MessagePublisher &_pub) const
{
this->SendMsg(
DestinationType::ALL, msgs::Discovery::SUBSCRIBERS_REP, _pub);
}

/// \brief Register a node from this process as a remote subscriber.
/// \param[in] _pub Contains information about the subscriber.
public: void Register(const MessagePublisher &_pub) const
Expand Down Expand Up @@ -433,6 +441,17 @@ namespace gz
return this->info.Publishers(_topic, _publishers);
}

/// \brief Get all the subscribers' information known for a given topic.
/// \param[in] _topic Topic name.
/// \param[out] _subscribers All remote subscribers for this topic.
/// \return True if the topic is found and there is at least one publisher
public: bool RemoteSubscribers(const std::string &_topic,
Addresses_M<Pub> &_subscribers) const
{
std::lock_guard<std::mutex> lock(this->mutex);
return this->remoteSubscribers.Publishers(_topic, _subscribers);
}

/// \brief Unadvertise a new message. Broadcast a discovery
/// message that will cancel all the discovery information for the topic
/// advertised by a specific node.
Expand Down Expand Up @@ -573,6 +592,15 @@ namespace gz
this->unregistrationCb = _cb;
}

/// \brief Register a callback to receive an event when a node requests
/// the list of remote subscribers.
/// \param[in] _cb Function callback.
public: void SubscribersCb(const std::function<void()> &_cb)
{
std::lock_guard<std::mutex> lock(this->mutex);
this->subscribersCb = _cb;
}

/// \brief Print the current discovery state.
public: void PrintCurrentState() const
{
Expand Down Expand Up @@ -615,13 +643,33 @@ namespace gz
std::cout << "---------------" << std::endl;
}

/// \brief Get the list of topics currently advertised in the network.
/// \brief Get the list of topics currently advertised and subscribed
/// in the network.
/// \param[out] _topics List of advertised topics.
public: void TopicList(std::vector<std::string> &_topics) const
public: void TopicList(std::vector<std::string> &_topics)
{
this->remoteSubscribers.Clear();

// Request the list of subscribers.
Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
this->SendMsg(
DestinationType::ALL, msgs::Discovery::SUBSCRIBERS_REQ, pub);

this->WaitForInit();
std::lock_guard<std::mutex> lock(this->mutex);
this->info.TopicList(_topics);

std::vector<std::string> remoteSubs;
this->remoteSubscribers.TopicList(remoteSubs);

// Add the remote subscribers
for (auto const &t : remoteSubs)
{
if (std::find(_topics.begin(), _topics.end(), t) == _topics.end())
{
_topics.push_back(t);
}
}
}

/// \brief Check if ready/initialized. If not, then wait on the
Expand Down Expand Up @@ -926,13 +974,15 @@ namespace gz
DiscoveryCallback<Pub> disconnectCb;
DiscoveryCallback<Pub> registerCb;
DiscoveryCallback<Pub> unregisterCb;
std::function<void()> subscribersReqCb;
{
std::lock_guard<std::mutex> lock(this->mutex);
this->activity[recvPUuid] = std::chrono::steady_clock::now();
connectCb = this->connectionCb;
disconnectCb = this->disconnectionCb;
registerCb = this->registrationCb;
unregisterCb = this->unregistrationCb;
subscribersReqCb = this->subscribersCb;
}

switch (msg.type())
Expand Down Expand Up @@ -1011,6 +1061,25 @@ namespace gz

break;
}
case msgs::Discovery::SUBSCRIBERS_REQ:
{
if (subscribersReqCb)
subscribersReqCb();

break;
}
case msgs::Discovery::SUBSCRIBERS_REP:
{
// Save the remote subscriber.
Pub publisher;
publisher.SetFromDiscovery(msg);

{
std::lock_guard<std::mutex> lock(this->mutex);
this->remoteSubscribers.AddPublisher(publisher);
}
break;
}
case msgs::Discovery::NEW_CONNECTION:
{
// Read the rest of the fields.
Expand Down Expand Up @@ -1114,6 +1183,7 @@ namespace gz
discoveryMsg.set_version(this->Version());
discoveryMsg.set_type(_type);
discoveryMsg.set_process_uuid(this->pUuid);
_pub.FillDiscovery(discoveryMsg);

switch (_type)
{
Expand All @@ -1132,6 +1202,8 @@ namespace gz
}
case msgs::Discovery::HEARTBEAT:
case msgs::Discovery::BYE:
case msgs::Discovery::SUBSCRIBERS_REQ:
case msgs::Discovery::SUBSCRIBERS_REP:
break;
default:
std::cerr << "Discovery::SendMsg() error: Unrecognized message"
Expand Down Expand Up @@ -1450,9 +1522,15 @@ namespace gz
/// \brief Callback executed when a new remote subscriber is unregistered.
private: DiscoveryCallback<Pub> unregistrationCb;

/// \brief Callback executed when a SUBSCRIBERS_REQ message is received.
private: std::function<void()> subscribersCb;

/// \brief Addressing information.
private: TopicStorage<Pub> info;

/// \brief Remote subscribers.
private: TopicStorage<Pub> remoteSubscribers;

/// \brief Activity information. Every time there is a message from a
/// remote node, its activity information is updated. If we do not hear
/// from a node in a while, its entries in 'info' will be invalided. The
Expand Down
9 changes: 8 additions & 1 deletion include/gz/transport/HandlerStorage.hh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace gz
using UUIDHandler_M = std::map<std::string, std::shared_ptr<T>>;
using UUIDHandler_Collection_M = std::map<std::string, UUIDHandler_M>;

/// \brief key is a topic name and value is UUIDHandler_M
/// \brief key is a topic name and value is UUIDHandler_Collection_M
using TopicServiceCalls_M =
std::map<std::string, UUIDHandler_Collection_M>;

Expand Down Expand Up @@ -159,6 +159,13 @@ namespace gz
return true;
}

/// \brief Get a reference to all the handlers.
/// \return All the handlers.
public: TopicServiceCalls_M &AllHandlers()
{
return this->data;
}

/// \brief Add a request handler to a topic. A request handler stores
/// the callback and types associated to a service call request.
/// \param[in] _topic Topic name.
Expand Down
11 changes: 11 additions & 0 deletions include/gz/transport/NodeShared.hh
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ namespace gz
/// \param[in] _pub Information of the remote subscriber.
public: void OnEndRegistration(const MessagePublisher &_pub);

/// \brief Callback executed when a SUBSCRIBERS request is received.
/// \param[in] _pub Request.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// \param[in] _pub Request.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in a1af664

public: void OnSubscribers();

/// \brief Pass through to bool Publishers(const std::string &_topic,
/// Addresses_M<Pub> &_publishers) const
/// \param[in] _topic Service name.
Expand Down Expand Up @@ -395,6 +399,13 @@ namespace gz
const std::string &_fullyQualifiedTopic,
const std::string &_nUuid);

/// \brief Convert all the HandlerStorages into a vector of publishers.
/// \param[in] _addr The pub/sub address.
/// \param[in] _pUuid The process UUID.
/// \return The vector of message publishers.
public: std::vector<MessagePublisher> Convert(const std::string &_addr,
const std::string &_pUuid);

/// \brief Normal local subscriptions.
public: HandlerStorage<ISubscriptionHandler> normal;

Expand Down
66 changes: 0 additions & 66 deletions include/gz/transport/Packet.hh

This file was deleted.

6 changes: 6 additions & 0 deletions include/gz/transport/TopicStorage.hh
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,12 @@ namespace gz
}
}

/// \brief Clear the content.
public: void Clear()
{
this->data.clear();
}

/// \brief The keys are topics. The values are another map, where the key
/// is the process UUID and the value a vector of publishers.
private: std::map<std::string,
Expand Down
2 changes: 2 additions & 0 deletions src/HandlerStorage_TEST.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ TEST(RepStorageTest, RepStorageAPI)
EXPECT_FALSE(reps.HasHandlersForTopic(topic));
EXPECT_FALSE(reps.RemoveHandlersForNode(topic, nUuid1));
EXPECT_FALSE(reps.HasHandlersForNode(topic, nUuid1));
EXPECT_TRUE(reps.AllHandlers().empty());

// Create a REP handler.
std::shared_ptr<transport::RepHandler<msgs::Vector3d,
Expand All @@ -104,6 +105,7 @@ TEST(RepStorageTest, RepStorageAPI)
EXPECT_TRUE(reps.Handlers(topic, m));
EXPECT_EQ(m.size(), 1u);
EXPECT_EQ(m.begin()->first, nUuid1);
EXPECT_EQ(1u, reps.AllHandlers().size());

reset();

Expand Down
47 changes: 47 additions & 0 deletions src/NodeShared.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ NodeShared::NodeShared()
this->dataPtr->msgDiscovery->UnregistrationsCb(
std::bind(&NodeShared::OnEndRegistration, this, std::placeholders::_1));

this->dataPtr->msgDiscovery->SubscribersCb(
std::bind(&NodeShared::OnSubscribers, this));

// Set the callback to notify svc discovery updates (new services).
this->dataPtr->srvDiscovery->ConnectionsCb(
std::bind(&NodeShared::OnNewSrvConnection, this, std::placeholders::_1));
Expand Down Expand Up @@ -1391,6 +1394,18 @@ void NodeShared::OnEndRegistration(const MessagePublisher &_pub)
this->remoteSubscribers.DelPublisherByNode(topic, procUuid, nodeUuid);
}

//////////////////////////////////////////////////
void NodeShared::OnSubscribers()
{
// Get the list of local subscribers.
std::lock_guard<std::recursive_mutex> lock(this->mutex);
auto pubs = this->localSubscribers.Convert(this->myAddress, this->pUuid);

// Reply to the SUBSCRIBERS_REQ with multiple SUBSCRIBERS_REP.
for (auto const &publisher : pubs)
this->dataPtr->msgDiscovery->SendSubscribersRep(publisher);
}

//////////////////////////////////////////////////
bool NodeShared::InitializeSockets()
{
Expand Down Expand Up @@ -1660,6 +1675,38 @@ bool NodeShared::HandlerWrapper::RemoveHandlersForNode(
return removed;
}

//////////////////////////////////////////////////
std::vector<MessagePublisher> NodeShared::HandlerWrapper::Convert(
const std::string &_addr, const std::string &_pUuid)
{
std::vector<MessagePublisher> res;

for (const auto &[topic, handlerCollection] : this->normal.AllHandlers())
{
for (const auto &[nUuid, handlerNode] : handlerCollection)
{
for (const auto &[hUuid, handler] : handlerNode)
{
res.push_back(MessagePublisher(topic, _addr, "", _pUuid, nUuid,
handler->TypeName(), AdvertiseMessageOptions()));
}
}
}

for (const auto &[topic, handlerCollection] : this->raw.AllHandlers())
{
for (const auto &[nUuid, handlerNode] : handlerCollection)
{
for (const auto &[hUuid, handler] : handlerNode)
{
res.push_back(MessagePublisher(topic, _addr, "", _pUuid, nUuid,
handler->TypeName(), AdvertiseMessageOptions()));
}
}
}

return res;
}

//////////////////////////////////////////////////
void NodeSharedPrivate::SecurityOnNewConnection()
Expand Down
Loading