Skip to content

Commit

Permalink
[lambda][output] output processor modular updates
Browse files Browse the repository at this point in the history
[cli] restricting colon characters along with spaces in user input for service descriptors

[cli] migrating some code from the runner to the new outputs file for the cli

[rules][testing] updating rules to conform to new outputs configuration style. also updating tests to go with this change

[lambda][alert] remove encrypted_credentials folder

[tf] fix bug in streamalert.secrets iam policy

[lambda][rule] send a single alert to SNS without wrapping in a list

[lambda] bug fixes: make it work

* load the outputs.json config
* package in the conf directory
* load the conf
* read s3 buckets from the conf
* fix bugs in request helper
* more

[lambda][output] masking slack url during input and restricting the use of colons (:) in unmasked input

[lambda][output][cli] fixing various nits
  • Loading branch information
ryandeivert committed Apr 10, 2017
1 parent f95a0b8 commit 3afa054
Show file tree
Hide file tree
Showing 14 changed files with 214 additions and 186 deletions.
12 changes: 12 additions & 0 deletions conf/outputs.json
Original file line number Diff line number Diff line change
@@ -1,2 +1,14 @@
{
"aws-s3": {
"sample.bucket": "sample_bucket_arn"
},
"pagerduty": [
"sample_integration"
],
"phantom": [
"sample_integration"
],
"slack": [
"sample_channel"
]
}
20 changes: 10 additions & 10 deletions rules/sample_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# osquery invalid user
@rule(logs=['osquery'],
matchers=[],
outputs=['s3'],
outputs=['s3:sample.bucket'],
req_subkeys={'columns': ['user']})
def invalid_user(rec):
"""Catch unauthorized user logins"""
Expand All @@ -34,7 +34,7 @@ def invalid_user(rec):

@rule(logs=['osquery'],
matchers=[],
outputs=['slack'],
outputs=['slack:sample_channel'],
req_subkeys={'columns': ['host']})
def invalid_subnet(rec):
"""Catch logins from unauthorized subnets"""
Expand All @@ -56,21 +56,21 @@ def sample_json_rule(rec):
@disable
@rule(logs=['syslog_log'],
matchers=[],
outputs=['pagerduty'])
outputs=['pagerduty:sample_integration'])
def sample_syslog_rule(rec):
return rec['application'] == 'sudo'


@rule(logs=['csv_log'],
matchers=[],
outputs=['s3'])
outputs=['s3:sample.bucket'])
def sample_csv_rule(rec):
return rec['host'] == 'test-host-2'


@rule(logs=['kv_log'],
matchers=[],
outputs=['s3'])
outputs=['s3:sample.bucket'])
def sample_kv_rule(rec):
return (
rec['msg'] == 'fatal' and
Expand All @@ -80,7 +80,7 @@ def sample_kv_rule(rec):

@rule(logs=['kv_log'],
matchers=[],
outputs=['s3'])
outputs=['s3:sample.bucket'])
def sample_kv_rule_last_hour(rec):
return (
rec['type'] == 'start' and
Expand All @@ -91,7 +91,7 @@ def sample_kv_rule_last_hour(rec):

@rule(logs=['cloudtrail:v1.05'],
matchers=[],
outputs=['slack'])
outputs=['slack:sample_channel'])
def sample_cloudtrail_rule(rec):
"""Non Lambda/Kinesis service AssumedRole"""
whitelist_services = {
Expand All @@ -108,23 +108,23 @@ def sample_cloudtrail_rule(rec):

@rule(logs=['cloudwatch:ec2_event'],
matchers=[],
outputs=['s3'])
outputs=['s3:sample.bucket'])
def sample_cloudwatch_events_rule(rec):
"""Any activity on EC2"""
return rec['source'] == 'aws.ec2'


@rule(logs=['cloudwatch:cloudtrail'],
matchers=[],
outputs=['s3'])
outputs=['s3:sample.bucket'])
def sample_cloudwatch_cloudtrail_rule(rec):
"""IAM Key Decrypt operation"""
return rec['detail']['eventName'] == 'Decrypt'


