Skip to content

Commit

Permalink
Refs #20815: Only apply filter to ALIVE changes
Browse files Browse the repository at this point in the history
Signed-off-by: eduponz <eduardoponz@eprosima.com>
  • Loading branch information
EduPonz committed May 23, 2024
1 parent 2405910 commit 51eac25
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 15 deletions.
1 change: 1 addition & 0 deletions src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ set(${PROJECT_NAME}_source_files
rtps/participant/RTPSParticipant.cpp
rtps/participant/RTPSParticipantImpl.cpp
rtps/persistence/PersistenceFactory.cpp
rtps/reader/reader_utils.cpp
rtps/reader/RTPSReader.cpp
rtps/reader/StatefulPersistentReader.cpp
rtps/reader/StatefulReader.cpp
Expand Down
14 changes: 9 additions & 5 deletions src/cpp/fastdds/publisher/filtering/ReaderFilterCollection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,17 @@ class ReaderFilterCollection
// Copy the signature
std::copy(entry.filter_signature.begin(), entry.filter_signature.end(), signature);

// Evaluate filter and update filtered_out_readers
bool filter_result = entry.filter->evaluate(change.serializedPayload, info, it->first);
if (!filter_result)
// Only evaluate filter on ALIVE changes, as UNREGISTERED and DISPOSED are always relevant
bool filter_result = true;
if (fastrtps::rtps::ALIVE == change.kind)
{
change.filtered_out_readers.emplace_back(it->first);
// Evaluate filter and update filtered_out_readers
filter_result = entry.filter->evaluate(change.serializedPayload, info, it->first);
if (!filter_result)
{
change.filtered_out_readers.emplace_back(it->first);
}
}

return filter_result;
};

Expand Down
11 changes: 6 additions & 5 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include <fastdds/rtps/reader/ReaderListener.h>
#include <fastdds/rtps/reader/StatefulReader.h>

#include "reader_utils.hpp"
#include "rtps/RTPSDomainImpl.hpp"
#include <rtps/builtin/BuiltinProtocols.h>
#include <rtps/builtin/liveliness/WLP.h>
#include <rtps/DataSharing/DataSharingListener.hpp>
Expand All @@ -36,9 +38,6 @@
#include <rtps/participant/RTPSParticipantImpl.h>
#include <rtps/reader/WriterProxy.h>
#include <rtps/writer/LivelinessManager.hpp>

#include "rtps/RTPSDomainImpl.hpp"

#ifdef FASTDDS_STATISTICS
#include <statistics/types/monitorservice_types.hpp>
#endif // FASTDDS_STATISTICS
Expand Down Expand Up @@ -587,14 +586,15 @@ bool StatefulReader::processDataMsg(
return false;
}

if (data_filter_ && !data_filter_->is_relevant(*change, m_guid))
if (!fastdds::rtps::change_is_relevant_for_filter(*change, m_guid, data_filter_))
{
if (pWP)
{
pWP->irrelevant_change_set(change->sequenceNumber);
NotifyChanges(pWP);
send_ack_if_datasharing(this, mp_history, pWP, change->sequenceNumber);
}
// Change was filtered out, so there isn't anything else to do
return true;
}

