Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and support NewerNoncurrentVersions #2443

Merged
merged 3 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions extensions/lifecycle/tasks/LifecycleTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,14 @@ class LifecycleTask extends BackbeatTask {
const ncvHeapObject = this.ncvHeap.get(bucketName).get(version.Key);

const nncvSize = parseInt(rule[ncve][nncv], 10);
if (!ncvHeapObject.get(rule.Id)) {
ncvHeapObject.set(rule.Id, new MinHeap(nncvSize, noncurrentVersionCompare));

const ruleId = rule[ncve].ID;

if (!ncvHeapObject.get(ruleId)) {
ncvHeapObject.set(ruleId, new MinHeap(nncvSize, noncurrentVersionCompare));
}

const heap = ncvHeapObject.get(rule.Id);
const heap = ncvHeapObject.get(ruleId);

if (heap.size < nncvSize) {
heap.add(version);
Expand Down
32 changes: 31 additions & 1 deletion extensions/lifecycle/tasks/LifecycleTaskV2.js
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,18 @@ class LifecycleTaskV2 extends LifecycleTask {
return done(err);
}

// create Set of unique keys not matching the next marker to
// indicate the object level entries to be cleared at the end
// of the processing step
const uniqueObjectKeysNotNextMarker = new Set();
if (markerInfo.keyMarker) {
contents.forEach(v => {
if (v.Key !== markerInfo.keyMarker) {
uniqueObjectKeysNotNextMarker.add(v.Key);
}
});
}

// re-queue truncated listing only once.
if (isTruncated && nbRetries === 0) {
const entry = Object.assign({}, bucketData, {
Expand All @@ -198,7 +210,25 @@ class LifecycleTaskV2 extends LifecycleTask {
});
}
return this._compareRulesToList(bucketData, bucketLCRules,
contents, log, done);
contents, log, err => {
if (err) {
return done(err);
}

if (!isTruncated) {
// end of bucket listing
// clear bucket level entry and all object entries
this._ncvHeapBucketClear(bucketData.target.bucket);
} else {
// clear object level entries that have been processed
this._ncvHeapObjectsClear(
bucketData.target.bucket,
uniqueObjectKeysNotNextMarker
);
}

return done();
});
});
}

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "backbeat",
"version": "7.70.4",
"version": "7.70.5",
"description": "Asynchronous queue and job manager",
"main": "index.js",
"scripts": {
Expand Down
141 changes: 141 additions & 0 deletions tests/functional/lifecycle/LifecycleTaskV2-versioned.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ describe('LifecycleTaskV2 with bucket versioned', () => {
kafkaBacklogMetrics: { snapshotTopicOffsets: () => {} },
pausedLocations: new Set(),
log,
ncvHeap: new Map(),
}),
};
lifecycleTask = new LifecycleTaskV2(lp);
Expand Down Expand Up @@ -752,4 +753,144 @@ describe('LifecycleTaskV2 with bucket versioned', () => {
return done();
});
});

it('should not publish any entry because version is retain by NewerNoncurrentVersions', done => {
const nonCurrentExpirationRule = [{
NoncurrentVersionExpiration: {
NoncurrentDays: 2,
NewerNoncurrentVersions: 1,
},
ID: '123',
Prefix: '',
Status: 'Enabled',
}];

const keyName = 'key1';
const versionId = 'versionid1';
const key = keyMock.nonCurrent({ keyName, versionId, daysEarlier: 1 });
const contents = [key];
backbeatMetadataProxy.listLifecycleResponse = { contents, isTruncated: false, markerInfo: {} };

const nbRetries = 0;
return lifecycleTask.processBucketEntry(nonCurrentExpirationRule, bucketData, s3,
backbeatMetadataProxy, nbRetries, err => {
assert.ifError(err);
// test that the non-current listing is triggered
assert.strictEqual(backbeatMetadataProxy.listLifecycleType, 'noncurrent');

// test parameters used to list lifecycle keys
const { listLifecycleParams } = backbeatMetadataProxy;
expectNominalListingParams(bucketName, listLifecycleParams);

// test that the entry is valid and pushed to kafka topic
assert.strictEqual(kafkaEntries.length, 0);
return done();
});
});

