Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[17283] Fix issues in Dynamic Network Interfaces (backport #5282) (backport #5304) #5390

Merged
merged 1 commit into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/fastdds/rtps/builtin/discovery/participant/PDP.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

#include <fastdds/rtps/attributes/RTPSParticipantAttributes.h>
#include <fastdds/rtps/builtin/data/ReaderProxyData.h>
Expand Down Expand Up @@ -61,6 +64,7 @@ class RTPSWriter;
class RTPSReader;
class WriterHistory;
class ReaderHistory;
struct RTPSParticipantAllocationAttributes;
class RTPSParticipantImpl;
class RTPSParticipantListener;
class BuiltinProtocols;
Expand All @@ -70,6 +74,7 @@ class ReaderProxyData;
class WriterProxyData;
class ParticipantProxyData;
class ReaderListener;
class PDPEndpoints;
class PDPListener;
class PDPServerListener;
class ITopicPayloadPool;
Expand Down
265 changes: 202 additions & 63 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -762,11 +762,17 @@ class RTPSParticipantImpl
*/
void get_default_metatraffic_locators();

void get_default_metatraffic_locators(
RTPSParticipantAttributes& att);

/**
* Get default unicast locators when not provided by the user.
*/
void get_default_unicast_locators();

void get_default_unicast_locators(
RTPSParticipantAttributes& att);

bool match_local_endpoints_ = true;

bool should_match_local_endpoints(
Expand Down
4 changes: 4 additions & 0 deletions test/dds/communication/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,7 @@ if(Python3_Interpreter_FOUND)
endif()
endforeach()
endif()

if(UNIX AND NOT(APPLE) AND NOT(QNXNTO) AND NOT(ANDROID))
add_subdirectory(dyn_network)
endif()
6 changes: 3 additions & 3 deletions test/dds/communication/PubSubMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void publisher_run(
publisher->wait_discovery(wait);
}

publisher->run(samples, loops, interval);
publisher->run(samples, 0, loops, interval);
}

int main(
Expand Down Expand Up @@ -197,7 +197,7 @@ int main(
DomainParticipantFactory::get_instance()->load_XML_profiles_file(xml_file);
}

SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy);
SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy, false);
PublisherModule publisher(exit_on_lost_liveliness, fixed_type, zero_copy);