Expand Down Expand Up @@ -770,7 +770,8 @@ bool StatefulReader::processDataFragMsg(

// Temporarilly assign the inline qos while evaluating the data filter
work_change->inline_qos = incomingChange->inline_qos;
bool filtered_out = data_filter_ && !data_filter_->is_relevant(*work_change, m_guid);
bool filtered_out =
!fastdds::rtps::change_is_relevant_for_filter(*work_change, m_guid, data_filter_);
work_change->inline_qos = SerializedPayload_t();

if (filtered_out)
Expand Down
11 changes: 6 additions & 5 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@
#include <fastdds/rtps/reader/ReaderListener.h>
#include <fastdds/rtps/reader/StatelessReader.h>

#include "reader_utils.hpp"
#include "rtps/RTPSDomainImpl.hpp"
#include <rtps/builtin/BuiltinProtocols.h>
#include <rtps/builtin/liveliness/WLP.h>
#include <rtps/DataSharing/DataSharingListener.hpp>
#include <rtps/DataSharing/ReaderPool.hpp>
#include <rtps/participant/RTPSParticipantImpl.h>
#include <rtps/writer/LivelinessManager.hpp>

#include "rtps/RTPSDomainImpl.hpp"

#ifdef FASTDDS_STATISTICS
#include <statistics/types/monitorservice_types.hpp>
#endif // FASTDDS_STATISTICS
Expand Down Expand Up @@ -582,9 +581,10 @@ bool StatelessReader::processDataMsg(
return false;
}

if (data_filter_ && !data_filter_->is_relevant(*change, m_guid))
if (!fastdds::rtps::change_is_relevant_for_filter(*change, m_guid, data_filter_))
{
update_last_notified(change->writerGUID, change->sequenceNumber);
// Change was filtered out, so there isn't anything else to do
return true;
}

Expand Down Expand Up @@ -797,7 +797,8 @@ bool StatelessReader::processDataFragMsg(
{
// Temporarilly assign the inline qos while evaluating the data filter
change_completed->inline_qos = incomingChange->inline_qos;
bool filtered_out = data_filter_ && !data_filter_->is_relevant(*change_completed, m_guid);
bool filtered_out = !fastdds::rtps::change_is_relevant_for_filter(*change_completed, m_guid,
data_filter_);
change_completed->inline_qos = SerializedPayload_t();

if (filtered_out)
Expand Down
45 changes: 45 additions & 0 deletions src/cpp/rtps/reader/reader_utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file reader_utils.cpp
*/

#include "reader_utils.hpp"

#include <fastdds/rtps/common/ChangeKind_t.hpp>

namespace eprosima {
namespace fastdds {
namespace rtps {

bool change_is_relevant_for_filter(
const CacheChange& change,
const GUID& guid,
const IReaderDataFilter* filter)
{
bool ret = true;

// Only evaluate filter on ALIVE changes, as UNREGISTERED and DISPOSED are always relevant
if ((nullptr != filter) && (fastrtps::rtps::ALIVE == change.kind) && (!filter->is_relevant(change, guid)))
{
ret = false;
}

return ret;
}

} // namespace rtps
} // namespace fastdds
} // namespace eprosima
53 changes: 53 additions & 0 deletions src/cpp/rtps/reader/reader_utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file reader_utils.hpp
*/

#ifndef _FASTDDS_RTPS_READER_READERUTILS_H_
#define _FASTDDS_RTPS_READER_READERUTILS_H_

#include <fastdds/rtps/common/CacheChange.h>
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
#include <fastdds/rtps/common/ChangeKind_t.hpp>

namespace eprosima {
namespace fastdds {
namespace rtps {

using CacheChange = fastrtps::rtps::CacheChange_t;
using GUID = fastrtps::rtps::GUID_t;

/**
* @brief Check if a change is relevant for a reader.
*
* @param change The CacheChange_t to be evaluated.
* @param reader_guid Remote reader GUID_t.
* @param filter The IReaderDataFilter to be used.
*
* @return true if relevant, false otherwise.
*/
bool change_is_relevant_for_filter(
const CacheChange& change,
const GUID& guid,
const IReaderDataFilter* filter);

} // namespace rtps
} // namespace fastdds
} // namespace eprosima


#endif // _FASTDDS_RTPS_READER_READERUTILS_H_
1 change: 1 addition & 0 deletions test/unittest/dds/publisher/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ set(DATAWRITERTESTS_SOURCE DataWriterTests.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/participant/RTPSParticipant.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/participant/RTPSParticipantImpl.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/PersistenceFactory.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/reader_utils.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/RTPSReader.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/StatefulPersistentReader.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/StatefulReader.cpp
Expand Down
2 changes: 2 additions & 0 deletions test/unittest/statistics/dds/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ if (SQLITE3_SUPPORT AND FASTDDS_STATISTICS AND NOT QNX)
${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/PersistenceFactory.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/sqlite3.c
${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/SQLite3PersistenceService.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/reader_utils.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/RTPSReader.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/StatefulPersistentReader.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/StatefulReader.cpp
Expand Down Expand Up @@ -410,6 +411,7 @@ if (SQLITE3_SUPPORT AND FASTDDS_STATISTICS AND NOT QNX)
${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/PersistenceFactory.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/sqlite3.c
${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/SQLite3PersistenceService.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/reader_utils.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/RTPSReader.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/StatefulPersistentReader.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/StatefulReader.cpp
Expand Down

0 comments on commit 51eac25

Please sign in to comment.