Skip to content

Commit

Permalink
Adding new output for PagerDuty Incidents
Browse files Browse the repository at this point in the history
  • Loading branch information
javier_marcos committed Nov 10, 2017
1 parent e75d559 commit 0842ddf
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 8 deletions.
8 changes: 8 additions & 0 deletions docs/source/rules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ req_subkeys

This feature should be avoided, but it is useful if you defined a loose schema to trade flexibility for safety; see `Schemas <conf-schemas.html#json-example-osquery>`_.

context
~~~~~~~~~~~

``context`` is an optional argument which defines an extra field of information to pass on inside
of the alert record but without affecting schemas. It can be particulary helpful to pass data to
an output and utilize within the output processing code.


Examples:

.. code-block:: python
Expand Down
3 changes: 2 additions & 1 deletion manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ def _add_output_subparser(subparsers):
# Output service options
output_parser.add_argument(
'--service',
choices=['aws-lambda', 'aws-s3', 'pagerduty', 'pagerduty-v2', 'phantom', 'slack'],
choices=['aws-lambda', 'aws-s3', 'pagerduty', 'pagerduty-v2',
'pagerduty-incident', 'phantom', 'slack'],
required=True,
help=ARGPARSE_SUPPRESS
)
Expand Down
8 changes: 7 additions & 1 deletion stream_alert/alert_processor/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def validate_alert(alert):
'log_source',
'outputs',
'source_service',
'source_entity'
'source_entity',
'context'
}
if not set(alert.keys()) == alert_keys:
LOGGER.error('The alert object must contain the following keys: %s',
Expand All @@ -53,6 +54,11 @@ def validate_alert(alert):
LOGGER.error('The alert record must be a map (dict)')
return False

elif key == 'context':
if not isinstance(alert['context'], dict):
LOGGER.error('The alert context must be a map (dict)')
return False

elif key == 'outputs':
if not isinstance(alert[key], list):
LOGGER.error(
Expand Down
166 changes: 166 additions & 0 deletions stream_alert/alert_processor/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
# pylint: disable=too-many-lines
from abc import abstractmethod
import cgi
from collections import OrderedDict
Expand Down Expand Up @@ -198,6 +199,171 @@ def dispatch(self, **kwargs):

return self._log_status(success)

@output
class PagerDutyIncidentOutput(StreamOutputBase):
"""PagerDutyIncidentOutput handles all alert dispatching for PagerDuty Incidents API v2"""
__service__ = 'pagerduty-incident'
INCIDENTS_ENDPOINT = 'incidents'
USERS_ENDPOINT = 'users'
POLICIES_ENDPOINT = 'escalation_policies'
SERVICES_ENDPOINT = 'services'

@classmethod
def _get_default_properties(cls):
"""Get the standard url used for PagerDuty Incidents API v2. This value the same for
everyone, so is hard-coded here and does not need to be configured by the user
Returns:
dict: Contains various default items for this output (ie: url)
"""
return {
'api': 'https://api.pagerduty.com'
}

def get_user_defined_properties(self):
"""Get properties that must be asssigned by the user when configuring a new PagerDuty
event output. This should be sensitive or unique information for this use-case that
needs to come from the user.
Every output should return a dict that contains a 'descriptor' with a description of the
integration being configured.
PagerDuty also requires a routing_key that represents this integration. This
value should be masked during input and is a credential requirement.
Returns:
OrderedDict: Contains various OutputProperty items
"""
return OrderedDict([
('descriptor',
OutputProperty(description='a short and unique descriptor for this '
'PagerDuty integration')),
('token',
OutputProperty(description='the token for this PagerDuty integration',
mask_input=True,
cred_requirement=True)),
('service_key',
OutputProperty(description='the service key for this PagerDuty integration',
mask_input=True,
cred_requirement=True)),
('escalation_policy',
OutputProperty(description='the name of the default escalation policy'))
])

def _check_exists_get_id(self, filter_str, target_url, headers, target_key):
"""Generic method to run a search in the PagerDuty REST API and return the id
of the first occurence from the results.
Args:
filter (str): The query filter to search for in the API
url (str): The url to send the requests to in the API
headers (dict): A dictionary containing header parameters
target_key (str): The key to extract in the returned results
Returns:
str: ID of the targeted element that matches the provided filter or
False if a matching element does not exists.
"""
params = {
'query': '"{}"'.format(filter_str)
}
resp = self._get_request(target_url, params, headers, False)
if not self._check_http_response(resp):
return False

response = resp.json()

# If there are results, get the first occurence from the list
return response and response.get(target_key)[0]['id']

def dispatch(self, **kwargs):
"""Send incident to Pagerduty Incidents API v2
Args:
**kwargs: consists of any combination of the following items:
descriptor (str): Service descriptor (ie: slack channel, pd integration)
rule_name (str): Name of the triggered rule
alert (dict): Alert relevant to the triggered rule
"""
creds = self._load_creds(kwargs['descriptor'])
if not creds:
return self._log_status(False)

# Extracting context data to assign the incident
rule_context = kwargs['alert']['context'][self.__service__]

headers = {
'Authorization': 'Token token={}'.format(creds['token']),
'Accept': 'application/vnd.pagerduty+json;version=2'
}
# Check if a user to assign the incident is provided
user_to_assign = rule_context.get('assigned_user')

# Incident assignment goes in this order:
# Provided user -> provided policy -> default policy
if user_to_assign:
users_url = os.path.join(creds['api'], self.USERS_ENDPOINT)
user_id = self._check_exists_get_id(user_to_assign,
users_url, headers, self.USERS_ENDPOINT)
if user_id:
assigned_key = 'assignments'
assigned_value = [{
'assignee' : {
'id': '',
'type': 'user_reference'}
}]
# If the user retrieval did not succeed, default to policies
else:
user_to_assign = False

if not user_to_assign and rule_context.get('assigned_policy'):
policy_to_assign = rule_context.get('assigned_policy')
else:
policy_to_assign = creds['escalation_policy']

policies_url = os.path.join(creds['api'], self.POLICIES_ENDPOINT)
policy_id = self._check_exists_get_id(policy_to_assign,
policies_url, headers, self.POLICIES_ENDPOINT)
assigned_key = 'escalation_policy'
assigned_value = {
'id': policy_id,
'type': 'escalation_policy_reference'
}

# Start preparing the incident JSON blob to be sent to the API
incident_title = 'StreamAlert Incident - Rule triggered: {}'.format(kwargs['rule_name'])
incident_body = {
'type': '',
'details': ''
}
# We need to get the service id from the API
services_url = os.path.join(creds['api'], self.SERVICES_ENDPOINT)
service_id = self._check_exists_get_id(creds['service_key'],
services_url, headers, self.SERVICES_ENDPOINT)
incident_service = {
'id': service_id,
'type': 'service_reference'
}
incident_priority = {
'id': '',
'type': 'priority_reference'
}
incident = {
'incident': {
'type': 'incident',
'title': incident_title,
'service': incident_service,
'priority': incident_priority,
'body': incident_body
},
assigned_key: assigned_value
}
incidents_url = os.path.join(creds['api'], self.INCIDENTS_ENDPOINT)
resp = self._post_request(incidents_url, incident, None, True)
success = self._check_http_response(resp)

return self._log_status(success)

@output
class PhantomOutput(StreamOutputBase):
"""PhantomOutput handles all alert dispatching for Phantom"""
Expand Down
10 changes: 7 additions & 3 deletions stream_alert/rule_processor/rules_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
'datatypes',
'logs',
'outputs',
'req_subkeys'])
'req_subkeys',
'context'])


