Skip to content

Commit

Permalink
HPCC-32946 Capture and report lookahead timings for hash distributer
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <shamser.ahmed@lexisnexis.com>
  • Loading branch information
shamser committed Nov 13, 2024
1 parent 16e1ad1 commit 683cc20
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 11 deletions.
12 changes: 12 additions & 0 deletions common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,14 @@ class LookAheadTimer : public SimpleActivityTimer
: SimpleActivityTimer(_accumulator.lookAheadCycles, _enabled) { }
};

#define MEASURE_CYCLES_ATOMIC(atomicvar, enabled, code_block) \
{ \
auto start = (enabled) ? get_cycles_now() : 0; \
code_block; \
if (enabled) \
atomicvar.add(get_cycles_now()-start); \
}

#else
class ActivityTimer
{
Expand All @@ -371,6 +379,10 @@ struct LookAheadTimer
inline LookAheadTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled){ }
};

#define MEASURE_CYCLES_ATOMIC(atomicvar, enabled, code_block) \
{\
code_block; \
}
#endif

class THORHELPER_API IndirectCodeContextEx : public IndirectCodeContext
Expand Down
9 changes: 8 additions & 1 deletion thorlcr/activities/diskread/thdiskreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,14 @@ class CDiskGroupAggregateSlave
merging = false;
appendOutputLinked(this);
}

// CSlaveActivity overloaded methods
virtual unsigned __int64 queryLookAheadCycles() const override
{
cycle_t lookAheadCycles = slaveTimerStats.lookAheadCycles;
if (distributor)
lookAheadCycles += distributor->queryLookAheadCycles();
return lookAheadCycles;
}
// IHThorGroupAggregateCallback
virtual void processRow(const void *next)
{
Expand Down
18 changes: 17 additions & 1 deletion thorlcr/activities/fetch/thfetchslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,12 @@ class CFetchStream : public IRowStream, implements IStopInput, implements IFetch
if (distributor)
distributor->abort();
}

virtual unsigned __int64 queryLookAheadCycles() const override
{
if (distributor)
return distributor->queryLookAheadCycles();
return 0;
}
// IStopInput
virtual void stopInput()
{
Expand Down Expand Up @@ -404,6 +409,15 @@ class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler
{
}

virtual unsigned __int64 queryLookAheadCycles() const override
{
CriticalBlock b(fetchStreamCS);
cycle_t lookAheadCycles = slaveTimerStats.lookAheadCycles;
if (fetchStream)
lookAheadCycles += fetchStream->queryLookAheadCycles();
return lookAheadCycles;
}

// IThorDataLink impl.
virtual void start() override
{
Expand Down Expand Up @@ -515,6 +529,8 @@ class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler
OwnedRoxieString fileName = fetchBaseHelper->getFileName();
{
CriticalBlock b(fetchStreamCS);
if (fetchStream)
slaveTimerStats.lookAheadCycles += fetchStream->queryLookAheadCycles();
fetchStream.setown(createFetchStream(*this, keyInIf, rowIf, abortSoon, fileName, parts, offsetCount, offsetMapSz, offsetMapBytes.toByteArray(), this, mptag, eexp));
}
fetchStreamOut = fetchStream->queryOutput();
Expand Down
1 change: 1 addition & 0 deletions thorlcr/activities/fetch/thfetchslave.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ interface IFetchStream : extends IInterface
virtual void abort() = 0;
virtual void getStats(CRuntimeStatisticCollection & stats) const = 0;
virtual void getFileStats(std::vector<OwnedPtr<CRuntimeStatisticCollection>> & fileStats, unsigned fileTableStart) const = 0;
virtual unsigned __int64 queryLookAheadCycles() const = 0;
};

IFetchStream *createFetchStream(CSlaveActivity &owner, IThorRowInterfaces *keyRowIf, IThorRowInterfaces *fetchRowIf, bool &abortSoon, const char *logicalFilename, CPartDescriptorArray &parts, unsigned offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *iFetchHandler, mptag_t tag, IExpander *eexp=NULL);
Expand Down
48 changes: 39 additions & 9 deletions thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
RelaxedAtomic<stat_type> numLocalRows {0};
RelaxedAtomic<stat_type> numRemoteRows {0};
RelaxedAtomic<size_t> sizeRemoteWrite {0};
RelaxedAtomic<cycle_t> lookAheadCycles {0};

void init()
{
Expand Down Expand Up @@ -859,10 +860,14 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
}
if (aborted)
break;
const void *row = input->ungroupedNextRow();
const void *row;
MEASURE_CYCLES_ATOMIC(lookAheadCycles, owner.activity->queryTimeActivities(),
{
row = input->ungroupedNextRow();
}
);
if (!row)
break;

CTarget *target = nullptr;
if (owner.isAll)
target = targets.item(0);
Expand Down Expand Up @@ -947,6 +952,10 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
stats.setStatistic(StNumRemoteRows, numRemoteRows.load());
stats.setStatistic(StSizeRemoteWrite, sizeRemoteWrite.load());
}
virtual unsigned __int64 queryLookAheadCycles() const
{
return lookAheadCycles.load();
}
// IThreadFactory impl.
virtual IPooledThread *createNew()
{
Expand Down Expand Up @@ -1167,6 +1176,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
return ssz.size();
}