@rule(logs=['cloudwatch:flow_logs'],
matchers=[],
outputs=['slack'])
outputs=['slack:sample_channel'])
def sample_cloudwatch_flog_log_rule(rec):
"""Successful SSH connection"""
return (
Expand Down

This file was deleted.

1 change: 0 additions & 1 deletion stream_alert/alert_processor/encrypted_credentials/slack

This file was deleted.

67 changes: 49 additions & 18 deletions stream_alert/alert_processor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
See the License for the specific language governing permissions and
limitations under the License.
'''

import json
import logging

Expand All @@ -23,7 +22,7 @@

logging.basicConfig()
LOGGER = logging.getLogger('StreamOutput')
LOGGER.setLevel(logging.INFO)
LOGGER.setLevel(logging.DEBUG)

def handler(event, context):
"""StreamAlert Alert Processor
Expand All @@ -40,18 +39,20 @@ def handler(event, context):
if not sns_payload:
continue

message = sns_payload['Message']
sns_message = sns_payload['Message']
try:
alert = json.loads(message)
loaded_sns_message = json.loads(sns_message)
except ValueError as err:
logging.error('an error occurred while decoding message to JSON: %s', err)
LOGGER.error('an error occurred while decoding message to JSON: %s', err)
return

if not 'default' in alert:
logging.info('malformed alert: %s', alert)
if not 'default' in loaded_sns_message:
LOGGER.error('Malformed SNS: %s', loaded_sns_message)
return

def run(message, context):
run(loaded_sns_message, context)

def run(loaded_sns_message, context):
"""Send an Alert to its described outputs.
Args:
Expand All @@ -77,31 +78,47 @@ def run(message, context):
}
}
"""
alert = message['default']
LOGGER.debug(loaded_sns_message)
alert = loaded_sns_message['default']
rule_name = alert['rule_name']

# strip out unnecessary keys and sort
alert = sort_dict(alert)

config = load_output_config()

outputs = alert['metadata']['outputs']
# Get the output configuration for this rule and send the alert to each
for output in set(outputs):
output_info = output.split(':')
service, descriptor = output_info[0], output_info[1] if len(output_info) > 1 else ""
try:
service, descriptor = output.split(':')
except ValueError:
LOGGER.error('outputs for rules must be declared with both a service and a '
'descriptor for the integration (ie: \'slack:my_channel\')')

if not service in config or not descriptor in config[service]:
LOGGER.error('The output %s does not exist!', output)
continue

region = context.invoked_function_arn.split(':')[3]
function = context.invoked_function_arn.split(':')[-1]
function_name = context.function_name

# Retrieve the proper class to handle dispatching the alerts of this services
output_dispatcher = get_output_dispatcher(service, region, function)
output_dispatcher = get_output_dispatcher(service, region, function_name, config)

if not output_dispatcher:
continue

try:
output_dispatcher.dispatch(descriptor, rule_name, alert)
except BaseException as err:
LOGGER.error('an error occurred while sending alert to %s: %s',
service, err)
# try:
LOGGER.debug('Sending alert to %s', output_dispatcher.__service__)
output_dispatcher.dispatch(
descriptor=descriptor,
rule_name=rule_name,
alert=alert
)
# except BaseException as err:
# LOGGER.error('an error occurred while sending alert to %s: %s',
# service, err)

def sort_dict(unordered_dict):
"""Recursively sort a dictionary
Expand All @@ -121,3 +138,17 @@ def sort_dict(unordered_dict):
result[key] = value

return result

def load_output_config():
"""Load the outputs configuration file from disk
Returns:
[dict] The output configuration settings
"""
with open('conf/outputs.json') as outputs:
try:
config = json.load(outputs)
except ValueError:
LOGGER.exception('the outputs.json file could not be loaded into json')

return config
52 changes: 17 additions & 35 deletions stream_alert/alert_processor/output_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
See the License for the specific language governing permissions and
limitations under the License.
'''

import json
import logging
import os
Expand All @@ -25,6 +24,7 @@
from collections import namedtuple

import boto3

from botocore.exceptions import ClientError

logging.basicConfig()
Expand Down Expand Up @@ -59,11 +59,11 @@ class StreamOutputBase(object):
"""
__metaclass__ = ABCMeta
__service__ = NotImplemented
__config_service__ = __service__

def __init__(self, region, s3_prefix):
def __init__(self, region, function_name, config):
self.region = region
self.s3_prefix = self._format_prefix(s3_prefix)
self.secrets_bucket = self._get_secrets_bucket_name(function_name)
self.config = config

@staticmethod
def _local_temp_dir():
Expand All @@ -75,7 +75,7 @@ def _local_temp_dir():
temp_dir = os.path.join(tempfile.gettempdir(), "stream_alert_secrets")

# Check if this item exists as a file, and remove it if it does
if os.path.exists(temp_dir) and not os.path.isdir(temp_dir):
if os.path.isfile(temp_dir):
os.remove(temp_dir)

# Create the folder on disk to store the credentials temporarily
Expand All @@ -93,6 +93,7 @@ def _load_creds(self, descriptor):
Returns:
[dict] the loaded credential info needed for sending alerts to this service
or None if nothing gets loaded
"""
local_cred_location = os.path.join(self._local_temp_dir(),
self.output_cred_name(descriptor))
Expand All @@ -102,7 +103,8 @@ def _load_creds(self, descriptor):
if not self._get_creds_from_s3(local_cred_location, descriptor):
return

with open(local_cred_location, 'wb') as cred_file:
# Open encrypted credential file
with open(local_cred_location, 'rb') as cred_file:
enc_creds = cred_file.read()

# Get the decrypted credential json from kms and load into dict
Expand All @@ -120,16 +122,11 @@ def _load_creds(self, descriptor):

return creds_dict

def _format_s3_bucket(self, suffix):
"""Format the s3 bucket by combining the stored qualifier with a suffix
Args:
suffix [string]: Suffix for an s3 bucket
Returns:
[string] The combined prefix and suffix
"""
return '.'.join([self.s3_prefix, suffix])
@classmethod
def _get_secrets_bucket_name(cls, function_name):
"""Returns the streamalerts secrets s3 bucket name"""
prefix = function_name.split('_')[0]
return '.'.join([prefix, 'streamalert', 'secrets'])

def _get_creds_from_s3(self, cred_location, descriptor):
"""Pull the encrypted credential blob for this service and destination from s3
Expand All @@ -144,7 +141,7 @@ def _get_creds_from_s3(self, cred_location, descriptor):
try:
client = boto3.client('s3', region_name=self.region)
with open(cred_location, 'wb') as cred_output:
client.download_fileobj(self.get_secrets_bucket_name(),
client.download_fileobj(self.secrets_bucket,
self.output_cred_name(descriptor),
cred_output)

Expand Down Expand Up @@ -212,7 +209,7 @@ def _request_helper(url, data, headers=None, verify=True):
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
request = urllib2.Request(url, data=data, headers=headers)
request = urllib2.Request(url, data=data, headers=headers or {})
resp = urllib2.urlopen(request, context=context)
return resp
except urllib2.HTTPError as err:
Expand All @@ -238,14 +235,10 @@ def _get_default_properties(cls):
information is then sent to kms for encryption and s3 for storage.
Returns:
[OrderedDict] Contains various OutputProperty items
[dict] Contains various default items for this output (ie: url)
"""
pass

def get_secrets_bucket_name(self):
"""Returns the streamalerts secrets s3 bucket name"""
return self._format_s3_bucket('streamalert.secrets')

def output_cred_name(self, descriptor):
"""Formats the output name for this credential by combining the service
and the descriptor.
Expand All @@ -264,17 +257,6 @@ def output_cred_name(self, descriptor):

return cred_name

def get_config_service(self):
"""Get the string used for saving this service to the config. AWS services
are not named the same in the config as they are in the rules processor, so
having the ability to return a string like 'aws-s3' instead of 's3' is required
Returns:
[string] Service string used for looking up info in output configuration
"""
return (self.__config_service__,
self.__service__)[self.__config_service__ == NotImplemented]

def format_output_config(self, config, props):
"""Add this descriptor to the list of descriptor this service
If the service doesn't exist, a new entry is added to an empty list
Expand All @@ -285,7 +267,7 @@ def format_output_config(self, config, props):
Returns:
[list<string>] List of descriptors for this service
"""
return config.get(self.get_config_service(), []) + [props['descriptor'].value]
return config.get(self.__service__, []) + [props['descriptor'].value]

@abstractmethod
def get_user_defined_properties(self):
Expand Down
Loading

0 comments on commit 3afa054

Please sign in to comment.