diff --git a/REFERENCE.md b/REFERENCE.md index aaa715fd1..a462cf138 100644 --- a/REFERENCE.md +++ b/REFERENCE.md @@ -34,6 +34,7 @@ - [Queue#getCompleted](#queuegetcompleted) - [Queue#getFailed](#queuegetfailed) - [Queue#getWorkers](#queuegetworkers) + - [Queue#getMetrics](#queuegetmetrics) - [Job](#job) @@ -71,11 +72,18 @@ interface QueueOptions { limiter?: RateLimiter; redis?: RedisOpts; prefix?: string = 'bull'; // prefix for all queue keys. + metrics?: MetricsOpts; // Configure metrics defaultJobOptions?: JobOpts; settings?: AdvancedSettings; } ``` +```typescript +interface MetricsOpts { + maxDataPoints?: number; // Max number of data points to collect, granularity is fixed at one minute. +} +``` + ```typescript interface RateLimiter { max: number; // Max number of jobs processed @@ -796,11 +804,29 @@ Returns a promise that will return an array with the failed jobs between start a getWorkers() : Promise> ``` -Returns a promise that will return an array workers currently listening or processing jobs. +Returns a promise that will resolve to an array workers currently listening or processing jobs. The object includes the same fields as [Redis CLIENT LIST](https://redis.io/commands/client-list) command. --- +### Queue#getMetrics + +```ts +getMetrics(type: 'completed' | 'failed', start = 0, end = -1) : Promise<{ + meta: { + count: number; + prevTS: number; + prevCount: number; + }; + data: number[]; + count: number; +}> +``` + +Returns a promise that resolves to a Metrics object. + +--- + ### Queue#clean ```ts diff --git a/index.js b/index.js index 6f908bc92..34fb79f02 100644 --- a/index.js +++ b/index.js @@ -2,3 +2,4 @@ module.exports = require('./lib/queue'); module.exports.Job = require('./lib/job'); +module.exports.utils = require('./lib/utils'); diff --git a/lib/commands/moveToFinished-8.lua b/lib/commands/moveToFinished-8.lua deleted file mode 100644 index ad496d933..000000000 --- a/lib/commands/moveToFinished-8.lua +++ /dev/null @@ -1,118 +0,0 @@ ---[[ - Move job from active to a finished status (completed or failed) - A job can only be moved to completed if it was active. - The job must be locked before it can be moved to a finished status, - and the lock must be released in this script. - - Input: - KEYS[1] active key - KEYS[2] completed/failed key - KEYS[3] jobId key - - KEYS[4] wait key - KEYS[5] priority key - KEYS[6] active event key - - KEYS[7] delayed key - KEYS[8] stalled key - - ARGV[1] jobId - ARGV[2] timestamp - ARGV[3] msg property - ARGV[4] return value / failed reason - ARGV[5] token - ARGV[6] shouldRemove - ARGV[7] event data (? maybe just send jobid). - ARGV[8] should fetch next job - ARGV[9] base key - - Output: - 0 OK - -1 Missing key. - -2 Missing lock. - - Events: - 'completed/failed' -]] -local rcall = redis.call - -if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists - if ARGV[5] ~= "0" then - local lockKey = KEYS[3] .. ':lock' - if rcall("GET", lockKey) == ARGV[5] then - rcall("DEL", lockKey) - rcall("SREM", KEYS[8], ARGV[1]) - else - return -2 - end - end - - -- Remove from active list - rcall("LREM", KEYS[1], -1, ARGV[1]) - - -- Remove job? - local keepJobs = cmsgpack.unpack(ARGV[6]) - local maxCount = keepJobs['count'] - local maxAge = keepJobs['age'] - local targetSet = KEYS[2] - local timestamp = ARGV[2] - - if maxCount ~= 0 then - - -- Add to complete/failed set - rcall("ZADD", targetSet, timestamp, ARGV[1]) - rcall("HMSET", KEYS[3], ARGV[3], ARGV[4], "finishedOn", timestamp) -- "returnvalue" / "failedReason" and "finishedOn" - - local function removeJobs(jobIds) - for i, jobId in ipairs(jobIds) do - local jobKey = ARGV[9] .. jobId - local jobLogKey = jobKey .. ':logs' - rcall("DEL", jobKey, jobLogKey) - end - end - - -- Remove old jobs? - if maxAge ~= nil then - local start = timestamp - maxAge * 1000 - local jobIds = rcall("ZREVRANGEBYSCORE", targetSet, start, "-inf") - removeJobs(jobIds) - rcall("ZREMRANGEBYSCORE", targetSet, "-inf", start) - end - - if maxCount ~= nil and maxCount > 0 then - local start = maxCount - local jobIds = rcall("ZREVRANGE", targetSet, start, -1) - removeJobs(jobIds) - rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1)); - end - else - local jobLogKey = KEYS[3] .. ':logs' - rcall("DEL", KEYS[3], jobLogKey) - end - - rcall("PUBLISH", targetSet, ARGV[7]) - - -- Try to get next job to avoid an extra roundtrip if the queue is not closing, - -- and not rate limited. - if(ARGV[8] == "1") then - -- move from wait to active - local jobId = rcall("RPOPLPUSH", KEYS[4], KEYS[1]) - if jobId then - local jobKey = ARGV[9] .. jobId - local lockKey = jobKey .. ':lock' - - -- get a lock - rcall("SET", lockKey, ARGV[11], "PX", ARGV[10]) - - rcall("ZREM", KEYS[5], jobId) -- remove from priority - rcall("PUBLISH", KEYS[6], jobId) - rcall("HSET", jobKey, "processedOn", ARGV[2]) - - return {rcall("HGETALL", jobKey), jobId} -- get job data - end - end - - return 0 -else - return -1 -end diff --git a/lib/commands/moveToFinished-9.lua b/lib/commands/moveToFinished-9.lua new file mode 100644 index 000000000..d7e5ba436 --- /dev/null +++ b/lib/commands/moveToFinished-9.lua @@ -0,0 +1,168 @@ +--[[ + Move job from active to a finished status (completed or failed) + A job can only be moved to completed if it was active. + The job must be locked before it can be moved to a finished status, + and the lock must be released in this script. + + Input: + KEYS[1] active key + KEYS[2] completed/failed key + KEYS[3] jobId key + + KEYS[4] wait key + KEYS[5] priority key + KEYS[6] active event key + + KEYS[7] delayed key + KEYS[8] stalled key + + KEYS[9] metrics key + + ARGV[1] jobId + ARGV[2] timestamp + ARGV[3] msg property + ARGV[4] return value / failed reason + ARGV[5] token + ARGV[6] shouldRemove + ARGV[7] event data (? maybe just send jobid). + ARGV[8] should fetch next job + ARGV[9] base key + ARGV[10] lock token + ARGV[11] lock duration in milliseconds + ARGV[12] maxMetricsSize + + Output: + 0 OK + -1 Missing key. + -2 Missing lock. + + Events: + 'completed/failed' +]] +local rcall = redis.call + +--[[ + Functions to collect metrics based on a current and previous count of jobs. + Granualarity is fixed at 1 minute. +]] +local function collectMetrics(metaKey, dataPointsList, maxDataPoints, timestamp) + -- Increment current count + local count = rcall("HINCRBY", metaKey, "count", 1) - 1 + + -- Compute how many data points we need to add to the list, N. + local prevTS = rcall("HGET", metaKey, "prevTS") + + if not prevTS then + -- If prevTS is nil, set it to the current timestamp + rcall("HSET", metaKey, "prevTS", timestamp, "prevCount", 0) + return + end + + local N = math.floor((timestamp - prevTS) / 60000) + + if N > 0 then + local delta = count - rcall("HGET", metaKey, "prevCount") + -- If N > 1, add N-1 zeros to the list + if N > 1 then + local points = {} + points[1] = delta + for i = 2, N do points[i] = 0 end + rcall("LPUSH", dataPointsList, unpack(points)) + else + -- LPUSH delta to the list + rcall("LPUSH", dataPointsList, delta) + end + + -- LTRIM to keep list to its max size + rcall("LTRIM", dataPointsList, 0, maxDataPoints - 1) + + -- update prev count with current count + rcall("HSET", metaKey, "prevCount", count, "prevTS", timestamp) + end +end + +if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists + if ARGV[5] ~= "0" then + local lockKey = KEYS[3] .. ':lock' + if rcall("GET", lockKey) == ARGV[5] then + rcall("DEL", lockKey) + rcall("SREM", KEYS[8], ARGV[1]) + else + return -2 + end + end + + -- Remove from active list + rcall("LREM", KEYS[1], -1, ARGV[1]) + + -- Remove job? + local keepJobs = cmsgpack.unpack(ARGV[6]) + local maxCount = keepJobs['count'] + local maxAge = keepJobs['age'] + local targetSet = KEYS[2] + local timestamp = ARGV[2] + + if maxCount ~= 0 then + + -- Add to complete/failed set + rcall("ZADD", targetSet, timestamp, ARGV[1]) + rcall("HMSET", KEYS[3], ARGV[3], ARGV[4], "finishedOn", timestamp) -- "returnvalue" / "failedReason" and "finishedOn" + + local function removeJobs(jobIds) + for i, jobId in ipairs(jobIds) do + local jobKey = ARGV[9] .. jobId + local jobLogKey = jobKey .. ':logs' + rcall("DEL", jobKey, jobLogKey) + end + end + + -- Remove old jobs? + if maxAge ~= nil then + local start = timestamp - maxAge * 1000 + local jobIds = rcall("ZREVRANGEBYSCORE", targetSet, start, "-inf") + removeJobs(jobIds) + rcall("ZREMRANGEBYSCORE", targetSet, "-inf", start) + end + + if maxCount ~= nil and maxCount > 0 then + local start = maxCount + local jobIds = rcall("ZREVRANGE", targetSet, start, -1) + removeJobs(jobIds) + rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1)); + end + else + local jobLogKey = KEYS[3] .. ':logs' + rcall("DEL", KEYS[3], jobLogKey) + end + + -- Collect metrics + if ARGV[12] ~= "" then + collectMetrics(KEYS[9], KEYS[9]..':data', ARGV[12], timestamp) + end + + rcall("PUBLISH", targetSet, ARGV[7]) + + -- Try to get next job to avoid an extra roundtrip if the queue is not closing, + -- and not rate limited. + if (ARGV[8] == "1") then + -- move from wait to active + local jobId = rcall("RPOPLPUSH", KEYS[4], KEYS[1]) + if jobId then + local jobKey = ARGV[9] .. jobId + local lockKey = jobKey .. ':lock' + + -- get a lock + rcall("SET", lockKey, ARGV[11], "PX", ARGV[10]) + + rcall("ZREM", KEYS[5], jobId) -- remove from priority + rcall("PUBLISH", KEYS[6], jobId) + rcall("HSET", jobKey, "processedOn", ARGV[2]) + + return {rcall("HGETALL", jobKey), jobId} -- get job data + end + end + + return 0 +else + return -1 +end diff --git a/lib/commands/obliterate-2.lua b/lib/commands/obliterate-2.lua index 6055f5caa..b1e609239 100644 --- a/lib/commands/obliterate-2.lua +++ b/lib/commands/obliterate-2.lua @@ -7,12 +7,10 @@ ARGV[1] count ARGV[2] force -]] - +]] -- This command completely destroys a queue including all of its jobs, current or past -- leaving no trace of its existence. Since this script needs to iterate to find all the job -- keys, consider that this call may be slow for very large queues. - -- The queue needs to be "paused" or it will return an error -- If the queue has currently active jobs then the script by default will return error, -- however this behaviour can be overrided using the `force` option. @@ -45,15 +43,11 @@ end local function removeZSetJobs(keyName, max) local jobs = getZSetItems(keyName, max) removeJobs(keyName, jobs) - if(#jobs > 0) then - rcall("ZREM", keyName, unpack(jobs)) - end + if (#jobs > 0) then rcall("ZREM", keyName, unpack(jobs)) end end local function removeLockKeys(keys) - for i, key in ipairs(keys) do - rcall("DEL", baseKey .. key .. ':lock') - end + for i, key in ipairs(keys) do rcall("DEL", baseKey .. key .. ':lock') end end -- 1) Check if paused, if not return with error. @@ -65,7 +59,7 @@ end local activeKey = baseKey .. 'active' local activeJobs = getListItems(activeKey, maxCount) if (#activeJobs > 0) then - if(ARGV[2] == "") then + if (ARGV[2] == "") then return -2 -- Error, ExistsActiveJobs end end @@ -73,35 +67,25 @@ end removeLockKeys(activeJobs) removeJobs(activeKey, activeJobs) rcall("LTRIM", activeKey, #activeJobs, -1) -if(maxCount <= 0) then - return 1 -end +if (maxCount <= 0) then return 1 end local waitKey = baseKey .. 'paused' removeListJobs(waitKey, maxCount) -if(maxCount <= 0) then - return 1 -end +if (maxCount <= 0) then return 1 end local delayedKey = baseKey .. 'delayed' removeZSetJobs(delayedKey, maxCount) -if(maxCount <= 0) then - return 1 -end +if (maxCount <= 0) then return 1 end local completedKey = baseKey .. 'completed' removeZSetJobs(completedKey, maxCount) -if(maxCount <= 0) then - return 1 -end +if (maxCount <= 0) then return 1 end local failedKey = baseKey .. 'failed' removeZSetJobs(failedKey, maxCount) -if(maxCount <= 0) then - return 1 -end +if (maxCount <= 0) then return 1 end -if(maxCount > 0) then +if (maxCount > 0) then rcall("DEL", baseKey .. 'priority') rcall("DEL", baseKey .. 'stalled-check') rcall("DEL", baseKey .. 'stalled') @@ -109,6 +93,10 @@ if(maxCount > 0) then rcall("DEL", baseKey .. 'meta') rcall("DEL", baseKey .. 'id') rcall("DEL", baseKey .. 'repeat') + rcall("DEL", baseKey .. 'metrics:completed') + rcall("DEL", baseKey .. 'metrics:completed:data') + rcall("DEL", baseKey .. 'metrics:failed') + rcall("DEL", baseKey .. 'metrics:failed:data') return 0 else return 1 diff --git a/lib/getters.js b/lib/getters.js index 6e14b364e..a23ae9f0f 100644 --- a/lib/getters.js +++ b/lib/getters.js @@ -199,6 +199,48 @@ module.exports = function(Queue) { }; }); }; + + /** + * Get queue metrics related to the queue. + * + * This method returns the gathered metrics for the queue. + * The metrics are represented as an array of job counts + * per unit of time (1 minute). + * + * @param start - Start point of the metrics, where 0 + * is the newest point to be returned. + * @param end - End poinf of the metrics, where -1 is the + * oldest point to be returned. + * + * @returns - Returns an object with queue metrics. + */ + Queue.prototype.getMetrics = async function(type, start = 0, end = -1) { + const metricsKey = this.toKey(`metrics:${type}`); + const dataKey = `${metricsKey}:data`; + + const multi = this.multi(); + multi.hmget(metricsKey, 'count', 'prevTS', 'prevCount'); + multi.lrange(dataKey, start, end); + multi.llen(dataKey); + + const [hmget, range, len] = await multi.exec(); + const [err, [count, prevTS, prevCount]] = hmget; + const [err2, data] = range; + const [err3, numPoints] = len; + if (err || err2) { + throw err || err2 || err3; + } + + return { + meta: { + count: parseInt(count || '0', 10), + prevTS: parseInt(prevTS || '0', 10), + prevCount: parseInt(prevCount || '0', 10) + }, + data, + count: numPoints + }; + }; }; function parseTypeArg(args) { diff --git a/lib/queue.js b/lib/queue.js index 2869100cd..1c5ac9ab8 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -219,6 +219,8 @@ const Queue = function Queue(name, url, opts) { isSharedChildPool: false }); + this.metrics = opts.metrics; + this.settings.lockRenewTime = this.settings.lockRenewTime || this.settings.lockDuration / 2; diff --git a/lib/scripts.js b/lib/scripts.js index cc14da1c8..bd2e1ac08 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -114,27 +114,20 @@ const scripts = { }); }, - retryJobsArgs( - queue, - count, - ) { - const keys = [ - queue.toKey(''), - queue.toKey('failed'), - queue.toKey('wait'), - ]; + retryJobsArgs(queue, count) { + const keys = [queue.toKey(''), queue.toKey('failed'), queue.toKey('wait')]; const args = [count]; return keys.concat(args); }, - async retryJobs(queue, count = 1000){ + async retryJobs(queue, count = 1000) { const client = await queue.client; const args = this.retryJobsArgs(queue, count); - return (client).retryJobs(args); + return client.retryJobs(args); }, moveToFinishedArgs( @@ -149,6 +142,8 @@ const scripts = { const queue = job.queue; const queueKeys = queue.keys; + const metricsKey = queue.toKey(`metrics:${target}`); + const keys = [ queueKeys.active, queueKeys[target], @@ -157,7 +152,8 @@ const scripts = { queueKeys.priority, queueKeys.active + '@' + queue.token, queueKeys.delayed, - queueKeys.stalled + queueKeys.stalled, + metricsKey ]; const keepJobs = pack( @@ -179,7 +175,8 @@ const scripts = { notFetch || queue.paused || queue.closing || queue.limiter ? 0 : 1, queueKeys[''], queue.settings.lockDuration, - queue.token + queue.token, + queue.metrics?.maxDataPoints ]; return keys.concat(args); diff --git a/lib/utils.js b/lib/utils.js index d659269aa..721dd89e9 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -57,3 +57,14 @@ module.exports.emitSafe = function(emitter, event, ...args) { } } }; + +module.exports.MetricsTime = { + ONE_MINUTE: 1, + FIVE_MINUTES: 5, + FIFTEEN_MINUTES: 15, + THIRTY_MINUTES: 30, + ONE_HOUR: 60, + ONE_WEEK: 60 * 24 * 7, + TWO_WEEKS: 60 * 24 * 7 * 2, + ONE_MONTH: 60 * 24 * 7 * 2 * 4 +}; diff --git a/test/test_metrics.js b/test/test_metrics.js new file mode 100644 index 000000000..4afaf2ffa --- /dev/null +++ b/test/test_metrics.js @@ -0,0 +1,376 @@ +'use strict'; + +const expect = require('chai').expect; +const utils = require('./utils'); +const sinon = require('sinon'); +const redis = require('ioredis'); + +const ONE_SECOND = 1000; +const ONE_MINUTE = 60 * ONE_SECOND; +const ONE_HOUR = 60 * ONE_MINUTE; + +const { MetricsTime } = require('../lib/utils'); + +describe('metrics', function() { + beforeEach(async function() { + this.clock = sinon.useFakeTimers(); + const client = new redis(); + //await client.flushdb(); + return client.quit(); + }); + + it('should gather metrics for completed jobs', async function() { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + this.clock.tick(0); + + const timmings = [ + 0, + 0, // For the fixtures to work we need to use 0 as first timing + ONE_MINUTE / 2, + ONE_MINUTE / 2, + 0, + 0, + ONE_MINUTE, + ONE_MINUTE, + ONE_MINUTE * 3, + ONE_HOUR, + ONE_MINUTE + ]; + + const fixture = [ + '1', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '1', + '0', + '0', + '1', + '1', + '3', + '3' + ]; + + const numJobs = timmings.length; + + const queue = utils.buildQueue('metrics', { + metrics: { + maxDataPoints: MetricsTime.ONE_HOUR * 2 + } + }); + + queue.process(job => { + this.clock.tick(timmings[job.data.index]); + }); + + let processed = 0; + const completing = new Promise(resolve => { + queue.on('completed', async () => { + processed++; + if (processed === numJobs) { + resolve(); + } + }); + }); + + for (let i = 0; i < numJobs; i++) { + await queue.add({ index: i }); + } + + await completing; + + const metrics = await queue.getMetrics('completed'); + + const numPoints = Math.floor( + timmings.reduce((sum, timing) => sum + timing, 0) / ONE_MINUTE + ); + + expect(metrics.meta.count).to.be.equal(numJobs); + expect(metrics.data.length).to.be.equal(numPoints); + expect(metrics.count).to.be.equal(metrics.data.length); + expect(processed).to.be.equal(numJobs); + expect(metrics.data).to.be.deep.equal(fixture); + + this.clock.restore(); + await queue.close(); + }); + + it('should only keep metrics for "maxDataPoints"', async function() { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + this.clock.tick(0); + + const timmings = [ + 0, // For the fixtures to work we need to use 0 as first timing + 0, + ONE_MINUTE / 2, + ONE_MINUTE / 2, + 0, + 0, + ONE_MINUTE, + ONE_MINUTE, + ONE_MINUTE * 3, + ONE_HOUR, + 0, + 0, + ONE_MINUTE, + ONE_MINUTE + ]; + + const fixture = [ + '1', + '3', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0', + '0' + ]; + + const numJobs = timmings.length; + + const queue = utils.buildQueue('metrics', { + metrics: { + maxDataPoints: MetricsTime.FIFTEEN_MINUTES + } + }); + + queue.process(job => { + this.clock.tick(timmings[job.data.index]); + }); + + let processed = 0; + const completing = new Promise(resolve => { + queue.on('completed', async () => { + processed++; + if (processed === numJobs) { + resolve(); + } + }); + }); + + for (let i = 0; i < numJobs; i++) { + await queue.add({ index: i }); + } + + await completing; + + const metrics = await queue.getMetrics('completed'); + + expect(metrics.meta.count).to.be.equal(numJobs); + expect(metrics.data.length).to.be.equal(MetricsTime.FIFTEEN_MINUTES); + expect(metrics.count).to.be.equal(metrics.data.length); + expect(processed).to.be.equal(numJobs); + expect(metrics.data).to.be.deep.equal(fixture); + + this.clock.restore(); + await queue.close(); + }); + + it('should gather metrics for failed jobs', async function() { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + this.clock.tick(0); + + const timmings = [ + 0, // For the fixtures to work we need to use 0 as first timing + ONE_MINUTE, + ONE_MINUTE / 5, + ONE_MINUTE / 2, + 0, + ONE_MINUTE, + ONE_MINUTE * 3, + 0 + ]; + + const fixture = ['0', '0', '1', '4', '1']; + + const numJobs = timmings.length; + + const queue = utils.buildQueue('metrics', { + metrics: { + maxDataPoints: MetricsTime.ONE_HOUR * 2 + } + }); + + queue.process(async job => { + this.clock.tick(timmings[job.data.index]); + throw new Error('test'); + }); + + let processed = 0; + const completing = new Promise(resolve => { + queue.on('failed', async () => { + processed++; + if (processed === numJobs) { + resolve(); + } + }); + }); + + for (let i = 0; i < numJobs; i++) { + await queue.add({ index: i }); + } + + await completing; + + const metrics = await queue.getMetrics('failed'); + + const numPoints = Math.floor( + timmings.reduce((sum, timing) => sum + timing, 0) / ONE_MINUTE + ); + + expect(metrics.meta.count).to.be.equal(numJobs); + expect(metrics.data.length).to.be.equal(numPoints); + expect(metrics.count).to.be.equal(metrics.data.length); + expect(processed).to.be.equal(numJobs); + expect(metrics.data).to.be.deep.equal(fixture); + + this.clock.restore(); + await queue.close(); + }); + + it('should get metrics with pagination', async function() { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + this.clock.tick(0); + + const timmings = [ + 0, + 0, // For the fixtures to work we need to use 0 as first timing + ONE_MINUTE / 2, + ONE_MINUTE / 2, + 0, + 0, + ONE_MINUTE, + ONE_MINUTE, + ONE_MINUTE * 3, + ONE_HOUR, + ONE_MINUTE + ]; + + const numJobs = timmings.length; + + const queue = utils.buildQueue('metrics', { + metrics: { + maxDataPoints: MetricsTime.ONE_HOUR * 2 + } + }); + + queue.process(async job => { + this.clock.tick(timmings[job.data.index]); + }); + + let processed = 0; + const completing = new Promise(resolve => { + queue.on('completed', async () => { + processed++; + if (processed === numJobs) { + resolve(); + } + }); + }); + + for (let i = 0; i < numJobs; i++) { + await queue.add({ index: i }); + } + + await completing; + + expect(processed).to.be.equal(numJobs); + + const numPoints = Math.floor( + timmings.reduce((sum, timing) => sum + timing, 0) / ONE_MINUTE + ); + + const pageSize = 10; + const data = []; + let skip = 0; + + while (skip < numPoints) { + const metrics = await queue.getMetrics( + 'completed', + skip, + skip + pageSize - 1 + ); + expect(metrics.meta.count).to.be.equal(numJobs); + expect(metrics.data.length).to.be.equal( + Math.min(numPoints - skip, pageSize) + ); + + data.push(...metrics.data); + skip += pageSize; + } + + const metrics = await queue.getMetrics('completed'); + expect(data).to.be.deep.equal(metrics.data); + + this.clock.restore(); + await queue.close(); + }); +});