diff --git a/lib/config.joi.js b/lib/config.joi.js index e5ae2e1e6..303bc4ae7 100644 --- a/lib/config.joi.js +++ b/lib/config.joi.js @@ -42,6 +42,7 @@ const joiSchema = joi.object({ zookeeperPath: joi.string().required(), logSource: joi.alternatives().try(logSourcesJoi).required(), + exhaustLogSource: joi.bool().default(false), bucketd: hostPortJoi .keys({ transport: transportJoi }) .when('logSource', { is: 'bucketd', then: joi.required() }), diff --git a/lib/queuePopulator/LogReader.js b/lib/queuePopulator/LogReader.js index 7c187d7ea..b941582f8 100644 --- a/lib/queuePopulator/LogReader.js +++ b/lib/queuePopulator/LogReader.js @@ -423,6 +423,7 @@ class LogReader { // continuously send 'data' events as they come. if (logRes.tailable && logStats.nbLogRecordsRead >= batchState.maxRead) { + logStats.hasMoreLog = true; logger.debug('ending batch', { method: 'LogReader._processPrepareEntries', reason: 'limit on number of read records reached', @@ -440,6 +441,9 @@ class LogReader { return shipBatchCb(err); }; const endEventHandler = () => { + if (logStats.nbLogRecordsRead >= batchState.maxRead) { + logStats.hasMoreLog = true; + } logger.debug('ending record stream', { method: 'LogReader._processPrepareEntries', }); diff --git a/lib/queuePopulator/QueuePopulator.js b/lib/queuePopulator/QueuePopulator.js index a5ef48155..7fdc07350 100644 --- a/lib/queuePopulator/QueuePopulator.js +++ b/lib/queuePopulator/QueuePopulator.js @@ -524,7 +524,24 @@ class QueuePopulator { _processLogEntries(params, done) { return async.map( this.logReaders, - (logReader, cb) => logReader.processLogEntries(params, cb), + (logReader, readerDone) => { + async.doWhilst( + next => { + logReader.processLogEntries(params, (err, hasMoreLog) => { + if (err) { + this.log.error('error processing log entries', { + method: 'QueuePopulator._processLogEntries', + error: err, + }); + return next(err); + } + return next(null, hasMoreLog); + }); + }, + hasMoreLog => this.qpConfig.exhaustLogSource && hasMoreLog && !this.logReadersUpdate, + readerDone, + ); + }, done); } diff --git a/tests/unit/QueuePopulator.spec.js b/tests/unit/QueuePopulator.spec.js index cb9ef7a6f..bf2a60933 100644 --- a/tests/unit/QueuePopulator.spec.js +++ b/tests/unit/QueuePopulator.spec.js @@ -4,6 +4,7 @@ const sinon = require('sinon'); const zookeeper = require('node-zookeeper-client'); const QueuePopulator = require('../../lib/queuePopulator/QueuePopulator'); const constants = require('../../lib/constants'); +const { errors } = require('arsenal'); describe('QueuePopulator', () => { let qp; @@ -120,4 +121,73 @@ describe('QueuePopulator', () => { ); }); }); + + describe('_processLogEntries', () => { + it('should process log records once when no more logs are available', done => { + qp.qpConfig.exhaustLogSource = true; + qp.logReaders = [{ + processLogEntries: sinon.stub().yields(null, false), + }]; + qp._processLogEntries({}, err => { + assert.ifError(err); + assert(qp.logReaders[0].processLogEntries.calledOnce); + return done(); + }); + }); + + it('should process log records until no more logs are available', done => { + qp.qpConfig.exhaustLogSource = true; + qp.logReaders = [{ + processLogEntries: sinon.stub() + .onCall(0).yields(null, true) + .onCall(1).yields(null, false), + }]; + qp._processLogEntries({}, err => { + assert.ifError(err); + assert(qp.logReaders[0].processLogEntries.calledTwice); + return done(); + }); + }); + + it('should only process log records once if exhaustLogSource is set to false', done => { + qp.qpConfig.exhaustLogSource = false; + qp.logReaders = [{ + processLogEntries: sinon.stub() + .onCall(0).yields(null, true) + .onCall(1).yields(null, false), + }]; + qp._processLogEntries({}, err => { + assert.ifError(err); + assert(qp.logReaders[0].processLogEntries.calledOnce); + return done(); + }); + }); + + it('should only process log records once if the logReaders need to be updated', done => { + qp.qpConfig.exhaustLogSource = true; + qp.logReaders = [{ + processLogEntries: sinon.stub() + .onCall(0).yields(null, true) + .onCall(1).yields(null, false), + }]; + qp.logReadersUpdate = true; + qp._processLogEntries({}, err => { + assert.ifError(err); + assert(qp.logReaders[0].processLogEntries.calledOnce); + return done(); + }); + }); + + it('should forward logReader errors', done => { + qp.qpConfig.exhaustLogSource = true; + qp.logReaders = [{ + processLogEntries: sinon.stub().yields(errors.InternalError, false), + }]; + qp._processLogEntries({}, err => { + assert.deepEqual(err, errors.InternalError); + assert(qp.logReaders[0].processLogEntries.calledOnce); + return done(); + }); + }); + }); }); diff --git a/tests/unit/lib/queuePopulator/LogReader.spec.js b/tests/unit/lib/queuePopulator/LogReader.spec.js index 2c515d1dd..87b45a905 100644 --- a/tests/unit/lib/queuePopulator/LogReader.spec.js +++ b/tests/unit/lib/queuePopulator/LogReader.spec.js @@ -1,5 +1,6 @@ const assert = require('assert'); const sinon = require('sinon'); +const stream = require('stream'); const ZookeeperMock = require('zookeeper-mock'); @@ -26,6 +27,12 @@ class MockLogConsumer { } } +class MockRecordStream extends stream.PassThrough { + constructor() { + super({ objectMode: true }); + } +} + describe('LogReader', () => { let zkMock; let logReader; @@ -366,4 +373,136 @@ describe('LogReader', () => { }); }); }); + + describe('_processPrepareEntries', () => { + it('should consume "batchState.maxRead" logs from tailable stream', done => { + const batchState = { + logRes: { + log: new MockRecordStream(), + tailable: true, + }, + logStats: { + nbLogRecordsRead: 0, + nbLogEntriesRead: 0, + hasMoreLog: false, + }, + entriesToPublish: {}, + publishedEntries: {}, + currentRecords: [], + maxRead: 3, + startTime: Date.now(), + timeoutMs: 60000, + logger: logReader.log, + }; + for (let i = 0; i < 5; ++i) { + batchState.logRes.log.write({ + entries: [], + }); + } + logReader._processPrepareEntries(batchState, err => { + assert.ifError(err); + assert.strictEqual(batchState.logStats.nbLogRecordsRead, 3); + assert.strictEqual(batchState.logStats.hasMoreLog, true); + done(); + }); + }); + + it('should consume and return if tailable stream doesn\'t have enough records', done => { + const batchState = { + logRes: { + log: new MockRecordStream(), + tailable: true, + }, + logStats: { + nbLogRecordsRead: 0, + nbLogEntriesRead: 0, + hasMoreLog: false, + }, + entriesToPublish: {}, + publishedEntries: {}, + currentRecords: [], + maxRead: 30, + startTime: Date.now(), + timeoutMs: 100, + logger: logReader.log, + }; + for (let i = 0; i < 3; ++i) { + batchState.logRes.log.write({ + entries: [], + }); + } + logReader._processPrepareEntries(batchState, err => { + assert.ifError(err); + assert.strictEqual(batchState.logStats.nbLogRecordsRead, 3); + assert.strictEqual(batchState.logStats.hasMoreLog, false); + done(); + }); + }); + + it('should consume all logs from a non tailable streams', done => { + const batchState = { + logRes: { + log: new MockRecordStream(), + tailable: false, + }, + logStats: { + nbLogRecordsRead: 0, + nbLogEntriesRead: 0, + hasMoreLog: false, + }, + entriesToPublish: {}, + publishedEntries: {}, + currentRecords: [], + maxRead: 3, + startTime: Date.now(), + timeoutMs: 60000, + logger: logReader.log, + }; + for (let i = 0; i < 3; ++i) { + batchState.logRes.log.write({ + entries: [], + }); + } + batchState.logRes.log.end(); + logReader._processPrepareEntries(batchState, err => { + assert.ifError(err); + assert.strictEqual(batchState.logStats.nbLogRecordsRead, 3); + assert.strictEqual(batchState.logStats.hasMoreLog, true); + done(); + }); + }); + + it('should set hasMoreLog to false when a non tailable streams doesn\'t have enough records', done => { + const batchState = { + logRes: { + log: new MockRecordStream(), + tailable: false, + }, + logStats: { + nbLogRecordsRead: 0, + nbLogEntriesRead: 0, + hasMoreLog: false, + }, + entriesToPublish: {}, + publishedEntries: {}, + currentRecords: [], + maxRead: 5, + startTime: Date.now(), + timeoutMs: 60000, + logger: logReader.log, + }; + for (let i = 0; i < 3; ++i) { + batchState.logRes.log.write({ + entries: [], + }); + } + batchState.logRes.log.end(); + logReader._processPrepareEntries(batchState, err => { + assert.ifError(err); + assert.strictEqual(batchState.logStats.nbLogRecordsRead, 3); + assert.strictEqual(batchState.logStats.hasMoreLog, false); + done(); + }); + }); + }); });