From f845305a754629947acaaf9f7d608a40d974b79f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez=20Moreno?= Date: Wed, 27 Nov 2019 11:02:44 +0100 Subject: [PATCH 1/4] Refs #6929. Applying uncrustify. --- include/fastdds/rtps/history/History.h | 334 +++++++++++++---------- include/fastdds/rtps/reader/RTPSReader.h | 53 ++-- src/cpp/rtps/history/History.cpp | 65 ++--- src/cpp/rtps/reader/RTPSReader.cpp | 30 +- 4 files changed, 267 insertions(+), 215 deletions(-) diff --git a/include/fastdds/rtps/history/History.h b/include/fastdds/rtps/history/History.h index fd2bcf3ee5e..bb584a3c97f 100644 --- a/include/fastdds/rtps/history/History.h +++ b/include/fastdds/rtps/history/History.h @@ -32,7 +32,7 @@ #include namespace eprosima { -namespace fastrtps{ +namespace fastrtps { namespace rtps { /** @@ -41,153 +41,191 @@ namespace rtps { */ class History { - protected: - History(const HistoryAttributes& att); - History(History&&) = delete; - History& operator=(History&&) = delete; - virtual ~History(); - public: - //!Attributes of the History - HistoryAttributes m_att; - /** - * Reserve a CacheChange_t from the CacheChange pool. - * @param[out] change Pointer to pointer to the CacheChange_t to reserve - * @param[in] calculateSizeFunc Function to calculate the size of the change. - * @return True is reserved - */ - RTPS_DllAPI inline bool reserve_Cache( - CacheChange_t** change, - const std::function& calculateSizeFunc) - { - std::lock_guard guard(*mp_mutex); - return m_changePool.reserve_Cache(change, calculateSizeFunc); - } - - RTPS_DllAPI inline bool reserve_Cache(CacheChange_t** change, uint32_t dataSize) - { - std::lock_guard guard(*mp_mutex); - return m_changePool.reserve_Cache(change, dataSize); - } - - /** - * release a previously reserved CacheChange_t. - * @param ch Pointer to the CacheChange_t. - */ - RTPS_DllAPI inline void release_Cache(CacheChange_t* ch) - { - std::lock_guard guard(*mp_mutex); - return m_changePool.release_Cache(ch); - } - - /** - * Check if the history is full - * @return true if the History is full. - */ - RTPS_DllAPI bool isFull() { return m_isHistoryFull; } - - /** - * Get the History size. - * @return Size of the history. - */ - RTPS_DllAPI size_t getHistorySize() - { - std::lock_guard guard(*mp_mutex); - return m_changes.size(); - } - - /** - * Remove all changes from the History - * @return True if everything was correctly removed. - */ - RTPS_DllAPI bool remove_all_changes(); - - /** - * Update the maximum and minimum sequenceNumbers. - */ - virtual void updateMaxMinSeqNum()=0; - - /** - * Remove a specific change from the history. - * @param ch Pointer to the CacheChange_t. - * @return True if removed. - */ - virtual bool remove_change(CacheChange_t* ch) = 0; - - /** - * Get the beginning of the changes history iterator. - * @return Iterator to the beginning of the vector. - */ - RTPS_DllAPI std::vector::iterator changesBegin(){ return m_changes.begin(); } - RTPS_DllAPI std::vector::reverse_iterator changesRbegin() { return m_changes.rbegin(); } - /** - * Get the end of the changes history iterator. - * @return Iterator to the end of the vector. - */ - RTPS_DllAPI std::vector::iterator changesEnd(){ return m_changes.end(); } - RTPS_DllAPI std::vector::reverse_iterator changesRend() { return m_changes.rend(); } - /** - * Get the minimum CacheChange_t. - * @param min_change Pointer to pointer to the minimum change. - * @return True if correct. - */ - RTPS_DllAPI bool get_min_change(CacheChange_t** min_change); - - /** - * Get the maximum CacheChange_t. - * @param max_change Pointer to pointer to the maximum change. - * @return True if correct. - */ - RTPS_DllAPI bool get_max_change(CacheChange_t** max_change); - - /** - * Get the maximum serialized payload size - * @return Maximum serialized payload size - */ - RTPS_DllAPI inline uint32_t getTypeMaxSerialized(){ return m_changePool.getInitialPayloadSize(); } - - /*! - * Get the mutex - * @return Mutex - */ - RTPS_DllAPI inline RecursiveTimedMutex* getMutex() { assert(mp_mutex != nullptr); return mp_mutex; } - - RTPS_DllAPI bool get_change( - const SequenceNumber_t& seq, - const GUID_t& guid, - CacheChange_t** change) const; - - /** - * @brief A method to get the change with the earliest timestamp - * @param change Pointer to pointer to earliest change - * @return True on success - */ - bool get_earliest_change(CacheChange_t** change); - - protected: - - //!Vector of pointers to the CacheChange_t. - std::vector m_changes; - - //!Variable to know if the history is full without needing to block the History mutex. - bool m_isHistoryFull; - - //!Pointer to and invalid cacheChange used to return the maximum and minimum when no changes are stored in the history. - CacheChange_t* mp_invalidCache; - - //!Pool of cache changes reserved when the History is created. - CacheChangePool m_changePool; - - //!Pointer to the minimum sequeceNumber CacheChange. - CacheChange_t* mp_minSeqCacheChange; - - //!Pointer to the maximum sequeceNumber CacheChange. - CacheChange_t* mp_maxSeqCacheChange; - - //!Print the seqNum of the changes in the History (for debuggisi, mng purposes). - void print_changes_seqNum2(); - - //!Mutex for the History. - RecursiveTimedMutex* mp_mutex; +protected: + + History( + const HistoryAttributes& att); + History( + History&&) = delete; + History& operator =( + History&&) = delete; + virtual ~History(); + +public: + + //!Attributes of the History + HistoryAttributes m_att; + /** + * Reserve a CacheChange_t from the CacheChange pool. + * @param[out] change Pointer to pointer to the CacheChange_t to reserve + * @param[in] calculateSizeFunc Function to calculate the size of the change. + * @return True is reserved + */ + RTPS_DllAPI inline bool reserve_Cache( + CacheChange_t** change, + const std::function& calculateSizeFunc) + { + std::lock_guard guard(*mp_mutex); + return m_changePool.reserve_Cache(change, calculateSizeFunc); + } + + RTPS_DllAPI inline bool reserve_Cache( + CacheChange_t** change, + uint32_t dataSize) + { + std::lock_guard guard(*mp_mutex); + return m_changePool.reserve_Cache(change, dataSize); + } + + /** + * release a previously reserved CacheChange_t. + * @param ch Pointer to the CacheChange_t. + */ + RTPS_DllAPI inline void release_Cache( + CacheChange_t* ch) + { + std::lock_guard guard(*mp_mutex); + return m_changePool.release_Cache(ch); + } + + /** + * Check if the history is full + * @return true if the History is full. + */ + RTPS_DllAPI bool isFull() + { + return m_isHistoryFull; + } + + /** + * Get the History size. + * @return Size of the history. + */ + RTPS_DllAPI size_t getHistorySize() + { + std::lock_guard guard(*mp_mutex); + return m_changes.size(); + } + + /** + * Remove all changes from the History + * @return True if everything was correctly removed. + */ + RTPS_DllAPI bool remove_all_changes(); + + /** + * Update the maximum and minimum sequenceNumbers. + */ + virtual void updateMaxMinSeqNum() = 0; + + /** + * Remove a specific change from the history. + * @param ch Pointer to the CacheChange_t. + * @return True if removed. + */ + virtual bool remove_change( + CacheChange_t* ch) = 0; + + /** + * Get the beginning of the changes history iterator. + * @return Iterator to the beginning of the vector. + */ + RTPS_DllAPI std::vector::iterator changesBegin() + { + return m_changes.begin(); + } + + RTPS_DllAPI std::vector::reverse_iterator changesRbegin() + { + return m_changes.rbegin(); + } + + /** + * Get the end of the changes history iterator. + * @return Iterator to the end of the vector. + */ + RTPS_DllAPI std::vector::iterator changesEnd() + { + return m_changes.end(); + } + + RTPS_DllAPI std::vector::reverse_iterator changesRend() + { + return m_changes.rend(); + } + + /** + * Get the minimum CacheChange_t. + * @param min_change Pointer to pointer to the minimum change. + * @return True if correct. + */ + RTPS_DllAPI bool get_min_change( + CacheChange_t** min_change); + + /** + * Get the maximum CacheChange_t. + * @param max_change Pointer to pointer to the maximum change. + * @return True if correct. + */ + RTPS_DllAPI bool get_max_change( + CacheChange_t** max_change); + + /** + * Get the maximum serialized payload size + * @return Maximum serialized payload size + */ + RTPS_DllAPI inline uint32_t getTypeMaxSerialized() + { + return m_changePool.getInitialPayloadSize(); + } + + /*! + * Get the mutex + * @return Mutex + */ + RTPS_DllAPI inline RecursiveTimedMutex* getMutex() + { + assert(mp_mutex != nullptr); return mp_mutex; + } + + RTPS_DllAPI bool get_change( + const SequenceNumber_t& seq, + const GUID_t& guid, + CacheChange_t** change) const; + + /** + * @brief A method to get the change with the earliest timestamp + * @param change Pointer to pointer to earliest change + * @return True on success + */ + bool get_earliest_change( + CacheChange_t** change); + +protected: + + //!Vector of pointers to the CacheChange_t. + std::vector m_changes; + + //!Variable to know if the history is full without needing to block the History mutex. + bool m_isHistoryFull; + + //!Pointer to and invalid cacheChange used to return the maximum and minimum when no changes are stored in the history. + CacheChange_t* mp_invalidCache; + + //!Pool of cache changes reserved when the History is created. + CacheChangePool m_changePool; + + //!Pointer to the minimum sequeceNumber CacheChange. + CacheChange_t* mp_minSeqCacheChange; + + //!Pointer to the maximum sequeceNumber CacheChange. + CacheChange_t* mp_maxSeqCacheChange; + + //!Print the seqNum of the changes in the History (for debuggisi, mng purposes). + void print_changes_seqNum2(); + + //!Mutex for the History. + RecursiveTimedMutex* mp_mutex; }; diff --git a/include/fastdds/rtps/reader/RTPSReader.h b/include/fastdds/rtps/reader/RTPSReader.h index f2348e16461..abc75a283fe 100644 --- a/include/fastdds/rtps/reader/RTPSReader.h +++ b/include/fastdds/rtps/reader/RTPSReader.h @@ -78,14 +78,16 @@ class RTPSReader : public Endpoint * @param writer_guid GUID of the writer to remove. * @return True if correctly removed. */ - RTPS_DllAPI virtual bool matched_writer_remove(const GUID_t& writer_guid) = 0; + RTPS_DllAPI virtual bool matched_writer_remove( + const GUID_t& writer_guid) = 0; /** * Tells us if a specific Writer is matched against this reader. * @param writer_guid GUID of the writer to check. * @return True if it is matched. */ - RTPS_DllAPI virtual bool matched_writer_is_matched(const GUID_t& writer_guid) = 0; + RTPS_DllAPI virtual bool matched_writer_is_matched( + const GUID_t& writer_guid) = 0; /** * Processes a new DATA message. Previously the message must have been accepted by function acceptMsgDirectedTo. @@ -93,7 +95,8 @@ class RTPSReader : public Endpoint * @param change Pointer to the CacheChange_t. * @return true if the reader accepts messages from the. */ - RTPS_DllAPI virtual bool processDataMsg(CacheChange_t* change) = 0; + RTPS_DllAPI virtual bool processDataMsg( + CacheChange_t* change) = 0; /** * Processes a new DATA FRAG message. @@ -164,7 +167,8 @@ class RTPSReader : public Endpoint * @param target Pointed to ReaderLister to attach * @return True is correctly set. */ - RTPS_DllAPI bool setListener(ReaderListener* target); + RTPS_DllAPI bool setListener( + ReaderListener* target); /** * Reserve a CacheChange_t. @@ -179,7 +183,8 @@ class RTPSReader : public Endpoint /** * Release a cacheChange. */ - RTPS_DllAPI void releaseCache(CacheChange_t* change); + RTPS_DllAPI void releaseCache( + CacheChange_t* change); /** * Read the next unread CacheChange_t from the history @@ -202,7 +207,7 @@ class RTPSReader : public Endpoint WriterProxy** wp) = 0; RTPS_DllAPI bool wait_for_unread_cache( - const eprosima::fastrtps::Duration_t &timeout); + const eprosima::fastrtps::Duration_t& timeout); RTPS_DllAPI uint64_t get_unread_count() const; @@ -218,7 +223,7 @@ class RTPSReader : public Endpoint RTPS_DllAPI inline ReaderHistory* getHistory() { return mp_history; - }; + } /*! * @brief Search if there is a CacheChange_t, giving SequenceNumber_t and writer GUID_t, @@ -242,12 +247,14 @@ class RTPSReader : public Endpoint //! The liveliness changed status struct as defined in the DDS LivelinessChangedStatus liveliness_changed_status_; - inline void enableMessagesFromUnkownWriters(bool enable) + inline void enableMessagesFromUnkownWriters( + bool enable) { m_acceptMessagesFromUnkownWriters = enable; } - void setTrustedWriter(const EntityId_t& writer) + void setTrustedWriter( + const EntityId_t& writer) { m_acceptMessagesFromUnkownWriters = false; m_trustedWriterEntityId = writer; @@ -279,25 +286,26 @@ class RTPSReader : public Endpoint * @return Last notified sequence number for input guid * @remarks Takes persistence_guid into consideration */ - SequenceNumber_t get_last_notified(const GUID_t& guid); + SequenceNumber_t get_last_notified( + const GUID_t& guid); /*! - * @brief Update the last notified sequence for a RTPS guid - * @param guid The RTPS guid of the writer - * @param seq Max sequence number available on writer - * @return Previous value of last notified sequence number for input guid - * @remarks Takes persistence_guid into consideration - */ + * @brief Update the last notified sequence for a RTPS guid + * @param guid The RTPS guid of the writer + * @param seq Max sequence number available on writer + * @return Previous value of last notified sequence number for input guid + * @remarks Takes persistence_guid into consideration + */ SequenceNumber_t update_last_notified( const GUID_t& guid, const SequenceNumber_t& seq); /*! - * @brief Set the last notified sequence for a persistence guid - * @param persistence_guid The persistence guid to update - * @param seq Sequence number to set for input guid - * @remarks Persistent readers will write to DB - */ + * @brief Set the last notified sequence for a persistence guid + * @param persistence_guid The persistence guid to update + * @param seq Sequence number to set for input guid + * @remarks Persistent readers will write to DB + */ virtual void set_last_notified( const GUID_t& persistence_guid, const SequenceNumber_t& seq); @@ -329,7 +337,8 @@ class RTPSReader : public Endpoint private: - RTPSReader& operator=(const RTPSReader&) = delete; + RTPSReader& operator =( + const RTPSReader&) = delete; }; } /* namespace rtps */ diff --git a/src/cpp/rtps/history/History.cpp b/src/cpp/rtps/history/History.cpp index 85ca9aa96b5..56dc7a273f0 100644 --- a/src/cpp/rtps/history/History.cpp +++ b/src/cpp/rtps/history/History.cpp @@ -28,47 +28,47 @@ #include namespace eprosima { -namespace fastrtps{ +namespace fastrtps { namespace rtps { -History::History(const HistoryAttributes & att) +History::History( + const HistoryAttributes& att) : m_att(att) , m_isHistoryFull(false) , mp_invalidCache(nullptr) - , m_changePool(att.initialReservedCaches,att.payloadMaxSize,att.maximumReservedCaches,att.memoryPolicy) + , m_changePool(att.initialReservedCaches, att.payloadMaxSize, att.maximumReservedCaches, att.memoryPolicy) , mp_minSeqCacheChange(nullptr) , mp_maxSeqCacheChange(nullptr) , mp_mutex(nullptr) - { - m_changes.reserve((uint32_t)abs(att.initialReservedCaches)); - mp_invalidCache = new CacheChange_t(); - mp_invalidCache->writerGUID = c_Guid_Unknown; - mp_invalidCache->sequenceNumber = c_SequenceNumber_Unknown; - mp_minSeqCacheChange = mp_invalidCache; - mp_maxSeqCacheChange = mp_invalidCache; - } +{ + m_changes.reserve((uint32_t)abs(att.initialReservedCaches)); + mp_invalidCache = new CacheChange_t(); + mp_invalidCache->writerGUID = c_Guid_Unknown; + mp_invalidCache->sequenceNumber = c_SequenceNumber_Unknown; + mp_minSeqCacheChange = mp_invalidCache; + mp_maxSeqCacheChange = mp_invalidCache; +} History::~History() { - logInfo(RTPS_HISTORY,""); + logInfo(RTPS_HISTORY, ""); delete(mp_invalidCache); } - bool History::remove_all_changes() { - if(mp_mutex == nullptr) + if (mp_mutex == nullptr) { - logError(RTPS_HISTORY,"You need to create a RTPS Entity with this History before using it"); + logError(RTPS_HISTORY, "You need to create a RTPS Entity with this History before using it"); return false; } std::lock_guard guard(*mp_mutex); - if(!m_changes.empty()) + if (!m_changes.empty()) { - while(!m_changes.empty()) + while (!m_changes.empty()) { remove_change(m_changes.front()); } @@ -80,9 +80,10 @@ bool History::remove_all_changes() return false; } -bool History::get_min_change(CacheChange_t** min_change) +bool History::get_min_change( + CacheChange_t** min_change) { - if(mp_minSeqCacheChange->sequenceNumber != mp_invalidCache->sequenceNumber) + if (mp_minSeqCacheChange->sequenceNumber != mp_invalidCache->sequenceNumber) { *min_change = mp_minSeqCacheChange; return true; @@ -90,9 +91,11 @@ bool History::get_min_change(CacheChange_t** min_change) return false; } -bool History::get_max_change(CacheChange_t** max_change) + +bool History::get_max_change( + CacheChange_t** max_change) { - if(mp_maxSeqCacheChange->sequenceNumber != mp_invalidCache->sequenceNumber) + if (mp_maxSeqCacheChange->sequenceNumber != mp_invalidCache->sequenceNumber) { *max_change = mp_maxSeqCacheChange; return true; @@ -108,7 +111,7 @@ bool History::get_change( if (mp_mutex == nullptr) { - logError(RTPS_HISTORY,"You need to create a RTPS Entity with this History before using it"); + logError(RTPS_HISTORY, "You need to create a RTPS Entity with this History before using it"); return false; } @@ -123,7 +126,7 @@ bool History::get_change( *change = it; return true; } - else if(it->sequenceNumber > seq) + else if (it->sequenceNumber > seq) { break; } @@ -133,11 +136,12 @@ bool History::get_change( return false; } -bool History::get_earliest_change(CacheChange_t **change) +bool History::get_earliest_change( + CacheChange_t** change) { if (mp_mutex == nullptr) { - logError(RTPS_HISTORY,"You need to create a RTPS Entity with this History before using it"); + logError(RTPS_HISTORY, "You need to create a RTPS Entity with this History before using it"); return false; } @@ -160,15 +164,15 @@ bool History::get_earliest_change(CacheChange_t **change) //TODO Remove if you want. #include -namespace eprosima{ -namespace fastrtps{ -namespace rtps{ +namespace eprosima { +namespace fastrtps { +namespace rtps { void History::print_changes_seqNum2() { std::stringstream ss; - for(std::vector::iterator it = m_changes.begin(); - it!=m_changes.end();++it) + for (std::vector::iterator it = m_changes.begin(); + it != m_changes.end(); ++it) { ss << (*it)->sequenceNumber << "-"; } @@ -176,7 +180,6 @@ void History::print_changes_seqNum2() std::cout << ss.str(); } - } } /* namespace rtps */ } /* namespace eprosima */ diff --git a/src/cpp/rtps/reader/RTPSReader.cpp b/src/cpp/rtps/reader/RTPSReader.cpp index da217643a7d..0ed18fc1005 100644 --- a/src/cpp/rtps/reader/RTPSReader.cpp +++ b/src/cpp/rtps/reader/RTPSReader.cpp @@ -15,7 +15,7 @@ /* * RTPSReader.cpp * -*/ + */ #include #include @@ -55,12 +55,12 @@ RTPSReader::RTPSReader( mp_history->mp_reader = this; mp_history->mp_mutex = &mp_mutex; - logInfo(RTPS_READER,"RTPSReader created correctly"); + logInfo(RTPS_READER, "RTPSReader created correctly"); } RTPSReader::~RTPSReader() { - logInfo(RTPS_READER,"Removing reader "<getGuid().entityId;); + logInfo(RTPS_READER, "Removing reader " << this->getGuid().entityId; ); delete history_state_; mp_history->mp_reader = nullptr; mp_history->mp_mutex = nullptr; @@ -73,7 +73,8 @@ bool RTPSReader::reserveCache( return mp_history->reserve_Cache(change, dataCdrSerializedSize); } -void RTPSReader::releaseCache(CacheChange_t* change) +void RTPSReader::releaseCache( + CacheChange_t* change) { return mp_history->release_Cache(change); } @@ -83,7 +84,8 @@ ReaderListener* RTPSReader::getListener() const return mp_listener; } -bool RTPSReader::setListener(ReaderListener *target) +bool RTPSReader::setListener( + ReaderListener* target) { mp_listener = target; return true; @@ -156,7 +158,8 @@ SequenceNumber_t RTPSReader::update_last_notified( return ret_val; } -SequenceNumber_t RTPSReader::get_last_notified(const GUID_t& guid) +SequenceNumber_t RTPSReader::get_last_notified( + const GUID_t& guid) { SequenceNumber_t ret_val; std::lock_guard guard(mp_mutex); @@ -183,21 +186,20 @@ void RTPSReader::set_last_notified( history_state_->history_record[peristence_guid] = seq; } - bool RTPSReader::wait_for_unread_cache( - const eprosima::fastrtps::Duration_t &timeout) + const eprosima::fastrtps::Duration_t& timeout) { auto time_out = std::chrono::steady_clock::now() + std::chrono::seconds(timeout.seconds) + - std::chrono::nanoseconds(timeout.nanosec); + std::chrono::nanoseconds(timeout.nanosec); std::unique_lock lock(mp_mutex, std::defer_lock); - if(lock.try_lock_until(time_out)) + if (lock.try_lock_until(time_out)) { - if(new_notification_cv_.wait_until(lock, time_out, [&]() - { - return total_unread_ > 0; - })) + if (new_notification_cv_.wait_until(lock, time_out, [&]() + { + return total_unread_ > 0; + })) { return true; } From b2d1bea493792c0f4280ee9510ea2a18cb08908a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez=20Moreno?= Date: Wed, 27 Nov 2019 11:56:28 +0100 Subject: [PATCH 2/4] Refs #6929. Mechanism to improve next searchs in History. --- include/fastdds/rtps/history/History.h | 18 +- include/fastdds/rtps/reader/RTPSReader.h | 15 +- src/cpp/rtps/history/History.cpp | 26 +- src/cpp/rtps/reader/RTPSReader.cpp | 16 +- src/cpp/rtps/reader/StatefulReader.cpp | 294 +++++++++++++---------- 5 files changed, 224 insertions(+), 145 deletions(-) diff --git a/include/fastdds/rtps/history/History.h b/include/fastdds/rtps/history/History.h index bb584a3c97f..ea532cdcf34 100644 --- a/include/fastdds/rtps/history/History.h +++ b/include/fastdds/rtps/history/History.h @@ -53,6 +53,10 @@ class History public: + using iterator = std::vector::iterator; + using reverse_iterator = std::vector::reverse_iterator; + using const_iterator = std::vector::const_iterator; + //!Attributes of the History HistoryAttributes m_att; /** @@ -130,12 +134,12 @@ class History * Get the beginning of the changes history iterator. * @return Iterator to the beginning of the vector. */ - RTPS_DllAPI std::vector::iterator changesBegin() + RTPS_DllAPI iterator changesBegin() { return m_changes.begin(); } - RTPS_DllAPI std::vector::reverse_iterator changesRbegin() + RTPS_DllAPI reverse_iterator changesRbegin() { return m_changes.rbegin(); } @@ -144,12 +148,12 @@ class History * Get the end of the changes history iterator. * @return Iterator to the end of the vector. */ - RTPS_DllAPI std::vector::iterator changesEnd() + RTPS_DllAPI iterator changesEnd() { return m_changes.end(); } - RTPS_DllAPI std::vector::reverse_iterator changesRend() + RTPS_DllAPI reverse_iterator changesRend() { return m_changes.rend(); } @@ -193,6 +197,12 @@ class History const GUID_t& guid, CacheChange_t** change) const; + RTPS_DllAPI const_iterator get_change_nts( + const SequenceNumber_t& seq, + const GUID_t& guid, + CacheChange_t** change, + const_iterator hint) const; + /** * @brief A method to get the change with the earliest timestamp * @param change Pointer to pointer to earliest change diff --git a/include/fastdds/rtps/reader/RTPSReader.h b/include/fastdds/rtps/reader/RTPSReader.h index abc75a283fe..894822249f6 100644 --- a/include/fastdds/rtps/reader/RTPSReader.h +++ b/include/fastdds/rtps/reader/RTPSReader.h @@ -26,6 +26,7 @@ #include #include #include +#include "../history/ReaderHistory.h" namespace eprosima { namespace fastrtps { @@ -33,7 +34,6 @@ namespace rtps { // Forward declarations class LivelinessManager; -class ReaderHistory; class ReaderListener; class WriterProxy; struct CacheChange_t; @@ -230,11 +230,18 @@ class RTPSReader : public Endpoint * waiting to be completed because it is fragmented. * @param sequence_number SequenceNumber_t of the searched CacheChange_t. * @param writer_guid writer GUID_t of the searched CacheChange_t. - * @return If a CacheChange_t was found, it will be returned. In other case nullptr is returned. + * @param change If a CacheChange_t was found, this argument will fill with its pointer. + * In other case nullptr is returned. + * @param hint Iterator since the search will start. + * Used to improve the search. + * @return Iterator pointing to the position were CacheChange_t was found. + * It can be used to improve next search. */ - CacheChange_t* findCacheInFragmentedProcess( + History::const_iterator findCacheInFragmentedProcess( const SequenceNumber_t& sequence_number, - const GUID_t& writer_guid) const; + const GUID_t& writer_guid, + CacheChange_t** change, + History::const_iterator hint) const; /*! * @brief Returns there is a clean state with all Writers. diff --git a/src/cpp/rtps/history/History.cpp b/src/cpp/rtps/history/History.cpp index 56dc7a273f0..561f88ce40c 100644 --- a/src/cpp/rtps/history/History.cpp +++ b/src/cpp/rtps/history/History.cpp @@ -116,24 +116,36 @@ bool History::get_change( } std::lock_guard guard(*mp_mutex); + get_change_nts(seq, guid, change, m_changes.cbegin()); + return *change != nullptr; +} - for (CacheChange_t* it : m_changes) +History::const_iterator History::get_change_nts( + const SequenceNumber_t& seq, + const GUID_t& guid, + CacheChange_t** change, + History::const_iterator hint) const +{ + const_iterator returned_value = hint; + *change = nullptr; + + for (; returned_value != m_changes.end(); ++returned_value) { - if (it->writerGUID == guid) + if ((*returned_value)->writerGUID == guid) { - if (it->sequenceNumber == seq) + if ((*returned_value)->sequenceNumber == seq) { - *change = it; - return true; + *change = *returned_value; + break; } - else if (it->sequenceNumber > seq) + else if ((*returned_value)->sequenceNumber > seq) { break; } } } - return false; + return returned_value; } bool History::get_earliest_change( diff --git a/src/cpp/rtps/reader/RTPSReader.cpp b/src/cpp/rtps/reader/RTPSReader.cpp index 0ed18fc1005..cf5acd3240b 100644 --- a/src/cpp/rtps/reader/RTPSReader.cpp +++ b/src/cpp/rtps/reader/RTPSReader.cpp @@ -91,16 +91,20 @@ bool RTPSReader::setListener( return true; } -CacheChange_t* RTPSReader::findCacheInFragmentedProcess( +History::const_iterator RTPSReader::findCacheInFragmentedProcess( const SequenceNumber_t& sequence_number, - const GUID_t& writer_guid) const + const GUID_t& writer_guid, + CacheChange_t** change, + History::const_iterator hint) const { - CacheChange_t* ret_val = nullptr; - if (mp_history->get_change(sequence_number, writer_guid, &ret_val)) + History::const_iterator ret_val = mp_history->get_change_nts(sequence_number, writer_guid, change, hint); + + if (nullptr != *change && (*change)->is_fully_assembled()) { - return ret_val->is_fully_assembled() ? nullptr : ret_val; + *change = nullptr; } - return nullptr; + + return ret_val; } void RTPSReader::add_persistence_guid( diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index 4cd6da1e0a5..c5fc442697c 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -36,13 +36,13 @@ #include -#define IDSTRING "(ID:"<< std::this_thread::get_id() <<") "<< +#define IDSTRING "(ID:" << std::this_thread::get_id() << ") " << using namespace eprosima::fastrtps::rtps; StatefulReader::~StatefulReader() { - logInfo(RTPS_READER,"StatefulReader destructor."); + logInfo(RTPS_READER, "StatefulReader destructor."); // Only is_alive_ assignment needs to be protected, as // matched_writers_ and matched_writers_pool_ are only used @@ -52,7 +52,7 @@ StatefulReader::~StatefulReader() is_alive_ = false; } - for(WriterProxy* writer : matched_writers_) + for (WriterProxy* writer : matched_writers_) { delete(writer); } @@ -68,7 +68,7 @@ StatefulReader::StatefulReader( const ReaderAttributes& att, ReaderHistory* hist, ReaderListener* listen) - : RTPSReader(pimpl,guid,att,hist, listen) + : RTPSReader(pimpl, guid, att, hist, listen) , acknack_count_(0) , nackfrag_count_(0) , times_(att.times) @@ -124,7 +124,7 @@ bool StatefulReader::matched_writer_add( else { logWarning(RTPS_WRITER, "Maximum number of reader proxies (" << max_readers << \ - ") reached for writer " << m_guid << endl); + ") reached for writer " << m_guid << endl); return false; } } @@ -153,9 +153,9 @@ bool StatefulReader::matched_writer_add( if ( wlp != nullptr) { wlp->sub_liveliness_manager_->add_writer( - wdata.guid(), - liveliness_kind_, - liveliness_lease_duration_); + wdata.guid(), + liveliness_kind_, + liveliness_lease_duration_); } else { @@ -167,7 +167,8 @@ bool StatefulReader::matched_writer_add( return true; } -bool StatefulReader::matched_writer_remove(const GUID_t& writer_guid) +bool StatefulReader::matched_writer_remove( + const GUID_t& writer_guid) { std::unique_lock lock(mp_mutex); if (is_alive_) @@ -177,7 +178,8 @@ bool StatefulReader::matched_writer_remove(const GUID_t& writer_guid) //Remove cachechanges belonging to the unmatched writer mp_history->remove_changes_with_guid(writer_guid); - for (ResourceLimitedVector::iterator it = matched_writers_.begin(); it != matched_writers_.end(); ++it) + for (ResourceLimitedVector::iterator it = matched_writers_.begin(); it != matched_writers_.end(); + ++it) { if ((*it)->guid() == writer_guid) { @@ -189,13 +191,14 @@ bool StatefulReader::matched_writer_remove(const GUID_t& writer_guid) if ( wlp != nullptr) { wlp->sub_liveliness_manager_->remove_writer( - writer_guid, - liveliness_kind_, - liveliness_lease_duration_); + writer_guid, + liveliness_kind_, + liveliness_lease_duration_); } else { - logError(RTPS_LIVELINESS, "Finite liveliness lease duration but WLP not enabled, cannot remove writer"); + logError(RTPS_LIVELINESS, + "Finite liveliness lease duration but WLP not enabled, cannot remove writer"); } } @@ -218,14 +221,15 @@ bool StatefulReader::matched_writer_remove(const GUID_t& writer_guid) return false; } -bool StatefulReader::matched_writer_is_matched(const GUID_t& writer_guid) +bool StatefulReader::matched_writer_is_matched( + const GUID_t& writer_guid) { std::lock_guard guard(mp_mutex); if (is_alive_) { - for(WriterProxy* it : matched_writers_) + for (WriterProxy* it : matched_writers_) { - if(it->guid() == writer_guid && it->is_alive()) + if (it->guid() == writer_guid && it->is_alive()) { return true; } @@ -248,15 +252,15 @@ bool StatefulReader::matched_writer_lookup( bool returnedValue = findWriterProxy(writerGUID, WP); - if(returnedValue) + if (returnedValue) { logInfo(RTPS_READER, this->getGuid().entityId << " FINDS writerProxy " << writerGUID << " from " - << matched_writers_.size()); + << matched_writers_.size()); } else { logInfo(RTPS_READER, this->getGuid().entityId << " NOT FINDS writerProxy " << writerGUID << " from " - << matched_writers_.size()); + << matched_writers_.size()); } return returnedValue; @@ -268,9 +272,9 @@ bool StatefulReader::findWriterProxy( { assert(WP); - for(WriterProxy* it : matched_writers_) + for (WriterProxy* it : matched_writers_) { - if(it->guid() == writerGUID && it->is_alive()) + if (it->guid() == writerGUID && it->is_alive()) { *WP = it; return true; @@ -279,9 +283,10 @@ bool StatefulReader::findWriterProxy( return false; } -bool StatefulReader::processDataMsg(CacheChange_t *change) +bool StatefulReader::processDataMsg( + CacheChange_t* change) { - WriterProxy *pWP = nullptr; + WriterProxy* pWP = nullptr; assert(change); @@ -291,12 +296,12 @@ bool StatefulReader::processDataMsg(CacheChange_t *change) return false; } - if(acceptMsgFrom(change->writerGUID, &pWP)) + if (acceptMsgFrom(change->writerGUID, &pWP)) { if (liveliness_lease_duration_ < c_TimeInfinite) { if (liveliness_kind_ == MANUAL_BY_TOPIC_LIVELINESS_QOS || - pWP->attributes().m_qos.m_liveliness.kind == MANUAL_BY_TOPIC_LIVELINESS_QOS) + pWP->attributes().m_qos.m_liveliness.kind == MANUAL_BY_TOPIC_LIVELINESS_QOS) { auto wlp = this->mp_RTPSParticipant->wlp(); if (wlp != nullptr) @@ -314,20 +319,21 @@ bool StatefulReader::processDataMsg(CacheChange_t *change) } // Check if CacheChange was received or is framework data - if(!pWP || !pWP->change_was_received(change->sequenceNumber)) + if (!pWP || !pWP->change_was_received(change->sequenceNumber)) { - logInfo(RTPS_MSG_IN,IDSTRING"Trying to add change " << change->sequenceNumber <<" TO reader: "<< getGuid().entityId); + logInfo(RTPS_MSG_IN, + IDSTRING "Trying to add change " << change->sequenceNumber << " TO reader: " << getGuid().entityId); CacheChange_t* change_to_add; - if(reserveCache(&change_to_add, change->serializedPayload.length)) //Reserve a new cache from the corresponding cache pool + if (reserveCache(&change_to_add, change->serializedPayload.length)) //Reserve a new cache from the corresponding cache pool { #if HAVE_SECURITY - if(getAttributes().security_attributes().is_payload_protected) + if (getAttributes().security_attributes().is_payload_protected) { change_to_add->copy_not_memcpy(change); - if(!getRTPSParticipant()->security_manager().decode_serialized_payload(change->serializedPayload, - change_to_add->serializedPayload, m_guid, change->writerGUID)) + if (!getRTPSParticipant()->security_manager().decode_serialized_payload(change->serializedPayload, + change_to_add->serializedPayload, m_guid, change->writerGUID)) { releaseCache(change_to_add); logWarning(RTPS_MSG_IN, "Cannont decode serialized payload"); @@ -337,26 +343,27 @@ bool StatefulReader::processDataMsg(CacheChange_t *change) else { #endif - if (!change_to_add->copy(change)) - { - logWarning(RTPS_MSG_IN,IDSTRING"Problem copying CacheChange, received data is: " << change->serializedPayload.length - << " bytes and max size in reader " << getGuid().entityId << " is " << change_to_add->serializedPayload.max_size); - releaseCache(change_to_add); - return false; - } -#if HAVE_SECURITY + if (!change_to_add->copy(change)) + { + logWarning(RTPS_MSG_IN, IDSTRING "Problem copying CacheChange, received data is: " << change->serializedPayload.length + << " bytes and max size in reader " << getGuid().entityId << " is " << + change_to_add->serializedPayload.max_size); + releaseCache(change_to_add); + return false; } +#if HAVE_SECURITY + } #endif } else { - logError(RTPS_MSG_IN,IDSTRING"Problem reserving CacheChange in reader: " << getGuid().entityId); + logError(RTPS_MSG_IN, IDSTRING "Problem reserving CacheChange in reader: " << getGuid().entityId); return false; } - if(!change_received(change_to_add, pWP)) + if (!change_received(change_to_add, pWP)) { - logInfo(RTPS_MSG_IN,IDSTRING"MessageReceiver not add change "<sequenceNumber); + logInfo(RTPS_MSG_IN, IDSTRING "MessageReceiver not add change " << change_to_add->sequenceNumber); releaseCache(change_to_add); } } @@ -371,7 +378,7 @@ bool StatefulReader::processDataFragMsg( uint32_t fragmentStartingNum, uint16_t fragmentsInSubmessage) { - WriterProxy *pWP = nullptr; + WriterProxy* pWP = nullptr; assert(incomingChange); @@ -382,7 +389,7 @@ bool StatefulReader::processDataFragMsg( } // TODO: see if we need manage framework fragmented DATA message - if(acceptMsgFrom(incomingChange->writerGUID, &pWP) && pWP) + if (acceptMsgFrom(incomingChange->writerGUID, &pWP) && pWP) { if (liveliness_lease_duration_ < c_TimeInfinite) { @@ -393,9 +400,9 @@ bool StatefulReader::processDataFragMsg( if ( wlp != nullptr) { wlp->sub_liveliness_manager_->assert_liveliness( - incomingChange->writerGUID, - liveliness_kind_, - liveliness_lease_duration_); + incomingChange->writerGUID, + liveliness_kind_, + liveliness_lease_duration_); } else { @@ -405,20 +412,23 @@ bool StatefulReader::processDataFragMsg( } // Check if CacheChange was received. - if(!pWP->change_was_received(incomingChange->sequenceNumber)) + if (!pWP->change_was_received(incomingChange->sequenceNumber)) { - logInfo(RTPS_MSG_IN, IDSTRING"Trying to add fragment " << incomingChange->sequenceNumber.to64long() << " TO reader: " << getGuid().entityId); + logInfo(RTPS_MSG_IN, + IDSTRING "Trying to add fragment " << incomingChange->sequenceNumber.to64long() << " TO reader: " << + getGuid().entityId); CacheChange_t* change_to_add = incomingChange; #if HAVE_SECURITY - if(getAttributes().security_attributes().is_payload_protected) + if (getAttributes().security_attributes().is_payload_protected) { - if(reserveCache(&change_to_add, incomingChange->serializedPayload.length)) //Reserve a new cache from the corresponding cache pool + if (reserveCache(&change_to_add, incomingChange->serializedPayload.length)) //Reserve a new cache from the corresponding cache pool { change_to_add->copy_not_memcpy(incomingChange); - if(!getRTPSParticipant()->security_manager().decode_serialized_payload(incomingChange->serializedPayload, - change_to_add->serializedPayload, m_guid, incomingChange->writerGUID)) + if (!getRTPSParticipant()->security_manager().decode_serialized_payload(incomingChange-> + serializedPayload, + change_to_add->serializedPayload, m_guid, incomingChange->writerGUID)) { releaseCache(change_to_add); logWarning(RTPS_MSG_IN, "Cannont decode serialized payload"); @@ -457,17 +467,20 @@ bool StatefulReader::processDataFragMsg( } #if HAVE_SECURITY - if(getAttributes().security_attributes().is_payload_protected) + if (getAttributes().security_attributes().is_payload_protected) + { releaseCache(change_to_add); + } #endif // If this is the first time we have received fragments for this change, add it to history - if(change_created != nullptr) + if (change_created != nullptr) { - if(!change_received(change_created, pWP)) + if (!change_received(change_created, pWP)) { - logInfo(RTPS_MSG_IN, IDSTRING"MessageReceiver not add change " << change_created->sequenceNumber.to64long()); + logInfo(RTPS_MSG_IN, + IDSTRING "MessageReceiver not add change " << change_created->sequenceNumber.to64long()); releaseCache(change_created); work_change = nullptr; @@ -494,7 +507,7 @@ bool StatefulReader::processHeartbeatMsg( bool finalFlag, bool livelinessFlag) { - WriterProxy *writer = nullptr; + WriterProxy* writer = nullptr; std::unique_lock lock(mp_mutex); if (!is_alive_) @@ -502,11 +515,11 @@ bool StatefulReader::processHeartbeatMsg( return false; } - if(acceptMsgFrom(writerGUID, &writer) && writer) + if (acceptMsgFrom(writerGUID, &writer) && writer) { bool assert_liveliness = false; if (writer->process_heartbeat( - hbCount, firstSN, lastSN, finalFlag, livelinessFlag, disable_positive_acks_, assert_liveliness)) + hbCount, firstSN, lastSN, finalFlag, livelinessFlag, disable_positive_acks_, assert_liveliness)) { mp_history->remove_fragmented_changes_until(firstSN, writerGUID); @@ -522,9 +535,9 @@ bool StatefulReader::processHeartbeatMsg( if ( wlp != nullptr) { wlp->sub_liveliness_manager_->assert_liveliness( - writerGUID, - liveliness_kind_, - liveliness_lease_duration_); + writerGUID, + liveliness_kind_, + liveliness_lease_duration_); } else { @@ -547,7 +560,7 @@ bool StatefulReader::processGapMsg( const SequenceNumber_t& gapStart, const SequenceNumberSet_t& gapList) { - WriterProxy *pWP = nullptr; + WriterProxy* pWP = nullptr; std::unique_lock lock(mp_mutex); if (!is_alive_) @@ -555,16 +568,22 @@ bool StatefulReader::processGapMsg( return false; } - if(acceptMsgFrom(writerGUID, &pWP) && pWP) + if (acceptMsgFrom(writerGUID, &pWP) && pWP) { // TODO (Miguel C): Refactor this inside WriterProxy SequenceNumber_t auxSN; SequenceNumber_t finalSN = gapList.base() - 1; - for(auxSN = gapStart; auxSN<=finalSN;auxSN++) + History::const_iterator history_iterator = mp_history->changesBegin(); + for (auxSN = gapStart; auxSN <= finalSN; auxSN++) { - if(pWP->irrelevant_change_set(auxSN)) + if (pWP->irrelevant_change_set(auxSN)) { - CacheChange_t* to_remove = findCacheInFragmentedProcess(auxSN, pWP->guid()); + CacheChange_t* to_remove = nullptr; + auto ret_iterator = findCacheInFragmentedProcess(auxSN, pWP->guid(), &to_remove, history_iterator); + if (ret_iterator != mp_history->changesEnd()) + { + history_iterator = ret_iterator; + } if (to_remove != nullptr) { mp_history->remove_change(to_remove); @@ -574,9 +593,14 @@ bool StatefulReader::processGapMsg( gapList.for_each([&](SequenceNumber_t it) { - if(pWP->irrelevant_change_set(it)) + if (pWP->irrelevant_change_set(it)) { - CacheChange_t* to_remove = findCacheInFragmentedProcess(auxSN, pWP->guid()); + CacheChange_t* to_remove = nullptr; + auto ret_iterator = findCacheInFragmentedProcess(auxSN, pWP->guid(), &to_remove, history_iterator); + if (ret_iterator != mp_history->changesEnd()) + { + history_iterator = ret_iterator; + } if (to_remove != nullptr) { mp_history->remove_change(to_remove); @@ -593,13 +617,13 @@ bool StatefulReader::processGapMsg( bool StatefulReader::acceptMsgFrom( const GUID_t& writerId, - WriterProxy **wp) const + WriterProxy** wp) const { assert(wp != nullptr); - for(WriterProxy* it : matched_writers_) + for (WriterProxy* it : matched_writers_) { - if(it->guid() == writerId && it->is_alive()) + if (it->guid() == writerId && it->is_alive()) { *wp = it; return true; @@ -609,7 +633,7 @@ bool StatefulReader::acceptMsgFrom( // Check if it's a framework's one. In this case, m_acceptMessagesFromUnkownWriters // is an enabler for the trusted entity comparison if (m_acceptMessagesFromUnkownWriters - && (writerId.entityId == m_trustedWriterEntityId)) + && (writerId.entityId == m_trustedWriterEntityId)) { *wp = nullptr; return true; @@ -626,7 +650,7 @@ bool StatefulReader::change_removed_by_history( if (is_alive_) { - if(wp != nullptr || matched_writer_lookup(a_change->writerGUID,&wp)) + if (wp != nullptr || matched_writer_lookup(a_change->writerGUID, &wp)) { if (a_change->is_fully_assembled()) { @@ -643,10 +667,10 @@ bool StatefulReader::change_removed_by_history( } return true; } - else if(a_change->writerGUID.entityId != this->m_trustedWriterEntityId) + else if (a_change->writerGUID.entityId != this->m_trustedWriterEntityId) { // trusted entities messages mean no havoc - logError(RTPS_READER," You should always find the WP associated with a change, something is very wrong"); + logError(RTPS_READER, " You should always find the WP associated with a change, something is very wrong"); } } @@ -658,14 +682,15 @@ bool StatefulReader::change_received( WriterProxy* prox) { //First look for WriterProxy in case is not provided - if(prox == nullptr) + if (prox == nullptr) { - if(!findWriterProxy(a_change->writerGUID, &prox)) + if (!findWriterProxy(a_change->writerGUID, &prox)) { // discard non framework messages from unknown writer if (a_change->writerGUID.entityId != m_trustedWriterEntityId) { - logInfo(RTPS_READER, "Writer Proxy " << a_change->writerGUID << " not matched to this Reader " << m_guid.entityId); + logInfo(RTPS_READER, + "Writer Proxy " << a_change->writerGUID << " not matched to this Reader " << m_guid.entityId); return false; } else @@ -696,13 +721,13 @@ bool StatefulReader::change_received( // NOTE: Depending on QoS settings, one change can be removed from history // inside the call to mp_history->received_change - if(mp_history->received_change(a_change, unknown_missing_changes_up_to)) + if (mp_history->received_change(a_change, unknown_missing_changes_up_to)) { GUID_t proxGUID = prox->guid(); // If KEEP_LAST and history full, make older changes as lost. CacheChange_t* aux_change = nullptr; - if(mp_history->isFull() && mp_history->get_min_change_from(&aux_change, proxGUID)) + if (mp_history->isFull() && mp_history->get_min_change_from(&aux_change, proxGUID)) { prox->lost_changes_update(aux_change->sequenceNumber); } @@ -722,7 +747,8 @@ bool StatefulReader::change_received( return false; } -void StatefulReader::NotifyChanges(WriterProxy* prox) +void StatefulReader::NotifyChanges( + WriterProxy* prox) { GUID_t proxGUID = prox->guid(); update_last_notified(proxGUID, prox->available_changes_max()); @@ -748,7 +774,9 @@ void StatefulReader::NotifyChanges(WriterProxy* prox) // Search again the WriterProxy because could be removed after the unlock. if (!findWriterProxy(proxGUID, &prox)) + { break; + } nextChangeToNotify = prox->next_cache_change_to_be_notified(); } @@ -766,19 +794,19 @@ bool StatefulReader::nextUntakenCache( std::vector toremove; bool takeok = false; - for(std::vector::iterator it = mp_history->changesBegin(); - it!=mp_history->changesEnd();++it) + for (std::vector::iterator it = mp_history->changesBegin(); + it != mp_history->changesEnd(); ++it) { WriterProxy* wp; - if(this->matched_writer_lookup((*it)->writerGUID, &wp)) + if (this->matched_writer_lookup((*it)->writerGUID, &wp)) { // TODO Revisar la comprobacion SequenceNumber_t seq = wp->available_changes_max(); - if(seq >= (*it)->sequenceNumber) + if (seq >= (*it)->sequenceNumber) { *change = *it; - if(!(*change)->isRead) + if (!(*change)->isRead) { if (0 < total_unread_) { @@ -788,8 +816,10 @@ bool StatefulReader::nextUntakenCache( (*change)->isRead = true; - if(wpout !=nullptr) + if (wpout != nullptr) + { *wpout = wp; + } takeok = true; break; @@ -801,10 +831,12 @@ bool StatefulReader::nextUntakenCache( } } - for(std::vector::iterator it = toremove.begin(); - it!=toremove.end();++it) + for (std::vector::iterator it = toremove.begin(); + it != toremove.end(); ++it) { - logWarning(RTPS_READER,"Removing change "<<(*it)->sequenceNumber << " from " << (*it)->writerGUID << " because is no longer paired"); + logWarning(RTPS_READER, + "Removing change " << (*it)->sequenceNumber << " from " << (*it)->writerGUID << + " because is no longer paired"); mp_history->remove_change(*it); } return takeok; @@ -823,18 +855,20 @@ bool StatefulReader::nextUnreadCache( std::vector toremove; bool readok = false; - for(std::vector::iterator it = mp_history->changesBegin(); - it!=mp_history->changesEnd();++it) + for (std::vector::iterator it = mp_history->changesBegin(); + it != mp_history->changesEnd(); ++it) { - if((*it)->isRead) + if ((*it)->isRead) + { continue; + } WriterProxy* wp; - if(this->matched_writer_lookup((*it)->writerGUID,&wp)) + if (this->matched_writer_lookup((*it)->writerGUID, &wp)) { SequenceNumber_t seq; seq = wp->available_changes_max(); - if(seq >= (*it)->sequenceNumber) + if (seq >= (*it)->sequenceNumber) { *change = *it; @@ -845,8 +879,10 @@ bool StatefulReader::nextUnreadCache( (*change)->isRead = true; - if(wpout !=nullptr) + if (wpout != nullptr) + { *wpout = wp; + } readok = true; break; @@ -858,25 +894,28 @@ bool StatefulReader::nextUnreadCache( } } - for(std::vector::iterator it = toremove.begin(); - it!=toremove.end();++it) + for (std::vector::iterator it = toremove.begin(); + it != toremove.end(); ++it) { - logWarning(RTPS_READER,"Removing change "<<(*it)->sequenceNumber << " from " << (*it)->writerGUID << " because is no longer paired"); + logWarning(RTPS_READER, + "Removing change " << (*it)->sequenceNumber << " from " << (*it)->writerGUID << + " because is no longer paired"); mp_history->remove_change(*it); } return readok; } -bool StatefulReader::updateTimes(const ReaderTimes& ti) +bool StatefulReader::updateTimes( + const ReaderTimes& ti) { std::lock_guard guard(mp_mutex); if (is_alive_) { - if(times_.heartbeatResponseDelay != ti.heartbeatResponseDelay) + if (times_.heartbeatResponseDelay != ti.heartbeatResponseDelay) { times_ = ti; - for(WriterProxy* writer : matched_writers_) + for (WriterProxy* writer : matched_writers_) { writer->update_heartbeat_response_interval(times_.heartbeatResponseDelay); } @@ -950,40 +989,47 @@ void StatefulReader::send_acknack( { GUID_t guid = sender.remote_guids().at(0); SequenceNumberSet_t sns(writer->available_changes_max() + 1); + History::const_iterator history_iterator = mp_history->changesBegin(); missing_changes.for_each( [&](const SequenceNumber_t& seq) + { + // Check if the CacheChange_t is uncompleted. + CacheChange_t* uncomplete_change = nullptr; + auto ret_iterator = findCacheInFragmentedProcess(seq, guid, &uncomplete_change, history_iterator); + if (ret_iterator != mp_history->changesEnd()) { - // Check if the CacheChange_t is uncompleted. - CacheChange_t* uncomplete_change = findCacheInFragmentedProcess(seq, guid); - if (uncomplete_change == nullptr) + history_iterator = ret_iterator; + } + if (uncomplete_change == nullptr) + { + if (!sns.add(seq)) { - if (!sns.add(seq)) - { - logInfo(RTPS_READER, "Sequence number " << seq - << " exceeded bitmap limit of AckNack. SeqNumSet Base: " << sns.base()); - } + logInfo(RTPS_READER, "Sequence number " << seq + << " exceeded bitmap limit of AckNack. SeqNumSet Base: " + << sns.base()); } - else - { - FragmentNumberSet_t frag_sns; - uncomplete_change->get_missing_fragments(frag_sns); - ++nackfrag_count_; - logInfo(RTPS_READER, "Sending NACKFRAG for sample" << seq << ": " << frag_sns;); + } + else + { + FragmentNumberSet_t frag_sns; + uncomplete_change->get_missing_fragments(frag_sns); + ++nackfrag_count_; + logInfo(RTPS_READER, "Sending NACKFRAG for sample" << cit->sequenceNumber << ": " << frag_sns; ); - group.add_nackfrag(seq, frag_sns, nackfrag_count_); - } + group.add_nackfrag(seq, frag_sns, nackfrag_count_); + } - }); + }); acknack_count_++; - logInfo(RTPS_READER, "Sending ACKNACK: " << sns;); + logInfo(RTPS_READER, "Sending ACKNACK: " << sns; ); bool final = sns.empty(); group.add_acknack(sns, acknack_count_, final); } } - catch(const RTPSMessageGroup::timeout&) + catch (const RTPSMessageGroup::timeout&) { logError(RTPS_WRITER, "Max blocking time reached"); } From a83e776f0dc95bfbb1a47a1fdd6f0fbb351d4b8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez=20Moreno?= Date: Thu, 28 Nov 2019 08:50:27 +0100 Subject: [PATCH 3/4] Refs #6929. Take advantage of changes to remove faster. --- include/fastdds/rtps/history/ReaderHistory.h | 10 ++++++++++ src/cpp/rtps/history/ReaderHistory.cpp | 10 ++++++++++ src/cpp/rtps/reader/StatefulReader.cpp | 16 ++++++++-------- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/include/fastdds/rtps/history/ReaderHistory.h b/include/fastdds/rtps/history/ReaderHistory.h index 4681e0430db..b3cf6d90e99 100644 --- a/include/fastdds/rtps/history/ReaderHistory.h +++ b/include/fastdds/rtps/history/ReaderHistory.h @@ -77,6 +77,16 @@ class ReaderHistory : public History RTPS_DllAPI bool remove_change( CacheChange_t* a_change) override; + /** + * Remove a specific change from the history. + * @param ch Pointer to the CacheChange_t. + * @param hint Iterator where the CacheChange_t is located in the history. + * @return An iterator pointing to the new location of the element that followed the removed CacheChange_t. + */ + const_iterator remove_change_nts( + CacheChange_t* ch, + const_iterator position); + /** * Remove all changes from the History that have a certain guid. * @param a_guid Pointer to the target guid to search for. diff --git a/src/cpp/rtps/history/ReaderHistory.cpp b/src/cpp/rtps/history/ReaderHistory.cpp index f6b8ba97622..828b8b50e3e 100644 --- a/src/cpp/rtps/history/ReaderHistory.cpp +++ b/src/cpp/rtps/history/ReaderHistory.cpp @@ -125,6 +125,16 @@ bool ReaderHistory::remove_change( return false; } +History::const_iterator ReaderHistory::remove_change_nts( + CacheChange_t* a_change, + History::const_iterator position) +{ + (void)a_change; + assert(nullptr != a_change); + assert((*position) == a_change); + return m_changes.erase(position); +} + bool ReaderHistory::remove_changes_with_guid( const GUID_t& a_guid) { diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index c5fc442697c..b5fea0ef21e 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -580,13 +580,13 @@ bool StatefulReader::processGapMsg( { CacheChange_t* to_remove = nullptr; auto ret_iterator = findCacheInFragmentedProcess(auxSN, pWP->guid(), &to_remove, history_iterator); - if (ret_iterator != mp_history->changesEnd()) + if (to_remove != nullptr) { - history_iterator = ret_iterator; + ret_iterator = mp_history->remove_change_nts(to_remove, ret_iterator); } - if (to_remove != nullptr) + else if (ret_iterator != mp_history->changesEnd()) { - mp_history->remove_change(to_remove); + history_iterator = ret_iterator; } } } @@ -597,13 +597,13 @@ bool StatefulReader::processGapMsg( { CacheChange_t* to_remove = nullptr; auto ret_iterator = findCacheInFragmentedProcess(auxSN, pWP->guid(), &to_remove, history_iterator); - if (ret_iterator != mp_history->changesEnd()) + if (to_remove != nullptr) { - history_iterator = ret_iterator; + ret_iterator = mp_history->remove_change_nts(to_remove, ret_iterator); } - if (to_remove != nullptr) + else if (ret_iterator != mp_history->changesEnd()) { - mp_history->remove_change(to_remove); + history_iterator = ret_iterator; } } }); From 3f0047d7ebb1839dcd08126777404f3016f8ae4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez=20Moreno?= Date: Fri, 29 Nov 2019 11:34:04 +0100 Subject: [PATCH 4/4] Refs #6929. Applying suggestions. --- include/fastdds/rtps/history/History.h | 2 +- include/fastdds/rtps/reader/RTPSReader.h | 36 ++++++++++++------------ src/cpp/rtps/reader/StatefulReader.cpp | 4 +-- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/include/fastdds/rtps/history/History.h b/include/fastdds/rtps/history/History.h index ea532cdcf34..27487f9c40c 100644 --- a/include/fastdds/rtps/history/History.h +++ b/include/fastdds/rtps/history/History.h @@ -197,7 +197,7 @@ class History const GUID_t& guid, CacheChange_t** change) const; - RTPS_DllAPI const_iterator get_change_nts( + const_iterator get_change_nts( const SequenceNumber_t& seq, const GUID_t& guid, CacheChange_t** change, diff --git a/include/fastdds/rtps/reader/RTPSReader.h b/include/fastdds/rtps/reader/RTPSReader.h index 894822249f6..b62b768651f 100644 --- a/include/fastdds/rtps/reader/RTPSReader.h +++ b/include/fastdds/rtps/reader/RTPSReader.h @@ -225,24 +225,6 @@ class RTPSReader : public Endpoint return mp_history; } - /*! - * @brief Search if there is a CacheChange_t, giving SequenceNumber_t and writer GUID_t, - * waiting to be completed because it is fragmented. - * @param sequence_number SequenceNumber_t of the searched CacheChange_t. - * @param writer_guid writer GUID_t of the searched CacheChange_t. - * @param change If a CacheChange_t was found, this argument will fill with its pointer. - * In other case nullptr is returned. - * @param hint Iterator since the search will start. - * Used to improve the search. - * @return Iterator pointing to the position were CacheChange_t was found. - * It can be used to improve next search. - */ - History::const_iterator findCacheInFragmentedProcess( - const SequenceNumber_t& sequence_number, - const GUID_t& writer_guid, - CacheChange_t** change, - History::const_iterator hint) const; - /*! * @brief Returns there is a clean state with all Writers. * It occurs when the Reader received all samples sent by Writers. In other words, @@ -317,6 +299,24 @@ class RTPSReader : public Endpoint const GUID_t& persistence_guid, const SequenceNumber_t& seq); + /*! + * @brief Search if there is a CacheChange_t, giving SequenceNumber_t and writer GUID_t, + * waiting to be completed because it is fragmented. + * @param sequence_number SequenceNumber_t of the searched CacheChange_t. + * @param writer_guid writer GUID_t of the searched CacheChange_t. + * @param change If a CacheChange_t was found, this argument will fill with its pointer. + * In other case nullptr is returned. + * @param hint Iterator since the search will start. + * Used to improve the search. + * @return Iterator pointing to the position were CacheChange_t was found. + * It can be used to improve next search. + */ + History::const_iterator findCacheInFragmentedProcess( + const SequenceNumber_t& sequence_number, + const GUID_t& writer_guid, + CacheChange_t** change, + History::const_iterator hint) const; + //!ReaderHistory ReaderHistory* mp_history; //!Listener diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index b5fea0ef21e..7ddff0afd7b 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -582,7 +582,7 @@ bool StatefulReader::processGapMsg( auto ret_iterator = findCacheInFragmentedProcess(auxSN, pWP->guid(), &to_remove, history_iterator); if (to_remove != nullptr) { - ret_iterator = mp_history->remove_change_nts(to_remove, ret_iterator); + history_iterator = mp_history->remove_change_nts(to_remove, ret_iterator); } else if (ret_iterator != mp_history->changesEnd()) { @@ -599,7 +599,7 @@ bool StatefulReader::processGapMsg( auto ret_iterator = findCacheInFragmentedProcess(auxSN, pWP->guid(), &to_remove, history_iterator); if (to_remove != nullptr) { - ret_iterator = mp_history->remove_change_nts(to_remove, ret_iterator); + history_iterator = mp_history->remove_change_nts(to_remove, ret_iterator); } else if (ret_iterator != mp_history->changesEnd()) {