Skip to content

Commit

Permalink
BB-381: do not send empty objects to tape for now
Browse files Browse the repository at this point in the history
  • Loading branch information
rachedbenmustapha committed Apr 12, 2023
1 parent 2b419fe commit 8af4bea
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 2 deletions.
39 changes: 37 additions & 2 deletions extensions/replication/ReplicationAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ const ActionQueueEntry = require('../../lib/models/ActionQueueEntry');
const ReplicationMetrics = require('./ReplicationMetrics');

let { dataMoverTopic } = config.extensions.replication;
const { coldStorageArchiveTopicPrefix } = config.extensions.lifecycle;
const {
coldStorageArchiveTopicPrefix,
coldStorageStatusTopicPrefix,
} = config.extensions.lifecycle;
const { LifecycleMetrics } = require('../lifecycle/LifecycleMetrics');

const { emptyObjectArchiveId, emptyObjectArchiveVersion } = require('../../lib/constants');

class ReplicationAPI {
/**
* Create an action to copy an object's data to a new location.
Expand Down Expand Up @@ -84,9 +89,39 @@ class ReplicationAPI {
log.error(errorMsg, { method: 'ReplicationAPI.sendDataMoverAction' });
return cb(new Error(errorMsg));
}

const { reqId } = action.getContext();

if (locationConfig.isCold) {
// Temp special case: current tape implementation does not allow
// zero-byte objects yet. Bypass forwarder and send directly
// to status topic.
if (contentLength === 0 && locationConfig.type === 'dmf') {
topic = `${coldStorageStatusTopicPrefix}${toLocation}`;
const message = {
op: 'archive',
requestId: reqId,
date: new Date(),

accountId,
bucketName: bucket,
objectKey: key,
objectVersion: version,
eTag,

archiveInfo: {
archiveId: emptyObjectArchiveId,
archiveVersion: emptyObjectArchiveVersion,
},
};
kafkaEntry.message = JSON.stringify(message);
return producer.sendToTopic(topic, [kafkaEntry], err => {
LifecycleMetrics.onKafkaPublish(log, 'ColdStorageStatusTopic', 'bucket', err, 1);
return cb();
});
}

topic = `${coldStorageArchiveTopicPrefix}${toLocation}`;
const { reqId } = action.getContext();
const message = {
accountId,
bucketName: bucket,
Expand Down
2 changes: 2 additions & 0 deletions lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ const constants = {
replicationStatusProcessor: 'ReplicationStatusProcessor',
},
compressionType: 'Zstd',
emptyObjectArchiveId: 'EMPTY_OBJECT_NO_ARCHIVE_ID',
emptyObjectArchiveVersion: 0,
};

module.exports = constants;
54 changes: 54 additions & 0 deletions tests/unit/replication/ReplicationAPI.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ const ReplicationAPI =
const assert = require('assert');
const ActionQueueEntry = require('../../../lib/models/ActionQueueEntry');

const sinon = require('sinon');

const fakeLogger = require('../../utils/fakeLogger');
const bucketName = 'transition-to-dmf';
const owner = '4f5a1a4bd769fd6e4ebca87b96c86a621ebb9c8be0c012f291757410c55a36f7';
Expand All @@ -29,8 +31,15 @@ describe('ReplicationAPI', () => {
},
};

let clock;

beforeEach(() => {
messages = [];
clock = sinon.useFakeTimers();
});

afterEach(() => {
clock.restore();
});

describe('::sendDataMoverAction ', () => {
Expand Down Expand Up @@ -73,5 +82,50 @@ describe('ReplicationAPI', () => {
return;
});
});

it('should bypass archive topic for empty objects', done => {
const action = ActionQueueEntry.create('copyLocation');
action
.setAttribute('target', {
accountId,
owner,
bucket: bucketName,
key: objectKey,
version: versionId,
eTag,
lastModified,
})
.setAttribute('toLocation', toLocation)
.setAttribute('metrics', {
origin: originLabel,
fromLocation,
contentLength: 0,
})
.setResultsTopic(resultsTopic);
ReplicationAPI.sendDataMoverAction(mockProducer, action, fakeLogger, err => {
assert.ifError(err);
const expectedMessage = [
{
topic: 'cold-status-location-dmf-v1',
entry: {
op: 'archive',
accountId,
bucketName,
objectKey,
objectVersion: versionId,
eTag,
archiveInfo: {
archiveId: 'EMPTY_OBJECT_NO_ARCHIVE_ID',
archiveVersion: 0,
},
date: new Date().toJSON(),
},
},
];
assert.deepStrictEqual(messages, expectedMessage);
done();
return;
});
});
});
});

0 comments on commit 8af4bea

Please sign in to comment.