Skip to content

Commit

Permalink
Refs #20160: Applied suggestions.
Browse files Browse the repository at this point in the history
Signed-off-by: adriancampo <adriancampo@eprosima.com>
  • Loading branch information
adriancampo authored and JLBuenoLopez committed Apr 5, 2024
1 parent b45c43e commit 6d3bdd2
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 38 deletions.
84 changes: 53 additions & 31 deletions src/cpp/rtps/builtin/discovery/endpoint/EDPServerListeners.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ void EDPServerPUBListener::onNewCacheChangeAdded(
EPROSIMA_LOG_WARNING(RTPS_EDP_LISTENER, "Received change with no Key");
}

// Get writer's GUID and EDP publications' reader history
GUID_t auxGUID = iHandle2GUID(change->instanceHandle);
// Get EDP publications' reader history
ReaderHistory* reader_history = reader->getHistory();

// Related_sample_identity could be lost in message delivered, so we set as sample_identity
Expand All @@ -78,7 +77,6 @@ void EDPServerPUBListener::onNewCacheChangeAdded(
change->writer_info.previous = nullptr;
change->writer_info.num_sent_submessages = 0;


// DATA(w) case: new writer or updated information about an existing writer
if (change->kind == ALIVE)
{
Expand All @@ -89,6 +87,9 @@ void EDPServerPUBListener::onNewCacheChangeAdded(
// return it to the pool
add_writer_from_change(reader, reader_history, change, sedp_, false, writer_added_callback);

// DATA(w) case: Retrieve the topic after creating the WriterProxyData (in add_writer_from_change()). This way, not matter
// whether the DATA(w) is a new one or an update, the WriterProxyData exists, and so the topic can be retrieved

// Stop and wait for callback in case of TypeLookupService needed time to process the types
return;
}
Expand All @@ -97,29 +98,24 @@ void EDPServerPUBListener::onNewCacheChangeAdded(
{
EPROSIMA_LOG_INFO(RTPS_EDP_LISTENER, "Disposed Remote Writer, removing...");

// DATA(Uw) case: Retrieve the topic before removing the WriterProxyData. We need it to add the DATA(Uw) to the database
GUID_t auxGUID = iHandle2GUID(change->instanceHandle);
std::string topic_name = get_writer_proxy_topic_name(auxGUID);

// Remove WriterProxy data information
get_pdp()->removeWriterProxyData(auxGUID);

// Removing change from history, not returning the change to the pool, since the ownership will be yielded to
// the database
// Removing change from history, not returning the change to the pool, since the ownership will be yielded to the database
reader_history->remove_change(reader_history->find_change(change), false);

// Continue without waiting
continue_with_writer(reader, change);
notify_discoverydatabase(topic_name, reader, change);
}
}

void EDPServerPUBListener::continue_with_writer(
RTPSReader* reader,
CacheChange_t* change)
std::string EDPServerPUBListener::get_writer_proxy_topic_name(
GUID_t auxGUID)
{
GUID_t auxGUID = iHandle2GUID(change->instanceHandle);
// String to store the topic of the writer
std::string topic_name = "";

// DATA(w) case: Retrieve the topic after creating the WriterProxyData (in add_writer_from_change()). This way, not matter
// whether the DATA(w) is a new one or an update, the WriterProxyData exists, and so the topic can be retrieved
// DATA(Uw) case: Retrieve the topic before removing the WriterProxyData. We need it to add the DATA(Uw) to the database
auto temp_writer_data = get_pdp()->get_temporary_writer_proxies_pool().get();
if (get_pdp()->lookupWriterProxyData(auxGUID, *temp_writer_data))
{
Expand All @@ -129,7 +125,14 @@ void EDPServerPUBListener::continue_with_writer(
{
EPROSIMA_LOG_WARNING(RTPS_EDP_LISTENER, "Writer Proxy Data missing for change " << auxGUID);
}
return topic_name;
}

void EDPServerPUBListener::notify_discoverydatabase(
std::string topic_name,
RTPSReader* reader,
CacheChange_t* change)
{
// Notify the DiscoveryDataBase if it is enabled already
// In case it is not enable, the change should not be updated or released because it is been
// updated from a backup
Expand All @@ -156,6 +159,14 @@ void EDPServerPUBListener::continue_with_writer(
EPROSIMA_LOG_INFO(RTPS_EDP_LISTENER, "");
}

void EDPServerPUBListener::continue_with_writer(
RTPSReader* reader,
CacheChange_t* change)
{
std::string topic_name = get_writer_proxy_topic_name(iHandle2GUID(change->instanceHandle));
notify_discoverydatabase(topic_name, reader, change);
}

PDPServer* EDPServerSUBListener::get_pdp()
{
return sedp_->get_pdp();
Expand Down Expand Up @@ -198,12 +209,9 @@ void EDPServerSUBListener::onNewCacheChangeAdded(
change->writer_info.previous = nullptr;
change->writer_info.num_sent_submessages = 0;

// Get readers's GUID and EDP subscriptions' reader history
GUID_t auxGUID = iHandle2GUID(change->instanceHandle);
// Get EDP subscriptions' reader history
ReaderHistory* reader_history = reader->getHistory();



// DATA(r) case: new reader or updated information about an existing reader
if (change->kind == ALIVE)
{
Expand All @@ -214,6 +222,9 @@ void EDPServerSUBListener::onNewCacheChangeAdded(
// return it to the pool
add_reader_from_change(reader, reader_history, change, sedp_, false, reader_added_callback);

// DATA(w) case: Retrieve the topic after creating the ReaderProxyData (in add_reader_from_change()). This way, not matter
// whether the DATA(r) is a new one or an update, the ReaderProxyData exists, and so the topic can be retrieved

// Stop and wait for callback in case of TypeLookupService needed time to process the types
return;
}
Expand All @@ -223,29 +234,25 @@ void EDPServerSUBListener::onNewCacheChangeAdded(
//REMOVE WRITER FROM OUR READERS:
EPROSIMA_LOG_INFO(RTPS_EDP_LISTENER, "Disposed Remote Reader, removing...");

// DATA(Uw) case: Retrieve the topic before removing the ReaderProxyData. We need it to add the DATA(Ur) to the database
GUID_t auxGUID = iHandle2GUID(change->instanceHandle);
std::string topic_name = get_reader_proxy_topic_name(auxGUID);

// Remove ReaderProxy data information
get_pdp()->removeReaderProxyData(auxGUID);

// Removing change from history, not returning the change to the pool, since the ownership will be yielded to
// the database
reader_history->remove_change(reader_history->find_change(change), false);

// Continue without waiting
continue_with_reader(reader, change);
notify_discoverydatabase(topic_name, reader, change);
}
}

void EDPServerSUBListener::continue_with_reader(
RTPSReader* reader,
CacheChange_t* change)
std::string EDPServerSUBListener::get_reader_proxy_topic_name(
GUID_t auxGUID)
{
GUID_t auxGUID = iHandle2GUID(change->instanceHandle);
// String to store the topic of the reader
std::string topic_name = "";

// DATA(w) case: Retrieve the topic after creating the ReaderProxyData (in add_reader_from_change()). This way, not matter
// whether the DATA(r) is a new one or an update, the ReaderProxyData exists, and so the topic can be retrieved
// DATA(Uw) case: Retrieve the topic before removing the ReaderProxyData. We need it to add the DATA(Ur) to the database
auto temp_reader_data = get_pdp()->get_temporary_reader_proxies_pool().get();
if (get_pdp()->lookupReaderProxyData(auxGUID, *temp_reader_data))
{
Expand All @@ -255,7 +262,14 @@ void EDPServerSUBListener::continue_with_reader(
{
EPROSIMA_LOG_WARNING(RTPS_EDP_LISTENER, "Reader Proxy Data missing for change " << auxGUID);
}
return topic_name;
}

void EDPServerSUBListener::notify_discoverydatabase(
std::string topic_name,
RTPSReader* reader,
CacheChange_t* change)
{
// Notify the DiscoveryDataBase if it is enabled already
// In case it is not enable, the change should not be updated or released because it is been
// updated from a backup
Expand All @@ -282,6 +296,14 @@ void EDPServerSUBListener::continue_with_reader(
EPROSIMA_LOG_INFO(RTPS_EDP_LISTENER, "");
}

void EDPServerSUBListener::continue_with_reader(
RTPSReader* reader,
CacheChange_t* change)
{
std::string topic_name = get_reader_proxy_topic_name(iHandle2GUID(change->instanceHandle));
notify_discoverydatabase(topic_name, reader, change);
}

} /* namespace rtps */
} // namespace fastdds
} /* namespace eprosima */
24 changes: 20 additions & 4 deletions src/cpp/rtps/builtin/discovery/endpoint/EDPServerListeners.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,19 @@ class EDPServerPUBListener : public fastrtps::rtps::EDPBasePUBListener
fastrtps::rtps::RTPSReader* reader,
const fastrtps::rtps::CacheChange_t* const change) override;

void continue_with_writer(
private:

std::string get_writer_proxy_topic_name(
fastrtps::rtps::GUID_t auxGUID);

void notify_discoverydatabase(
std::string topic_name,
fastrtps::rtps::RTPSReader* reader,
fastrtps::rtps::CacheChange_t* change);

private:
void continue_with_writer(
fastrtps::rtps::RTPSReader* reader,
fastrtps::rtps::CacheChange_t* change);

//!Pointer to the EDPServer
EDPServer* sedp_;
Expand Down Expand Up @@ -110,11 +118,19 @@ class EDPServerSUBListener : public fastrtps::rtps::EDPBaseSUBListener
fastrtps::rtps::RTPSReader* reader,
const fastrtps::rtps::CacheChange_t* const change) override;

void continue_with_reader(
private:

std::string get_reader_proxy_topic_name(
fastrtps::rtps::GUID_t auxGUID);

void notify_discoverydatabase(
std::string topic_name,
fastrtps::rtps::RTPSReader* reader,
fastrtps::rtps::CacheChange_t* change);

private:
void continue_with_reader(
fastrtps::rtps::RTPSReader* reader,
fastrtps::rtps::CacheChange_t* change);

//!Pointer to the EDPServer
EDPServer* sedp_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace fastrtps {
namespace rtps {

using EndpointAddedCallback = std::function<
void (RTPSReader* reader, const CacheChange_t* const change)>;
void (RTPSReader* reader, const CacheChange_t* change)>;

class RTPSReader;
struct CacheChange_t;
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include <tuple>
#include <vector>

#include <fastdds/builtin/type_lookup_service/TypeLookupManager.hpp>
#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/attributes/RTPSParticipantAttributes.h>
#include <fastdds/rtps/history/ReaderHistory.h>
Expand All @@ -37,6 +36,7 @@
#include <fastdds/rtps/writer/ReaderProxy.h>
#include <fastdds/rtps/writer/StatefulWriter.h>

#include <fastdds/builtin/type_lookup_service/TypeLookupManager.hpp>
#include <rtps/builtin/BuiltinProtocols.h>
#include <rtps/builtin/discovery/endpoint/EDPClient.h>
#include <rtps/builtin/discovery/participant/DirectMessageSender.hpp>
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <mutex>
#include <set>

#include <fastdds/builtin/type_lookup_service/TypeLookupManager.hpp>
#include <fastdds/dds/core/policy/QosPolicies.hpp>
#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/history/History.h>
Expand All @@ -33,6 +32,7 @@
#include <fastdds/rtps/writer/StatefulWriter.h>
#include <fastdds/utils/TimedMutex.hpp>

#include <fastdds/builtin/type_lookup_service/TypeLookupManager.hpp>
#include <rtps/builtin/BuiltinProtocols.h>
#include <rtps/builtin/discovery/database/backup/SharedBackupFunctions.hpp>
#include <rtps/builtin/discovery/endpoint/EDPServer.hpp>
Expand Down

0 comments on commit 6d3bdd2

Please sign in to comment.