Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.9.x] Performance improvements #905

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
338 changes: 192 additions & 146 deletions include/fastrtps/rtps/history/History.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
#include <cassert>

namespace eprosima {
namespace fastrtps{
namespace fastrtps {
namespace rtps {

/**
Expand All @@ -41,151 +41,197 @@ namespace rtps {
*/
class History
{
protected:
History(const HistoryAttributes& att);
virtual ~History();
public:
//!Attributes of the History
HistoryAttributes m_att;
/**
* Reserve a CacheChange_t from the CacheChange pool.
* @param[out] change Pointer to pointer to the CacheChange_t to reserve
* @param[in] calculateSizeFunc Function to calculate the size of the change.
* @return True is reserved
*/
RTPS_DllAPI inline bool reserve_Cache(
CacheChange_t** change,
const std::function<uint32_t()>& calculateSizeFunc)
{
std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
return m_changePool.reserve_Cache(change, calculateSizeFunc);
}

RTPS_DllAPI inline bool reserve_Cache(CacheChange_t** change, uint32_t dataSize)
{
std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
return m_changePool.reserve_Cache(change, dataSize);
}

/**
* release a previously reserved CacheChange_t.
* @param ch Pointer to the CacheChange_t.
*/
RTPS_DllAPI inline void release_Cache(CacheChange_t* ch)
{
std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
return m_changePool.release_Cache(ch);
}

/**
* Check if the history is full
* @return true if the History is full.
*/
RTPS_DllAPI bool isFull() { return m_isHistoryFull; }

/**
* Get the History size.
* @return Size of the history.
*/
RTPS_DllAPI size_t getHistorySize()
{
std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
return m_changes.size();
}

/**
* Remove all changes from the History
* @return True if everything was correctly removed.
*/
RTPS_DllAPI bool remove_all_changes();

/**
* Update the maximum and minimum sequenceNumbers.
*/
virtual void updateMaxMinSeqNum()=0;

/**
* Remove a specific change from the history.
* @param ch Pointer to the CacheChange_t.
* @return True if removed.
*/
virtual bool remove_change(CacheChange_t* ch) = 0;

/**
* Get the beginning of the changes history iterator.
* @return Iterator to the beginning of the vector.
*/
RTPS_DllAPI std::vector<CacheChange_t*>::iterator changesBegin(){ return m_changes.begin(); }
RTPS_DllAPI std::vector<CacheChange_t*>::reverse_iterator changesRbegin() { return m_changes.rbegin(); }
/**
* Get the end of the changes history iterator.
* @return Iterator to the end of the vector.
*/
RTPS_DllAPI std::vector<CacheChange_t*>::iterator changesEnd(){ return m_changes.end(); }
RTPS_DllAPI std::vector<CacheChange_t*>::reverse_iterator changesRend() { return m_changes.rend(); }
/**
* Get the minimum CacheChange_t.
* @param min_change Pointer to pointer to the minimum change.
* @return True if correct.
*/
RTPS_DllAPI bool get_min_change(CacheChange_t** min_change);

/**
* Get the maximum CacheChange_t.
* @param max_change Pointer to pointer to the maximum change.
* @return True if correct.
*/
RTPS_DllAPI bool get_max_change(CacheChange_t** max_change);

/**
* Get the maximum serialized payload size
* @return Maximum serialized payload size
*/
RTPS_DllAPI inline uint32_t getTypeMaxSerialized(){ return m_changePool.getInitialPayloadSize(); }

/*!
* Get the mutex
* @return Mutex
*/
RTPS_DllAPI inline RecursiveTimedMutex* getMutex() { assert(mp_mutex != nullptr); return mp_mutex; }

RTPS_DllAPI bool get_change(
const SequenceNumber_t& seq,
const GUID_t& guid,
CacheChange_t** change) const;

/**
* @brief A method to get the change with the earliest timestamp
* @param change Pointer to pointer to earliest change
* @return True on success
*/
bool get_earliest_change(CacheChange_t** change);

protected:

//!Vector of pointers to the CacheChange_t.
std::vector<CacheChange_t*> m_changes;

//!Variable to know if the history is full without needing to block the History mutex.
bool m_isHistoryFull;

//!Pointer to and invalid cacheChange used to return the maximum and minimum when no changes are stored in the history.
CacheChange_t* mp_invalidCache;

//!Pool of cache changes reserved when the History is created.
CacheChangePool m_changePool;

//!Pointer to the minimum sequeceNumber CacheChange.
CacheChange_t* mp_minSeqCacheChange;

//!Pointer to the maximum sequeceNumber CacheChange.
CacheChange_t* mp_maxSeqCacheChange;

//!Print the seqNum of the changes in the History (for debuggisi, mng purposes).
void print_changes_seqNum2();

//!Mutex for the History.
RecursiveTimedMutex* mp_mutex;
protected:

History(
const HistoryAttributes& att);
virtual ~History();

public:

using iterator = std::vector<CacheChange_t*>::iterator;
using reverse_iterator = std::vector<CacheChange_t*>::reverse_iterator;
using const_iterator = std::vector<CacheChange_t*>::const_iterator;

//!Attributes of the History
HistoryAttributes m_att;
/**
* Reserve a CacheChange_t from the CacheChange pool.
* @param[out] change Pointer to pointer to the CacheChange_t to reserve
* @param[in] calculateSizeFunc Function to calculate the size of the change.
* @return True is reserved
*/
RTPS_DllAPI inline bool reserve_Cache(
CacheChange_t** change,
const std::function<uint32_t()>& calculateSizeFunc)
{
std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
return m_changePool.reserve_Cache(change, calculateSizeFunc);
}

RTPS_DllAPI inline bool reserve_Cache(
CacheChange_t** change,
uint32_t dataSize)
{
std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
return m_changePool.reserve_Cache(change, dataSize);
}

/**
* release a previously reserved CacheChange_t.
* @param ch Pointer to the CacheChange_t.
*/
RTPS_DllAPI inline void release_Cache(
CacheChange_t* ch)
{
std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
return m_changePool.release_Cache(ch);
}

/**
* Check if the history is full
* @return true if the History is full.
*/
RTPS_DllAPI bool isFull()
{
return m_isHistoryFull;
}

/**
* Get the History size.
* @return Size of the history.
*/
RTPS_DllAPI size_t getHistorySize()
{
std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
return m_changes.size();
}

/**
* Remove all changes from the History
* @return True if everything was correctly removed.
*/
RTPS_DllAPI bool remove_all_changes();

/**
* Update the maximum and minimum sequenceNumbers.
*/
virtual void updateMaxMinSeqNum() = 0;

/**
* Remove a specific change from the history.
* @param ch Pointer to the CacheChange_t.
* @return True if removed.
*/
virtual bool remove_change(
CacheChange_t* ch) = 0;

/**
* Get the beginning of the changes history iterator.
* @return Iterator to the beginning of the vector.
*/
RTPS_DllAPI iterator changesBegin()
{
return m_changes.begin();
}

RTPS_DllAPI reverse_iterator changesRbegin()
{
return m_changes.rbegin();
}

/**
* Get the end of the changes history iterator.
* @return Iterator to the end of the vector.
*/
RTPS_DllAPI iterator changesEnd()
{
return m_changes.end();
}

RTPS_DllAPI reverse_iterator changesRend()
{
return m_changes.rend();
}

/**
* Get the minimum CacheChange_t.
* @param min_change Pointer to pointer to the minimum change.
* @return True if correct.
*/
RTPS_DllAPI bool get_min_change(
CacheChange_t** min_change);

/**
* Get the maximum CacheChange_t.
* @param max_change Pointer to pointer to the maximum change.
* @return True if correct.
*/
RTPS_DllAPI bool get_max_change(
CacheChange_t** max_change);

/**
* Get the maximum serialized payload size
* @return Maximum serialized payload size
*/
RTPS_DllAPI inline uint32_t getTypeMaxSerialized()
{
return m_changePool.getInitialPayloadSize();
}

/*!
* Get the mutex
* @return Mutex
*/
RTPS_DllAPI inline RecursiveTimedMutex* getMutex()
{
assert(mp_mutex != nullptr); return mp_mutex;
}

RTPS_DllAPI bool get_change(
const SequenceNumber_t& seq,
const GUID_t& guid,
CacheChange_t** change) const;

const_iterator get_change_nts(
const SequenceNumber_t& seq,
const GUID_t& guid,
CacheChange_t** change,
const_iterator hint) const;

/**
* @brief A method to get the change with the earliest timestamp
* @param change Pointer to pointer to earliest change
* @return True on success
*/
bool get_earliest_change(
CacheChange_t** change);

protected:

//!Vector of pointers to the CacheChange_t.
std::vector<CacheChange_t*> m_changes;

//!Variable to know if the history is full without needing to block the History mutex.
bool m_isHistoryFull;

//!Pointer to and invalid cacheChange used to return the maximum and minimum when no changes are stored in the history.
CacheChange_t* mp_invalidCache;

//!Pool of cache changes reserved when the History is created.
CacheChangePool m_changePool;

//!Pointer to the minimum sequeceNumber CacheChange.
CacheChange_t* mp_minSeqCacheChange;

//!Pointer to the maximum sequeceNumber CacheChange.
CacheChange_t* mp_maxSeqCacheChange;

//!Print the seqNum of the changes in the History (for debuggisi, mng purposes).
void print_changes_seqNum2();

//!Mutex for the History.
RecursiveTimedMutex* mp_mutex;

};

Expand Down
Loading