-
Notifications
You must be signed in to change notification settings - Fork 334
/
pagerduty.py
674 lines (551 loc) · 25 KB
/
pagerduty.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
"""
Copyright 2017-present, Airbnb Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from collections import OrderedDict
import os
import backoff
from stream_alert.alert_processor import LOGGER
from stream_alert.alert_processor.outputs.output_base import (
OutputDispatcher,
OutputProperty,
OutputRequestFailure,
StreamAlertOutput
)
from stream_alert.shared.backoff_handlers import (
backoff_handler,
success_handler,
giveup_handler
)
def events_v2_data(routing_key, **kwargs):
"""Helper method to generate the payload to create an event using PagerDuty Events API v2
Keyword Args:
routing_key (str): Routing key for this PagerDuty integration
descriptor (str): Service descriptor (ie: slack channel, pd integration)
rule_name (str): Name of the triggered rule
alert (dict): Alert relevant to the triggered rule
Returns:
dict: Contains JSON blob to be used as event
"""
summary = 'StreamAlert Rule Triggered - {}'.format(kwargs['rule_name'])
details = {
'rule_description': kwargs['alert']['rule_description'],
'record': kwargs['alert']['record']
}
payload = {
'summary': summary,
'source': kwargs['alert']['log_source'],
'severity': 'critical',
'custom_details': details
}
return {
'routing_key': routing_key,
'payload': payload,
'event_action': 'trigger',
'client': 'StreamAlert'
}
@StreamAlertOutput
class PagerDutyOutput(OutputDispatcher):
"""PagerDutyOutput handles all alert dispatching for PagerDuty Events API v1"""
__service__ = 'pagerduty'
@classmethod
def _get_default_properties(cls):
"""Get the standard url used for PagerDuty. This value the same for everyone, so
is hard-coded here and does not need to be configured by the user
Returns:
dict: Contains various default items for this output (ie: url)
"""
return {'url': 'https://events.pagerduty.com/generic/2010-04-15/create_event.json'}
@classmethod
def get_user_defined_properties(cls):
"""Get properties that must be asssigned by the user when configuring a new PagerDuty
output. This should be sensitive or unique information for this use-case that needs
to come from the user.
Every output should return a dict that contains a 'descriptor' with a description of the
integration being configured.
PagerDuty also requires a service_key that represnts this integration. This
value should be masked during input and is a credential requirement.
Returns:
OrderedDict: Contains various OutputProperty items
"""
return OrderedDict([
('descriptor',
OutputProperty(description='a short and unique descriptor for this '
'PagerDuty integration')),
('service_key',
OutputProperty(description='the service key for this PagerDuty integration',
mask_input=True,
cred_requirement=True))
])
def dispatch(self, **kwargs):
"""Send alert to Pagerduty
Args:
**kwargs: consists of any combination of the following items:
descriptor (str): Service descriptor (ie: slack channel, pd integration)
rule_name (str): Name of the triggered rule
alert (dict): Alert relevant to the triggered rule
"""
creds = self._load_creds(kwargs['descriptor'])
if not creds:
return self._log_status(False)
message = 'StreamAlert Rule Triggered - {}'.format(kwargs['rule_name'])
rule_desc = kwargs['alert']['rule_description']
details = {
'rule_description': rule_desc,
'record': kwargs['alert']['record']
}
data = {
'service_key': creds['service_key'],
'event_type': 'trigger',
'description': message,
'details': details,
'client': 'StreamAlert'
}
try:
success = self._post_request_retry(creds['url'], data, None, True)
except OutputRequestFailure:
success = False
return self._log_status(success)
@StreamAlertOutput
class PagerDutyOutputV2(OutputDispatcher):
"""PagerDutyOutput handles all alert dispatching for PagerDuty Events API v2"""
__service__ = 'pagerduty-v2'
@classmethod
def _get_default_properties(cls):
"""Get the standard url used for PagerDuty Events API v2. This value the same for
everyone, so is hard-coded here and does not need to be configured by the user
Returns:
dict: Contains various default items for this output (ie: url)
"""
return {'url': 'https://events.pagerduty.com/v2/enqueue'}
@classmethod
def get_user_defined_properties(cls):
"""Get properties that must be asssigned by the user when configuring a new PagerDuty
event output. This should be sensitive or unique information for this use-case that
needs to come from the user.
Every output should return a dict that contains a 'descriptor' with a description of the
integration being configured.
PagerDuty also requires a routing_key that represents this integration. This
value should be masked during input and is a credential requirement.
Returns:
OrderedDict: Contains various OutputProperty items
"""
return OrderedDict([
('descriptor',
OutputProperty(description='a short and unique descriptor for this '
'PagerDuty integration')),
('routing_key',
OutputProperty(description='the routing key for this PagerDuty integration',
mask_input=True,
cred_requirement=True))
])
def dispatch(self, **kwargs):
"""Send alert to Pagerduty
Args:
**kwargs: consists of any combination of the following items:
descriptor (str): Service descriptor (ie: slack channel, pd integration)
rule_name (str): Name of the triggered rule
alert (dict): Alert relevant to the triggered rule
"""
creds = self._load_creds(kwargs['descriptor'])
if not creds:
return self._log_status(False)
data = events_v2_data(creds['routing_key'], **kwargs)
try:
success = self._post_request_retry(creds['url'], data, None, True)
except OutputRequestFailure:
success = False
return self._log_status(success)
class PagerdutySearchDelay(Exception):
"""PagerdutyAlertDelay handles any delays looking up PagerDuty Incidents"""
@StreamAlertOutput
class PagerDutyIncidentOutput(OutputDispatcher):
"""PagerDutyIncidentOutput handles all alert dispatching for PagerDuty Incidents API v2"""
__service__ = 'pagerduty-incident'
INCIDENTS_ENDPOINT = 'incidents'
USERS_ENDPOINT = 'users'
POLICIES_ENDPOINT = 'escalation_policies'
SERVICES_ENDPOINT = 'services'
PRIORITIES_ENDPOINT = 'priorities'
BACKOFF_MAX = 3
BACKOFF_TIME = 5
def __init__(self, *args, **kwargs):
OutputDispatcher.__init__(self, *args, **kwargs)
self._base_url = None
self._headers = None
self._escalation_policy = None
@classmethod
def _get_default_properties(cls):
"""Get the standard url used for PagerDuty Incidents API v2. This value the same for
everyone, so is hard-coded here and does not need to be configured by the user
Returns:
dict: Contains various default items for this output (ie: url)
"""
return {'api': 'https://api.pagerduty.com'}
@classmethod
def get_user_defined_properties(cls):
"""Get properties that must be asssigned by the user when configuring a new PagerDuty
event output. This should be sensitive or unique information for this use-case that
needs to come from the user.
Every output should return a dict that contains a 'descriptor' with a description of the
integration being configured.
PagerDuty also requires a routing_key that represents this integration. This
value should be masked during input and is a credential requirement.
Returns:
OrderedDict: Contains various OutputProperty items
"""
return OrderedDict([
('descriptor',
OutputProperty(description='a short and unique descriptor for this '
'PagerDuty integration')),
('token',
OutputProperty(description='the token for this PagerDuty integration',
mask_input=True,
cred_requirement=True)),
('service_name',
OutputProperty(description='the service name for this PagerDuty integration',
cred_requirement=True)),
('escalation_policy',
OutputProperty(description='the name of the default escalation policy',
input_restrictions={},
cred_requirement=True)),
('email_from',
OutputProperty(description='valid user email from the PagerDuty '
'account linked to the token',
cred_requirement=True)),
('integration_key',
OutputProperty(description='the integration key for this PagerDuty integration',
cred_requirement=True))
])
@staticmethod
def _get_endpoint(base_url, endpoint):
"""Helper to get the full url for a PagerDuty Incidents endpoint.
Args:
base_url (str): Base URL for the API
endpoint (str): Endpoint that we want the full URL for
Returns:
str: Full URL of the provided endpoint
"""
return os.path.join(base_url, endpoint)
def _create_event(self, data):
"""Helper to create an event in the PagerDuty Events API v2
Args:
data (dict): JSON blob with the format of the PagerDuty Events API v2
Returns:
dict: Contains the HTTP response of the request to the API
"""
url = 'https://events.pagerduty.com/v2/enqueue'
try:
resp = self._post_request_retry(url, data, None, False)
except OutputRequestFailure:
return False
response = resp.json()
if not response:
return False
return response
@backoff.on_exception(backoff.constant,
PagerdutySearchDelay,
max_tries=BACKOFF_MAX,
interval=BACKOFF_TIME,
on_backoff=backoff_handler,
on_success=success_handler,
on_giveup=giveup_handler)
def _get_event_incident_id(self, incident_key):
"""Helper to lookup an incident using the incident_key and return the id
Args:
incident_key (str): Incident key that indentifies uniquely an incident
Returns:
str: ID of the incident after look up the incident_key
"""
params = {
'incident_key': incident_key
}
incidents_url = self._get_endpoint(self._base_url, self.INCIDENTS_ENDPOINT)
response = self._generic_api_get(incidents_url, params)
incident = response.get('incidents', [])
if not incident:
raise PagerdutySearchDelay()
return incident[0].get('id')
def _merge_incidents(self, url, to_be_merged_id):
"""Helper to merge incidents by id using the PagerDuty REST API v2
Args:
url (str): The url to send the requests to in the API
to_be_merged_id (str): ID of the incident to merge with
Returns:
dict: Contains the HTTP response of the request to the API
"""
params = {
"source_incidents": [
{
"id": to_be_merged_id,
"type": "incident_reference"
}
]
}
try:
resp = self._put_request_retry(url, params, self._headers, False)
except OutputRequestFailure:
return False
response = resp.json()
if not response:
return False
return response
def _generic_api_get(self, url, params):
"""Helper to submit generic GET requests with parameters to the PagerDuty REST API v2
Args:
url (str): The url to send the requests to in the API
Returns:
dict: Contains the HTTP response of the request to the API
"""
try:
resp = self._get_request_retry(url, params, self._headers, False)
except OutputRequestFailure:
return False
response = resp.json()
if not response:
return False
return response
def _check_exists(self, filter_str, url, target_key, get_id=True):
"""Generic method to run a search in the PagerDuty REST API and return the id
of the first occurence from the results.
Args:
filter_str (str): The query filter to search for in the API
url (str): The url to send the requests to in the API
target_key (str): The key to extract in the returned results
get_id (boolean): Whether to generate a dict with result and reference
Returns:
str: ID of the targeted element that matches the provided filter or
True/False whether a matching element exists or not.
"""
params = {
'query': filter_str
}
response = self._generic_api_get(url, params)
if not response:
return False
if not get_id:
return True
# If there are results, get the first occurence from the list
return response[target_key][0]['id'] if target_key in response else False
def _user_verify(self, user, get_id=True):
"""Method to verify the existance of an user with the API
Args:
user (str): User to query about in the API.
get_id (boolean): Whether to generate a dict with result and reference
Returns:
dict or False: JSON object be used in the API call, containing the user_id
and user_reference. False if user is not found
"""
return self._item_verify(user, self.USERS_ENDPOINT, 'user_reference', get_id)
def _policy_verify(self, policy, default_policy):
"""Method to verify the existance of a escalation policy with the API
Args:
policy (str): Escalation policy to query about in the API
default_policy (str): Escalation policy to use if the first one is not verified
Returns:
dict: JSON object be used in the API call, containing the policy_id
and escalation_policy_reference
"""
verified = self._item_verify(policy, self.POLICIES_ENDPOINT, 'escalation_policy_reference')
# If the escalation policy provided is not verified in the API, use the default
if verified:
return verified
return self._item_verify(default_policy, self.POLICIES_ENDPOINT,
'escalation_policy_reference')
def _service_verify(self, service):
"""Method to verify the existance of a service with the API
Args:
service (str): Service to query about in the API
Returns:
dict: JSON object be used in the API call, containing the service_id
and the service_reference
"""
return self._item_verify(service, self.SERVICES_ENDPOINT, 'service_reference')
def _item_verify(self, item_str, item_key, item_type, get_id=True):
"""Method to verify the existance of an item with the API
Args:
item_str (str): Service to query about in the API
item_key (str): Endpoint/key to be extracted from search results
item_type (str): Type of item reference to be returned
get_id (boolean): Whether to generate a dict with result and reference
Returns:
dict: JSON object be used in the API call, containing the item id
and the item reference, True if it just exists or False if it fails
"""
item_url = self._get_endpoint(self._base_url, item_key)
item_id = self._check_exists(item_str, item_url, item_key, get_id)
if not item_id:
LOGGER.info('%s not found in %s, %s', item_str, item_key, self.__service__)
return False
if get_id:
return {'id': item_id, 'type': item_type}
return item_id
def _priority_verify(self, context):
"""Method to verify the existance of a incident priority with the API
Args:
context (dict): Context provided in the alert record
Returns:
dict: JSON object be used in the API call, containing the priority id
and the priority reference, empty if it fails or it does not exist
"""
if not context:
return dict()
priority_name = context.get('incident_priority', False)
if not priority_name:
return dict()
priorities_url = self._get_endpoint(self._base_url, self.PRIORITIES_ENDPOINT)
try:
resp = self._get_request_retry(priorities_url, {}, self._headers, False)
except OutputRequestFailure:
return dict()
response = resp.json()
if not response:
return dict()
priorities = response.get('priorities', [])
if not priorities:
return dict()
# If the requested priority is in the list, get the id
priority_id = next(
(item for item in priorities if item["name"] == priority_name), {}).get('id', False)
# If the priority id is found, compose the JSON
if priority_id:
return {'id': priority_id, 'type': 'priority_reference'}
return dict()
def _incident_assignment(self, context):
"""Method to determine if the incident gets assigned to a user or an escalation policy
Args:
context (dict): Context provided in the alert record
Returns:
tuple: assigned_key (str), assigned_value (dict to assign incident to an escalation
policy or array of dicts to assign incident to users)
"""
# Check if a user to assign the incident is provided
user_to_assign = context.get('assigned_user', False)
# If provided, verify the user and get the id from API
if user_to_assign:
user_assignee = self._user_verify(user_to_assign)
# User is verified, return tuple
if user_assignee:
return 'assignments', [{'assignee': user_assignee}]
# If escalation policy was not provided, use default one
policy_to_assign = context.get('assigned_policy', self._escalation_policy)
# Verify escalation policy, return tuple
return 'escalation_policy', self._policy_verify(policy_to_assign, self._escalation_policy)
def _add_incident_note(self, incident_id, note):
"""Method to add a text note to the provided incident id
Args:
incident_id (str): ID of the incident to add the note to
Returns:
str: ID of the note after being added to the incident or False if it fails
"""
notes_path = '{}/{}/notes'.format(self.INCIDENTS_ENDPOINT, incident_id)
incident_notes_url = self._get_endpoint(self._base_url, notes_path)
data = {
'note': {
'content': note
}
}
try:
resp = self._post_request_retry(incident_notes_url, data, self._headers, True)
except OutputRequestFailure:
return False
response = resp.json()
if not response:
return False
note_rec = response.get('note', {})
return note_rec.get('id', False)
def dispatch(self, **kwargs):
"""Send incident to Pagerduty Incidents API v2
Keyword Args:
**kwargs: consists of any combination of the following items:
descriptor (str): Service descriptor (ie: slack channel, pd integration)
rule_name (str): Name of the triggered rule
alert (dict): Alert relevant to the triggered rule
alert['context'] (dict): Provides user or escalation policy
"""
creds = self._load_creds(kwargs['descriptor'])
if not creds:
return self._log_status(False)
# Cache base_url
self._base_url = creds['api']
# Preparing headers for API calls
self._headers = {
'Authorization': 'Token token={}'.format(creds['token']),
'Accept': 'application/vnd.pagerduty+json;version=2'
}
# Get user email to be added as From header and verify
user_email = creds['email_from']
if not self._user_verify(user_email, False):
LOGGER.error('Could not verify header From: %s, %s', user_email, self.__service__)
return self._log_status(False)
# Add From to the headers after verifying
self._headers['From'] = user_email
# Cache default escalation policy
self._escalation_policy = creds['escalation_policy']
# Extracting context data to assign the incident
rule_context = kwargs['alert'].get('context', {})
if rule_context:
rule_context = rule_context.get(self.__service__, {})
# Use the priority provided in the context, use it or the incident will be low priority
incident_priority = self._priority_verify(rule_context)
# Incident assignment goes in this order:
# Provided user -> provided policy -> default policy
assigned_key, assigned_value = self._incident_assignment(rule_context)
# Start preparing the incident JSON blob to be sent to the API
incident_title = 'StreamAlert Incident - Rule triggered: {}'.format(kwargs['rule_name'])
incident_body = {
'type': 'incident_body',
'details': kwargs['alert']['rule_description']
}
# We need to get the service id from the API
incident_service = self._service_verify(creds['service_name'])
incident_data = {
'incident': {
'type': 'incident',
'title': incident_title,
'service': incident_service,
'priority': incident_priority,
'body': incident_body,
assigned_key: assigned_value
}
}
incidents_url = self._get_endpoint(self._base_url, self.INCIDENTS_ENDPOINT)
try:
incident = self._post_request_retry(incidents_url, incident_data, self._headers, True)
except OutputRequestFailure:
incident = False
if not incident:
LOGGER.error('Could not create main incident, %s', self.__service__)
return self._log_status(False)
# Extract the json blob from the response, returned by self._post_request_retry
incident_json = incident.json()
if not incident_json:
return self._log_status(False)
# Extract the incident id from the incident that was just created
incident_id = incident_json.get('incident', {}).get('id')
# Create alert to hold all the incident details
event_data = events_v2_data(creds['integration_key'], **kwargs)
event = self._create_event(event_data)
if not event:
LOGGER.error('Could not create incident event, %s', self.__service__)
return self._log_status(False)
# Lookup the incident_key returned as dedup_key to get the incident id
incident_key = event.get('dedup_key')
if not incident_key:
LOGGER.error('Could not get incident key, %s', self.__service__)
return self._log_status(False)
# Keep that id to be merged later with the created incident
event_incident_id = self._get_event_incident_id(incident_key)
# Merge the incident with the event, so we can have a rich context incident
# assigned to a specific person, which the PagerDuty REST API v2 does not allow
merging_url = '{}/{}'.format(incidents_url, incident_id)
merged = self._merge_incidents(merging_url, event_incident_id)
return self._log_status(merged)