From c325e76181355865cdda03eb7272adfcd90fe1fa Mon Sep 17 00:00:00 2001 From: Nik Bougalis Date: Fri, 21 Feb 2014 11:33:59 -0800 Subject: [PATCH 1/3] Refactor Job tracking and statistics gathering --- src/ripple_core/functional/Job.cpp | 72 ---- src/ripple_core/functional/Job.h | 88 ++-- src/ripple_core/functional/JobQueue.cpp | 493 ++++++++--------------- src/ripple_core/functional/JobQueue.h | 3 - src/ripple_core/functional/JobTypeData.h | 99 +++++ src/ripple_core/functional/JobTypeInfo.h | 101 +++++ src/ripple_core/functional/JobTypes.h | 218 ++++++++++ 7 files changed, 634 insertions(+), 440 deletions(-) create mode 100644 src/ripple_core/functional/JobTypeData.h create mode 100644 src/ripple_core/functional/JobTypeInfo.h create mode 100644 src/ripple_core/functional/JobTypes.h diff --git a/src/ripple_core/functional/Job.cpp b/src/ripple_core/functional/Job.cpp index 47e8c3bb408..14a5ab1e3ca 100644 --- a/src/ripple_core/functional/Job.cpp +++ b/src/ripple_core/functional/Job.cpp @@ -31,19 +31,6 @@ Job::Job (JobType type, uint64 index) { } -#if 0 -Job::Job (Job const& other) - : m_cancelCallback (other.m_cancelCallback) - , mType (other.mType) - , mJobIndex (other.mJobIndex) - , mJob (other.mJob) - , m_loadEvent (other.m_loadEvent) - , mName (other.mName) - , m_queue_time (other.m_queue_time) -{ -} -#endif - Job::Job (JobType type, std::string const& name, uint64 index, @@ -60,19 +47,6 @@ Job::Job (JobType type, m_loadEvent = boost::make_shared (boost::ref (lm), name, false); } -/* -Job& Job::operator= (Job const& other) -{ - mType = other.mType; - mJobIndex = other.mJobIndex; - mJob = other.mJob; - m_loadEvent = other.m_loadEvent; - mName = other.mName; - m_cancelCallback = other.m_cancelCallback; - return *this; -} -*/ - JobType Job::getType () const { return mType; @@ -109,52 +83,6 @@ void Job::rename (std::string const& newName) mName = newName; } -const char* Job::toString (JobType t) -{ - switch (t) - { - case jtINVALID: return "invalid"; - case jtPACK: return "peerLedgerReq"; - case jtPUBOLDLEDGER: return "publishAcqLedger"; - case jtVALIDATION_ut: return "untrustedValidation"; - case jtPROOFWORK: return "proofOfWork"; - case jtTRANSACTION_l: return "localTransaction"; - case jtPROPOSAL_ut: return "untrustedProposal"; - case jtLEDGER_DATA: return "ledgerData"; - case jtUPDATE_PF: return "updatePaths"; - case jtCLIENT: return "clientCommand"; - case jtRPC: return "RPC"; - case jtTRANSACTION: return "transaction"; - case jtUNL: return "unl"; - case jtADVANCE: return "advanceLedger"; - case jtPUBLEDGER: return "publishNewLedger"; - case jtTXN_DATA: return "fetchTxnData"; - case jtWAL: return "writeAhead"; - case jtVALIDATION_t: return "trustedValidation"; - case jtWRITE: return "writeObjects"; - case jtACCEPT: return "acceptLedger"; - case jtPROPOSAL_t: return "trustedProposal"; - case jtSWEEP: return "sweep"; - case jtNETOP_CLUSTER: return "clusterReport"; - case jtNETOP_TIMER: return "heartbeat"; - - case jtADMIN: return "administration"; - - // special types not dispatched by the job pool - case jtPEER: return "peerCommand"; - case jtDISK: return "diskAccess"; - case jtTXN_PROC: return "processTransaction"; - case jtOB_SETUP: return "orderBookSetup"; - case jtPATH_FIND: return "pathFind"; - case jtHO_READ: return "nodeRead"; - case jtHO_WRITE: return "nodeWrite"; - case jtGENERIC: return "generic"; - default: - assert (false); - return "unknown"; - } -} - bool Job::operator> (const Job& j) const { if (mType < j.mType) diff --git a/src/ripple_core/functional/Job.h b/src/ripple_core/functional/Job.h index 0c817a8e742..695e3a91467 100644 --- a/src/ripple_core/functional/Job.h +++ b/src/ripple_core/functional/Job.h @@ -24,48 +24,51 @@ namespace ripple { // Note that this queue should only be used for CPU-bound jobs // It is primarily intended for signature checking + enum JobType { - // must be in priority order, low to high - jtINVALID = -1, - jtPACK = 1, // Make a fetch pack for a peer - jtPUBOLDLEDGER = 2, // An old ledger has been accepted - jtVALIDATION_ut = 3, // A validation from an untrusted source - jtPROOFWORK = 4, // A proof of work demand from another server - jtTRANSACTION_l = 5, // A local transaction - jtPROPOSAL_ut = 6, // A proposal from an untrusted source - jtLEDGER_DATA = 7, // Received data for a ledger we're acquiring - jtUPDATE_PF = 8, // Update pathfinding requests - jtCLIENT = 9, // A websocket command from the client - jtRPC = 10, // A websocket command from the client - jtTRANSACTION = 11, // A transaction received from the network - jtUNL = 12, // A Score or Fetch of the UNL (DEPRECATED) - jtADVANCE = 13, // Advance validated/acquired ledgers - jtPUBLEDGER = 14, // Publish a fully-accepted ledger - jtTXN_DATA = 15, // Fetch a proposed set - jtWAL = 16, // Write-ahead logging - jtVALIDATION_t = 17, // A validation from a trusted source - jtWRITE = 18, // Write out hashed objects - jtACCEPT = 19, // Accept a consensus ledger - jtPROPOSAL_t = 20, // A proposal from a trusted source - jtSWEEP = 21, // Sweep for stale structures - jtNETOP_CLUSTER = 22, // NetworkOPs cluster peer report - jtNETOP_TIMER = 23, // NetworkOPs net timer processing - jtADMIN = 24, // An administrative operation - - // special types not dispatched by the job pool - jtPEER = 30, - jtDISK = 31, - jtTXN_PROC = 32, - jtOB_SETUP = 33, - jtPATH_FIND = 34, - jtHO_READ = 35, - jtHO_WRITE = 36, - jtGENERIC = 37, // Used just to measure time -}; // CAUTION: If you add new types, add them to Job.cpp too - -// VFALCO TODO move this into the enum so it calculates itself? -#define NUM_JOB_TYPES 48 // why 48 and not 38? + // Special type indicating an invalid job - will go away soon. + jtINVALID = -1, + + // Job types - the position in this enum indicates the job priority with + // earlier jobs having lower priority than later jobs. If you wish to + // insert a job at a specific priority, simply add it at the right location. + + jtPACK, // Make a fetch pack for a peer + jtPUBOLDLEDGER, // An old ledger has been accepted + jtVALIDATION_ut, // A validation from an untrusted source + jtPROOFWORK, // A proof of work demand from another server + jtTRANSACTION_l, // A local transaction + jtPROPOSAL_ut, // A proposal from an untrusted source + jtLEDGER_DATA, // Received data for a ledger we're acquiring + jtUPDATE_PF, // Update pathfinding requests + jtCLIENT, // A websocket command from the client + jtRPC, // A websocket command from the client + jtTRANSACTION, // A transaction received from the network + jtUNL, // A Score or Fetch of the UNL (DEPRECATED) + jtADVANCE, // Advance validated/acquired ledgers + jtPUBLEDGER, // Publish a fully-accepted ledger + jtTXN_DATA, // Fetch a proposed set + jtWAL, // Write-ahead logging + jtVALIDATION_t, // A validation from a trusted source + jtWRITE, // Write out hashed objects + jtACCEPT, // Accept a consensus ledger + jtPROPOSAL_t, // A proposal from a trusted source + jtSWEEP, // Sweep for stale structures + jtNETOP_CLUSTER, // NetworkOPs cluster peer report + jtNETOP_TIMER, // NetworkOPs net timer processing + jtADMIN, // An administrative operation + + // Special job types which are not dispatched by the job pool + jtPEER , + jtDISK , + jtTXN_PROC , + jtOB_SETUP , + jtPATH_FIND , + jtHO_READ , + jtHO_WRITE , + jtGENERIC , // Used just to measure time +}; class Job { @@ -113,14 +116,13 @@ class Job void rename (const std::string& n); - // These comparison operators make the jobs sort in priority order in the job set + // These comparison operators make the jobs sort in priority order + // in the job set bool operator< (const Job& j) const; bool operator> (const Job& j) const; bool operator<= (const Job& j) const; bool operator>= (const Job& j) const; - static const char* toString (JobType); - private: CancelCallback m_cancelCallback; JobType mType; diff --git a/src/ripple_core/functional/JobQueue.cpp b/src/ripple_core/functional/JobQueue.cpp index 7f2e44643dd..d8a5b4d33f7 100644 --- a/src/ripple_core/functional/JobQueue.cpp +++ b/src/ripple_core/functional/JobQueue.cpp @@ -18,6 +18,9 @@ //============================================================================== #include "JobQueue.h" +#include "JobTypes.h" +#include "JobTypeInfo.h" +#include "JobTypeData.h" #include "beast/beast/make_unique.h" #include "beast/beast/chrono/chrono_util.h" @@ -31,186 +34,96 @@ class JobQueueImp , private Workers::Callback { public: - struct Stats - { - insight::Hook hook; - insight::Gauge job_count; - - // VFALCO TODO should enumerate the map of jobtypes instead - explicit Stats (insight::Collector::ptr const& collector) - : m_collector (collector) - { - add (jtPACK , "make_pack"); - add (jtPUBOLDLEDGER , "pub_oldledgx"); - add (jtVALIDATION_ut, "ut_validx"); - add (jtPROOFWORK , "proof_of_work"); - add (jtTRANSACTION_l, "local_tx"); - add (jtPROPOSAL_ut , "ut_proposal"); - add (jtLEDGER_DATA , "ledgx_data"); - add (jtUPDATE_PF , "upd_paths"); - add (jtCLIENT , "client_cmd"); - add (jtRPC , "rpc_cmd"); - add (jtTRANSACTION , "recv_tx"); - add (jtUNL , "unl_op"); - add (jtADVANCE , "next_ledgx"); - add (jtPUBLEDGER , "pub_ledgx"); - add (jtTXN_DATA , "tx_data"); - add (jtWAL , "wal"); - add (jtVALIDATION_t , "t_validx"); - add (jtWRITE , "write"); - add (jtACCEPT , "accept_ledgx"); - add (jtPROPOSAL_t , "t_prop"); - add (jtSWEEP , "sweep"); - add (jtNETOP_CLUSTER, "netop_clust"); - add (jtNETOP_TIMER , "netop_heart"); - add (jtADMIN , "admin"); - } - - template - void on_dequeue (JobType type, - std::chrono::duration const& value) const - { - auto const ms (ceil (value)); - if (ms.count() >= 10) - m_dequeue.find (type)->second.notify (ms); - } - - template - void on_execute (JobType type, - std::chrono::duration const& value) const - { - auto const ms (ceil (value)); - if (ms.count() >= 10) - m_execute.find (type)->second.notify (ms); - } - - private: - void add (JobType type, std::string const& label) - { - m_dequeue.emplace (type, m_collector->make_event (label + "_q")); - m_execute.emplace (type, m_collector->make_event (label)); - } - - typedef std::unordered_map > JobEvents; - - insight::Collector::ptr m_collector; - JobEvents m_dequeue; - JobEvents m_execute; - }; - - //-------------------------------------------------------------------------- - - struct Count - { - Count () noexcept - : type (jtINVALID) - , waiting (0) - , running (0) - , deferred (0) - { - } - - Count (JobType type_) noexcept - : type (type_) - , waiting (0) - , running (0) - , deferred (0) - { - } - - JobType type; // The type of Job these counts reflect - int waiting; // The number waiting - int running; // How many are running - int deferred; // Number of jobs we didn't signal due to limits - }; - typedef std::set JobSet; - typedef std::map MapType; + typedef std::map JobDataMap; typedef CriticalSection::ScopedLockType ScopedLock; Journal m_journal; - Stats m_stats; CriticalSection m_mutex; uint64 m_lastJob; JobSet m_jobSet; - MapType m_jobCounts; + JobDataMap m_jobData; + JobTypeData m_invalidJobData; - // The number of jobs running through processTask() + // The number of jobs currently in processTask() int m_processCount; Workers m_workers; - LoadMonitor m_loads [NUM_JOB_TYPES]; CancelCallback m_cancelCallback; + // statistics tracking + insight::Collector::ptr m_collector; + insight::Gauge job_count; + insight::Hook hook; + //-------------------------------------------------------------------------- + static JobTypes const& getJobTypes () + { + static JobTypes types; + return types; + } + + //-------------------------------------------------------------------------- JobQueueImp (insight::Collector::ptr const& collector, Stoppable& parent, Journal journal) : JobQueue ("JobQueue", parent) , m_journal (journal) - , m_stats (collector) , m_lastJob (0) + , m_invalidJobData (getJobTypes ().getInvalid (), collector) , m_processCount (0) , m_workers (*this, "JobQueue", 0) , m_cancelCallback (boost::bind (&Stoppable::isStopping, this)) + , m_collector (collector) { - m_stats.hook = collector->make_hook (std::bind ( + hook = m_collector->make_hook (std::bind ( &JobQueueImp::collect, this)); - m_stats.job_count = collector->make_gauge ("job_count"); + job_count = m_collector->make_gauge ("job_count"); { ScopedLock lock (m_mutex); - // Initialize the job counts. - // The 'limit' field in particular will be set based on the limit - for (int i = 0; i < NUM_JOB_TYPES; ++i) + for (auto const& x : getJobTypes ()) { - JobType const type (static_cast (i)); - m_jobCounts [type] = Count (type); + JobTypeInfo const& jt = x.second; + + // And create dynamic information for all jobs + auto const result (m_jobData.emplace (std::piecewise_construct, + std::forward_as_tuple (jt.type ()), + std::forward_as_tuple (jt, m_collector))); + assert (result.second == true); } } - - m_loads [ jtPUBOLDLEDGER ].setTargetLatency (10000, 15000); - m_loads [ jtVALIDATION_ut ].setTargetLatency (2000, 5000); - m_loads [ jtPROOFWORK ].setTargetLatency (2000, 5000); - m_loads [ jtTRANSACTION ].setTargetLatency (250, 1000); - m_loads [ jtPROPOSAL_ut ].setTargetLatency (500, 1250); - m_loads [ jtPUBLEDGER ].setTargetLatency (3000, 4500); - m_loads [ jtWAL ].setTargetLatency (1000, 2500); - m_loads [ jtVALIDATION_t ].setTargetLatency (500, 1500); - m_loads [ jtWRITE ].setTargetLatency (1750, 2500); - m_loads [ jtTRANSACTION_l ].setTargetLatency (100, 500); - m_loads [ jtPROPOSAL_t ].setTargetLatency (100, 500); - - m_loads [ jtCLIENT ].setTargetLatency (2000, 5000); - m_loads [ jtPEER ].setTargetLatency (200, 2500); - m_loads [ jtDISK ].setTargetLatency (500, 1000); - - m_loads [ jtNETOP_CLUSTER ].setTargetLatency (9999, 9999); // once per 10 seconds - m_loads [ jtNETOP_TIMER ].setTargetLatency (999, 999); // once per second } ~JobQueueImp () { // Must unhook before destroying - m_stats.hook = insight::Hook (); + hook = insight::Hook (); } void collect () { ScopedLock lock (m_mutex); - m_stats.job_count = m_jobSet.size (); + job_count = m_jobSet.size (); } void addJob (JobType type, std::string const& name, boost::function const& jobFunc) { - bassert (type != jtINVALID); + assert (type != jtINVALID); + + JobDataMap::iterator iter (m_jobData.find (type)); + assert (iter != m_jobData.end ()); + + if (iter == m_jobData.end ()) + return; + + JobTypeData& data (iter->second); // FIXME: Workaround incorrect client shutdown ordering // do not add jobs to a queue with no threads - bassert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0); + assert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0); { // If this goes off it means that a child didn't follow @@ -225,7 +138,7 @@ class JobQueueImp // * Not all children are stopped // ScopedLock lock (m_mutex); - bassert (! isStopped() && ( + assert (! isStopped() && ( m_processCount>0 || ! m_jobSet.empty () || ! areChildrenStopped())); @@ -246,7 +159,7 @@ class JobQueueImp std::pair ::iterator, bool> result ( m_jobSet.insert (Job (type, name, ++m_lastJob, - m_loads[type], jobFunc, m_cancelCallback))); + data.load (), jobFunc, m_cancelCallback))); queueJob (*result.first, lock); } } @@ -255,18 +168,22 @@ class JobQueueImp { ScopedLock lock (m_mutex); - MapType::const_iterator c = m_jobCounts.find (t); + JobDataMap::const_iterator c = m_jobData.find (t); - return (c == m_jobCounts.end ()) ? 0 : c->second.waiting; + return (c == m_jobData.end ()) + ? 0 + : c->second.waiting; } int getJobCountTotal (JobType t) { ScopedLock lock (m_mutex); - MapType::const_iterator c = m_jobCounts.find (t); + JobDataMap::const_iterator c = m_jobData.find (t); - return (c == m_jobCounts.end ()) ? 0 : (c->second.waiting + c->second.running); + return (c == m_jobData.end ()) + ? 0 + : (c->second.waiting + c->second.running); } int getJobCountGE (JobType t) @@ -276,32 +193,10 @@ class JobQueueImp ScopedLock lock (m_mutex); - typedef MapType::value_type jt_int_pair; - - BOOST_FOREACH (jt_int_pair const& it, m_jobCounts) + for (auto const& x : m_jobData) { - if (it.first >= t) - ret += it.second.waiting; - } - - return ret; - } - - std::vector< std::pair > > getJobCounts () - { - // return all jobs at all priority levels - std::vector< std::pair > > ret; - - ScopedLock lock (m_mutex); - - ret.reserve (m_jobCounts.size ()); - - typedef MapType::value_type jt_int_pair; - - BOOST_FOREACH (const jt_int_pair & it, m_jobCounts) - { - ret.push_back (std::make_pair (it.second.type, - std::make_pair (it.second.waiting, it.second.running))); + if (x.first >= t) + ret += x.second.waiting; } return ret; @@ -338,7 +233,8 @@ class JobQueueImp c += 2; - m_journal.info << "Auto-tuning to " << c << " validation/transaction/proposal threads"; + m_journal.info << "Auto-tuning to " << c << + " validation/transaction/proposal threads"; } m_workers.setNumberOfThreads (c); @@ -347,21 +243,37 @@ class JobQueueImp LoadEvent::pointer getLoadEvent (JobType t, const std::string& name) { - return boost::make_shared (boost::ref (m_loads[t]), name, true); + JobDataMap::iterator iter (m_jobData.find (t)); + assert (iter != m_jobData.end ()); + + if (iter == m_jobData.end ()) + return boost::shared_ptr (); + + return boost::make_shared ( + boost::ref (iter-> second.load ()), name, true); } LoadEvent::autoptr getLoadEventAP (JobType t, const std::string& name) { - return LoadEvent::autoptr (new LoadEvent (m_loads[t], name, true)); + JobDataMap::iterator iter (m_jobData.find (t)); + assert (iter != m_jobData.end ()); + + if (iter == m_jobData.end ()) + return LoadEvent::autoptr (); + + return LoadEvent::autoptr ( + new LoadEvent (iter-> second.load (), name, true)); } bool isOverloaded () { int count = 0; - for (int i = 0; i < NUM_JOB_TYPES; ++i) - if (m_loads[i].isOver ()) + for (auto& x : m_jobData) + { + if (x.second.load ().isOver ()) ++count; + } return count > 0; } @@ -377,42 +289,32 @@ class JobQueueImp ScopedLock lock (m_mutex); - for (int i = 0; i < NUM_JOB_TYPES; ++i) + for (auto& x : m_jobData) { - JobType const type (static_cast (i)); + assert (x.first != jtINVALID); - if (type == jtGENERIC) + if (x.first == jtGENERIC) continue; - LoadMonitor::Stats stats = m_loads [i].getStats (); - int jobCount; - int threadCount; + JobTypeData& data (x.second); - MapType::const_iterator it = m_jobCounts.find (type); + LoadMonitor::Stats stats (data.stats ()); + + int waiting (data.running); + int running (data.waiting); - if (it == m_jobCounts.end ()) - { - jobCount = 0; - threadCount = 0; - } - else - { - jobCount = it->second.waiting; - threadCount = it->second.running; - } - - if ((stats.count != 0) || (jobCount != 0) || - (stats.latencyPeak != 0) || (threadCount != 0)) + if ((stats.count != 0) || (waiting != 0) || + (stats.latencyPeak != 0) || (running != 0)) { Json::Value& pri = priorities.append (Json::objectValue); - pri["job_type"] = Job::toString (type); + pri["job_type"] = data.name (); if (stats.isOverloaded) pri["over_target"] = true; - if (jobCount != 0) - pri["waiting"] = jobCount; + if (waiting != 0) + pri["waiting"] = waiting; if (stats.count != 0) pri["per_second"] = static_cast (stats.count); @@ -423,8 +325,8 @@ class JobQueueImp if (stats.latencyAvg != 0) pri["avg_time"] = static_cast (stats.latencyAvg); - if (threadCount != 0) - pri["in_progress"] = threadCount; + if (running != 0) + pri["in_progress"] = running; } } @@ -434,7 +336,21 @@ class JobQueueImp } private: - //------------------------------------------------------------------------------ + //-------------------------------------------------------------------------- + JobTypeData& getJobTypeData (JobType type) + { + JobDataMap::iterator c (m_jobData.find (type)); + assert (c != m_jobData.end ()); + + // NIKB: This is ugly and I hate it. We must remove jtINVALID completely + // and use something sane. + if (c == m_jobData.end ()) + return m_invalidJobData; + + return c->second; + } + + //-------------------------------------------------------------------------- // Signals the service stopped if the stopped condition is met. // @@ -456,7 +372,7 @@ class JobQueueImp } } - //------------------------------------------------------------------------------ + //-------------------------------------------------------------------------- // // Signals an added Job for processing. // @@ -478,9 +394,9 @@ class JobQueueImp assert (type != jtINVALID); assert (m_jobSet.find (job) != m_jobSet.end ()); - Count& count (m_jobCounts [type]); + JobTypeData& data (getJobTypeData (type)); - if (count.waiting + count.running < getJobLimit (type)) + if (data.waiting + data.running < getJobLimit (type)) { m_workers.addTask (); } @@ -488,9 +404,9 @@ class JobQueueImp { // defer the task until we go below the limit // - ++count.deferred; + ++data.deferred; } - ++count.waiting; + ++data.waiting; } //------------------------------------------------------------------------------ @@ -515,35 +431,35 @@ class JobQueueImp // void getNextJob (Job& job, ScopedLock const& lock) { - bassert (! m_jobSet.empty ()); + assert (! m_jobSet.empty ()); JobSet::const_iterator iter; for (iter = m_jobSet.begin (); iter != m_jobSet.end (); ++iter) { - Count& count (m_jobCounts [iter->getType ()]); + JobTypeData& data (getJobTypeData (iter->getType ())); - bassert (count.running <= getJobLimit (count.type)); + assert (data.running <= getJobLimit (data.type ())); // Run this job if we're running below the limit. - if (count.running < getJobLimit (count.type)) + if (data.running < getJobLimit (data.type ())) { - bassert (count.waiting > 0); + assert (data.waiting > 0); break; } } - bassert (iter != m_jobSet.end ()); + assert (iter != m_jobSet.end ()); JobType const type = iter->getType (); - Count& count (m_jobCounts [type]); + JobTypeData& data (getJobTypeData (type)); - bassert (type != jtINVALID); + assert (type != jtINVALID); job = *iter; m_jobSet.erase (iter); - --count.waiting; - ++count.running; + --data.waiting; + ++data.running; } //------------------------------------------------------------------------------ @@ -565,24 +481,45 @@ class JobQueueImp { JobType const type = job.getType (); - bassert (m_jobSet.find (job) == m_jobSet.end ()); - bassert (type != jtINVALID); + assert (m_jobSet.find (job) == m_jobSet.end ()); + assert (type != jtINVALID); - Count& count (m_jobCounts [type]); + JobTypeData& data (getJobTypeData (type)); // Queue a deferred task if possible - if (count.deferred > 0) + if (data.deferred > 0) { - bassert (count.running + count.waiting >= getJobLimit (type)); + assert (data.running + data.waiting >= getJobLimit (type)); - --count.deferred; + --data.deferred; m_workers.addTask (); } - --count.running; + --data.running; } - //------------------------------------------------------------------------------ + //-------------------------------------------------------------------------- + template + void on_dequeue (JobType type, + std::chrono::duration const& value) + { + auto const ms (ceil (value)); + + if (ms.count() >= 10) + getJobTypeData (type).dequeue.notify (ms); + } + + template + void on_execute (JobType type, + std::chrono::duration const& value) + { + auto const ms (ceil (value)); + + if (ms.count() >= 10) + getJobTypeData (type).execute.notify (ms); + } + + //-------------------------------------------------------------------------- // // Runs the next appropriate waiting Job. // @@ -605,29 +542,26 @@ class JobQueueImp ++m_processCount; } - JobType const type (job.getType ()); - String const name (Job::toString (type)); + JobTypeData& data (getJobTypeData (job.getType ())); // Skip the job if we are stopping and the // skipOnStop flag is set for the job type // - if (!isStopping() || !skipOnStop (type)) + if (!isStopping() || !data.info.skip ()) { - Thread::setCurrentThreadName (name); - m_journal.trace << "Doing " << name << " job"; + Thread::setCurrentThreadName (data.name ()); + m_journal.trace << "Doing " << data.name () << " job"; Job::clock_type::time_point const start_time ( Job::clock_type::now()); - m_stats.on_dequeue (type, start_time - job.queue_time ()); - + on_dequeue (job.getType (), start_time - job.queue_time ()); job.doJob (); - - m_stats.on_execute (type, Job::clock_type::now() - start_time); + on_execute (job.getType (), Job::clock_type::now() - start_time); } else { - m_journal.trace << "Skipping processTask ('" << name << "')"; + m_journal.trace << "Skipping processTask ('" << data.name () << "')"; } { @@ -647,109 +581,24 @@ class JobQueueImp // the JobQueue receives a stop notification. If the job type isn't // skipped, the Job will be called and the job must call Job::shouldCancel // to determine if a long running or non-mandatory operation should be canceled. - static bool skipOnStop (JobType type) + bool skipOnStop (JobType type) { - switch (type) - { - // These are skipped when a stop notification is received - case jtPACK: - case jtPUBOLDLEDGER: - case jtVALIDATION_ut: - case jtPROOFWORK: - case jtTRANSACTION_l: - case jtPROPOSAL_ut: - case jtLEDGER_DATA: - case jtUPDATE_PF: - case jtCLIENT: - case jtTRANSACTION: - case jtUNL: - case jtADVANCE: - case jtPUBLEDGER: - case jtTXN_DATA: - case jtVALIDATION_t: - case jtPROPOSAL_t: - case jtSWEEP: - case jtNETOP_CLUSTER: - case jtNETOP_TIMER: - case jtADMIN: - //case jtACCEPT: - return true; - - default: - bassertfalse; - case jtWAL: - case jtWRITE: - break; - } + JobTypeInfo const& j (getJobTypes ().get (type)); + assert (j.type () != jtINVALID); - return false; + return j.skip (); } // Returns the limit of running jobs for the given job type. // For jobs with no limit, we return the largest int. Hopefully that // will be enough. // - static int getJobLimit (JobType type) + int getJobLimit (JobType type) { - int limit = std::numeric_limits ::max (); - - switch (type) - { - // These are not dispatched by JobQueue - case jtPEER: - case jtDISK: - case jtTXN_PROC: - case jtOB_SETUP: - case jtPATH_FIND: - case jtHO_READ: - case jtHO_WRITE: - case jtGENERIC: - limit = 0; - break; - - default: - // Someone added a JobType but forgot to set a limit. - // Did they also forget to add it to Job.cpp? - bassertfalse; - break; - - case jtVALIDATION_ut: - case jtPROOFWORK: - case jtTRANSACTION_l: - case jtPROPOSAL_ut: - case jtUPDATE_PF: - case jtCLIENT: - case jtRPC: - case jtTRANSACTION: - case jtPUBLEDGER: - case jtADVANCE: - case jtWAL: - case jtVALIDATION_t: - case jtWRITE: - case jtPROPOSAL_t: - case jtSWEEP: - case jtADMIN: - case jtACCEPT: - limit = std::numeric_limits ::max (); - break; - - case jtLEDGER_DATA: limit = 2; break; - case jtPACK: limit = 1; break; - case jtPUBOLDLEDGER: limit = 2; break; - case jtTXN_DATA: limit = 1; break; - case jtUNL: limit = 1; break; - - // If either of the next two are processing so slowly - // or we are so busy we have two of them at once, it - // indicates a serious problem! - // - case jtNETOP_TIMER: - case jtNETOP_CLUSTER: - limit = 1; - break; - }; + JobTypeInfo const& j (getJobTypes ().get (type)); + assert (j.type () != jtINVALID); - return limit; + return j.limit (); } //-------------------------------------------------------------------------- @@ -765,8 +614,8 @@ class JobQueueImp ScopedLock lock (m_mutex); // Remove all jobs whose type is skipOnStop - typedef boost::unordered_map MapType; - MapType counts; + typedef boost::unordered_map JobDataMap; + JobDataMap counts; bool const report (m_journal.debug.active()); for (JobSet::const_iterator iter (m_jobSet.begin()); @@ -776,7 +625,7 @@ class JobQueueImp { if (report) { - std::pair result ( + std::pair result ( counts.insert (std::make_pair (iter->getType(), 1))); if (! result.second) ++(result.first->second); @@ -794,7 +643,7 @@ class JobQueueImp { Journal::ScopedStream s (m_journal.debug); - for (MapType::const_iterator iter (counts.begin()); + for (JobDataMap::const_iterator iter (counts.begin()); iter != counts.end(); ++iter) { s << std::endl << diff --git a/src/ripple_core/functional/JobQueue.h b/src/ripple_core/functional/JobQueue.h index 0fc64b19d89..6f8fe315ee6 100644 --- a/src/ripple_core/functional/JobQueue.h +++ b/src/ripple_core/functional/JobQueue.h @@ -47,9 +47,6 @@ class JobQueue : public Stoppable // All waiting jobs at or greater than this priority virtual int getJobCountGE (JobType t) = 0; - // jobs waiting, threads doing - virtual std::vector< std::pair > > getJobCounts () = 0; - virtual void shutdown () = 0; virtual void setThreadCount (int c, bool const standaloneMode) = 0; diff --git a/src/ripple_core/functional/JobTypeData.h b/src/ripple_core/functional/JobTypeData.h new file mode 100644 index 00000000000..7a21015f622 --- /dev/null +++ b/src/ripple_core/functional/JobTypeData.h @@ -0,0 +1,99 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_CORE_JOBTYPEDATA_H_INCLUDED +#define RIPPLE_CORE_JOBTYPEDATA_H_INCLUDED + +#include "JobTypeInfo.h" + +namespace ripple +{ + +struct JobTypeData +{ +private: + LoadMonitor m_load; + + /* Support for insight */ + insight::Collector::ptr m_collector; + +public: + /* The job category which we represent */ + JobTypeInfo const& info; + + /* The number of jobs waiting */ + int waiting; + + /* The number presently running */ + int running; + + /* And the number we deferred executing because of job limits */ + int deferred; + + /* Notification callbacks */ + insight::Event dequeue; + insight::Event execute; + + explicit JobTypeData (JobTypeInfo const& info_, + insight::Collector::ptr const& collector) noexcept + : m_collector (collector) + , info (info_) + , waiting (0) + , running (0) + , deferred (0) + { + m_load.setTargetLatency ( + info.getAverageLatency (), + info.getPeakLatency()); + + if (!info.special ()) + { + dequeue = m_collector->make_event (info.name () + "_q"); + execute = m_collector->make_event (info.name ()); + } + } + + /* Not copy-constructible or assignable */ + JobTypeData (JobTypeData const& other) = delete; + JobTypeData& operator= (JobTypeData const& other) = delete; + + std::string name () const + { + return info.name (); + } + + JobType type () const + { + return info.type (); + } + + LoadMonitor& load () + { + return m_load; + } + + LoadMonitor::Stats stats () + { + return m_load.getStats (); + } +}; + +} + +#endif diff --git a/src/ripple_core/functional/JobTypeInfo.h b/src/ripple_core/functional/JobTypeInfo.h new file mode 100644 index 00000000000..0db57aa5f05 --- /dev/null +++ b/src/ripple_core/functional/JobTypeInfo.h @@ -0,0 +1,101 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_CORE_JOBTYPEINFO_H_INCLUDED +#define RIPPLE_CORE_JOBTYPEINFO_H_INCLUDED + +namespace ripple +{ + +/** Holds all the 'static' information about a job, which does not change */ +class JobTypeInfo +{ +private: + JobType const m_type; + std::string const m_name; + + /** The limit on the number of running jobs for this job type. */ + int const m_limit; + + /** Can be skipped */ + bool const m_skip; + + /** Special jobs are not dispatched via the job queue */ + bool const m_special; + + /** Average and peak latencies for this job type. 0 is none specified */ + uint64 const m_avgLatency; + uint64 const m_peakLatency; + +public: + // Not default constructible + JobTypeInfo () = delete; + + JobTypeInfo (JobType type, std::string name, int limit, + bool skip, bool special, uint64 avgLatency, uint64 peakLatency) + : m_type (type) + , m_name (name) + , m_limit (limit) + , m_skip (skip) + , m_special (special) + , m_avgLatency (avgLatency) + , m_peakLatency (peakLatency) + { + + } + + JobType type () const + { + return m_type; + } + + std::string name () const + { + return m_name; + } + + int limit () const + { + return m_limit; + } + + bool skip () const + { + return m_skip; + } + + bool special () const + { + return m_special; + } + + uint64 getAverageLatency () const + { + return m_avgLatency; + } + + uint64 getPeakLatency () const + { + return m_peakLatency; + } +}; + +} + +#endif diff --git a/src/ripple_core/functional/JobTypes.h b/src/ripple_core/functional/JobTypes.h new file mode 100644 index 00000000000..250dcbc7b84 --- /dev/null +++ b/src/ripple_core/functional/JobTypes.h @@ -0,0 +1,218 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "Job.h" +#include "JobTypeInfo.h" + +namespace ripple +{ + +class JobTypes +{ +public: + typedef std::map Map; + typedef Map::const_iterator const_iterator; + + + JobTypes () + : m_unknown (jtINVALID, "invalid", 0, true, true, 0, 0) + { + int maxLimit = std::numeric_limits ::max (); + + // Make a fetch pack for a peer + add (jtPACK, "makeFetchPack", + 1, true, false, 0, 0); + + // An old ledger has been accepted + add (jtPUBOLDLEDGER, "publishAcqLedger", + 2, true, false, 10000, 15000); + + // A validation from an untrusted source + add (jtVALIDATION_ut, "untrustedValidation", + maxLimit, true, false, 2000, 5000); + + // A proof of work demand from another server + add (jtPROOFWORK, "proofOfWork", + maxLimit, true, false, 2000, 5000); + + // A local transaction + add (jtTRANSACTION_l, "localTransaction", + maxLimit, true, false, 100, 500); + + // A proposal from an untrusted source + add (jtPROPOSAL_ut, "untrustedProposal", + maxLimit, true, false, 500, 1250); + + // Received data for a ledger we're acquiring + add (jtLEDGER_DATA, "ledgerData", + 2, true, false, 0, 0); + + // Update pathfinding requests + add (jtUPDATE_PF, "updatePaths", + maxLimit, true, false, 0, 0); + + // A websocket command from the client + add (jtCLIENT, "clientCommand", + maxLimit, true, false, 2000, 5000); + + // A websocket command from the client + add (jtRPC, "RPC", + maxLimit, false, false, 0, 0); + + // A transaction received from the network + add (jtTRANSACTION, "transaction", + maxLimit, true, false, 250, 1000); + + // A Score or Fetch of the UNL (DEPRECATED) + add (jtUNL, "unl", + 1, true, false, 0, 0); + + // Advance validated/acquired ledgers + add (jtADVANCE, "advanceLedger", + maxLimit, true, false, 0, 0); + + // Publish a fully-accepted ledger + add (jtPUBLEDGER, "publishNewLedger", + maxLimit, true, false, 3000, 4500); + + // Fetch a proposed set + add (jtTXN_DATA, "fetchTxnData", + 1, true, false, 0, 0); + + // Write-ahead logging + add (jtWAL, "writeAhead", + maxLimit, false, false, 1000, 2500); + + // A validation from a trusted source + add (jtVALIDATION_t, "trustedValidation", + maxLimit, true, false, 500, 1500); + + // Write out hashed objects + add (jtWRITE, "writeObjects", + maxLimit, false, false, 1750, 2500); + + // Accept a consensus ledger + add (jtACCEPT, "acceptLedger", + maxLimit, false, false, 0, 0); + + // A proposal from a trusted source + add (jtPROPOSAL_t, "trustedProposal", + maxLimit, false, false, 100, 500); + + // Sweep for stale structures + add (jtSWEEP, "sweep", + maxLimit, true, false, 0, 0); + + // NetworkOPs cluster peer report + add (jtNETOP_CLUSTER, "clusterReport", + 1, true, false, 9999, 9999); + + // NetworkOPs net timer processing + add (jtNETOP_TIMER, "heartbeat", + 1, true, false, 999, 999); + + // An administrative operation + add (jtADMIN, "administration", + maxLimit, true, false, 0, 0); + + // The rest are special job types that are not dispatched + // by the job pool. The "limit" and "skip" attributes are + // not applicable to these types of jobs. + + add (jtPEER, "peerCommand", + 0, false, true, 200, 2500); + + add (jtDISK, "diskAccess", + 0, false, true, 500, 1000); + + add (jtTXN_PROC, "processTransaction", + 0, false, true, 0, 0); + + add (jtOB_SETUP, "orderBookSetup", + 0, false, true, 0, 0); + + add (jtPATH_FIND, "pathFind", + 0, false, true, 0, 0); + + add (jtHO_READ, "nodeRead", + 0, false, true, 0, 0); + + add (jtHO_WRITE, "nodeWrite", + 0, false, true, 0, 0); + + add (jtGENERIC, "generic", + 0, false, true, 0, 0); + } + + JobTypeInfo const& get (JobType jt) const + { + Map::const_iterator const iter (m_map.find (jt)); + assert (iter != m_map.end ()); + + if (iter != m_map.end()) + return iter->second; + + return m_unknown; + } + + JobTypeInfo const& getInvalid () const + { + return m_unknown; + } + + const_iterator begin () const + { + return m_map.cbegin (); + } + + const_iterator cbegin () const + { + return m_map.cbegin (); + } + + const_iterator end () const + { + return m_map.cend (); + } + + const_iterator cend () const + { + return m_map.cend (); + } + +private: + void add(JobType jt, std::string name, int limit, + bool skip, bool special, uint64 avgLatency, uint64 peakLatency) + { + assert (m_map.find (jt) == m_map.end ()); + + std::pair result (m_map.emplace ( + std::piecewise_construct, + std::forward_as_tuple (jt), + std::forward_as_tuple (jt, name, limit, skip, special, + avgLatency, peakLatency))); + + assert (result.second == true); + } + + JobTypeInfo m_unknown; + Map m_map; +}; + +} From fa5b5c375f54dfed73143c4d86461cf6451ed2d0 Mon Sep 17 00:00:00 2001 From: Nik Bougalis Date: Sat, 22 Feb 2014 14:57:45 -0800 Subject: [PATCH 2/3] When not on Travis, pretty-print but show sample cmds and generate log --- .gitignore | 3 +++ .travis.yml | 2 +- SConstruct | 21 +++++++++++++++++++-- 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 193646c332a..6a1c74898ba 100644 --- a/.gitignore +++ b/.gitignore @@ -36,6 +36,9 @@ db/*.db-* # Ignore debug logs debug_log.txt +# Ignore build log +build.log + # Ignore customized configs rippled.cfg validators.txt diff --git a/.travis.yml b/.travis.yml index 8bd3f7e170d..0dd97660bd2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -32,7 +32,7 @@ script: # Set so any failing command will abort the build - set -e # If only we could do -j12 ;) - - scons + - scons # See what we've actually built - ldd ./build/rippled # Run unittests (under gdb) diff --git a/SConstruct b/SConstruct index de58cd3ad26..b89910dbbd3 100644 --- a/SConstruct +++ b/SConstruct @@ -41,7 +41,7 @@ HONOR_ENVS = ['CC', 'CXX', 'PATH'] env = Environment( tools = ['default', 'protoc'], #ENV = dict((k, os.environ[k]) for k in HONOR_ENVS) - ENV = dict((k, os.environ[k]) for k in HONOR_ENVS if k in os.environ) + ENV = dict((k, os.environ[k]) for k in HONOR_ENVS if k in os.environ) ) # Use a newer gcc on FreeBSD @@ -138,18 +138,35 @@ else: # output #------------------------------------------------------------------------------- +BuildLogFile = None +CmdLineShown = 0 + def print_cmd_line(s, target, src, env): + global BuildLogFile + global CmdLineShown + + if BuildLogFile is None: + BuildLogFile = open('build.log', 'w') + Tgt = (''.join([str(x) for x in target])) + if BuildLogFile is not None: + BuildLogFile.write("%s\n" % s); + if ('build/rippled' == Tgt): + sys.stdout.write("Link Cmd: '\033[37m%s\033[0m'\n" % s) sys.stdout.write("Linking '\033[94m%s\033[0m'...\n" % Tgt) elif ('tags' == Tgt): sys.stdout.write("Generating tags...\n") else: + if CmdLineShown == 0: + sys.stdout.write("Build Cmd: '\033[37m%s\033[0m'\n" % s) + CmdLineShown = 1 sys.stdout.write("Compiling '\033[94m%s\033[0m'...\n" % \ (' and '.join([str(x) for x in src]))) -env['PRINT_CMD_LINE_FUNC'] = print_cmd_line +if os.environ.get('TRAVIS') is None: + env['PRINT_CMD_LINE_FUNC'] = print_cmd_line #------------------------------------------------------------------------------- # From 69053123d854519d735a87542815fdd5cc85e5e6 Mon Sep 17 00:00:00 2001 From: Nik Bougalis Date: Sat, 22 Feb 2014 22:08:47 -0800 Subject: [PATCH 3/3] Show command lines during compilation --- SConstruct | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/SConstruct b/SConstruct index b89910dbbd3..8a87bd15ca9 100644 --- a/SConstruct +++ b/SConstruct @@ -139,11 +139,9 @@ else: #------------------------------------------------------------------------------- BuildLogFile = None -CmdLineShown = 0 def print_cmd_line(s, target, src, env): global BuildLogFile - global CmdLineShown if BuildLogFile is None: BuildLogFile = open('build.log', 'w') @@ -154,16 +152,14 @@ def print_cmd_line(s, target, src, env): BuildLogFile.write("%s\n" % s); if ('build/rippled' == Tgt): - sys.stdout.write("Link Cmd: '\033[37m%s\033[0m'\n" % s) sys.stdout.write("Linking '\033[94m%s\033[0m'...\n" % Tgt) + sys.stdout.write("%s\n" % s) elif ('tags' == Tgt): sys.stdout.write("Generating tags...\n") else: - if CmdLineShown == 0: - sys.stdout.write("Build Cmd: '\033[37m%s\033[0m'\n" % s) - CmdLineShown = 1 sys.stdout.write("Compiling '\033[94m%s\033[0m'...\n" % \ (' and '.join([str(x) for x in src]))) + sys.stdout.write("%s\n" % s) if os.environ.get('TRAVIS') is None: env['PRINT_CMD_LINE_FUNC'] = print_cmd_line