it('should retain the first non current version but publish the second one', done => {
const nonCurrentExpirationRule = [{
NoncurrentVersionExpiration: {
NoncurrentDays: 2,
NewerNoncurrentVersions: 1,
},
ID: '123',
Prefix: '',
Status: 'Enabled',
}];

const keyName = 'key1';
const versionId = 'versionid1';
const version1 = keyMock.nonCurrent({ keyName, versionId, daysEarlier: 2 });
const versionId2 = 'versionid2';
const version2 = keyMock.nonCurrent({ keyName, versionId: versionId2, daysEarlier: 1 });
const contents = [version1, version2];
backbeatMetadataProxy.listLifecycleResponse = { contents, isTruncated: false, markerInfo: {} };

const nbRetries = 0;
return lifecycleTask.processBucketEntry(nonCurrentExpirationRule, bucketData, s3,
backbeatMetadataProxy, nbRetries, err => {
assert.ifError(err);
// test that the non-current listing is triggered
assert.strictEqual(backbeatMetadataProxy.listLifecycleType, 'noncurrent');

// test parameters used to list lifecycle keys
const { listLifecycleParams } = backbeatMetadataProxy;
expectNominalListingParams(bucketName, listLifecycleParams);

// test that the entry is valid and pushed to kafka topic
assert.strictEqual(kafkaEntries.length, 1);
const firstEntry = kafkaEntries[0];
testKafkaEntry.expectObjectExpirationEntry(firstEntry, { keyName, versionId });
return done();
});
});

it('should retain the correct number of versions when the rule changes', done => {
const nonCurrentExpirationRule = [{
NoncurrentVersionExpiration: {
NoncurrentDays: 2,
NewerNoncurrentVersions: 2,
},
ID: '123',
Prefix: '',
Status: 'Enabled',
}];

const keyName = 'key1';
const versionId = 'versionid1';
const version1 = keyMock.nonCurrent({ keyName, versionId, daysEarlier: 2 });
const versionId2 = 'versionid2';
const version2 = keyMock.nonCurrent({ keyName, versionId: versionId2, daysEarlier: 1 });
const contents = [version1, version2];
backbeatMetadataProxy.listLifecycleResponse = {
contents,
isTruncated: true,
markerInfo: { keyMarker: 'key1', versionIdMarker: 'versionid2' },
};

const nbRetries = 0;
return lifecycleTask.processBucketEntry(nonCurrentExpirationRule, bucketData, s3,
backbeatMetadataProxy, nbRetries, err => {
assert.ifError(err);

kafkaEntries = [];
const nonCurrentExpirationRule2 = [{
NoncurrentVersionExpiration: {
NoncurrentDays: 2,
NewerNoncurrentVersions: 1,
},
ID: '456',
Prefix: '',
Status: 'Enabled',
}];

return lifecycleTask.processBucketEntry(nonCurrentExpirationRule2, bucketData, s3,
backbeatMetadataProxy, nbRetries, err => {
assert.ifError(err);
// test that the non-current listing is triggered
assert.strictEqual(backbeatMetadataProxy.listLifecycleType, 'noncurrent');

// test parameters used to list lifecycle keys
const { listLifecycleParams } = backbeatMetadataProxy;
expectNominalListingParams(bucketName, listLifecycleParams);

// test that the entry is valid and pushed to kafka topic
assert.strictEqual(kafkaEntries.length, 2);

const firstEntry = kafkaEntries[0];
testKafkaEntry.expectBucketEntry(firstEntry, {
hasBeforeDate: true,
keyMarker: keyName,
versionIdMarker: versionId2,
prefix: '',
listType: 'noncurrent',
});

const secondEntry = kafkaEntries[1];
testKafkaEntry.expectObjectExpirationEntry(secondEntry, { keyName, versionId });
return done();
});
});
});
});
27 changes: 14 additions & 13 deletions tests/unit/lifecycle/LifecycleTask.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -1729,9 +1729,10 @@ describe('lifecycle task helper methods', () => {
});

