Skip to content

Commit

Permalink
Execute Actions on runTrigger exceptions for Bucket-Level Monitor (#157)
Browse files Browse the repository at this point in the history
Signed-off-by: Mohammad Qureshi <qreshi@amazon.com>
  • Loading branch information
qreshi authored Aug 27, 2021
1 parent 0eed799 commit 2d60ede
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 10 deletions.
40 changes: 32 additions & 8 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,17 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
val triggerResult = triggerService.runBucketLevelTrigger(monitor, trigger, triggerCtx)
triggerResults[trigger.id] = triggerResult.getCombinedTriggerRunResult(triggerResults[trigger.id])

/*
* If an error was encountered when running the trigger, it means that something went wrong when parsing the input results
* for the filtered buckets returned from the pipeline bucket selector injected into the input query.
*
* In this case, the returned aggregation result buckets are empty so the categorization of the Alerts that happens below
* should be skipped/invalidated since comparing the current Alerts to an empty result will lead the execution to believe
* that all Alerts have been COMPLETED. Not doing so would mean it would not be possible to propagate the error into the
* existing Alerts in a way the user can easily view them since they will have all been moved to the history index.
*/
if (triggerResults[trigger.id]?.error != null) continue

// TODO: Should triggerResult's aggregationResultBucket be a list? If not, getCategorizedAlertsForBucketLevelMonitor can
// be refactored to use a map instead
val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(
Expand Down Expand Up @@ -418,9 +429,12 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
}
} while (monitorResult.inputResults.afterKeysPresent())

// The completed Alerts are whatever are left in the currentAlerts
// The completed Alerts are whatever are left in the currentAlerts.
// However, this operation will only be done if there was no trigger error, since otherwise the nextAlerts were not collected
// in favor of just using the currentAlerts as-is.
currentAlerts.forEach { (trigger, keysToAlertsMap) ->
nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED)?.addAll(alertService.convertToCompletedAlerts(keysToAlertsMap))
if (triggerResults[trigger.id]?.error == null)
nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED)?.addAll(alertService.convertToCompletedAlerts(keysToAlertsMap))
}

for (trigger in monitor.triggers) {
Expand All @@ -432,16 +446,20 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
// All trigger contexts and results should be available at this point since all triggers were evaluated in the main do-while loop
val triggerCtx = triggerContexts[trigger.id]!!
val triggerResult = triggerResults[trigger.id]!!
val monitorOrTriggerError = monitorResult.error ?: triggerResult.error
for (action in trigger.actions) {
if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT) {
// If the monitor or triggerResult has an error, then default to PER_EXECUTION to communicate the error.
// Typically, given the actions taken by runBucketLevelTrigger, an exception during the operation could mean
// either there were incompatible trigger conditions or there was a parsing error on the results.
if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT && monitorOrTriggerError == null) {
val perAlertActionFrequency = action.actionExecutionPolicy.actionExecutionScope as PerAlertActionScope
for (alertCategory in perAlertActionFrequency.actionableAlerts) {
val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
for (alert in alertsToExecuteActionsFor) {
if (isBucketLevelTriggerActionThrottled(action, alert)) continue

val actionCtx = getActionContextForAlertCategory(
alertCategory, alert, triggerCtx, monitorResult.error ?: triggerResult.error
alertCategory, alert, triggerCtx, monitorOrTriggerError
)
// AggregationResultBucket should not be null here
val alertBucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash()
Expand All @@ -454,9 +472,10 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
alertsToUpdate.add(alert)
}
}
} else if (action.getActionScope() == ActionExecutionScope.Type.PER_EXECUTION) {
// If all categories of Alerts are empty, there is nothing to message on and we can skip the Action
if (dedupedAlerts.isEmpty() && newAlerts.isEmpty() && completedAlerts.isEmpty()) continue
} else if (action.getActionScope() == ActionExecutionScope.Type.PER_EXECUTION || monitorOrTriggerError != null) {
// If all categories of Alerts are empty, there is nothing to message on and we can skip the Action.
// If the error is not null, this is disregarded and the Action is executed anyway so the user can be notified.
if (monitorOrTriggerError == null && dedupedAlerts.isEmpty() && newAlerts.isEmpty() && completedAlerts.isEmpty()) continue

val actionCtx = triggerCtx.copy(
dedupedAlerts = dedupedAlerts,
Expand All @@ -465,8 +484,13 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
error = monitorResult.error ?: triggerResult.error
)
val actionResult = runAction(action, actionCtx, dryrun)
// If there was an error during trigger execution then the Alerts to be updated are the current Alerts since the state
// was not changed. Otherwise, the Alerts to be updated are the sum of the deduped, new and completed Alerts.
val alertsToIterate = if (monitorOrTriggerError == null) {
(dedupedAlerts + newAlerts + completedAlerts)
} else currentAlerts[trigger]?.map { it.value } ?: listOf()
// Save the Action run result for every Alert
for (alert in (dedupedAlerts + newAlerts + completedAlerts)) {
for (alert in alertsToIterate) {
val alertBucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash()
if (!triggerResult.actionResultsMap.containsKey(alertBucketKeysHash)) {
triggerResult.actionResultsMap[alertBucketKeysHash] = mutableMapOf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ class TriggerService(val scriptService: ScriptService) {
}
BucketLevelTriggerRunResult(trigger.name, null, selectedBuckets)
} catch (e: Exception) {
logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e)
// TODO empty map here with error should be treated in the same way as QueryLevelTrigger with error running script
logger.info("Error running trigger [${trigger.id}] for monitor [${monitor.id}]", e)
BucketLevelTriggerRunResult(trigger.name, e, emptyMap())
}
}
Expand Down

0 comments on commit 2d60ede

Please sign in to comment.