Skip to content

Commit

Permalink
Check large objects before replication
Browse files Browse the repository at this point in the history
Issue: BB-566
  • Loading branch information
KillianG committed Oct 25, 2024
1 parent 31d9ca6 commit 54216a6
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 1 deletion.
1 change: 1 addition & 0 deletions extensions/replication/ReplicationConfigValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const joiSchema = joi.object({
minMPUSizeMB: joi.number().greater(0).default(20),
probeServer: probeServerJoi.default(),
circuitBreaker: joi.object().optional(),
sourceCheckIfSizeGreaterThanMB: joi.number().positive().default(100),
}).required(),
replicationStatusProcessor: {
groupId: joi.string().required(),
Expand Down
66 changes: 66 additions & 0 deletions extensions/replication/tasks/ReplicateObject.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ const { getAccountCredentials } = require('../../../lib/credentials/AccountCrede
const RoleCredentials = require('../../../lib/credentials/RoleCredentials');
const { metricsExtension, metricsTypeQueued, metricsTypeCompleted, replicationStages } = require('../constants');

const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');

const errorAlreadyCompleted = {};

function _extractAccountIdFromRole(role) {
return role.split(':')[4];
}
Expand Down Expand Up @@ -331,6 +335,49 @@ class ReplicateObject extends BackbeatTask {
});
}

_refreshSourceEntry(sourceEntry, log, cb) {
const params = {
Bucket: sourceEntry.getBucket(),
Key: sourceEntry.getObjectKey(),
VersionId: sourceEntry.getEncodedVersionId(),
};
return this.backbeatSource.getMetadata(params, (err, blob) => {
if (err) {
err.origin = 'source';

Check warning on line 346 in extensions/replication/tasks/ReplicateObject.js

View workflow job for this annotation

GitHub Actions / tests

Assignment to property of function parameter 'err'
log.error('error getting metadata blob from S3', {
method: 'ReplicateObject._refreshSourceEntry',
error: err,
});
return cb(err);
}
const parsedEntry = ObjectQueueEntry.createFromBlob(blob.Body);
if (parsedEntry.error) {
log.error('error parsing metadata blob', {

Check warning on line 355 in extensions/replication/tasks/ReplicateObject.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/replication/tasks/ReplicateObject.js#L355

Added line #L355 was not covered by tests
error: parsedEntry.error,
method: 'ReplicateObject._refreshSourceEntry',
});
return cb(errors.InternalError.

Check warning on line 359 in extensions/replication/tasks/ReplicateObject.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/replication/tasks/ReplicateObject.js#L359

Added line #L359 was not covered by tests
customizeDescription('error parsing metadata blob'));
}
const refreshedEntry = new ObjectQueueEntry(sourceEntry.getBucket(),
sourceEntry.getObjectVersionedKey(), parsedEntry.result);
return cb(null, refreshedEntry);
});
}

_checkSourceReplication(sourceEntry, log, cb) {
this._refreshSourceEntry(sourceEntry, log, (err, refreshedEntry) => {
if (err) {
return cb(err);
}
const status = refreshedEntry.getReplicationSiteStatus(this.site);
if (status === 'COMPLETED') {
return cb(errorAlreadyCompleted);

Check warning on line 375 in extensions/replication/tasks/ReplicateObject.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/replication/tasks/ReplicateObject.js#L375

Added line #L375 was not covered by tests
}
return cb();
});
}