class StreamRules(object):
Expand Down Expand Up @@ -70,6 +71,7 @@ def decorator(rule):
matchers = opts.get('matchers')
datatypes = opts.get('datatypes')
req_subkeys = opts.get('req_subkeys')
context = opts.get('context', {})

if not (logs or datatypes):
LOGGER.error(
Expand All @@ -92,7 +94,8 @@ def decorator(rule):
datatypes,
logs,
outputs,
req_subkeys)
req_subkeys,
context)
return rule
return decorator

Expand Down Expand Up @@ -387,7 +390,8 @@ def process(cls, input_payload):
'log_type': payload.type,
'outputs': rule.outputs,
'source_service': payload.service(),
'source_entity': payload.entity}
'source_entity': payload.entity,
'context': rule.context}
alerts.append(alert)

return alerts
6 changes: 6 additions & 0 deletions tests/unit/stream_alert_alert_processor/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ def get_alert(index=0):
'outputs': [
'slack:unit_test_channel'
],
'context': {
'pagerduty-incident': {
'assigned_user':'valid_user',
'assigned_policy': 'valid_policy'
}
},
'source_service': 's3',
'source_entity': 'corp-prefix.prod.cb.region',
'log_type': 'json',
Expand Down
Loading

0 comments on commit 0842ddf

Please sign in to comment.