-
Notifications
You must be signed in to change notification settings - Fork 334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
config consolidation/removal of tech debt #707
Changes from 1 commit
75479ec
e60dc26
98c653e
cba9029
a82ff25
03c6fc6
2771ad2
5a02931
dff31a9
bac158a
14a2def
b67d305
7c25589
b29a6eb
f2e1419
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
""" | ||
Copyright 2017-present, Airbnb Inc. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
""" | ||
from collections import defaultdict, OrderedDict | ||
import json | ||
import os | ||
|
||
|
||
class ConfigError(Exception): | ||
"""Exception class for config file errors""" | ||
|
||
|
||
def parse_lambda_arn(function_arn): | ||
"""Extract info on the current environment from the lambda function ARN | ||
|
||
Parses the invoked_function_arn from the given context object to get | ||
the name of the currently running alias (either production or development) | ||
and the name of the function. | ||
|
||
Example: | ||
arn:aws:lambda:aws-region:acct-id:function:stream_alert:production | ||
|
||
Arguments: | ||
function_arn (str): The AWS Lambda function ARN | ||
|
||
Returns: | ||
dict: | ||
{ | ||
'region': 'region_name', | ||
'account_id': 'account_id', | ||
'function_name': 'function_name', | ||
'qualifier': 'qualifier' | ||
} | ||
""" | ||
split_arn = function_arn.split(':') | ||
return { | ||
'region': split_arn[3], | ||
'account_id': split_arn[4], | ||
'function_name': split_arn[6], | ||
'qualifier': split_arn[7] if len(split_arn) == 8 else None # optional qualifier | ||
} | ||
|
||
|
||
def load_config(conf_dir='conf/', **kwargs): | ||
"""Load the configuration for StreamAlert. | ||
|
||
All configuration files live in the `conf` directory in JSON format. | ||
`sources` define a colleciton of AWS services (S3, Kinesis) supported as | ||
inputs to StreamAlert, specific entities (S3 buckets, Kinesis streams), | ||
and log types emitted from them. | ||
|
||
`logs` declare the schema for the listed log types in `sources`. Each | ||
key denotes the name of the log type, and includes 'keys' used to match | ||
rules to log fields. | ||
|
||
Arguments: | ||
conf_dir (str): [optional] Path from which to load the config | ||
|
||
Keyword Arguemnts: | ||
exclude (set): Names of config files or folders that should not be loaded | ||
include (set): Names of specific config files to only load | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Out of curiosity, what was the motivation for selectively loading parts of the config? Performance? A potential downside to this approach is having to reason about which sections of the config are available to different Lambda functions instead of being able to reason about a single config object. A potential future iteration of this (once we have a Config class) would be to lazy-load attributes. That way, callers don't need to explicitly say which config they need There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose the motivation was largely to allow for supporting what we currently do. For instance, the alert processor only loads the After this initial iteration I'm open to building it out more (and anticipate doing so). I like the idea of lazy loading and think that we can benefit a lot from that approach! |
||
validate (bool): Validate aspects of the config to check for user error | ||
|
||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If there are only 3 keyword arguments, can we list them explicitly instead of using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no real reason not to - I can make the change :) |
||
default_files = {file for file in os.listdir(conf_dir) if file.endswith('.json')} | ||
conf_files = kwargs.get('include', default_files).copy() | ||
include_clusters = 'clusters' in conf_files | ||
|
||
conf_files.intersection_update(default_files) | ||
if not (conf_files or include_clusters): | ||
raise ConfigError('No config files to load') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we add more context into this error? |
||
|
||
exclusions = kwargs.get('exclude', set()) | ||
conf_files = conf_files.difference(exclusions) | ||
|
||
config = defaultdict(dict) | ||
for name in conf_files: | ||
path = os.path.join(conf_dir, name) | ||
# we use object_pairs_hook=OrderdDict to preserve schema order for CSV/KV log types | ||
config[os.path.splitext(name)[0]] = _load_json_file(path, True) | ||
|
||
# Load the configs for clusters if it is not excluded | ||
if 'clusters' not in exclusions and not kwargs.get('include') or include_clusters: | ||
clusters = {file for file in os.listdir(os.path.join(conf_dir, 'clusters')) | ||
if file.endswith('.json')} | ||
for cluster in clusters: | ||
cluster_path = os.path.join(conf_dir, 'clusters', cluster) | ||
config['clusters'][os.path.splitext(cluster)[0]] = _load_json_file(cluster_path) | ||
|
||
if kwargs.get('validate'): | ||
_validate_config(config) | ||
|
||
return config | ||
|
||
def _load_json_file(path, ordered=False): | ||
"""Helper to return the loaded json from a given path""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: add kwargs/return/raise values in the docstring There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. didn't we say no more 'nits'? 🤣 |
||
kwargs = {'object_pairs_hook': OrderedDict if ordered else None} | ||
with open(path) as data: | ||
try: | ||
return json.load(data, **kwargs) | ||
except ValueError: | ||
raise ConfigError('Invalid JSON format for {}'.format(path)) | ||
|
||
|
||
def _validate_config(config): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably refactor this at some point, since 'logs' and 'sources's shouldn't ever be optional. It would be great to check a set of required/optional keys to ensure other arbitrary values aren't added as well into the conf. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The above
|
||
"""Validate the StreamAlert configuration contains a valid structure. | ||
|
||
Checks for `logs.json`: | ||
- each log has a schema and parser declared | ||
Checks for `sources.json` | ||
- the sources contains either kinesis or s3 keys | ||
- each sources has a list of logs declared | ||
""" | ||
# Check the log declarations | ||
if 'logs' in config: | ||
|
||
for log, attrs in config['logs'].iteritems(): | ||
if 'schema' not in attrs: | ||
raise ConfigError('The \'schema\' is missing for {}'.format(log)) | ||
|
||
if 'parser' not in attrs: | ||
raise ConfigError('The \'parser\' is missing for {}'.format(log)) | ||
|
||
# Check if the defined sources are supported and report any invalid entries | ||
if 'sources' in config: | ||
supported_sources = {'kinesis', 's3', 'sns', 'stream_alert_app'} | ||
if not set(config['sources']).issubset(supported_sources): | ||
missing_sources = supported_sources - set(config['sources']) | ||
raise ConfigError( | ||
'The \'sources.json\' file contains invalid source entries: {}. ' | ||
'The following sources are supported: {}'.format( | ||
', '.join('\'{}\''.format(source) for source in missing_sources), | ||
', '.join('\'{}\''.format(source) for source in supported_sources) | ||
) | ||
) | ||
|
||
# Iterate over each defined source and make sure the required subkeys exist | ||
for attrs in config['sources'].values(): | ||
for entity, entity_attrs in attrs.iteritems(): | ||
if 'logs' not in entity_attrs: | ||
raise ConfigError('Missing \'logs\' key for entity: {}'.format(entity)) | ||
|
||
if not entity_attrs['logs']: | ||
raise ConfigError('List of \'logs\' is empty for entity: {}'.format(entity)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optionally, we could also load
function_name
andqualifier
from other attributes of thecontext
:From: https://docs.aws.amazon.com/lambda/latest/dg/python-context-object.html