_getAndPutData(sourceEntry, destEntry, log, cb) {
log.debug('replicating data', { entry: sourceEntry.getLogInfo() });
if (sourceEntry.getLocation().some(part => {
Expand Down Expand Up @@ -701,6 +748,19 @@ class ReplicateObject extends BackbeatTask {
this._setTargetAccountMd(destEntry, targetRole, log, next);
},
// Get data from source bucket and put it on the target bucket
next => {
log.error('sourceCheckIfSizeGreaterThanMB', {
sourceCheckIfSizeGreaterThanMB: this.repConfig.queueProcessor.sourceCheckIfSizeGreaterThanMB,
contentLength: sourceEntry.getContentLength()
});

if (!mdOnly &&
sourceEntry.getContentLength() / 1000000 >=
this.repConfig.queueProcessor.sourceCheckIfSizeGreaterThanMB) {
return this._checkSourceReplication(sourceEntry, log, next);
}
return next();
},
next => {
if (!mdOnly) {
const extMetrics = getExtMetrics(this.site,
Expand Down Expand Up @@ -768,6 +828,12 @@ class ReplicateObject extends BackbeatTask {
});
return done();
}
if (err === errorAlreadyCompleted) {
log.warn('replication skipped: ' +

Check warning on line 832 in extensions/replication/tasks/ReplicateObject.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/replication/tasks/ReplicateObject.js#L832

Added line #L832 was not covered by tests
'source object version already COMPLETED',
{ entry: sourceEntry.getLogInfo() });
return done();

Check warning on line 835 in extensions/replication/tasks/ReplicateObject.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/replication/tasks/ReplicateObject.js#L835

Added line #L835 was not covered by tests
}
if (err.ObjNotFound || err.code === 'ObjNotFound') {
if (err.origin === 'source') {
log.info('replication skipped: ' +
Expand Down
77 changes: 76 additions & 1 deletion tests/functional/replication/queueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,6 @@ class S3Mock extends TestConfigurator {
}

_getMetadataSource(req, url, query, res) {
assert(query.versionId);
res.writeHead(200);
res.end(JSON.stringify({
Body: JSON.parse(this.getParam('kafkaEntry.value')).value,
Expand Down Expand Up @@ -797,6 +796,7 @@ describe('queue processor functional tests with mocking', () => {
},
groupId: 'backbeat-func-test-group-id',
mpuPartsConcurrency: 10,
sourceCheckIfSizeGreaterThanMB: 10,
},
},
{ host: '127.0.0.1',
Expand Down Expand Up @@ -984,6 +984,62 @@ describe('queue processor functional tests with mocking', () => {
}),
], done);
});

it('should check object MD if size is bigger than sourceCheckIfSizeGreaterThanMB', done => {
s3mock.setParam('contentLength', 100000000);
let checkMdCalled = false;
s3mock.setParam('routes.source.s3.getMetadata.handler',
(req, url, query, res) => {
checkMdCalled = true;
s3mock.resetParam('routes.source.s3.getMetadata.handler');
s3mock._getMetadataSource(req, url, query, res);
}, { _static: true });

async.parallel([
done => {
s3mock.onPutSourceMd = done;
},
done => queueProcessorSF.processReplicationEntry(
s3mock.getParam('kafkaEntry'), err => {
assert.ifError(err);
assert.strictEqual(s3mock.hasPutTargetData, true);
assert(s3mock.hasPutTargetMd);
assert(checkMdCalled);
done();
}),
], () => {
s3mock.resetParam('contentLength');
done();
});
});

it('should not check object MD if size is smaller than sourceCheckIfSizeGreaterThanMB', done => {
s3mock.setParam('contentLength', 1);
let checkMdCalled = false;
s3mock.setParam('routes.source.s3.getMetadata.handler',
(req, url, query, res) => {
checkMdCalled = true;
s3mock.resetParam('routes.source.s3.getMetadata.handler');
s3mock._getMetadataSource(req, url, query, res);
}, { _static: true });

async.parallel([
done => {
s3mock.onPutSourceMd = done;
},
done => queueProcessorSF.processReplicationEntry(
s3mock.getParam('kafkaEntry'), err => {
assert.ifError(err);
assert.strictEqual(s3mock.hasPutTargetData, true);
assert(s3mock.hasPutTargetMd);
assert.strictEqual(checkMdCalled, false);
done();
}),
], () => {
s3mock.resetParam('contentLength');
done();
});
});
});

describe('error paths', function errorPaths() {
Expand Down Expand Up @@ -1447,5 +1503,24 @@ describe('queue processor functional tests with mocking', () => {
], done);
});
});

it('should fail a replication if unable to get metadata', done => {
s3mock.setParam('contentLength', 100000000);
s3mock.installBackbeatErrorResponder('source.s3.getMetadata',
errors.ObjNotFound,
{ once: true });
async.parallel([
done => queueProcessorSF.processReplicationEntry(
s3mock.getParam('kafkaEntry'), err => {
assert.ifError(err);
assert(!s3mock.hasPutTargetData);
assert(!s3mock.hasPutTargetMd);
done();
}),
], () => {
s3mock.resetParam('contentLength');
done();
});
});
});
});

0 comments on commit 54216a6

Please sign in to comment.