Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20351] Fix on_sample_lost notification on best-effort readers for framented samples (backport #4187) #4606

Merged
merged 3 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,26 @@ bool StatelessReader::processDataFragMsg(
{
if (work_change->sequenceNumber < change_to_add->sequenceNumber)
{
SequenceNumber_t updated_seq = work_change->sequenceNumber;
SequenceNumber_t previous_seq{ 0, 0 };
previous_seq = update_last_notified(writer_guid, updated_seq);

// Notify lost samples
auto listener = getListener();
if (listener != nullptr)
{
if (SequenceNumber_t{ 0, 0 } != previous_seq)
{
assert(previous_seq < updated_seq);
uint64_t tmp = (updated_seq - previous_seq).to64long();
int32_t lost_samples =
tmp > static_cast<uint64_t>(std::numeric_limits<int32_t>::max()) ?
std::numeric_limits<int32_t>::max() : static_cast<int32_t>(tmp);
assert (0 < lost_samples);
listener->on_sample_lost(this, lost_samples);
}
}

// Pending change should be dropped. Check if it can be reused
if (sampleSize <= work_change->serializedPayload.max_size)
{
Expand Down
1 change: 1 addition & 0 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,7 @@ class PubSubReader
uint32_t sockerBufferSize)
{
participant_qos_.transport().listen_socket_buffer_size = sockerBufferSize;
participant_qos_.transport().send_socket_buffer_size = sockerBufferSize;
return *this;
}

Expand Down
1 change: 1 addition & 0 deletions test/blackbox/api/dds-pim/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,7 @@ class PubSubWriter
uint32_t sockerBufferSize)
{
participant_qos_.transport().listen_socket_buffer_size = sockerBufferSize;
participant_qos_.transport().send_socket_buffer_size = sockerBufferSize;
return *this;
}

Expand Down
137 changes: 132 additions & 5 deletions test/blackbox/common/DDSBlackboxTestsListeners.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -674,16 +674,29 @@ TEST_P(DDSStatus, DataAvailableConditions)
subscriber_reader.wait_waitset_timeout();
}

// We want to ensure that samples are only lost due to the custom filter we have set in sample_lost_test_dw_init.
// Since we are going to send 300KB samples in the test for fragments, let's increase the buffer size to avoid any
// other possible loss.
static constexpr uint32_t SAMPLE_LOST_TEST_BUFFER_SIZE =
300ul * 1024ul // sample size
* 13ul // number of samples
* 2ul; // 2x to avoid any possible loss

template<typename T>
void sample_lost_test_dw_init(
PubSubWriter<HelloWorldPubSubType>& writer)
PubSubWriter<T>& writer)
{
auto testTransport = std::make_shared<test_UDPv4TransportDescriptor>();
testTransport->sendBufferSize = SAMPLE_LOST_TEST_BUFFER_SIZE;
testTransport->receiveBufferSize = SAMPLE_LOST_TEST_BUFFER_SIZE;

testTransport->drop_data_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool
{
uint32_t old_pos = msg.pos;

// see RTPS DDS 9.4.5.3 Data Submessage
EntityId_t readerID, writerID;
EntityId_t readerID;
EntityId_t writerID;
SequenceNumber_t sn;

msg.pos += 2; // flags
Expand Down Expand Up @@ -711,6 +724,43 @@ void sample_lost_test_dw_init(

return false;
};
testTransport->drop_data_frag_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool
{
uint32_t old_pos = msg.pos;

// see RTPS DDS 9.4.5.4 DataFrag Submessage
EntityId_t readerID;
EntityId_t writerID;
SequenceNumber_t sn;
uint32_t first_fragment = 0;

msg.pos += 2; // flags
msg.pos += 2; // octets to inline quos
CDRMessage::readEntityId(&msg, &readerID);
CDRMessage::readEntityId(&msg, &writerID);
CDRMessage::readSequenceNumber(&msg, &sn);
CDRMessage::readUInt32(&msg, &first_fragment);

// restore buffer pos
msg.pos = old_pos;

// generate losses
if ((writerID.value[3] & 0xC0) == 0 // only user endpoints
&& (1 == first_fragment) // only first fragment
&& (sn == SequenceNumber_t{0, 2} ||
sn == SequenceNumber_t(0, 3) ||
sn == SequenceNumber_t(0, 4) ||
sn == SequenceNumber_t(0, 6) ||
sn == SequenceNumber_t(0, 8) ||
sn == SequenceNumber_t(0, 10) ||
sn == SequenceNumber_t(0, 11) ||
sn == SequenceNumber_t(0, 13)))
{
return true;
}

return false;
};


writer.disable_builtin_transport()
Expand All @@ -721,8 +771,9 @@ void sample_lost_test_dw_init(

}

template<typename T>
void sample_lost_test_dr_init(
PubSubReader<HelloWorldPubSubType>& reader,
PubSubReader<T>& reader,
std::function<void(const eprosima::fastdds::dds::SampleLostStatus& status)> functor)
{
reader.sample_lost_status_functor(functor)
Expand All @@ -731,11 +782,15 @@ void sample_lost_test_dr_init(
ASSERT_TRUE(reader.isInitialized());
}

template<typename T>
void sample_lost_test_init(
PubSubReader<HelloWorldPubSubType>& reader,
PubSubWriter<HelloWorldPubSubType>& writer,
PubSubReader<T>& reader,
PubSubWriter<T>& writer,
std::function<void(const eprosima::fastdds::dds::SampleLostStatus& status)> functor)
{
reader.socket_buffer_size(SAMPLE_LOST_TEST_BUFFER_SIZE);
writer.socket_buffer_size(SAMPLE_LOST_TEST_BUFFER_SIZE);

sample_lost_test_dw_init(writer);
sample_lost_test_dr_init(reader, functor);

Expand Down Expand Up @@ -802,6 +857,78 @@ TEST(DDSStatus, sample_lost_be_dw_be_dr)
});
}

/*!
* \test DDS-STS-SLS-01 Test `SampleLostStatus` in a Best-Effort DataWriter and a Best-Effort DataReader communication.
* This is also a regression test for bug redmine 20162
*/
TEST(DDSStatus, sample_lost_be_dw_be_dr_fragments)
{
PubSubReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
PubSubWriter<Data1mbPubSubType> writer(TEST_TOPIC_NAME);

std::mutex test_step_mtx;
std::condition_variable test_step_cv;
uint8_t test_step = 0;

writer.reliability(eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS);
reader.reliability(eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS);

sample_lost_test_init(reader, writer, [&test_step_mtx, &test_step_cv, &test_step](
const eprosima::fastdds::dds::SampleLostStatus& status)
{
{
std::unique_lock<std::mutex> lock(test_step_mtx);
std::cout << status.total_count << " " << status.total_count_change << std::endl;
if (0 == test_step && 1 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else if (1 == test_step && 2 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else if (2 == test_step && 3 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else if (3 == test_step && 4 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else if (4 == test_step && 5 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else if (5 == test_step && 6 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else if (6 == test_step && 7 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else
{
test_step = 0;
}
}

test_step_cv.notify_all();
});


auto data = default_data300kb_data_generator(13);

reader.startReception(data);
writer.send(data, 100);

std::unique_lock<std::mutex> lock(test_step_mtx);
test_step_cv.wait(lock, [&test_step]()
{
return 7 == test_step;
});
}

/*!
* \test DDS-STS-SLS-02 Test `SampleLostStatus` in a Best-Effort DataWriter and a late-joiner Best-Effort DataReader
* communication.
Expand Down
Loading