From f74d08430315ceb15d18b0080f902d2a5c2f5e28 Mon Sep 17 00:00:00 2001 From: KillianG Date: Tue, 15 Oct 2024 15:41:35 +0200 Subject: [PATCH] Check large objects before replication Issue: BB-566 --- .../replication/ReplicationConfigValidator.js | 1 + .../replication/tasks/ReplicateObject.js | 61 +++++++++++++++ .../functional/replication/queueProcessor.js | 77 ++++++++++++++++++- 3 files changed, 138 insertions(+), 1 deletion(-) diff --git a/extensions/replication/ReplicationConfigValidator.js b/extensions/replication/ReplicationConfigValidator.js index 310d5a2e9..a0ea0dc03 100644 --- a/extensions/replication/ReplicationConfigValidator.js +++ b/extensions/replication/ReplicationConfigValidator.js @@ -73,6 +73,7 @@ const joiSchema = joi.object({ minMPUSizeMB: joi.number().greater(0).default(20), probeServer: probeServerJoi.default(), circuitBreaker: joi.object().optional(), + sourceCheckIfSizeGreaterThanMB: joi.number().positive().default(100), }).required(), replicationStatusProcessor: { groupId: joi.string().required(), diff --git a/extensions/replication/tasks/ReplicateObject.js b/extensions/replication/tasks/ReplicateObject.js index 677acf696..d0b926e1c 100644 --- a/extensions/replication/tasks/ReplicateObject.js +++ b/extensions/replication/tasks/ReplicateObject.js @@ -16,6 +16,10 @@ const { getAccountCredentials } = require('../../../lib/credentials/AccountCrede const RoleCredentials = require('../../../lib/credentials/RoleCredentials'); const { metricsExtension, metricsTypeQueued, metricsTypeCompleted, replicationStages } = require('../constants'); +const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry'); + +const errorAlreadyCompleted = {}; + function _extractAccountIdFromRole(role) { return role.split(':')[4]; } @@ -331,6 +335,49 @@ class ReplicateObject extends BackbeatTask { }); } + _refreshSourceEntry(sourceEntry, log, cb) { + const params = { + Bucket: sourceEntry.getBucket(), + Key: sourceEntry.getObjectKey(), + VersionId: sourceEntry.getEncodedVersionId(), + }; + return this.backbeatSource.getMetadata(params, (err, blob) => { + if (err) { + err.origin = 'source'; + log.error('error getting metadata blob from S3', { + method: 'ReplicateObject._refreshSourceEntry', + error: err, + }); + return cb(err); + } + const parsedEntry = ObjectQueueEntry.createFromBlob(blob.Body); + if (parsedEntry.error) { + log.error('error parsing metadata blob', { + error: parsedEntry.error, + method: 'ReplicateObject._refreshSourceEntry', + }); + return cb(errors.InternalError. + customizeDescription('error parsing metadata blob')); + } + const refreshedEntry = new ObjectQueueEntry(sourceEntry.getBucket(), + sourceEntry.getObjectVersionedKey(), parsedEntry.result); + return cb(null, refreshedEntry); + }); + } + + _checkSourceReplication(sourceEntry, log, cb) { + this._refreshSourceEntry(sourceEntry, log, (err, refreshedEntry) => { + if (err) { + return cb(err); + } + const status = refreshedEntry.getReplicationSiteStatus(this.site); + if (status === 'COMPLETED') { + return cb(errorAlreadyCompleted); + } + return cb(); + }); + } + _getAndPutData(sourceEntry, destEntry, log, cb) { log.debug('replicating data', { entry: sourceEntry.getLogInfo() }); if (sourceEntry.getLocation().some(part => { @@ -700,6 +747,14 @@ class ReplicateObject extends BackbeatTask { (sourceRole, targetRole, next) => { this._setTargetAccountMd(destEntry, targetRole, log, next); }, + next => { + if (!mdOnly && + sourceEntry.getContentLength() / 1000000 >= + this.repConfig.queueProcessor.sourceCheckIfSizeGreaterThanMB) { + return this._checkSourceReplication(sourceEntry, log, next); + } + return next(); + }, // Get data from source bucket and put it on the target bucket next => { if (!mdOnly) { @@ -768,6 +823,12 @@ class ReplicateObject extends BackbeatTask { }); return done(); } + if (err === errorAlreadyCompleted) { + log.warn('replication skipped: ' + + 'source object version already COMPLETED', + { entry: sourceEntry.getLogInfo() }); + return done(); + } if (err.ObjNotFound || err.code === 'ObjNotFound') { if (err.origin === 'source') { log.info('replication skipped: ' + diff --git a/tests/functional/replication/queueProcessor.js b/tests/functional/replication/queueProcessor.js index 38b7c26c3..1c6245d96 100644 --- a/tests/functional/replication/queueProcessor.js +++ b/tests/functional/replication/queueProcessor.js @@ -674,7 +674,6 @@ class S3Mock extends TestConfigurator { } _getMetadataSource(req, url, query, res) { - assert(query.versionId); res.writeHead(200); res.end(JSON.stringify({ Body: JSON.parse(this.getParam('kafkaEntry.value')).value, @@ -797,6 +796,7 @@ describe('queue processor functional tests with mocking', () => { }, groupId: 'backbeat-func-test-group-id', mpuPartsConcurrency: 10, + sourceCheckIfSizeGreaterThanMB: 10, }, }, { host: '127.0.0.1', @@ -984,6 +984,62 @@ describe('queue processor functional tests with mocking', () => { }), ], done); }); + + it('should check object MD if size is bigger than sourceCheckIfSizeGreaterThanMB', done => { + s3mock.setParam('contentLength', 100000000); + let checkMdCalled = false; + s3mock.setParam('routes.source.s3.getMetadata.handler', + (req, url, query, res) => { + checkMdCalled = true; + s3mock.resetParam('routes.source.s3.getMetadata.handler'); + s3mock._getMetadataSource(req, url, query, res); + }, { _static: true }); + + async.parallel([ + done => { + s3mock.onPutSourceMd = done; + }, + done => queueProcessorSF.processReplicationEntry( + s3mock.getParam('kafkaEntry'), err => { + assert.ifError(err); + assert.strictEqual(s3mock.hasPutTargetData, true); + assert(s3mock.hasPutTargetMd); + assert(checkMdCalled); + done(); + }), + ], () => { + s3mock.resetParam('contentLength'); + done(); + }); + }); + + it('should not check object MD if size is smaller than sourceCheckIfSizeGreaterThanMB', done => { + s3mock.setParam('contentLength', 1); + let checkMdCalled = false; + s3mock.setParam('routes.source.s3.getMetadata.handler', + (req, url, query, res) => { + checkMdCalled = true; + s3mock.resetParam('routes.source.s3.getMetadata.handler'); + s3mock._getMetadataSource(req, url, query, res); + }, { _static: true }); + + async.parallel([ + done => { + s3mock.onPutSourceMd = done; + }, + done => queueProcessorSF.processReplicationEntry( + s3mock.getParam('kafkaEntry'), err => { + assert.ifError(err); + assert.strictEqual(s3mock.hasPutTargetData, true); + assert(s3mock.hasPutTargetMd); + assert.strictEqual(checkMdCalled, false); + done(); + }), + ], () => { + s3mock.resetParam('contentLength'); + done(); + }); + }); }); describe('error paths', function errorPaths() { @@ -1447,5 +1503,24 @@ describe('queue processor functional tests with mocking', () => { ], done); }); }); + + it('should fail a replication if unable to get metadata', done => { + s3mock.setParam('contentLength', 100000000); + s3mock.installBackbeatErrorResponder('source.s3.getMetadata', + errors.ObjNotFound, + { once: true }); + async.parallel([ + done => queueProcessorSF.processReplicationEntry( + s3mock.getParam('kafkaEntry'), err => { + assert.ifError(err); + assert(!s3mock.hasPutTargetData); + assert(!s3mock.hasPutTargetMd); + done(); + }), + ], () => { + s3mock.resetParam('contentLength'); + done(); + }); + }); }); });