From 6438b9378d1292b31009c904d7c7bf766d1ab640 Mon Sep 17 00:00:00 2001 From: Kerkesni Date: Tue, 15 Oct 2024 12:12:53 +0200 Subject: [PATCH] allow consuming log records without wait when more logs are available The cooldown between batches is removed when more logs are available to consume. This is mostly needed for S3C to keep good replication rates. Issue: BB-532 --- lib/config.joi.js | 1 + lib/queuePopulator/LogReader.js | 4 + lib/queuePopulator/QueuePopulator.js | 19 ++- tests/unit/QueuePopulator.spec.js | 70 +++++++++ .../unit/lib/queuePopulator/LogReader.spec.js | 139 ++++++++++++++++++ 5 files changed, 232 insertions(+), 1 deletion(-) diff --git a/lib/config.joi.js b/lib/config.joi.js index e5ae2e1e6..303bc4ae7 100644 --- a/lib/config.joi.js +++ b/lib/config.joi.js @@ -42,6 +42,7 @@ const joiSchema = joi.object({ zookeeperPath: joi.string().required(), logSource: joi.alternatives().try(logSourcesJoi).required(), + exhaustLogSource: joi.bool().default(false), bucketd: hostPortJoi .keys({ transport: transportJoi }) .when('logSource', { is: 'bucketd', then: joi.required() }), diff --git a/lib/queuePopulator/LogReader.js b/lib/queuePopulator/LogReader.js index 7c187d7ea..b941582f8 100644 --- a/lib/queuePopulator/LogReader.js +++ b/lib/queuePopulator/LogReader.js @@ -423,6 +423,7 @@ class LogReader { // continuously send 'data' events as they come. if (logRes.tailable && logStats.nbLogRecordsRead >= batchState.maxRead) { + logStats.hasMoreLog = true; logger.debug('ending batch', { method: 'LogReader._processPrepareEntries', reason: 'limit on number of read records reached', @@ -440,6 +441,9 @@ class LogReader { return shipBatchCb(err); }; const endEventHandler = () => { + if (logStats.nbLogRecordsRead >= batchState.maxRead) { + logStats.hasMoreLog = true; + } logger.debug('ending record stream', { method: 'LogReader._processPrepareEntries', }); diff --git a/lib/queuePopulator/QueuePopulator.js b/lib/queuePopulator/QueuePopulator.js index a5ef48155..7fdc07350 100644 --- a/lib/queuePopulator/QueuePopulator.js +++ b/lib/queuePopulator/QueuePopulator.js @@ -524,7 +524,24 @@ class QueuePopulator { _processLogEntries(params, done) { return async.map( this.logReaders, - (logReader, cb) => logReader.processLogEntries(params, cb), + (logReader, readerDone) => { + async.doWhilst( + next => { + logReader.processLogEntries(params, (err, hasMoreLog) => { + if (err) { + this.log.error('error processing log entries', { + method: 'QueuePopulator._processLogEntries', + error: err, + }); + return next(err); + } + return next(null, hasMoreLog); + }); + }, + hasMoreLog => this.qpConfig.exhaustLogSource && hasMoreLog && !this.logReadersUpdate, + readerDone, + ); + }, done); } diff --git a/tests/unit/QueuePopulator.spec.js b/tests/unit/QueuePopulator.spec.js index cb9ef7a6f..bf2a60933 100644 --- a/tests/unit/QueuePopulator.spec.js +++ b/tests/unit/QueuePopulator.spec.js @@ -4,6 +4,7 @@ const sinon = require('sinon'); const zookeeper = require('node-zookeeper-client'); const QueuePopulator = require('../../lib/queuePopulator/QueuePopulator'); const constants = require('../../lib/constants'); +const { errors } = require('arsenal'); describe('QueuePopulator', () => { let qp; @@ -120,4 +121,73 @@ describe('QueuePopulator', () => { ); }); }); + + describe('_processLogEntries', () => { + it('should process log records once when no more logs are available', done => { + qp.qpConfig.exhaustLogSource = true; + qp.logReaders = [{ + processLogEntries: sinon.stub().yields(null, false), + }]; + qp._processLogEntries({}, err => { + assert.ifError(err); + assert(qp.logReaders[0].processLogEntries.calledOnce); + return done(); + }); + }); + + it('should process log records until no more logs are available', done => { + qp.qpConfig.exhaustLogSource = true; + qp.logReaders = [{ + processLogEntries: sinon.stub() + .onCall(0).yields(null, true) + .onCall(1).yields(null, false), + }]; + qp._processLogEntries({}, err => { + assert.ifError(err); + assert(qp.logReaders[0].processLogEntries.calledTwice); + return done(); + }); + }); + + it('should only process log records once if exhaustLogSource is set to false', done => { + qp.qpConfig.exhaustLogSource = false; + qp.logReaders = [{ + processLogEntries: sinon.stub() + .onCall(0).yields(null, true) + .onCall(1).yields(null, false), + }]; + qp._processLogEntries({}, err => { + assert.ifError(err); + assert(qp.logReaders[0].processLogEntries.calledOnce); + return done(); + }); + }); + + it('should only process log records once if the logReaders need to be updated', done => { + qp.qpConfig.exhaustLogSource = true; + qp.logReaders = [{ + processLogEntries: sinon.stub() + .onCall(0).yields(null, true) + .onCall(1).yields(null, false), + }]; + qp.logReadersUpdate = true; + qp._processLogEntries({}, err => { + assert.ifError(err); + assert(qp.logReaders[0].processLogEntries.calledOnce); + return done(); + }); + }); + + it('should forward logReader errors', done => { + qp.qpConfig.exhaustLogSource = true; + qp.logReaders = [{ + processLogEntries: sinon.stub().yields(errors.InternalError, false), + }]; + qp._processLogEntries({}, err => { + assert.deepEqual(err, errors.InternalError); + assert(qp.logReaders[0].processLogEntries.calledOnce); + return done(); + }); + }); + }); }); diff --git a/tests/unit/lib/queuePopulator/LogReader.spec.js b/tests/unit/lib/queuePopulator/LogReader.spec.js index 2c515d1dd..87b45a905 100644 --- a/tests/unit/lib/queuePopulator/LogReader.spec.js +++ b/tests/unit/lib/queuePopulator/LogReader.spec.js @@ -1,5 +1,6 @@ const assert = require('assert'); const sinon = require('sinon'); +const stream = require('stream'); const ZookeeperMock = require('zookeeper-mock'); @@ -26,6 +27,12 @@ class MockLogConsumer { } } +class MockRecordStream extends stream.PassThrough { + constructor() { + super({ objectMode: true }); + } +} + describe('LogReader', () => { let zkMock; let logReader; @@ -366,4 +373,136 @@ describe('LogReader', () => { }); }); }); + + describe('_processPrepareEntries', () => { + it('should consume "batchState.maxRead" logs from tailable stream', done => { + const batchState = { + logRes: { + log: new MockRecordStream(), + tailable: true, + }, + logStats: { + nbLogRecordsRead: 0, + nbLogEntriesRead: 0, + hasMoreLog: false, + }, + entriesToPublish: {}, + publishedEntries: {}, + currentRecords: [], + maxRead: 3, + startTime: Date.now(), + timeoutMs: 60000, + logger: logReader.log, + }; + for (let i = 0; i < 5; ++i) { + batchState.logRes.log.write({ + entries: [], + }); + } + logReader._processPrepareEntries(batchState, err => { + assert.ifError(err); + assert.strictEqual(batchState.logStats.nbLogRecordsRead, 3); + assert.strictEqual(batchState.logStats.hasMoreLog, true); + done(); + }); + }); + + it('should consume and return if tailable stream doesn\'t have enough records', done => { + const batchState = { + logRes: { + log: new MockRecordStream(), + tailable: true, + }, + logStats: { + nbLogRecordsRead: 0, + nbLogEntriesRead: 0, + hasMoreLog: false, + }, + entriesToPublish: {}, + publishedEntries: {}, + currentRecords: [], + maxRead: 30, + startTime: Date.now(), + timeoutMs: 100, + logger: logReader.log, + }; + for (let i = 0; i < 3; ++i) { + batchState.logRes.log.write({ + entries: [], + }); + } + logReader._processPrepareEntries(batchState, err => { + assert.ifError(err); + assert.strictEqual(batchState.logStats.nbLogRecordsRead, 3); + assert.strictEqual(batchState.logStats.hasMoreLog, false); + done(); + }); + }); + + it('should consume all logs from a non tailable streams', done => { + const batchState = { + logRes: { + log: new MockRecordStream(), + tailable: false, + }, + logStats: { + nbLogRecordsRead: 0, + nbLogEntriesRead: 0, + hasMoreLog: false, + }, + entriesToPublish: {}, + publishedEntries: {}, + currentRecords: [], + maxRead: 3, + startTime: Date.now(), + timeoutMs: 60000, + logger: logReader.log, + }; + for (let i = 0; i < 3; ++i) { + batchState.logRes.log.write({ + entries: [], + }); + } + batchState.logRes.log.end(); + logReader._processPrepareEntries(batchState, err => { + assert.ifError(err); + assert.strictEqual(batchState.logStats.nbLogRecordsRead, 3); + assert.strictEqual(batchState.logStats.hasMoreLog, true); + done(); + }); + }); + + it('should set hasMoreLog to false when a non tailable streams doesn\'t have enough records', done => { + const batchState = { + logRes: { + log: new MockRecordStream(), + tailable: false, + }, + logStats: { + nbLogRecordsRead: 0, + nbLogEntriesRead: 0, + hasMoreLog: false, + }, + entriesToPublish: {}, + publishedEntries: {}, + currentRecords: [], + maxRead: 5, + startTime: Date.now(), + timeoutMs: 60000, + logger: logReader.log, + }; + for (let i = 0; i < 3; ++i) { + batchState.logRes.log.write({ + entries: [], + }); + } + batchState.logRes.log.end(); + logReader._processPrepareEntries(batchState, err => { + assert.ifError(err); + assert.strictEqual(batchState.logStats.nbLogRecordsRead, 3); + assert.strictEqual(batchState.logStats.hasMoreLog, false); + done(); + }); + }); + }); });