Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving ReaderHistory sorting [1.9.x] #894

Merged
merged 2 commits into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions include/fastrtps/rtps/history/ReaderHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include "../common/CacheChange.h"

namespace eprosima {
namespace fastrtps{
namespace fastrtps {
namespace rtps {

class WriterProxy;
Expand All @@ -39,10 +39,12 @@ class ReaderHistory : public History
friend class RTPSReader;

public:

/**
* Constructor of the ReaderHistory. It needs a HistoryAttributes.
*/
RTPS_DllAPI ReaderHistory(const HistoryAttributes& att);
RTPS_DllAPI ReaderHistory(
const HistoryAttributes& att);
RTPS_DllAPI virtual ~ReaderHistory();

/**
Expand All @@ -52,28 +54,31 @@ class ReaderHistory : public History
* @param change Pointer to the change
* @return True if added.
*/
RTPS_DllAPI virtual bool received_change(CacheChange_t* change, size_t);
RTPS_DllAPI virtual bool received_change(CacheChange_t * change, size_t);

/**
* Add a CacheChange_t to the ReaderHistory.
* @param a_change Pointer to the CacheChange to add.
* @return True if added.
*/
RTPS_DllAPI bool add_change(CacheChange_t* a_change);
RTPS_DllAPI bool add_change(
CacheChange_t* a_change);

/**
* Remove a CacheChange_t from the ReaderHistory.
* @param a_change Pointer to the CacheChange to remove.
* @return True if removed.
*/
RTPS_DllAPI bool remove_change(CacheChange_t* a_change) override;
RTPS_DllAPI bool remove_change(
CacheChange_t* a_change) override;

/**
* Remove all changes from the History that have a certain guid.
* @param a_guid Pointer to the target guid to search for.
* @return True if succesful, even if no changes have been removed.
* */
RTPS_DllAPI bool remove_changes_with_guid(const GUID_t& a_guid);
RTPS_DllAPI bool remove_changes_with_guid(
const GUID_t& a_guid);

/**
* Remove all fragmented changes from certain writer up to certain sequence number.
Expand All @@ -85,18 +90,17 @@ class ReaderHistory : public History
const SequenceNumber_t& seq_num,
const GUID_t& writer_guid);

/**
* Sort the CacheChange_t from the History by timestamp
*/
RTPS_DllAPI void sortCacheChanges();
/**
* Update the maximum and minimum sequenceNumber cacheChanges.
*/
RTPS_DllAPI void updateMaxMinSeqNum() override;

RTPS_DllAPI bool get_min_change_from(CacheChange_t** min_change, const GUID_t& writerGuid);
RTPS_DllAPI bool get_min_change_from(
CacheChange_t** min_change,
const GUID_t& writerGuid);

protected:

//!Pointer to the reader
RTPSReader* mp_reader;
};
Expand Down
107 changes: 60 additions & 47 deletions src/cpp/rtps/history/ReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@
#include <mutex>

namespace eprosima {
namespace fastrtps{
namespace fastrtps {
namespace rtps {

ReaderHistory::ReaderHistory(const HistoryAttributes& att)
ReaderHistory::ReaderHistory(
const HistoryAttributes& att)
: History(att)
, mp_reader(nullptr)
{
Expand All @@ -40,100 +41,118 @@ ReaderHistory::~ReaderHistory()
{
}

bool ReaderHistory::received_change(CacheChange_t* change, size_t)
bool ReaderHistory::received_change(
CacheChange_t* change,
size_t)
{
return add_change(change);
}

bool ReaderHistory::add_change(CacheChange_t* a_change)
bool ReaderHistory::add_change(
CacheChange_t* a_change)
{
if(mp_reader == nullptr || mp_mutex == nullptr)
if (mp_reader == nullptr || mp_mutex == nullptr)
{
logError(RTPS_HISTORY,"You need to create a Reader with this History before adding any changes");
logError(RTPS_HISTORY, "You need to create a Reader with this History before adding any changes");
return false;
}

std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
if(m_att.memoryPolicy == PREALLOCATED_MEMORY_MODE && a_change->serializedPayload.length > m_att.payloadMaxSize)
if (m_att.memoryPolicy == PREALLOCATED_MEMORY_MODE && a_change->serializedPayload.length > m_att.payloadMaxSize)
{
logError(RTPS_HISTORY,
"Change payload size of '" << a_change->serializedPayload.length <<
"' bytes is larger than the history payload size of '" << m_att.payloadMaxSize <<
"' bytes and cannot be resized.");
"Change payload size of '" << a_change->serializedPayload.length <<
"' bytes is larger than the history payload size of '" << m_att.payloadMaxSize <<
"' bytes and cannot be resized.");
return false;
}
if(a_change->writerGUID == c_Guid_Unknown)
if (a_change->writerGUID == c_Guid_Unknown)
{
logError(RTPS_HISTORY, "The Writer GUID_t must be defined");
}

if (!m_changes.empty() && a_change->sourceTimestamp < (*m_changes.rbegin())->sourceTimestamp)
{
auto it = std::lower_bound(m_changes.begin(), m_changes.end(), a_change,
[](const CacheChange_t* c1, const CacheChange_t* c2) -> bool
{
return c1->sourceTimestamp < c2->sourceTimestamp;
});
m_changes.insert(it, a_change);
}
else
{
logError(RTPS_HISTORY,"The Writer GUID_t must be defined");
m_changes.push_back(a_change);
}

m_changes.push_back(a_change);
sortCacheChanges();
updateMaxMinSeqNum();
logInfo(RTPS_HISTORY, "Change " << a_change->sequenceNumber << " added with " << a_change->serializedPayload.length << " bytes");
logInfo(RTPS_HISTORY,
"Change " << a_change->sequenceNumber << " added with " << a_change->serializedPayload.length << " bytes");

return true;
}

bool ReaderHistory::remove_change(CacheChange_t* a_change)
bool ReaderHistory::remove_change(
CacheChange_t* a_change)
{
if(mp_reader == nullptr || mp_mutex == nullptr)
if (mp_reader == nullptr || mp_mutex == nullptr)
{
logError(RTPS_HISTORY,"You need to create a Reader with this History before removing any changes");
logError(RTPS_HISTORY, "You need to create a Reader with this History before removing any changes");
return false;
}

std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
if(a_change == nullptr)
if (a_change == nullptr)
{
logError(RTPS_HISTORY,"Pointer is not valid")
logError(RTPS_HISTORY, "Pointer is not valid")
return false;
}
for(std::vector<CacheChange_t*>::iterator chit = m_changes.begin();
chit!=m_changes.end();++chit)
for (std::vector<CacheChange_t*>::iterator chit = m_changes.begin();
chit != m_changes.end(); ++chit)
{
if((*chit)->sequenceNumber == a_change->sequenceNumber &&
if ((*chit)->sequenceNumber == a_change->sequenceNumber &&
(*chit)->writerGUID == a_change->writerGUID)
{
logInfo(RTPS_HISTORY,"Removing change "<< a_change->sequenceNumber);
logInfo(RTPS_HISTORY, "Removing change " << a_change->sequenceNumber);
mp_reader->change_removed_by_history(a_change);
m_changePool.release_Cache(a_change);
m_changes.erase(chit);
sortCacheChanges();
updateMaxMinSeqNum();
return true;
}
}
logWarning(RTPS_HISTORY,"SequenceNumber "<<a_change->sequenceNumber << " not found");
logWarning(RTPS_HISTORY, "SequenceNumber " << a_change->sequenceNumber << " not found");
return false;
}

bool ReaderHistory::remove_changes_with_guid(const GUID_t& a_guid)
bool ReaderHistory::remove_changes_with_guid(
const GUID_t& a_guid)
{
std::vector<CacheChange_t*> changes_to_remove;

if(mp_reader == nullptr || mp_mutex == nullptr)
if (mp_reader == nullptr || mp_mutex == nullptr)
{
logError(RTPS_HISTORY,"You need to create a Reader with History before removing any changes");
logError(RTPS_HISTORY, "You need to create a Reader with History before removing any changes");
return false;
}

{//Lock scope
std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
for(std::vector<CacheChange_t*>::iterator chit = m_changes.begin(); chit!=m_changes.end();++chit)
for (std::vector<CacheChange_t*>::iterator chit = m_changes.begin(); chit != m_changes.end(); ++chit)
{
if((*chit)->writerGUID == a_guid)
if ((*chit)->writerGUID == a_guid)
{
changes_to_remove.push_back( (*chit) );
}
}
}//End lock scope

for(std::vector<CacheChange_t*>::iterator chit = changes_to_remove.begin(); chit != changes_to_remove.end(); ++chit)
for (std::vector<CacheChange_t*>::iterator chit = changes_to_remove.begin(); chit != changes_to_remove.end();
++chit)
{
if(!remove_change(*chit))
if (!remove_change(*chit))
{
logError(RTPS_HISTORY,"One of the cachechanged in the GUID removal bulk could not be removed");
logError(RTPS_HISTORY, "One of the cachechanged in the GUID removal bulk could not be removed");
return false;
}
}
Expand Down Expand Up @@ -180,32 +199,26 @@ bool ReaderHistory::remove_fragmented_changes_until(

if (at_least_one_removed)
{
sortCacheChanges();
updateMaxMinSeqNum();
}

return true;
}

void ReaderHistory::sortCacheChanges()
{
std::sort(m_changes.begin(),
m_changes.end(),
[](CacheChange_t* c1, CacheChange_t* c2){ return c1->sourceTimestamp < c2->sourceTimestamp; });
}

void ReaderHistory::updateMaxMinSeqNum()
{
if(m_changes.size()==0)
if (m_changes.size() == 0)
{
mp_minSeqCacheChange = mp_invalidCache;
mp_maxSeqCacheChange = mp_invalidCache;
}
else
{
auto minmax = std::minmax_element(m_changes.begin(),
m_changes.end(),
[](CacheChange_t* c1, CacheChange_t* c2){ return c1->sequenceNumber < c2->sequenceNumber; });
m_changes.end(),
[](CacheChange_t* c1, CacheChange_t* c2){
return c1->sequenceNumber < c2->sequenceNumber;
});
mp_minSeqCacheChange = *(minmax.first);
mp_maxSeqCacheChange = *(minmax.second);
}
Expand All @@ -218,9 +231,9 @@ bool ReaderHistory::get_min_change_from(
bool ret = false;
*min_change = nullptr;

for(auto it = m_changes.begin(); it != m_changes.end(); ++it)
for (auto it = m_changes.begin(); it != m_changes.end(); ++it)
{
if((*it)->writerGUID == writerGuid)
if ((*it)->writerGUID == writerGuid)
{
*min_change = *it;
ret = true;
Expand Down