-
Notifications
You must be signed in to change notification settings - Fork 7
/
alerts.py
189 lines (159 loc) · 5.72 KB
/
alerts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# | * IBM Confidential
# | * OCO Source Materials
# | * 5737-M66
# | * © Copyright IBM Corp. 2020
# | * The source code for this program is not published or otherwise divested of its
# | * trade secrets, irrespective of what has been deposited with the U.S.
# | * Copyright Office.
# python libraries
import json
import logging
from jsonschema import validate
# iotfunctions modules
from iotfunctions.db import (Database)
# mam-sdk modules
from .utils import *
from .parseinput import *
from .apiclient import (APIClient)
from iotfunctions.util import setup_logging
setup_logging(as_log_level=logging.DEBUG, root_log_level=logging.DEBUG)
logger = logging.getLogger(__name__)
query_alerts_schema = {
"type": "object",
"properties": {
"entityTypesFilter": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"dataItem": {"type": "array"},
}
}
},
"start_ts": {"type": "string"},
"end_ts": {"type": "string"},
"groupBy": {
"type": "array",
"items": {"type": "string"}
},
"time_grain": {"type": "string"}
}
}
def get_alerts(json_payload, credentials=None):
"""
get alerts using a json payload
Uses the following APIs:
POST /api/asengine/v1/<tenant_id>/queryalertdata
:param json_payload:
```
{
"entityTypesFilter": [
{
"name": "sample_entity_type_name",
"dataItem": [
"alert_data_item_name"
]
}
],
"start_ts": "valid datetime",
"end_ts": "valid datetime",
"groupBy": [
"severity",
"status",
"entity_id"
],
"time_grain": "hour"
}
```
:param credentials: dict analytics-service dev credentials
:return: json object: list of dict with alert information
"""
# 1. INPUT CHECKING
logger.debug('Performing Input Checking')
payload = validateJSON(json_payload) # input is valid json
validate(instance=payload, schema=query_alerts_schema) # input has valid schema
# 2. API CONNECTION: GET all constants for a tenant
logger.debug('Connecting to API')
body_arguments = json.dumps(payload).encode('utf-8')
APIClient.environment_info = generate_api_environment(credentials)
# call api to retrieve alerts
response = APIClient(api_suffix="asengine",
http_method_name="POST",
endpoint_suffix="/{orgId}/queryalertdata",
body=body_arguments,
).call_api()
if response.status_code != 200:
raise Exception('API Client call failed when getting alerts')
return response.json()
def set_alert():
"""
do it one at a time?
:return:
"""
raise NotImplementedError("In future release")
# Get and set Alerts severity, state, action url
# use post query data for get - will need a json payload
# use api functions for set - the high value or low value etc
ALLOWED_STATUS_VALUES = ["new", "acknowledged", "resolved", "dismissed"]
def update_alert_status(alert_id, new_status, credentials=None):
"""
update alert status using alert id
:param alert_id str can get alert id using get_alerts
:param new_status str
allowed values for new_status: New, Acknowledged, Resolved, Dismissed
:param credentials dict analytics-service dev credentials
:return:
"""
# 1. INPUT CHECKING
logger.debug('Performing Input Checking')
if new_status.lower() not in ALLOWED_STATUS_VALUES:
raise Exception(f'Invalid value {new_status} for new_status argument')
# 2. API CONNECTION: GET all constants for a tenant
logger.debug('Connecting to API')
payload = [{
"alertId": alert_id,
"domainStatus": new_status.lower().capitalize()
}]
body_arguments = json.dumps(payload).encode('utf-8')
APIClient.environment_info = generate_api_environment(credentials)
# call api to retrieve alerts
response = APIClient(api_suffix="alerts",
http_method_name="PUT",
endpoint_suffix="/{orgId}",
body=body_arguments,
).call_api()
if response.status_code != 200:
raise Exception('API Client call failed when getting alerts')
return
ALLOWED_SEVERITY_VALUES = ["low", "medium", "medium-high", "high"]
def update_alert_severity(alert_id, new_severity, credentials=None):
"""
update alert severity using alert id
:param alert_id str can get alert id using get_alerts
:param new_severity str
allowed values for new_severity: Low, Medium, High, Medium-High
:param credentials dict analytics-service dev credentials
:return:
"""
# 1. INPUT CHECKING
logger.debug('Performing Input Checking')
if new_severity.lower() not in ALLOWED_SEVERITY_VALUES:
raise Exception(f'Invalid value {new_severity} for new_status argument')
# 2. API CONNECTION: GET all constants for a tenant
logger.debug('Connecting to API')
payload = [{
"alertId": alert_id,
"severity": new_severity.lower().capitalize()
}]
body_arguments = json.dumps(payload).encode('utf-8')
APIClient.environment_info = generate_api_environment(credentials)
# call api to retrieve alerts
response = APIClient(api_suffix="alerts",
http_method_name="PUT",
endpoint_suffix="/{orgId}",
body=body_arguments,
).call_api()
if response.status_code != 200:
raise Exception('API Client call failed when getting alerts')
return