Skip to content

Commit

Permalink
BB-598 Validate Kafka messages against all bucket notification rules
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolas2bert committed Aug 2, 2024
1 parent 2ba28c0 commit 0a92417
Show file tree
Hide file tree
Showing 5 changed files with 734 additions and 32 deletions.
3 changes: 2 additions & 1 deletion extensions/notification/NotificationQueuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ class NotificationQueuePopulator extends QueuePopulatorExtension {
key,
eventType,
});
if (configUtil.validateEntry(config, ent)) {
const { isValid } = configUtil.validateEntry(config, ent);
if (isValid) {
const message
= messageUtil.addLogAttributes(value, ent);
this.log.info('publishing message', {
Expand Down
12 changes: 7 additions & 5 deletions extensions/notification/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,10 @@ class QueueProcessor extends EventEmitter {
const config = this.bnConfigManager.getConfig(bucket);
if (config && Object.keys(config).length > 0) {
const notifConfig = config.notificationConfiguration;
const destBnConf = notifConfig.queueConfig.find(
const destBnConf = notifConfig.queueConfig.filter(
c => c.queueArn.split(':').pop()
=== this.destinationId);
if (!destBnConf) {
if (!destBnConf.length) {
// skip, if there is no config for the current
// destination resource
return done();
Expand All @@ -212,7 +212,7 @@ class QueueProcessor extends EventEmitter {
const bnConfig = {
bucket,
notificationConfiguration: {
queueConfig: [destBnConf],
queueConfig: destBnConf,
},
};
this.logger.debug('validating entry', {
Expand All @@ -222,9 +222,10 @@ class QueueProcessor extends EventEmitter {
eventType,
destination: this.destinationId,
});
if (configUtil.validateEntry(bnConfig, sourceEntry)) {
const { isValid, matchingConfig } = configUtil.validateEntry(bnConfig, sourceEntry);
if (isValid) {
// add notification configuration id to the message
sourceEntry.configurationId = destBnConf.id;
sourceEntry.configurationId = matchingConfig.id;
const message
= messageUtil.transformToSpec(sourceEntry);
const msg = {
Expand All @@ -242,6 +243,7 @@ class QueueProcessor extends EventEmitter {
eventType: eventRecord.eventName,
eventTime: eventRecord.eventTime,
destination: this.destinationId,
matchingConfig,
});
return this._destination.send([msg], done);
}
Expand Down
44 changes: 21 additions & 23 deletions extensions/notification/utils/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,39 +65,37 @@ function filterConfigsByEvent(bnConfigs, event) {
/**
* Validates an entry from log against bucket notification configurations to see
* if the entry has to be published. Validations include, bucket specific
* configuration check, event type check, object name specific filter checks
* @param {Object} bnConfig - Bucket notification configuration
* @param {Object} entry - An entry from the log
* @return {boolean} true if event qualifies for notification
* configuration check, event type check, object name specific filter checks.
* @param {Object} bnConfig - Bucket notification configuration.
* @param {Object} entry - An entry from the log.
* @return {Object} Result with validity boolean and matching configuration rule.
*/
function validateEntry(bnConfig, entry) {
const { bucket, eventType } = entry;
/**
* if the event type is unavailable, it is an entry that is a
* placeholder for deletion or cleanup, these entries should be ignored and
* not be processed.
*/

if (!eventType) {
return false;
return { isValid: false, matchingConfig: null };
}
const notifConf = bnConfig.notificationConfiguration;
// check if the entry belongs to the bucket in the configuration

if (bucket !== bnConfig.bucket) {
return false;
return { isValid: false, matchingConfig: null };
}
// check if the event type matches

const notifConf = bnConfig.notificationConfiguration;
const qConfigs = filterConfigsByEvent(notifConf.queueConfig, eventType);

if (qConfigs.length > 0) {
const qConfigWithFilters
= qConfigs.filter(c => c.filterRules && c.filterRules.length > 0);
// if there are configs without filters, make the entry valid
if (qConfigs.length > qConfigWithFilters.length) {
return true;
}
return qConfigWithFilters.some(
c => validateEntryWithFilter(c.filterRules, entry));
const matchingConfig = qConfigs.find(c => {
if (!c.filterRules || c.filterRules.length === 0) {
return true;
}
return validateEntryWithFilter(c.filterRules, entry);
});

return { isValid: !!matchingConfig, matchingConfig };
}
return false;

return { isValid: false, matchingConfig: null };
}

module.exports = {
Expand Down
Loading

0 comments on commit 0a92417

Please sign in to comment.