Skip to content

Commit

Permalink
BB-451: Add fallback timestamp to delete notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
rachedbenmustapha committed Oct 13, 2023
1 parent 1875e1a commit 2578db9
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 3 deletions.
12 changes: 9 additions & 3 deletions extensions/notification/NotificationQueuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,11 @@ class NotificationQueuePopulator extends QueuePopulatorExtension {
* @param {String} key - object key
* @param {Object} value - log entry object
* @param {String} type - entry type
* @param {String} commitTimestamp - when the entry was written, used as a fallback
* if no last-modified MD attribute available
* @return {undefined}
*/
_processObjectEntry(bucket, key, value, type) {
_processObjectEntry(bucket, key, value, type, commitTimestamp) {
const versionId = value.versionId || null;
if (!isMasterKey(key)) {
return undefined;
Expand All @@ -139,14 +141,18 @@ class NotificationQueuePopulator extends QueuePopulatorExtension {
= notifConstants;
let eventType
= value[eventMessageProperty.eventType];
let dateTime
= value[eventMessageProperty.dateTime];
if (eventType === undefined && type === 'del') {
eventType = notifConstants.deleteEvent;
dateTime = commitTimestamp;
}
const ent = {
bucket,
key,
eventType,
versionId,
dateTime,
};
this.log.debug('validating entry', {
method: 'NotificationQueuePopulator._processObjectEntry',
Expand Down Expand Up @@ -181,7 +187,7 @@ class NotificationQueuePopulator extends QueuePopulatorExtension {
* @return {undefined}
*/
filter(entry) {
const { bucket, key, type } = entry;
const { bucket, key, type, timestamp } = entry;
const value = entry.value || '{}';
const { error, result } = safeJsonParse(value);
// ignore if entry's value is not valid
Expand All @@ -200,7 +206,7 @@ class NotificationQueuePopulator extends QueuePopulatorExtension {
}
// object entry processing - filter and publish
if (key && result) {
return this._processObjectEntry(bucket, key, result, type);
return this._processObjectEntry(bucket, key, result, type, timestamp);
}
return undefined;
}
Expand Down
1 change: 1 addition & 0 deletions lib/queuePopulator/LogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ class LogReader {
key: _transformKey(entry.key),
value: entry.value,
logReader: this,
timestamp: record.timestamp,
};
return _executeFilter(ext, entryToFilter, next);
}, cb);
Expand Down
104 changes: 104 additions & 0 deletions tests/unit/notification/NotificationQueuePopulator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
const assert = require('assert');
const werelogs = require('werelogs');

const NotificationQueuePopulator = require('../../../extensions/notification/NotificationQueuePopulator');

const log = new werelogs.Logger('NotificationQueuePopulator:test');
werelogs.configure({
level: 'warn',
dump: 'error',
});

const bucket = 'bucket1';
const objectKey = 'object1';
const value = {
bucket,
};
const fallbackCommitTimestamp = '2022-10-12T00:01:02.003Z';
const mdTimestamp = '2023-10-12T00:01:02.003Z';
const configTopic = 'topic1';

describe('NotificationQueuePopulator', () => {
describe('_processObjectEntry', () => {
it('should use the fallback event timestamp for deletes', () => {
const type = 'del';
let published = false;

const qp = new NotificationQueuePopulator({
logger: log,
config: {
topic: configTopic,
},
bnConfigManager: {
getConfig: () => ({
bucket,
notificationConfiguration: {
queueConfig: [
{
events: ['*'],
},
],
},
}),
},
});

qp.publish = (topic, key, data) => {
const parsed = JSON.parse(data);

assert.deepStrictEqual(topic, configTopic);
assert.deepStrictEqual(key, `${bucket}/${objectKey}`);
assert.deepStrictEqual(parsed.dateTime, fallbackCommitTimestamp);

published = true;
};

qp._processObjectEntry(bucket, objectKey, value, type, fallbackCommitTimestamp);

assert(published);
});

it('should use the event timestamp from MD if available', () => {
let published = false;

const qp = new NotificationQueuePopulator({
logger: log,
config: {
topic: configTopic,
},
bnConfigManager: {
getConfig: () => ({
bucket,
notificationConfiguration: {
queueConfig: [
{
events: ['*'],
},
],
},
}),
},
});

qp.publish = (topic, key, data) => {
const parsed = JSON.parse(data);

assert.deepStrictEqual(topic, configTopic);
assert.deepStrictEqual(key, `${bucket}/${objectKey}`);
assert.deepStrictEqual(parsed.dateTime, mdTimestamp);

published = true;
};

const valueWithMD = {
'originOp': 's3:ObjectCreated:Put',
'last-modified': mdTimestamp,
...value,
};

qp._processObjectEntry(bucket, objectKey, valueWithMD, null, fallbackCommitTimestamp);

assert(published);
});
});
});

0 comments on commit 2578db9

Please sign in to comment.