From 63cb1b8edc36fa9638ff4149174ce4a40ad90fe5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez=20Moreno?= Date: Wed, 27 Nov 2019 08:40:10 +0100 Subject: [PATCH 1/2] Refs #6929. Applying uncrustify. --- include/fastrtps/rtps/history/ReaderHistory.h | 22 +++-- src/cpp/rtps/history/ReaderHistory.cpp | 88 +++++++++++-------- 2 files changed, 65 insertions(+), 45 deletions(-) diff --git a/include/fastrtps/rtps/history/ReaderHistory.h b/include/fastrtps/rtps/history/ReaderHistory.h index 77273efe23f..5bf06e45e39 100644 --- a/include/fastrtps/rtps/history/ReaderHistory.h +++ b/include/fastrtps/rtps/history/ReaderHistory.h @@ -24,7 +24,7 @@ #include "../common/CacheChange.h" namespace eprosima { -namespace fastrtps{ +namespace fastrtps { namespace rtps { class WriterProxy; @@ -39,10 +39,12 @@ class ReaderHistory : public History friend class RTPSReader; public: + /** * Constructor of the ReaderHistory. It needs a HistoryAttributes. */ - RTPS_DllAPI ReaderHistory(const HistoryAttributes& att); + RTPS_DllAPI ReaderHistory( + const HistoryAttributes& att); RTPS_DllAPI virtual ~ReaderHistory(); /** @@ -52,28 +54,31 @@ class ReaderHistory : public History * @param change Pointer to the change * @return True if added. */ - RTPS_DllAPI virtual bool received_change(CacheChange_t* change, size_t); + RTPS_DllAPI virtual bool received_change(CacheChange_t * change, size_t); /** * Add a CacheChange_t to the ReaderHistory. * @param a_change Pointer to the CacheChange to add. * @return True if added. */ - RTPS_DllAPI bool add_change(CacheChange_t* a_change); + RTPS_DllAPI bool add_change( + CacheChange_t* a_change); /** * Remove a CacheChange_t from the ReaderHistory. * @param a_change Pointer to the CacheChange to remove. * @return True if removed. */ - RTPS_DllAPI bool remove_change(CacheChange_t* a_change) override; + RTPS_DllAPI bool remove_change( + CacheChange_t* a_change) override; /** * Remove all changes from the History that have a certain guid. * @param a_guid Pointer to the target guid to search for. * @return True if succesful, even if no changes have been removed. * */ - RTPS_DllAPI bool remove_changes_with_guid(const GUID_t& a_guid); + RTPS_DllAPI bool remove_changes_with_guid( + const GUID_t& a_guid); /** * Remove all fragmented changes from certain writer up to certain sequence number. @@ -94,9 +99,12 @@ class ReaderHistory : public History */ RTPS_DllAPI void updateMaxMinSeqNum() override; - RTPS_DllAPI bool get_min_change_from(CacheChange_t** min_change, const GUID_t& writerGuid); + RTPS_DllAPI bool get_min_change_from( + CacheChange_t** min_change, + const GUID_t& writerGuid); protected: + //!Pointer to the reader RTPSReader* mp_reader; }; diff --git a/src/cpp/rtps/history/ReaderHistory.cpp b/src/cpp/rtps/history/ReaderHistory.cpp index 6c7a594dd8a..b54f628b27f 100644 --- a/src/cpp/rtps/history/ReaderHistory.cpp +++ b/src/cpp/rtps/history/ReaderHistory.cpp @@ -27,10 +27,11 @@ #include namespace eprosima { -namespace fastrtps{ +namespace fastrtps { namespace rtps { -ReaderHistory::ReaderHistory(const HistoryAttributes& att) +ReaderHistory::ReaderHistory( + const HistoryAttributes& att) : History(att) , mp_reader(nullptr) { @@ -40,62 +41,67 @@ ReaderHistory::~ReaderHistory() { } -bool ReaderHistory::received_change(CacheChange_t* change, size_t) +bool ReaderHistory::received_change( + CacheChange_t* change, + size_t) { return add_change(change); } -bool ReaderHistory::add_change(CacheChange_t* a_change) +bool ReaderHistory::add_change( + CacheChange_t* a_change) { - if(mp_reader == nullptr || mp_mutex == nullptr) + if (mp_reader == nullptr || mp_mutex == nullptr) { - logError(RTPS_HISTORY,"You need to create a Reader with this History before adding any changes"); + logError(RTPS_HISTORY, "You need to create a Reader with this History before adding any changes"); return false; } std::lock_guard guard(*mp_mutex); - if(m_att.memoryPolicy == PREALLOCATED_MEMORY_MODE && a_change->serializedPayload.length > m_att.payloadMaxSize) + if (m_att.memoryPolicy == PREALLOCATED_MEMORY_MODE && a_change->serializedPayload.length > m_att.payloadMaxSize) { logError(RTPS_HISTORY, - "Change payload size of '" << a_change->serializedPayload.length << - "' bytes is larger than the history payload size of '" << m_att.payloadMaxSize << - "' bytes and cannot be resized."); + "Change payload size of '" << a_change->serializedPayload.length << + "' bytes is larger than the history payload size of '" << m_att.payloadMaxSize << + "' bytes and cannot be resized."); return false; } - if(a_change->writerGUID == c_Guid_Unknown) + if (a_change->writerGUID == c_Guid_Unknown) { - logError(RTPS_HISTORY,"The Writer GUID_t must be defined"); + logError(RTPS_HISTORY, "The Writer GUID_t must be defined"); } m_changes.push_back(a_change); sortCacheChanges(); updateMaxMinSeqNum(); - logInfo(RTPS_HISTORY, "Change " << a_change->sequenceNumber << " added with " << a_change->serializedPayload.length << " bytes"); + logInfo(RTPS_HISTORY, + "Change " << a_change->sequenceNumber << " added with " << a_change->serializedPayload.length << " bytes"); return true; } -bool ReaderHistory::remove_change(CacheChange_t* a_change) +bool ReaderHistory::remove_change( + CacheChange_t* a_change) { - if(mp_reader == nullptr || mp_mutex == nullptr) + if (mp_reader == nullptr || mp_mutex == nullptr) { - logError(RTPS_HISTORY,"You need to create a Reader with this History before removing any changes"); + logError(RTPS_HISTORY, "You need to create a Reader with this History before removing any changes"); return false; } std::lock_guard guard(*mp_mutex); - if(a_change == nullptr) + if (a_change == nullptr) { - logError(RTPS_HISTORY,"Pointer is not valid") + logError(RTPS_HISTORY, "Pointer is not valid") return false; } - for(std::vector::iterator chit = m_changes.begin(); - chit!=m_changes.end();++chit) + for (std::vector::iterator chit = m_changes.begin(); + chit != m_changes.end(); ++chit) { - if((*chit)->sequenceNumber == a_change->sequenceNumber && + if ((*chit)->sequenceNumber == a_change->sequenceNumber && (*chit)->writerGUID == a_change->writerGUID) { - logInfo(RTPS_HISTORY,"Removing change "<< a_change->sequenceNumber); + logInfo(RTPS_HISTORY, "Removing change " << a_change->sequenceNumber); mp_reader->change_removed_by_history(a_change); m_changePool.release_Cache(a_change); m_changes.erase(chit); @@ -104,36 +110,38 @@ bool ReaderHistory::remove_change(CacheChange_t* a_change) return true; } } - logWarning(RTPS_HISTORY,"SequenceNumber "<sequenceNumber << " not found"); + logWarning(RTPS_HISTORY, "SequenceNumber " << a_change->sequenceNumber << " not found"); return false; } -bool ReaderHistory::remove_changes_with_guid(const GUID_t& a_guid) +bool ReaderHistory::remove_changes_with_guid( + const GUID_t& a_guid) { std::vector changes_to_remove; - if(mp_reader == nullptr || mp_mutex == nullptr) + if (mp_reader == nullptr || mp_mutex == nullptr) { - logError(RTPS_HISTORY,"You need to create a Reader with History before removing any changes"); + logError(RTPS_HISTORY, "You need to create a Reader with History before removing any changes"); return false; } {//Lock scope std::lock_guard guard(*mp_mutex); - for(std::vector::iterator chit = m_changes.begin(); chit!=m_changes.end();++chit) + for (std::vector::iterator chit = m_changes.begin(); chit != m_changes.end(); ++chit) { - if((*chit)->writerGUID == a_guid) + if ((*chit)->writerGUID == a_guid) { changes_to_remove.push_back( (*chit) ); } } }//End lock scope - for(std::vector::iterator chit = changes_to_remove.begin(); chit != changes_to_remove.end(); ++chit) + for (std::vector::iterator chit = changes_to_remove.begin(); chit != changes_to_remove.end(); + ++chit) { - if(!remove_change(*chit)) + if (!remove_change(*chit)) { - logError(RTPS_HISTORY,"One of the cachechanged in the GUID removal bulk could not be removed"); + logError(RTPS_HISTORY, "One of the cachechanged in the GUID removal bulk could not be removed"); return false; } } @@ -190,13 +198,15 @@ bool ReaderHistory::remove_fragmented_changes_until( void ReaderHistory::sortCacheChanges() { std::sort(m_changes.begin(), - m_changes.end(), - [](CacheChange_t* c1, CacheChange_t* c2){ return c1->sourceTimestamp < c2->sourceTimestamp; }); + m_changes.end(), + [](CacheChange_t* c1, CacheChange_t* c2){ + return c1->sourceTimestamp < c2->sourceTimestamp; + }); } void ReaderHistory::updateMaxMinSeqNum() { - if(m_changes.size()==0) + if (m_changes.size() == 0) { mp_minSeqCacheChange = mp_invalidCache; mp_maxSeqCacheChange = mp_invalidCache; @@ -204,8 +214,10 @@ void ReaderHistory::updateMaxMinSeqNum() else { auto minmax = std::minmax_element(m_changes.begin(), - m_changes.end(), - [](CacheChange_t* c1, CacheChange_t* c2){ return c1->sequenceNumber < c2->sequenceNumber; }); + m_changes.end(), + [](CacheChange_t* c1, CacheChange_t* c2){ + return c1->sequenceNumber < c2->sequenceNumber; + }); mp_minSeqCacheChange = *(minmax.first); mp_maxSeqCacheChange = *(minmax.second); } @@ -218,9 +230,9 @@ bool ReaderHistory::get_min_change_from( bool ret = false; *min_change = nullptr; - for(auto it = m_changes.begin(); it != m_changes.end(); ++it) + for (auto it = m_changes.begin(); it != m_changes.end(); ++it) { - if((*it)->writerGUID == writerGuid) + if ((*it)->writerGUID == writerGuid) { *min_change = *it; ret = true; From c3e145dfd35453aa95f6c5ccbf0674832d5a6cec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez=20Moreno?= Date: Wed, 27 Nov 2019 08:52:16 +0100 Subject: [PATCH 2/2] Refs #6929. Improving readerhistory sorting. --- include/fastrtps/rtps/history/ReaderHistory.h | 4 --- src/cpp/rtps/history/ReaderHistory.cpp | 27 ++++++++++--------- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/include/fastrtps/rtps/history/ReaderHistory.h b/include/fastrtps/rtps/history/ReaderHistory.h index 5bf06e45e39..dbdc906f7bc 100644 --- a/include/fastrtps/rtps/history/ReaderHistory.h +++ b/include/fastrtps/rtps/history/ReaderHistory.h @@ -90,10 +90,6 @@ class ReaderHistory : public History const SequenceNumber_t& seq_num, const GUID_t& writer_guid); - /** - * Sort the CacheChange_t from the History by timestamp - */ - RTPS_DllAPI void sortCacheChanges(); /** * Update the maximum and minimum sequenceNumber cacheChanges. */ diff --git a/src/cpp/rtps/history/ReaderHistory.cpp b/src/cpp/rtps/history/ReaderHistory.cpp index b54f628b27f..62de80ae14e 100644 --- a/src/cpp/rtps/history/ReaderHistory.cpp +++ b/src/cpp/rtps/history/ReaderHistory.cpp @@ -71,8 +71,20 @@ bool ReaderHistory::add_change( logError(RTPS_HISTORY, "The Writer GUID_t must be defined"); } - m_changes.push_back(a_change); - sortCacheChanges(); + if (!m_changes.empty() && a_change->sourceTimestamp < (*m_changes.rbegin())->sourceTimestamp) + { + auto it = std::lower_bound(m_changes.begin(), m_changes.end(), a_change, + [](const CacheChange_t* c1, const CacheChange_t* c2) -> bool + { + return c1->sourceTimestamp < c2->sourceTimestamp; + }); + m_changes.insert(it, a_change); + } + else + { + m_changes.push_back(a_change); + } + updateMaxMinSeqNum(); logInfo(RTPS_HISTORY, "Change " << a_change->sequenceNumber << " added with " << a_change->serializedPayload.length << " bytes"); @@ -105,7 +117,6 @@ bool ReaderHistory::remove_change( mp_reader->change_removed_by_history(a_change); m_changePool.release_Cache(a_change); m_changes.erase(chit); - sortCacheChanges(); updateMaxMinSeqNum(); return true; } @@ -188,22 +199,12 @@ bool ReaderHistory::remove_fragmented_changes_until( if (at_least_one_removed) { - sortCacheChanges(); updateMaxMinSeqNum(); } return true; } -void ReaderHistory::sortCacheChanges() -{ - std::sort(m_changes.begin(), - m_changes.end(), - [](CacheChange_t* c1, CacheChange_t* c2){ - return c1->sourceTimestamp < c2->sourceTimestamp; - }); -} - void ReaderHistory::updateMaxMinSeqNum() { if (m_changes.size() == 0)