Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check large objects before replication #2558

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions extensions/replication/ReplicationConfigValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const joiSchema = joi.object({
minMPUSizeMB: joi.number().greater(0).default(20),
probeServer: probeServerJoi.default(),
circuitBreaker: joi.object().optional(),
sourceCheckIfSizeGreaterThanMB: joi.number().positive().default(100),
KillianG marked this conversation as resolved.
Show resolved Hide resolved
}).required(),
replicationStatusProcessor: {
groupId: joi.string().required(),
Expand Down
61 changes: 61 additions & 0 deletions extensions/replication/tasks/ReplicateObject.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
const RoleCredentials = require('../../../lib/credentials/RoleCredentials');
const { metricsExtension, metricsTypeQueued, metricsTypeCompleted, replicationStages } = require('../constants');

const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');

const errorAlreadyCompleted = {};

function _extractAccountIdFromRole(role) {
return role.split(':')[4];
}
Expand Down Expand Up @@ -331,6 +335,49 @@
});
}

_refreshSourceEntry(sourceEntry, log, cb) {
const params = {
Bucket: sourceEntry.getBucket(),
Key: sourceEntry.getObjectKey(),
VersionId: sourceEntry.getEncodedVersionId(),
};
return this.backbeatSource.getMetadata(params, (err, blob) => {
if (err) {
err.origin = 'source';

Check warning on line 346 in extensions/replication/tasks/ReplicateObject.js

View workflow job for this annotation

GitHub Actions / tests

Assignment to property of function parameter 'err'
log.error('error getting metadata blob from S3', {
method: 'ReplicateObject._refreshSourceEntry',
error: err,
});
return cb(err);
}
const parsedEntry = ObjectQueueEntry.createFromBlob(blob.Body);
if (parsedEntry.error) {
log.error('error parsing metadata blob', {

Check warning on line 355 in extensions/replication/tasks/ReplicateObject.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/replication/tasks/ReplicateObject.js#L355

Added line #L355 was not covered by tests
error: parsedEntry.error,
method: 'ReplicateObject._refreshSourceEntry',
});
return cb(errors.InternalError.

Check warning on line 359 in extensions/replication/tasks/ReplicateObject.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/replication/tasks/ReplicateObject.js#L359

Added line #L359 was not covered by tests
customizeDescription('error parsing metadata blob'));
}
const refreshedEntry = new ObjectQueueEntry(sourceEntry.getBucket(),
sourceEntry.getObjectVersionedKey(), parsedEntry.result);
return cb(null, refreshedEntry);
});
}

_checkSourceReplication(sourceEntry, log, cb) {
this._refreshSourceEntry(sourceEntry, log, (err, refreshedEntry) => {
if (err) {
return cb(err);
}
const status = refreshedEntry.getReplicationSiteStatus(this.site);
if (status === 'COMPLETED') {
return cb(errorAlreadyCompleted);

Check warning on line 375 in extensions/replication/tasks/ReplicateObject.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/replication/tasks/ReplicateObject.js#L375

Added line #L375 was not covered by tests
}
return cb();
});
}

