Skip to content

Commit

Permalink
feat: alerts REST client (#416)
Browse files Browse the repository at this point in the history
**Issue number:** ADDON-77073

### PR Type

**What kind of change does this PR introduce?**
* [x] Feature
* [ ] Bug Fix
* [ ] Refactoring (no functional or API changes)
* [ ] Documentation Update
* [ ] Maintenance (dependency updates, CI, etc.)

## Summary

### Changes

REST client used to create, read, update and delete alerts.

### User experience

No changes to the existing code. New class added.

## Checklist

If an item doesn't apply to your changes, leave it unchecked.

* [x] I have performed a self-review of this change according to the
[development
guidelines](https://splunk.github.io/addonfactory-ucc-generator/contributing/#development-guidelines)
* [x] Tests have been added/modified to cover the changes [(testing
doc)](https://splunk.github.io/addonfactory-ucc-generator/contributing/#build-and-test)
* [x] PR title and description follows the [contributing
principles](https://splunk.github.io/addonfactory-ucc-generator/contributing/#pull-requests)

---------

Co-authored-by: sgoral <sgoral@splunk.com>
Co-authored-by: sgoral-splunk <138458044+sgoral-splunk@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 16, 2024
1 parent a0d0e03 commit 2c927a4
Show file tree
Hide file tree
Showing 4 changed files with 477 additions and 6 deletions.
1 change: 0 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

258 changes: 258 additions & 0 deletions solnlib/alerts_rest_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
#
# Copyright 2024 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
from enum import Enum
from typing import Tuple, Union, Optional

from solnlib import splunk_rest_client as rest_client


class AlertType(Enum):
CUSTOM = "custom"
NUMBER_OF_EVENTS = "number of events"
NUMBER_OF_HOSTS = "number of hosts"
NUMBER_OF_SOURCES = "number of sources"


class AlertSeverity(Enum):
DEBUG = 1
INFO = 2
WARN = 3
ERROR = 4
SEVERE = 5
FATAL = 6


class AlertComparator(Enum):
GREATER_THAN = "greater than"
LESS_THAN = "less than"
EQUAL_TO = "equal to"
RISES_BY = "rises by"
DROPS_BY = "drops by"
RISES_BY_PERC = "rises by perc"
DROPS_BY_PERC = "drops by perc"


class AlertsRestClient:
"""REST client for handling alerts."""

ENDPOINT = "/servicesNS/{owner}/{app}/saved/searches"
headers = [("Content-Type", "application/json")]

def __init__(
self,
session_key: str,
app: str,
owner: str = "nobody",
**context: dict,
):
"""Initializes AlertsRestClient.
Arguments:
session_key: Splunk access token.
app: App name of namespace.
context: Other configurations for Splunk rest client.
"""
self.session_key = session_key
self.app = app

self._rest_client = rest_client.SplunkRestClient(
self.session_key,
app=self.app,
owner=owner,
**context,
)

self.endpoint = self.ENDPOINT.format(owner=owner, app=app)

def create_search_alert(
self,
name: str,
search: str,
*,
disabled: bool = True,
description: str = "",
alert_type: AlertType = AlertType.NUMBER_OF_EVENTS,
alert_condition: str = "",
alert_comparator: AlertComparator = AlertComparator.GREATER_THAN,
alert_threshold: Union[int, float, str] = 0,
time_window: Tuple[str, str] = ("-15m", "now"),
alert_severity: AlertSeverity = AlertSeverity.WARN,
cron_schedule: str = "* * * * *",
expires: Union[int, str] = "24h",
**kwargs,
):
"""Creates a search alert in Splunk.
Arguments:
name: Name of the alert.
search: Search query for the alert.
disabled: Whether the alert is disabled. Default is True.
description: Description of the alert.
alert_type: Type of the alert (see AlertType). If it equals to CUSTOM, Splunk executes a check in
alert_condition. Otherwise, alert_comparator and alert_threshold are used.
alert_condition: Condition for the alert.
alert_comparator: Comparator for the alert. Default is GREATER_THAN.
alert_threshold: Threshold for the alert. Default is 0.
time_window: Time window for the alert. Tuple of earliest and latest time. Default is ("-15m", "now").
alert_severity: Severity level of the alert. Default is WARN.
cron_schedule: Cron schedule for the alert. Default is "* * * * *".
expires: Expiration time for the alert (i.e. how long you can access the result of triggered alert).
Default is "24h".
kwargs: Additional parameters for the alert. See Splunk documentation for more details.
"""
params = {
"output_mode": "json",
"name": name,
"search": search,
"description": description,
"alert_type": alert_type.value,
"alert_condition": alert_condition,
"alert_comparator": alert_comparator.value,
"alert_threshold": alert_threshold,
"alert.severity": str(alert_severity.value),
"is_scheduled": "1",
"cron_schedule": cron_schedule,
"dispatch.earliest_time": time_window[0],
"dispatch.latest_time": time_window[1],
"alert.digest_mode": "1",
"alert.expires": str(expires),
"disabled": "1" if disabled else "0",
"realtime_schedule": "1",
}

params.update(kwargs)

self._rest_client.post(self.endpoint, body=params, headers=self.headers)

def delete_search_alert(self, name: str):
"""Deletes a search alert in Splunk.
Arguments:
name: Name of the alert to delete.
"""
self._rest_client.delete(f"{self.endpoint}/{name}")

def get_search_alert(self, name: str):
"""Retrieves a specific search alert from Splunk.
Arguments:
name: Name of the alert to retrieve.
Returns:
A dictionary containing the alert details.
"""
response = (
self._rest_client.get(f"{self.endpoint}/{name}", output_mode="json")
.body.read()
.decode("utf-8")
)

return json.loads(response)

def get_all_search_alerts(self):
"""Retrieves all search alerts from Splunk.
Returns:
A dictionary containing all search alerts.
"""
response = (
self._rest_client.get(self.endpoint, output_mode="json")
.body.read()
.decode("utf-8")
)

return json.loads(response)

def update_search_alert(
self,
name: str,
*,
search: Optional[str] = None,
disabled: Optional[bool] = None,
description: Optional[str] = None,
alert_type: Optional[AlertType] = None,
alert_condition: Optional[str] = None,
alert_comparator: Optional[AlertComparator] = None,
alert_threshold: Optional[Union[int, float, str]] = None,
time_window: Optional[Tuple[str, str]] = None,
alert_severity: Optional[AlertSeverity] = None,
cron_schedule: Optional[str] = None,
expires: Optional[Union[int, str]] = None,
**kwargs,
):
"""Updates a search alert in Splunk.
Arguments:
name: Name of the alert to update.
search: Search query for the alert.
disabled: Whether the alert is disabled.
description: Description of the alert.
alert_type: Type of the alert (see AlertType). If it equals to CUSTOM, Splunk executes a check in
alert_condition. Otherwise, alert_comparator and alert_threshold are used.
alert_condition: Condition for the alert.
alert_comparator: Comparator for the alert.
alert_threshold: Threshold for the alert.
time_window: Time window for the alert. Tuple of earliest and latest time.
alert_severity: Severity level of the alert.
cron_schedule: Cron schedule for the alert.
expires: Expiration time for the alert.
kwargs: Additional parameters for the alert. See Splunk documentation for more details.
"""
params = {
"output_mode": "json",
}

if search:
params["search"] = search

if disabled is not None:
params["disabled"] = "1" if disabled else "0"

if description:
params["description"] = description

if alert_type:
params["alert_type"] = alert_type.value

if alert_condition:
params["alert_condition"] = alert_condition

if alert_comparator:
params["alert_comparator"] = alert_comparator.value

if alert_threshold:
params["alert_threshold"] = str(alert_threshold)

if time_window:
params["dispatch.earliest_time"] = time_window[0]
params["dispatch.latest_time"] = time_window[1]

if alert_severity:
params["alert.severity"] = str(alert_severity.value)

if cron_schedule:
params["is_scheduled"] = "1"
params["cron_schedule"] = cron_schedule

if expires:
params["alert.expires"] = str(expires)

params.update(kwargs)

self._rest_client.post(
f"{self.endpoint}/{name}", body=params, headers=self.headers
)
25 changes: 20 additions & 5 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
import os
import sys

# path manipulation get the 'splunk' library for the imports while running on GH Actions
sys.path.append(
os.path.sep.join([os.environ["SPLUNK_HOME"], "lib", "python3.7", "site-packages"])
)
# TODO: 'python3.7' needs to be updated as and when Splunk has new folder for Python.
import pytest

import context


@pytest.fixture(autouse=True, scope="session")
def setup_env():
# path manipulation get the 'splunk' library for the imports while running on GH Actions
if "SPLUNK_HOME" in os.environ:
sys.path.append(
os.path.sep.join(
[os.environ["SPLUNK_HOME"], "lib", "python3.7", "site-packages"]
)
)
# TODO: 'python3.7' needs to be updated as and when Splunk has new folder for Python.


@pytest.fixture(scope="session")
def session_key():
return context.get_session_key()
Loading

0 comments on commit 2c927a4

Please sign in to comment.