Skip to content

Commit

Permalink
allow consuming log records without wait when more logs are available
Browse files Browse the repository at this point in the history
The cooldown between batches is removed when more logs are available
to consume.

This is mostly needed for S3C to keep good replication rates.

Issue: BB-532
  • Loading branch information
Kerkesni committed Oct 22, 2024
1 parent 93d499f commit 4a93e99
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 1 deletion.
1 change: 1 addition & 0 deletions lib/config.joi.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const joiSchema = joi.object({
zookeeperPath: joi.string().required(),

logSource: joi.alternatives().try(logSourcesJoi).required(),
exhaustLogSource: joi.bool().default(false),
bucketd: hostPortJoi
.keys({ transport: transportJoi })
.when('logSource', { is: 'bucketd', then: joi.required() }),
Expand Down
4 changes: 4 additions & 0 deletions lib/queuePopulator/LogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ class LogReader {
// continuously send 'data' events as they come.
if (logRes.tailable &&
logStats.nbLogRecordsRead >= batchState.maxRead) {
logStats.hasMoreLog = true;
logger.debug('ending batch', {
method: 'LogReader._processPrepareEntries',
reason: 'limit on number of read records reached',
Expand All @@ -421,6 +422,9 @@ class LogReader {
return shipBatchCb(err);
};
const endEventHandler = () => {
if (logStats.nbLogRecordsRead >= batchState.maxRead) {
logStats.hasMoreLog = true;
}
logger.debug('ending record stream', {
method: 'LogReader._processPrepareEntries',
});
Expand Down
19 changes: 18 additions & 1 deletion lib/queuePopulator/QueuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,24 @@ class QueuePopulator {
_processLogEntries(params, done) {
return async.map(
this.logReaders,
(logReader, cb) => logReader.processLogEntries(params, cb),
(logReader, readerDone) => {
async.doWhilst(
next => {
logReader.processLogEntries(params, (err, hasMoreLog) => {
if (err) {
this.log.error('error processing log entries', {
method: 'QueuePopulator._processLogEntries',
error: err,
});
return next(err);
}
return next(null, hasMoreLog);
});
},
hasMoreLog => this.qpConfig.exhaustLogSource && hasMoreLog && !this.logReadersUpdate,
readerDone,
);
},
done);
}

Expand Down
70 changes: 70 additions & 0 deletions tests/unit/QueuePopulator.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const sinon = require('sinon');
const zookeeper = require('node-zookeeper-client');
const QueuePopulator = require('../../lib/queuePopulator/QueuePopulator');
const constants = require('../../lib/constants');
const { errors } = require('arsenal');

describe('QueuePopulator', () => {
let qp;
Expand Down Expand Up @@ -92,4 +93,73 @@ describe('QueuePopulator', () => {
);
});
});

describe('_processLogEntries', () => {
it('should process log records once when no more logs are available', done => {
qp.qpConfig.exhaustLogSource = true;
qp.logReaders = [{
processLogEntries: sinon.stub().yields(null, false),
}];
qp._processLogEntries({}, err => {
assert.ifError(err);
assert(qp.logReaders[0].processLogEntries.calledOnce);
return done();
});
});

it('should process log records until no more logs are available', done => {
qp.qpConfig.exhaustLogSource = true;
qp.logReaders = [{
processLogEntries: sinon.stub()
.onCall(0).yields(null, true)
.onCall(1).yields(null, false),
}];
qp._processLogEntries({}, err => {
assert.ifError(err);
assert(qp.logReaders[0].processLogEntries.calledTwice);
return done();
});
});

it('should only process log records once if exhaustLogSource is set to false', done => {
qp.qpConfig.exhaustLogSource = false;
qp.logReaders = [{
processLogEntries: sinon.stub()
.onCall(0).yields(null, true)
.onCall(1).yields(null, false),
}];
qp._processLogEntries({}, err => {
assert.ifError(err);
assert(qp.logReaders[0].processLogEntries.calledOnce);
return done();
});
});

it('should only process log records once if the logReaders need to be updated', done => {
qp.qpConfig.exhaustLogSource = true;
qp.logReaders = [{
processLogEntries: sinon.stub()
.onCall(0).yields(null, true)
.onCall(1).yields(null, false),
}];
qp.logReadersUpdate = true;
qp._processLogEntries({}, err => {
assert.ifError(err);
assert(qp.logReaders[0].processLogEntries.calledOnce);
return done();
});
});

it('should forward logReader errors', done => {
qp.qpConfig.exhaustLogSource = true;
qp.logReaders = [{
processLogEntries: sinon.stub().yields(errors.InternalError, false),
}];
qp._processLogEntries({}, err => {
assert.deepEqual(err, errors.InternalError);
assert(qp.logReaders[0].processLogEntries.calledOnce);
return done();
});
});
});
});
139 changes: 139 additions & 0 deletions tests/unit/lib/queuePopulator/LogReader.spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const assert = require('assert');
const sinon = require('sinon');
const stream = require('stream');

const ZookeeperMock = require('zookeeper-mock');

Expand All @@ -26,6 +27,12 @@ class MockLogConsumer {
}
}

class MockRecordStream extends stream.PassThrough {
constructor() {
super({ objectMode: true });
}
}

describe('LogReader', () => {
let zkMock;
let logReader;
Expand Down Expand Up @@ -297,4 +304,136 @@ describe('LogReader', () => {
});
});
});

describe('_processPrepareEntries', () => {
it('should consume "batchState.maxRead" logs from tailable stream', done => {
const batchState = {
logRes: {
log: new MockRecordStream(),
tailable: true,
},
logStats: {
nbLogRecordsRead: 0,
nbLogEntriesRead: 0,
hasMoreLog: false,
},
entriesToPublish: {},
publishedEntries: {},
currentRecords: [],
maxRead: 3,
startTime: Date.now(),
timeoutMs: 60000,
logger: logReader.log,
};
for (let i = 0; i < 5; ++i) {
batchState.logRes.log.write({
entries: [],
});
}
logReader._processPrepareEntries(batchState, err => {
assert.ifError(err);
assert.strictEqual(batchState.logStats.nbLogRecordsRead, 3);
assert.strictEqual(batchState.logStats.hasMoreLog, true);
done();
});
});

it('should consume and return if tailable stream doesn\'t have enough records', done => {
const batchState = {
logRes: {
log: new MockRecordStream(),
tailable: true,
},
logStats: {
nbLogRecordsRead: 0,
nbLogEntriesRead: 0,
hasMoreLog: false,
},
entriesToPublish: {},
publishedEntries: {},
currentRecords: [],
maxRead: 30,
startTime: Date.now(),
timeoutMs: 100,
logger: logReader.log,
};
for (let i = 0; i < 3; ++i) {
batchState.logRes.log.write({
entries: [],
});
}
logReader._processPrepareEntries(batchState, err => {
assert.ifError(err);
assert.strictEqual(batchState.logStats.nbLogRecordsRead, 3);
assert.strictEqual(batchState.logStats.hasMoreLog, false);
done();
});
});

it('should consume all logs from a non tailable streams', done => {
const batchState = {
logRes: {
log: new MockRecordStream(),
tailable: false,
},
logStats: {
nbLogRecordsRead: 0,
nbLogEntriesRead: 0,
hasMoreLog: false,
},
entriesToPublish: {},
publishedEntries: {},
currentRecords: [],
maxRead: 3,
startTime: Date.now(),
timeoutMs: 60000,
logger: logReader.log,
};
for (let i = 0; i < 3; ++i) {
batchState.logRes.log.write({
entries: [],
});
}
batchState.logRes.log.end();
logReader._processPrepareEntries(batchState, err => {
assert.ifError(err);
assert.strictEqual(batchState.logStats.nbLogRecordsRead, 3);
assert.strictEqual(batchState.logStats.hasMoreLog, true);
done();
});
});

it('should set hasMoreLog to false when a non tailable streams doesn\'t have enough records', done => {
const batchState = {
logRes: {
log: new MockRecordStream(),
tailable: false,
},
logStats: {
nbLogRecordsRead: 0,
nbLogEntriesRead: 0,
hasMoreLog: false,
},
entriesToPublish: {},
publishedEntries: {},
currentRecords: [],
maxRead: 5,
startTime: Date.now(),
timeoutMs: 60000,
logger: logReader.log,
};
for (let i = 0; i < 3; ++i) {
batchState.logRes.log.write({
entries: [],
});
}
batchState.logRes.log.end();
logReader._processPrepareEntries(batchState, err => {
assert.ifError(err);
assert.strictEqual(batchState.logStats.nbLogRecordsRead, 3);
assert.strictEqual(batchState.logStats.hasMoreLog, false);
done();
});
});
});
});

0 comments on commit 4a93e99

Please sign in to comment.