Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execute Actions on runTrigger exceptions for Bucket-Level Monitor #157

Merged
merged 1 commit into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,17 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
val triggerResult = triggerService.runBucketLevelTrigger(monitor, trigger, triggerCtx)
triggerResults[trigger.id] = triggerResult.getCombinedTriggerRunResult(triggerResults[trigger.id])

/*
* If an error was encountered when running the trigger, it means that something went wrong when parsing the input results
* for the filtered buckets returned from the pipeline bucket selector injected into the input query.
*
* In this case, the returned aggregation result buckets are empty so the categorization of the Alerts that happens below
* should be skipped/invalidated since comparing the current Alerts to an empty result will lead the execution to believe
* that all Alerts have been COMPLETED. Not doing so would mean it would not be possible to propagate the error into the
* existing Alerts in a way the user can easily view them since they will have all been moved to the history index.
*/
if (triggerResults[trigger.id]?.error != null) continue

// TODO: Should triggerResult's aggregationResultBucket be a list? If not, getCategorizedAlertsForBucketLevelMonitor can
// be refactored to use a map instead
val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(
Expand Down Expand Up @@ -418,9 +429,12 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
}
} while (monitorResult.inputResults.afterKeysPresent())

// The completed Alerts are whatever are left in the currentAlerts
// The completed Alerts are whatever are left in the currentAlerts.
// However, this operation will only be done if there was no trigger error, since otherwise the nextAlerts were not collected
// in favor of just using the currentAlerts as-is.
currentAlerts.forEach { (trigger, keysToAlertsMap) ->
nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED)?.addAll(alertService.convertToCompletedAlerts(keysToAlertsMap))
if (triggerResults[trigger.id]?.error == null)
nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED)?.addAll(alertService.convertToCompletedAlerts(keysToAlertsMap))
}

for (trigger in monitor.triggers) {
Expand All @@ -432,16 +446,20 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
// All trigger contexts and results should be available at this point since all triggers were evaluated in the main do-while loop
val triggerCtx = triggerContexts[trigger.id]!!
val triggerResult = triggerResults[trigger.id]!!
val monitorOrTriggerError = monitorResult.error ?: triggerResult.error
for (action in trigger.actions) {
if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT) {
// If the monitor or triggerResult has an error, then default to PER_EXECUTION to communicate the error.
// Typically, given the actions taken by runBucketLevelTrigger, an exception during the operation could mean
// either there were incompatible trigger conditions or there was a parsing error on the results.
if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT && monitorOrTriggerError == null) {
val perAlertActionFrequency = action.actionExecutionPolicy.actionExecutionScope as PerAlertActionScope
for (alertCategory in perAlertActionFrequency.actionableAlerts) {
val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
for (alert in alertsToExecuteActionsFor) {
if (isBucketLevelTriggerActionThrottled(action, alert)) continue

val actionCtx = getActionContextForAlertCategory(
alertCategory, alert, triggerCtx, monitorResult.error ?: triggerResult.error
alertCategory, alert, triggerCtx, monitorOrTriggerError
)
// AggregationResultBucket should not be null here
val alertBucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash()
Expand All @@ -454,9 +472,10 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
alertsToUpdate.add(alert)
}
}
} else if (action.getActionScope() == ActionExecutionScope.Type.PER_EXECUTION) {
// If all categories of Alerts are empty, there is nothing to message on and we can skip the Action
if (dedupedAlerts.isEmpty() && newAlerts.isEmpty() && completedAlerts.isEmpty()) continue
} else if (action.getActionScope() == ActionExecutionScope.Type.PER_EXECUTION || monitorOrTriggerError != null) {
// If all categories of Alerts are empty, there is nothing to message on and we can skip the Action.
// If the error is not null, this is disregarded and the Action is executed anyway so the user can be notified.
if (monitorOrTriggerError == null && dedupedAlerts.isEmpty() && newAlerts.isEmpty() && completedAlerts.isEmpty()) continue

val actionCtx = triggerCtx.copy(
dedupedAlerts = dedupedAlerts,
Expand All @@ -465,8 +484,13 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
error = monitorResult.error ?: triggerResult.error
)
val actionResult = runAction(action, actionCtx, dryrun)
// If there was an error during trigger execution then the Alerts to be updated are the current Alerts since the state
// was not changed. Otherwise, the Alerts to be updated are the sum of the deduped, new and completed Alerts.
val alertsToIterate = if (monitorOrTriggerError == null) {
(dedupedAlerts + newAlerts + completedAlerts)
} else currentAlerts[trigger]?.map { it.value } ?: listOf()
// Save the Action run result for every Alert
for (alert in (dedupedAlerts + newAlerts + completedAlerts)) {
for (alert in alertsToIterate) {
val alertBucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash()
if (!triggerResult.actionResultsMap.containsKey(alertBucketKeysHash)) {
triggerResult.actionResultsMap[alertBucketKeysHash] = mutableMapOf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ class TriggerService(val scriptService: ScriptService) {
}
BucketLevelTriggerRunResult(trigger.name, null, selectedBuckets)
} catch (e: Exception) {
logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e)
// TODO empty map here with error should be treated in the same way as QueryLevelTrigger with error running script
logger.info("Error running trigger [${trigger.id}] for monitor [${monitor.id}]", e)
BucketLevelTriggerRunResult(trigger.name, e, emptyMap())
}
}
Expand Down