_getAndPutData(sourceEntry, destEntry, log, cb) {
log.debug('replicating data', { entry: sourceEntry.getLogInfo() });
if (sourceEntry.getLocation().some(part => {
Expand Down Expand Up @@ -700,6 +747,14 @@
(sourceRole, targetRole, next) => {
this._setTargetAccountMd(destEntry, targetRole, log, next);
},
next => {
if (!mdOnly &&
sourceEntry.getContentLength() / 1000000 >=
this.repConfig.queueProcessor.sourceCheckIfSizeGreaterThanMB) {
return this._checkSourceReplication(sourceEntry, log, next);
}
return next();
KillianG marked this conversation as resolved.
Show resolved Hide resolved
},
// Get data from source bucket and put it on the target bucket
next => {
if (!mdOnly) {
Expand Down Expand Up @@ -768,6 +823,12 @@
});
return done();
}
if (err === errorAlreadyCompleted) {
log.warn('replication skipped: ' +

Check warning on line 827 in extensions/replication/tasks/ReplicateObject.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/replication/tasks/ReplicateObject.js#L827

Added line #L827 was not covered by tests
'source object version already COMPLETED',
{ entry: sourceEntry.getLogInfo() });
return done();

Check warning on line 830 in extensions/replication/tasks/ReplicateObject.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/replication/tasks/ReplicateObject.js#L830

Added line #L830 was not covered by tests
}
if (err.ObjNotFound || err.code === 'ObjNotFound') {
if (err.origin === 'source') {
log.info('replication skipped: ' +
Expand Down
77 changes: 76 additions & 1 deletion tests/functional/replication/queueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,6 @@ class S3Mock extends TestConfigurator {
}

_getMetadataSource(req, url, query, res) {
assert(query.versionId);
res.writeHead(200);
res.end(JSON.stringify({
Body: JSON.parse(this.getParam('kafkaEntry.value')).value,
Expand Down Expand Up @@ -797,6 +796,7 @@ describe('queue processor functional tests with mocking', () => {
},
groupId: 'backbeat-func-test-group-id',
mpuPartsConcurrency: 10,
sourceCheckIfSizeGreaterThanMB: 10,
},
},
{ host: '127.0.0.1',
Expand Down Expand Up @@ -984,6 +984,62 @@ describe('queue processor functional tests with mocking', () => {
}),
], done);
});

it('should check object MD if size is bigger than sourceCheckIfSizeGreaterThanMB', done => {
s3mock.setParam('contentLength', 100000000);
let checkMdCalled = false;
s3mock.setParam('routes.source.s3.getMetadata.handler',
(req, url, query, res) => {
checkMdCalled = true;
s3mock.resetParam('routes.source.s3.getMetadata.handler');
s3mock._getMetadataSource(req, url, query, res);
}, { _static: true });

async.parallel([
done => {
s3mock.onPutSourceMd = done;
},
done => queueProcessorSF.processReplicationEntry(
s3mock.getParam('kafkaEntry'), err => {
assert.ifError(err);
assert.strictEqual(s3mock.hasPutTargetData, true);
assert(s3mock.hasPutTargetMd);
assert(checkMdCalled);
done();
}),
], () => {
s3mock.resetParam('contentLength');
done();
});
});

it('should not check object MD if size is smaller than sourceCheckIfSizeGreaterThanMB', done => {
s3mock.setParam('contentLength', 1);
let checkMdCalled = false;
s3mock.setParam('routes.source.s3.getMetadata.handler',
(req, url, query, res) => {
checkMdCalled = true;
s3mock.resetParam('routes.source.s3.getMetadata.handler');
s3mock._getMetadataSource(req, url, query, res);
}, { _static: true });

async.parallel([
done => {
s3mock.onPutSourceMd = done;
},
done => queueProcessorSF.processReplicationEntry(
s3mock.getParam('kafkaEntry'), err => {
assert.ifError(err);
assert.strictEqual(s3mock.hasPutTargetData, true);
assert(s3mock.hasPutTargetMd);
assert.strictEqual(checkMdCalled, false);
done();
}),
], () => {
s3mock.resetParam('contentLength');
done();
});
});
});

describe('error paths', function errorPaths() {
Expand Down Expand Up @@ -1447,5 +1503,24 @@ describe('queue processor functional tests with mocking', () => {
], done);
});
});

it('should fail a replication if unable to get metadata', done => {
s3mock.setParam('contentLength', 100000000);
s3mock.installBackbeatErrorResponder('source.s3.getMetadata',
errors.ObjNotFound,
{ once: true });
async.parallel([
done => queueProcessorSF.processReplicationEntry(
s3mock.getParam('kafkaEntry'), err => {
assert.ifError(err);
assert(!s3mock.hasPutTargetData);
assert(!s3mock.hasPutTargetMd);
done();
}),
], () => {
s3mock.resetParam('contentLength');
done();
});
});
});
});
Loading