diff --git a/extensions/replication/ReplicationConfigValidator.js b/extensions/replication/ReplicationConfigValidator.js index d3429a236..3bd9946f3 100644 --- a/extensions/replication/ReplicationConfigValidator.js +++ b/extensions/replication/ReplicationConfigValidator.js @@ -80,6 +80,7 @@ const joiSchema = joi.object({ probeServerPerSite, ).default({ bindAddress: 'localhost', port: 4042 }), circuitBreaker: joi.object().optional(), + sourceCheckIfSizeGreaterThanMB: joi.number().positive().default(100), }).required(), replicationStatusProcessor: { groupId: joi.string().required(), diff --git a/extensions/replication/tasks/ReplicateObject.js b/extensions/replication/tasks/ReplicateObject.js index 677acf696..d0b926e1c 100644 --- a/extensions/replication/tasks/ReplicateObject.js +++ b/extensions/replication/tasks/ReplicateObject.js @@ -16,6 +16,10 @@ const { getAccountCredentials } = require('../../../lib/credentials/AccountCrede const RoleCredentials = require('../../../lib/credentials/RoleCredentials'); const { metricsExtension, metricsTypeQueued, metricsTypeCompleted, replicationStages } = require('../constants'); +const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry'); + +const errorAlreadyCompleted = {}; + function _extractAccountIdFromRole(role) { return role.split(':')[4]; } @@ -331,6 +335,49 @@ class ReplicateObject extends BackbeatTask { }); } + _refreshSourceEntry(sourceEntry, log, cb) { + const params = { + Bucket: sourceEntry.getBucket(), + Key: sourceEntry.getObjectKey(), + VersionId: sourceEntry.getEncodedVersionId(), + }; + return this.backbeatSource.getMetadata(params, (err, blob) => { + if (err) { + err.origin = 'source'; + log.error('error getting metadata blob from S3', { + method: 'ReplicateObject._refreshSourceEntry', + error: err, + }); + return cb(err); + } + const parsedEntry = ObjectQueueEntry.createFromBlob(blob.Body); + if (parsedEntry.error) { + log.error('error parsing metadata blob', { + error: parsedEntry.error, + method: 'ReplicateObject._refreshSourceEntry', + }); + return cb(errors.InternalError. + customizeDescription('error parsing metadata blob')); + } + const refreshedEntry = new ObjectQueueEntry(sourceEntry.getBucket(), + sourceEntry.getObjectVersionedKey(), parsedEntry.result); + return cb(null, refreshedEntry); + }); + } + + _checkSourceReplication(sourceEntry, log, cb) { + this._refreshSourceEntry(sourceEntry, log, (err, refreshedEntry) => { + if (err) { + return cb(err); + } + const status = refreshedEntry.getReplicationSiteStatus(this.site); + if (status === 'COMPLETED') { + return cb(errorAlreadyCompleted); + } + return cb(); + }); + } + _getAndPutData(sourceEntry, destEntry, log, cb) { log.debug('replicating data', { entry: sourceEntry.getLogInfo() }); if (sourceEntry.getLocation().some(part => { @@ -700,6 +747,14 @@ class ReplicateObject extends BackbeatTask { (sourceRole, targetRole, next) => { this._setTargetAccountMd(destEntry, targetRole, log, next); }, + next => { + if (!mdOnly && + sourceEntry.getContentLength() / 1000000 >= + this.repConfig.queueProcessor.sourceCheckIfSizeGreaterThanMB) { + return this._checkSourceReplication(sourceEntry, log, next); + } + return next(); + }, // Get data from source bucket and put it on the target bucket next => { if (!mdOnly) { @@ -768,6 +823,12 @@ class ReplicateObject extends BackbeatTask { }); return done(); } + if (err === errorAlreadyCompleted) { + log.warn('replication skipped: ' + + 'source object version already COMPLETED', + { entry: sourceEntry.getLogInfo() }); + return done(); + } if (err.ObjNotFound || err.code === 'ObjNotFound') { if (err.origin === 'source') { log.info('replication skipped: ' + diff --git a/tests/functional/replication/queueProcessor.js b/tests/functional/replication/queueProcessor.js index 29c83f3de..8156d874e 100644 --- a/tests/functional/replication/queueProcessor.js +++ b/tests/functional/replication/queueProcessor.js @@ -676,7 +676,6 @@ class S3Mock extends TestConfigurator { } _getMetadataSource(req, url, query, res) { - assert(query.versionId); res.writeHead(200); res.end(JSON.stringify({ Body: JSON.parse(this.getParam('kafkaEntry.value')).value, @@ -797,6 +796,7 @@ describe('queue processor functional tests with mocking', () => { }, groupId: 'backbeat-func-test-group-id', mpuPartsConcurrency: 10, + sourceCheckIfSizeGreaterThanMB: 10, }, }, { host: '127.0.0.1', @@ -984,6 +984,62 @@ describe('queue processor functional tests with mocking', () => { }), ], done); }); + + it('should check object MD if size is bigger than sourceCheckIfSizeGreaterThanMB', done => { + s3mock.setParam('contentLength', 100000000); + let checkMdCalled = false; + s3mock.setParam('routes.source.s3.getMetadata.handler', + (req, url, query, res) => { + checkMdCalled = true; + s3mock.resetParam('routes.source.s3.getMetadata.handler'); + s3mock._getMetadataSource(req, url, query, res); + }, { _static: true }); + + async.parallel([ + done => { + s3mock.onPutSourceMd = done; + }, + done => queueProcessorSF.processReplicationEntry( + s3mock.getParam('kafkaEntry'), err => { + assert.ifError(err); + assert.strictEqual(s3mock.hasPutTargetData, true); + assert(s3mock.hasPutTargetMd); + assert(checkMdCalled); + done(); + }), + ], () => { + s3mock.resetParam('contentLength'); + done(); + }); + }); + + it('should not check object MD if size is smaller than sourceCheckIfSizeGreaterThanMB', done => { + s3mock.setParam('contentLength', 1); + let checkMdCalled = false; + s3mock.setParam('routes.source.s3.getMetadata.handler', + (req, url, query, res) => { + checkMdCalled = true; + s3mock.resetParam('routes.source.s3.getMetadata.handler'); + s3mock._getMetadataSource(req, url, query, res); + }, { _static: true }); + + async.parallel([ + done => { + s3mock.onPutSourceMd = done; + }, + done => queueProcessorSF.processReplicationEntry( + s3mock.getParam('kafkaEntry'), err => { + assert.ifError(err); + assert.strictEqual(s3mock.hasPutTargetData, true); + assert(s3mock.hasPutTargetMd); + assert.strictEqual(checkMdCalled, false); + done(); + }), + ], () => { + s3mock.resetParam('contentLength'); + done(); + }); + }); }); describe('error paths', function errorPaths() { @@ -1451,6 +1507,25 @@ describe('queue processor functional tests with mocking', () => { ], done); }); }); + + it('should fail a replication if unable to get metadata', done => { + s3mock.setParam('contentLength', 100000000); + s3mock.installBackbeatErrorResponder('source.s3.getMetadata', + errors.ObjNotFound, + { once: true }); + async.parallel([ + done => queueProcessorSF.processReplicationEntry( + s3mock.getParam('kafkaEntry'), err => { + assert.ifError(err); + assert(!s3mock.hasPutTargetData); + assert(!s3mock.hasPutTargetMd); + done(); + }), + ], () => { + s3mock.resetParam('contentLength'); + done(); + }); + }); }); });