// IHashDistributor
virtual void setBufferSizes(unsigned _inputBufferSize, unsigned _bucketSendSize, unsigned _pullBufferSize)
{
if (_inputBufferSize) inputBufferSize = _inputBufferSize;
Expand Down Expand Up @@ -1257,6 +1267,17 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
ihash = NULL;
iCompare = NULL;
}
virtual void mergeStats(CRuntimeStatisticCollection &stats) const
{
sender.mergeStats(stats);
CriticalBlock block(critPiperd);
if (piperd)
mergeRemappedStats(stats, piperd, diskToTempStatsMap);
}
virtual unsigned __int64 queryLookAheadCycles() const
{
return sender.queryLookAheadCycles();
}
virtual void abort()
{
if (!aborted)
Expand Down Expand Up @@ -1451,13 +1472,6 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
virtual void stopRecv() = 0;
virtual bool sendBlock(unsigned i,CMessageBuffer &mb) = 0;

virtual void mergeStats(CRuntimeStatisticCollection &stats) const
{
sender.mergeStats(stats);
CriticalBlock block(critPiperd);
if (piperd)
mergeRemappedStats(stats, piperd, diskToTempStatsMap);
}
// IExceptionHandler impl.
virtual bool fireException(IException *e)
{
Expand Down Expand Up @@ -4103,6 +4117,15 @@ class HashJoinSlaveActivity : public CSlaveActivity, implements IStopInput
activeStats.setStatistic(StNumRightRows, joinhelper->getRhsProgress());
}
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = slaveTimerStats.lookAheadCycles;
if (lhsDistributor)
lookAheadCycles += lhsDistributor->queryLookAheadCycles();
if (rhsDistributor)
lookAheadCycles += rhsDistributor->queryLookAheadCycles();
return lookAheadCycles;
}
};
#ifdef _MSC_VER
#pragma warning(pop)
Expand Down Expand Up @@ -4584,6 +4607,13 @@ class CHashAggregateSlave : public CSlaveActivity, implements IHThorRowAggregato
info.canStall = true;
// maybe more?
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = slaveTimerStats.lookAheadCycles;
if (distributor)
lookAheadCycles += distributor->queryLookAheadCycles();
return lookAheadCycles;
}
// IHThorRowAggregator impl
virtual size32_t clearAggregate(ARowBuilder & rowBuilder) override { return helper->clearAggregate(rowBuilder); }
virtual size32_t processFirst(ARowBuilder & rowBuilder, const void * src) override { return helper->processFirst(rowBuilder, src); }
Expand Down
1 change: 1 addition & 0 deletions thorlcr/activities/hashdistrib/thhashdistribslave.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ interface IHashDistributor : extends IInterface
virtual void join()=0;
virtual void setBufferSizes(unsigned sendBufferSize, unsigned outputBufferSize, unsigned pullBufferSize) = 0;
virtual void mergeStats(CRuntimeStatisticCollection &stats) const = 0;
virtual unsigned __int64 queryLookAheadCycles() const = 0;
virtual void abort()=0;
};

Expand Down
7 changes: 7 additions & 0 deletions thorlcr/activities/indexread/thindexreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,13 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements
merging = false;
appendOutputLinked(this);
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = slaveTimerStats.lookAheadCycles;
if (distributor)
lookAheadCycles += distributor->queryLookAheadCycles();
return lookAheadCycles;
}
// IHThorGroupAggregateCallback
virtual void processRow(const void *next)
{
Expand Down
9 changes: 9 additions & 0 deletions thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2837,6 +2837,15 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
PARENT::start();
dbgassertex(isSmart() || (leftITDL->isGrouped() == grouped)); // std. lookup join expects these to match
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = slaveTimerStats.lookAheadCycles;
if (rhsDistributor)
lookAheadCycles += rhsDistributor->queryLookAheadCycles();
if (lhsDistributor)
lookAheadCycles += lhsDistributor->queryLookAheadCycles();
return lookAheadCycles;
}
virtual void reset() override
{
PARENT::reset();
Expand Down

0 comments on commit 683cc20

Please sign in to comment.