uint32_t result = 1;
Expand All @@ -208,7 +208,7 @@ int main(

if (subscriber.init(seed, magic))
{
result = subscriber.run(notexit, timeout) ? 0 : -1;
result = subscriber.run(notexit, 0, timeout) ? 0 : -1;
}

publisher_thread.join();
Expand Down
28 changes: 26 additions & 2 deletions test/dds/communication/PublisherMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ using namespace eprosima::fastdds::dds;
* --seed <int>
* --wait <int>
* --samples <int>
* --loops <int>
* --interval <int>
* --magic <str>
* --xmlfile <path>
* --interval <int>
* --rescan <int>
*/

int main(
Expand All @@ -46,7 +48,9 @@ int main(
uint32_t wait = 0;
char* xml_file = nullptr;
uint32_t samples = 4;
uint32_t loops = 0;
uint32_t interval = 250;
uint32_t rescan_interval_seconds = 0;
std::string magic;

while (arg_count < argc)
Expand Down Expand Up @@ -93,6 +97,16 @@ int main(

samples = strtol(argv[arg_count], nullptr, 10);
}
else if (strcmp(argv[arg_count], "--loops") == 0)
{
if (++arg_count >= argc)
{
std::cout << "--loops expects a parameter" << std::endl;
return -1;
}

loops = strtol(argv[arg_count], nullptr, 10);
}
else if (strcmp(argv[arg_count], "--interval") == 0)
{
if (++arg_count >= argc)
Expand Down Expand Up @@ -123,6 +137,16 @@ int main(

xml_file = argv[arg_count];
}
else if (strcmp(argv[arg_count], "--rescan") == 0)
{
if (++arg_count >= argc)
{
std::cout << "--rescan expects a parameter" << std::endl;
return -1;
}

rescan_interval_seconds = strtol(argv[arg_count], nullptr, 10);
}
else
{
std::cout << "Wrong argument " << argv[arg_count] << std::endl;
Expand All @@ -146,7 +170,7 @@ int main(
publisher.wait_discovery(wait);
}

publisher.run(samples, 0, interval);
publisher.run(samples, rescan_interval_seconds, loops, interval);
return 0;
}

Expand Down
20 changes: 20 additions & 0 deletions test/dds/communication/PublisherModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,30 @@ void PublisherModule::wait_discovery(

void PublisherModule::run(
uint32_t samples,
const uint32_t rescan_interval,
const uint32_t loops,
uint32_t interval)
{
uint32_t current_loop = 0;
uint16_t index = 1;
void* sample = nullptr;

std::thread net_rescan_thread([this, rescan_interval]()
{
if (rescan_interval > 0)
{
auto interval = std::chrono::seconds(rescan_interval);
while (run_)
{
std::this_thread::sleep_for(interval);
if (run_)
{
participant_->set_qos(participant_->get_qos());
}
}
}
});

while (run_ && (loops == 0 || loops > current_loop))
{
if (zero_copy_)
Expand Down Expand Up @@ -184,6 +201,9 @@ void PublisherModule::run(

std::this_thread::sleep_for(std::chrono::milliseconds(interval));
}

run_ = false;
net_rescan_thread.join();
}

void PublisherModule::on_publication_matched(
Expand Down
19 changes: 11 additions & 8 deletions test/dds/communication/PublisherModule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
#ifndef TEST_DDS_COMMUNICATION_PUBLISHERMODULE_HPP
#define TEST_DDS_COMMUNICATION_PUBLISHERMODULE_HPP

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>

#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/DomainParticipantListener.hpp>
#include <fastdds/dds/publisher/PublisherListener.hpp>
Expand All @@ -27,9 +32,6 @@
#include "types/FixedSizedPubSubTypes.h"
#include "types/HelloWorldPubSubTypes.h"

#include <mutex>
#include <condition_variable>

namespace eprosima {
namespace fastdds {
namespace dds {
Expand All @@ -41,8 +43,8 @@ class PublisherModule

PublisherModule(
bool exit_on_lost_liveliness,
bool fixed_type = false,
bool zero_copy = false)
bool fixed_type,
bool zero_copy)
: exit_on_lost_liveliness_(exit_on_lost_liveliness)
, fixed_type_(zero_copy || fixed_type) // If zero copy active, fixed type is required
, zero_copy_(zero_copy)
Expand Down Expand Up @@ -80,8 +82,9 @@ class PublisherModule

void run(
uint32_t samples,
uint32_t loops = 0,
uint32_t interval = 250);
const uint32_t rescan_interval,
uint32_t loops,
uint32_t interval);

private:

Expand All @@ -93,7 +96,7 @@ class PublisherModule
bool exit_on_lost_liveliness_ = false;
bool fixed_type_ = false;
bool zero_copy_ = false;
bool run_ = true;
std::atomic_bool run_{true};
DomainParticipant* participant_ = nullptr;
TypeSupport type_;
Publisher* publisher_ = nullptr;
Expand Down
18 changes: 15 additions & 3 deletions test/dds/communication/SubscriberMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ using namespace eprosima::fastdds::dds;
* --notexit
* --fixed_type
* --zero_copy
* --succeed_on_timeout
* --seed <int>
* --samples <int>
* --magic <str>
* --timeout <int>
* --xmlfile <path>
* --publishers <int>
* --succeed_on_timeout
* --timeout <int>
* --rescan <int>
*/

int main(
Expand All @@ -48,6 +49,7 @@ int main(
uint32_t samples = 4;
uint32_t publishers = 1;
uint32_t timeout = 86400000; // 24 h in ms
uint32_t rescan_interval_seconds = 0;
char* xml_file = nullptr;
std::string magic;

Expand Down Expand Up @@ -129,6 +131,16 @@ int main(

publishers = strtol(argv[arg_count], nullptr, 10);
}
else if (strcmp(argv[arg_count], "--rescan") == 0)
{
if (++arg_count >= argc)
{
std::cout << "--rescan expects a parameter" << std::endl;
return -1;
}

rescan_interval_seconds = strtol(argv[arg_count], nullptr, 10);
}
else
{
std::cout << "Wrong argument " << argv[arg_count] << std::endl;
Expand All @@ -147,7 +159,7 @@ int main(

if (subscriber.init(seed, magic))
{
return subscriber.run(notexit, timeout) ? 0 : -1;
return subscriber.run(notexit, rescan_interval_seconds, timeout) ? 0 : -1;
}

return -1;
Expand Down
25 changes: 23 additions & 2 deletions test/dds/communication/SubscriberModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,35 @@ bool SubscriberModule::init(

bool SubscriberModule::run(
bool notexit,
const uint32_t rescan_interval,
uint32_t timeout)
{
return run_for(notexit, std::chrono::milliseconds(timeout));
return run_for(notexit, rescan_interval, std::chrono::milliseconds(timeout));
}

bool SubscriberModule::run_for(
bool notexit,
const uint32_t rescan_interval,
const std::chrono::milliseconds& timeout)
{
bool returned_value = false;

std::thread net_rescan_thread([this, rescan_interval]()
{
if (rescan_interval > 0)
{
auto interval = std::chrono::seconds(rescan_interval);
while (run_)
{
std::this_thread::sleep_for(interval);
if (run_)
{
participant_->set_qos(participant_->get_qos());
}
}
}
});

while (notexit && run_)
{
std::this_thread::sleep_for(std::chrono::milliseconds(250));
Expand All @@ -152,7 +170,7 @@ bool SubscriberModule::run_for(
std::unique_lock<std::mutex> lock(mutex_);
returned_value = cv_.wait_for(lock, timeout, [&]
{
if (succeeed_on_timeout_ && (std::chrono::steady_clock::now() - t0) > timeout)
if (succeed_on_timeout_ && (std::chrono::steady_clock::now() - t0) > timeout)
{
return true;
}
Expand Down Expand Up @@ -190,6 +208,9 @@ bool SubscriberModule::run_for(
returned_value = false;
}

run_ = false;
net_rescan_thread.join();

return returned_value;
}

Expand Down
Loading
Loading