it('should clear the ncvHeap object of the listed bucket/keys', () => {
const ruleId = 'rule_name';
const rules = {
Id: 'rule_name',
NoncurrentVersionExpiration: {
ID: ruleId,
NoncurrentDays: 1,
NewerNoncurrentVersions: 10,
},
Expand Down Expand Up @@ -1772,16 +1773,16 @@ describe('lifecycle task helper methods', () => {
assert(lct2.ncvHeap.has(b1));
assert(!lct2.ncvHeap.get(b1).has(version1.Key));
assert(!lct2.ncvHeap.get(b1).has(version2.Key));
assert(lct2.ncvHeap.get(b1).get(version3.Key).has(rules.Id));
assert.strictEqual(lct2.ncvHeap.get(b1).get(version3.Key).get(rules.Id).size, 1);
assert(lct2.ncvHeap.get(b1).get(version3.Key).has(ruleId));
assert.strictEqual(lct2.ncvHeap.get(b1).get(version3.Key).get(ruleId).size, 1);

assert(lct2.ncvHeap.has(b2));
assert(lct2.ncvHeap.get(b2).has(version1.Key));
assert(lct2.ncvHeap.get(b2).get(version1.Key).has(rules.Id));
assert.strictEqual(lct2.ncvHeap.get(b2).get(version1.Key).get(rules.Id).size, 1);
assert(lct2.ncvHeap.get(b2).get(version1.Key).has(ruleId));
assert.strictEqual(lct2.ncvHeap.get(b2).get(version1.Key).get(ruleId).size, 1);
assert(lct2.ncvHeap.get(b2).has(version2.Key));
assert(lct2.ncvHeap.get(b2).get(version2.Key).has(rules.Id));
assert.strictEqual(lct2.ncvHeap.get(b2).get(version2.Key).get(rules.Id).size, 1);
assert(lct2.ncvHeap.get(b2).get(version2.Key).has(ruleId));
assert.strictEqual(lct2.ncvHeap.get(b2).get(version2.Key).get(ruleId).size, 1);
});
});

Expand Down Expand Up @@ -1812,9 +1813,10 @@ describe('lifecycle task helper methods', () => {
});

it('should clear the ncvHeap object of the listed bucket/keys', () => {
const ruleId = 'rule_name';
const rules = {
Id: 'rule_name',
NoncurrentVersionExpiration: {
ID: ruleId,
NoncurrentDays: 1,
NewerNoncurrentVersions: 10,
},
Expand Down Expand Up @@ -1853,11 +1855,11 @@ describe('lifecycle task helper methods', () => {
assert(!lct2.ncvHeap.has(b1));
assert(lct2.ncvHeap.has(b2));
assert(lct2.ncvHeap.get(b2).has(version1.Key));
assert(lct2.ncvHeap.get(b2).get(version1.Key).has(rules.Id));
assert.strictEqual(lct2.ncvHeap.get(b2).get(version1.Key).get(rules.Id).size, 1);
assert(lct2.ncvHeap.get(b2).get(version1.Key).has(ruleId));
assert.strictEqual(lct2.ncvHeap.get(b2).get(version1.Key).get(ruleId).size, 1);
assert(lct2.ncvHeap.get(b2).has(version2.Key));
assert(lct2.ncvHeap.get(b2).get(version2.Key).has(rules.Id));
assert.strictEqual(lct2.ncvHeap.get(b2).get(version2.Key).get(rules.Id).size, 1);
assert(lct2.ncvHeap.get(b2).get(version2.Key).has(ruleId));
assert.strictEqual(lct2.ncvHeap.get(b2).get(version2.Key).get(ruleId).size, 1);
});
});

Expand Down Expand Up @@ -2048,4 +2050,3 @@ describe('lifecycle task helper methods', () => {
});
});
});