diff --git a/extensions/lifecycle/tasks/LifecycleTask.js b/extensions/lifecycle/tasks/LifecycleTask.js index e0222c3dd..ef84860f2 100644 --- a/extensions/lifecycle/tasks/LifecycleTask.js +++ b/extensions/lifecycle/tasks/LifecycleTask.js @@ -231,11 +231,14 @@ class LifecycleTask extends BackbeatTask { const ncvHeapObject = this.ncvHeap.get(bucketName).get(version.Key); const nncvSize = parseInt(rule[ncve][nncv], 10); - if (!ncvHeapObject.get(rule.Id)) { - ncvHeapObject.set(rule.Id, new MinHeap(nncvSize, noncurrentVersionCompare)); + + const ruleId = rule[ncve].ID; + + if (!ncvHeapObject.get(ruleId)) { + ncvHeapObject.set(ruleId, new MinHeap(nncvSize, noncurrentVersionCompare)); } - const heap = ncvHeapObject.get(rule.Id); + const heap = ncvHeapObject.get(ruleId); if (heap.size < nncvSize) { heap.add(version); diff --git a/extensions/lifecycle/tasks/LifecycleTaskV2.js b/extensions/lifecycle/tasks/LifecycleTaskV2.js index 7132038d9..adc62514d 100644 --- a/extensions/lifecycle/tasks/LifecycleTaskV2.js +++ b/extensions/lifecycle/tasks/LifecycleTaskV2.js @@ -176,6 +176,18 @@ class LifecycleTaskV2 extends LifecycleTask { return done(err); } + // create Set of unique keys not matching the next marker to + // indicate the object level entries to be cleared at the end + // of the processing step + const uniqueObjectKeysNotNextMarker = new Set(); + if (markerInfo.keyMarker) { + contents.forEach(v => { + if (v.Key !== markerInfo.keyMarker) { + uniqueObjectKeysNotNextMarker.add(v.Key); + } + }); + } + // re-queue truncated listing only once. if (isTruncated && nbRetries === 0) { const entry = Object.assign({}, bucketData, { @@ -198,7 +210,25 @@ class LifecycleTaskV2 extends LifecycleTask { }); } return this._compareRulesToList(bucketData, bucketLCRules, - contents, log, done); + contents, log, err => { + if (err) { + return done(err); + } + + if (!isTruncated) { + // end of bucket listing + // clear bucket level entry and all object entries + this._ncvHeapBucketClear(bucketData.target.bucket); + } else { + // clear object level entries that have been processed + this._ncvHeapObjectsClear( + bucketData.target.bucket, + uniqueObjectKeysNotNextMarker + ); + } + + return done(); + }); }); } diff --git a/package.json b/package.json index 0d8d7e4d3..0a1dabb75 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "backbeat", - "version": "7.70.4", + "version": "7.70.5", "description": "Asynchronous queue and job manager", "main": "index.js", "scripts": { diff --git a/tests/functional/lifecycle/LifecycleTaskV2-versioned.js b/tests/functional/lifecycle/LifecycleTaskV2-versioned.js index 1fb1a2100..3ac8d7181 100644 --- a/tests/functional/lifecycle/LifecycleTaskV2-versioned.js +++ b/tests/functional/lifecycle/LifecycleTaskV2-versioned.js @@ -74,6 +74,7 @@ describe('LifecycleTaskV2 with bucket versioned', () => { kafkaBacklogMetrics: { snapshotTopicOffsets: () => {} }, pausedLocations: new Set(), log, + ncvHeap: new Map(), }), }; lifecycleTask = new LifecycleTaskV2(lp); @@ -752,4 +753,144 @@ describe('LifecycleTaskV2 with bucket versioned', () => { return done(); }); }); + + it('should not publish any entry because version is retain by NewerNoncurrentVersions', done => { + const nonCurrentExpirationRule = [{ + NoncurrentVersionExpiration: { + NoncurrentDays: 2, + NewerNoncurrentVersions: 1, + }, + ID: '123', + Prefix: '', + Status: 'Enabled', + }]; + + const keyName = 'key1'; + const versionId = 'versionid1'; + const key = keyMock.nonCurrent({ keyName, versionId, daysEarlier: 1 }); + const contents = [key]; + backbeatMetadataProxy.listLifecycleResponse = { contents, isTruncated: false, markerInfo: {} }; + + const nbRetries = 0; + return lifecycleTask.processBucketEntry(nonCurrentExpirationRule, bucketData, s3, + backbeatMetadataProxy, nbRetries, err => { + assert.ifError(err); + // test that the non-current listing is triggered + assert.strictEqual(backbeatMetadataProxy.listLifecycleType, 'noncurrent'); + + // test parameters used to list lifecycle keys + const { listLifecycleParams } = backbeatMetadataProxy; + expectNominalListingParams(bucketName, listLifecycleParams); + + // test that the entry is valid and pushed to kafka topic + assert.strictEqual(kafkaEntries.length, 0); + return done(); + }); + }); + + it('should retain the first non current version but publish the second one', done => { + const nonCurrentExpirationRule = [{ + NoncurrentVersionExpiration: { + NoncurrentDays: 2, + NewerNoncurrentVersions: 1, + }, + ID: '123', + Prefix: '', + Status: 'Enabled', + }]; + + const keyName = 'key1'; + const versionId = 'versionid1'; + const version1 = keyMock.nonCurrent({ keyName, versionId, daysEarlier: 2 }); + const versionId2 = 'versionid2'; + const version2 = keyMock.nonCurrent({ keyName, versionId: versionId2, daysEarlier: 1 }); + const contents = [version1, version2]; + backbeatMetadataProxy.listLifecycleResponse = { contents, isTruncated: false, markerInfo: {} }; + + const nbRetries = 0; + return lifecycleTask.processBucketEntry(nonCurrentExpirationRule, bucketData, s3, + backbeatMetadataProxy, nbRetries, err => { + assert.ifError(err); + // test that the non-current listing is triggered + assert.strictEqual(backbeatMetadataProxy.listLifecycleType, 'noncurrent'); + + // test parameters used to list lifecycle keys + const { listLifecycleParams } = backbeatMetadataProxy; + expectNominalListingParams(bucketName, listLifecycleParams); + + // test that the entry is valid and pushed to kafka topic + assert.strictEqual(kafkaEntries.length, 1); + const firstEntry = kafkaEntries[0]; + testKafkaEntry.expectObjectExpirationEntry(firstEntry, { keyName, versionId }); + return done(); + }); + }); + + it('should retain the correct number of versions when the rule changes', done => { + const nonCurrentExpirationRule = [{ + NoncurrentVersionExpiration: { + NoncurrentDays: 2, + NewerNoncurrentVersions: 2, + }, + ID: '123', + Prefix: '', + Status: 'Enabled', + }]; + + const keyName = 'key1'; + const versionId = 'versionid1'; + const version1 = keyMock.nonCurrent({ keyName, versionId, daysEarlier: 2 }); + const versionId2 = 'versionid2'; + const version2 = keyMock.nonCurrent({ keyName, versionId: versionId2, daysEarlier: 1 }); + const contents = [version1, version2]; + backbeatMetadataProxy.listLifecycleResponse = { + contents, + isTruncated: true, + markerInfo: { keyMarker: 'key1', versionIdMarker: 'versionid2' }, + }; + + const nbRetries = 0; + return lifecycleTask.processBucketEntry(nonCurrentExpirationRule, bucketData, s3, + backbeatMetadataProxy, nbRetries, err => { + assert.ifError(err); + + kafkaEntries = []; + const nonCurrentExpirationRule2 = [{ + NoncurrentVersionExpiration: { + NoncurrentDays: 2, + NewerNoncurrentVersions: 1, + }, + ID: '456', + Prefix: '', + Status: 'Enabled', + }]; + + return lifecycleTask.processBucketEntry(nonCurrentExpirationRule2, bucketData, s3, + backbeatMetadataProxy, nbRetries, err => { + assert.ifError(err); + // test that the non-current listing is triggered + assert.strictEqual(backbeatMetadataProxy.listLifecycleType, 'noncurrent'); + + // test parameters used to list lifecycle keys + const { listLifecycleParams } = backbeatMetadataProxy; + expectNominalListingParams(bucketName, listLifecycleParams); + + // test that the entry is valid and pushed to kafka topic + assert.strictEqual(kafkaEntries.length, 2); + + const firstEntry = kafkaEntries[0]; + testKafkaEntry.expectBucketEntry(firstEntry, { + hasBeforeDate: true, + keyMarker: keyName, + versionIdMarker: versionId2, + prefix: '', + listType: 'noncurrent', + }); + + const secondEntry = kafkaEntries[1]; + testKafkaEntry.expectObjectExpirationEntry(secondEntry, { keyName, versionId }); + return done(); + }); + }); + }); }); diff --git a/tests/unit/lifecycle/LifecycleTask.spec.js b/tests/unit/lifecycle/LifecycleTask.spec.js index f580f86db..0f30ed059 100644 --- a/tests/unit/lifecycle/LifecycleTask.spec.js +++ b/tests/unit/lifecycle/LifecycleTask.spec.js @@ -1729,9 +1729,10 @@ describe('lifecycle task helper methods', () => { }); it('should clear the ncvHeap object of the listed bucket/keys', () => { + const ruleId = 'rule_name'; const rules = { - Id: 'rule_name', NoncurrentVersionExpiration: { + ID: ruleId, NoncurrentDays: 1, NewerNoncurrentVersions: 10, }, @@ -1772,16 +1773,16 @@ describe('lifecycle task helper methods', () => { assert(lct2.ncvHeap.has(b1)); assert(!lct2.ncvHeap.get(b1).has(version1.Key)); assert(!lct2.ncvHeap.get(b1).has(version2.Key)); - assert(lct2.ncvHeap.get(b1).get(version3.Key).has(rules.Id)); - assert.strictEqual(lct2.ncvHeap.get(b1).get(version3.Key).get(rules.Id).size, 1); + assert(lct2.ncvHeap.get(b1).get(version3.Key).has(ruleId)); + assert.strictEqual(lct2.ncvHeap.get(b1).get(version3.Key).get(ruleId).size, 1); assert(lct2.ncvHeap.has(b2)); assert(lct2.ncvHeap.get(b2).has(version1.Key)); - assert(lct2.ncvHeap.get(b2).get(version1.Key).has(rules.Id)); - assert.strictEqual(lct2.ncvHeap.get(b2).get(version1.Key).get(rules.Id).size, 1); + assert(lct2.ncvHeap.get(b2).get(version1.Key).has(ruleId)); + assert.strictEqual(lct2.ncvHeap.get(b2).get(version1.Key).get(ruleId).size, 1); assert(lct2.ncvHeap.get(b2).has(version2.Key)); - assert(lct2.ncvHeap.get(b2).get(version2.Key).has(rules.Id)); - assert.strictEqual(lct2.ncvHeap.get(b2).get(version2.Key).get(rules.Id).size, 1); + assert(lct2.ncvHeap.get(b2).get(version2.Key).has(ruleId)); + assert.strictEqual(lct2.ncvHeap.get(b2).get(version2.Key).get(ruleId).size, 1); }); }); @@ -1812,9 +1813,10 @@ describe('lifecycle task helper methods', () => { }); it('should clear the ncvHeap object of the listed bucket/keys', () => { + const ruleId = 'rule_name'; const rules = { - Id: 'rule_name', NoncurrentVersionExpiration: { + ID: ruleId, NoncurrentDays: 1, NewerNoncurrentVersions: 10, }, @@ -1853,11 +1855,11 @@ describe('lifecycle task helper methods', () => { assert(!lct2.ncvHeap.has(b1)); assert(lct2.ncvHeap.has(b2)); assert(lct2.ncvHeap.get(b2).has(version1.Key)); - assert(lct2.ncvHeap.get(b2).get(version1.Key).has(rules.Id)); - assert.strictEqual(lct2.ncvHeap.get(b2).get(version1.Key).get(rules.Id).size, 1); + assert(lct2.ncvHeap.get(b2).get(version1.Key).has(ruleId)); + assert.strictEqual(lct2.ncvHeap.get(b2).get(version1.Key).get(ruleId).size, 1); assert(lct2.ncvHeap.get(b2).has(version2.Key)); - assert(lct2.ncvHeap.get(b2).get(version2.Key).has(rules.Id)); - assert.strictEqual(lct2.ncvHeap.get(b2).get(version2.Key).get(rules.Id).size, 1); + assert(lct2.ncvHeap.get(b2).get(version2.Key).has(ruleId)); + assert.strictEqual(lct2.ncvHeap.get(b2).get(version2.Key).get(ruleId).size, 1); }); }); @@ -2048,4 +2050,3 @@ describe('lifecycle task helper methods', () => { }); }); }); -