-
Notifications
You must be signed in to change notification settings - Fork 176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add string sets to helpers #55
Changes from 1 commit
277b125
84e7d20
e22229c
a9ded8b
106ca06
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,9 @@ | ||
"""Utility functions provided to policies and rules during execution.""" | ||
from ipaddress import ip_network | ||
from typing import Any, Dict | ||
import time | ||
from typing import Any, Dict, Union, Sequence, Set | ||
|
||
import boto3 | ||
from boto3.dynamodb.conditions import Attr | ||
import botocore | ||
|
||
_RESOURCE_TABLE = None # boto3.Table resource, lazily constructed | ||
|
||
|
@@ -88,8 +87,15 @@ def is_dmz_tags(resource): | |
return resource['Tags'].get(DMZ_TAG_KEY) == DMZ_TAG_VALUE | ||
|
||
|
||
# Helper functions for accessing Dynamo key-value store. | ||
# | ||
# Keys can be any string specified by rules and policies, | ||
# values are integer counters and/or string sets. | ||
# | ||
# Use kv_table() if you want to interact with the table directly. | ||
_KV_TABLE = None | ||
_COUNT_COL = 'intCount' # name of the count column | ||
_COUNT_COL = 'intCount' | ||
_STRING_SET_COL = 'stringSet' | ||
|
||
|
||
def kv_table() -> boto3.resource: | ||
|
@@ -103,7 +109,7 @@ def kv_table() -> boto3.resource: | |
|
||
|
||
def get_counter(key: str) -> int: | ||
"""Get a counter's current value (defaulting to 0 if key does not exit).""" | ||
"""Get a counter's current value (defaulting to 0 if key does not exist).""" | ||
response = kv_table().get_item( | ||
Key={'key': key}, | ||
ProjectionExpression=_COUNT_COL, | ||
|
@@ -121,35 +127,25 @@ def increment_counter(key: str, val: int = 1) -> int: | |
Returns: | ||
The new value of the count | ||
""" | ||
table = kv_table() | ||
try: | ||
response = table.update_item( | ||
Key={'key': key}, | ||
ReturnValues='UPDATED_NEW', | ||
# You can only increment attributes which already exist | ||
ConditionExpression=Attr(_COUNT_COL).exists(), | ||
UpdateExpression='SET #col = intCount + :incr', | ||
ExpressionAttributeNames={'#col': _COUNT_COL}, | ||
ExpressionAttributeValues={':incr': val}) | ||
|
||
# Numeric values are returned as decimal.Decimal | ||
return response['Attributes'][_COUNT_COL].to_integral_value() | ||
except botocore.exceptions.ClientError as ex: | ||
if ex.response['Error']['Code'] != 'ConditionalCheckFailedException': | ||
raise | ||
|
||
# The conditional check failed, meaning this item doesn't exist yet. Add it! | ||
table.put_item(Item={'key': key, _COUNT_COL: val}) | ||
return val | ||
response = kv_table().update_item( | ||
Key={'key': key}, | ||
ReturnValues='UPDATED_NEW', | ||
UpdateExpression='ADD #col :incr', | ||
ExpressionAttributeNames={'#col': _COUNT_COL}, | ||
ExpressionAttributeValues={':incr': val}, | ||
) | ||
|
||
# Numeric values are returned as decimal.Decimal | ||
return response['Attributes'][_COUNT_COL].to_integral_value() | ||
|
||
|
||
def reset_counter(key: str) -> None: | ||
"""Reset a counter to 0.""" | ||
kv_table().put_item(Item={'key': key, _COUNT_COL: 0}) | ||
|
||
|
||
def set_counter_expiration(key: str, epoch_seconds: int) -> None: | ||
"""Configure the counter to automatically expire at the given time. | ||
def set_key_expiration(key: str, epoch_seconds: int) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Works for both string sets and counters (which can coexist under the same key), renaming for generality |
||
"""Configure the key to automatically expire at the given time. | ||
|
||
DynamoDB typically deletes expired items within 48 hours of expiration. | ||
|
||
|
@@ -160,3 +156,151 @@ def set_counter_expiration(key: str, epoch_seconds: int) -> None: | |
kv_table().update_item(Key={'key': key}, | ||
UpdateExpression='SET expiresAt = :time', | ||
ExpressionAttributeValues={':time': epoch_seconds}) | ||
|
||
|
||
def get_string_set(key: str) -> Set[str]: | ||
"""Get a string set's current value (defaulting to empty set if key does not exit).""" | ||
response = kv_table().get_item( | ||
Key={'key': key}, | ||
ProjectionExpression=_STRING_SET_COL, | ||
) | ||
return response.get('Item', {}).get(_STRING_SET_COL, set()) | ||
|
||
|
||
def put_string_set(key: str, val: Sequence[str]) -> None: | ||
"""Overwrite a string set under the given key. | ||
|
||
This is faster than (reset_string_set + add_string_set) if you know exactly what the contents | ||
of the set should be. | ||
|
||
Args: | ||
key: The name of the string set | ||
val: A list/set/tuple of strings to store | ||
""" | ||
if len(val) == 0: | ||
# Can't put an empty string set - remove it instead | ||
reset_string_set(key) | ||
else: | ||
kv_table().put_item(Item={'key': key, _STRING_SET_COL: set(val)}) | ||
|
||
|
||
def add_to_string_set(key: str, val: Union[str, Sequence[str]]) -> Set[str]: | ||
"""Add one or more strings to a set. | ||
|
||
Args: | ||
key: The name of the string set | ||
val: Either a single string or a list/tuple/set of strings to add | ||
|
||
Returns: | ||
The new value of the string set | ||
""" | ||
if isinstance(val, str): | ||
item_value = {val} | ||
else: | ||
item_value = set(val) | ||
if len(item_value) == 0: | ||
# We can't add empty sets, just return the existing value instead | ||
return get_string_set(key) | ||
|
||
response = kv_table().update_item( | ||
Key={'key': key}, | ||
ReturnValues='UPDATED_NEW', | ||
UpdateExpression='ADD #col :ss', | ||
ExpressionAttributeNames={'#col': _STRING_SET_COL}, | ||
ExpressionAttributeValues={':ss': item_value}, | ||
) | ||
return response['Attributes'][_STRING_SET_COL] | ||
|
||
|
||
def remove_from_string_set(key: str, val: Union[str, | ||
Sequence[str]]) -> Set[str]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
"""Remove one or more strings from a set. | ||
|
||
Args: | ||
key: The name of the string set | ||
val: Either a single string or a list/tuple/set of strings to remove | ||
|
||
Returns: | ||
The new value of the string set | ||
""" | ||
if isinstance(val, str): | ||
item_value = {val} | ||
else: | ||
item_value = set(val) | ||
if len(item_value) == 0: | ||
# We can't remove empty sets, just return the existing value instead | ||
return get_string_set(key) | ||
|
||
response = kv_table().update_item( | ||
Key={'key': key}, | ||
ReturnValues='UPDATED_NEW', | ||
UpdateExpression='DELETE #col :ss', | ||
ExpressionAttributeNames={'#col': _STRING_SET_COL}, | ||
ExpressionAttributeValues={':ss': item_value}, | ||
) | ||
return response['Attributes'][_STRING_SET_COL] | ||
|
||
|
||
def reset_string_set(key: str) -> None: | ||
"""Reset a string set to empty.""" | ||
kv_table().update_item( | ||
Key={'key': key}, | ||
UpdateExpression='REMOVE #col', | ||
ExpressionAttributeNames={'#col': _STRING_SET_COL}, | ||
) | ||
|
||
|
||
def _test_kv_store(): | ||
"""Integration tests which validate the functions which interact with the key-value store. | ||
|
||
Deploy Panther and then simply run "python3 panther.py" to test. | ||
""" | ||
assert increment_counter('panther', 1) == 1 | ||
assert increment_counter('labs', 3) == 3 | ||
assert increment_counter('panther', -2) == -1 | ||
assert increment_counter('panther', 0) == -1 | ||
assert increment_counter('panther', 11) == 10 | ||
|
||
assert get_counter('panther') == 10 | ||
assert get_counter('labs') == 3 | ||
assert get_counter('nonexistent') == 0 | ||
|
||
reset_counter('panther') | ||
reset_counter('labs') | ||
assert get_counter('panther') == 0 | ||
assert get_counter('labs') == 0 | ||
|
||
set_key_expiration('panther', int(time.time())) | ||
|
||
# Add elements in a list, tuple, set, or as singleton strings | ||
# The same key can be used to store int counts and string sets | ||
assert add_to_string_set('panther', ['a', 'b']) == {'a', 'b'} | ||
assert add_to_string_set('panther', ['b', 'a']) == {'a', 'b'} | ||
assert add_to_string_set('panther', 'c') == {'a', 'b', 'c'} | ||
assert add_to_string_set('panther', set()) == {'a', 'b', 'c'} | ||
assert add_to_string_set('panther', {'b', 'c', 'd'}) == {'a', 'b', 'c', 'd'} | ||
assert add_to_string_set('panther', ('d', 'e')) == {'a', 'b', 'c', 'd', 'e'} | ||
|
||
# Empty strings are allowed | ||
assert add_to_string_set('panther', '') == {'a', 'b', 'c', 'd', 'e', ''} | ||
|
||
assert get_string_set('labs') == set() | ||
assert get_string_set('panther') == {'a', 'b', 'c', 'd', 'e', ''} | ||
|
||
assert remove_from_string_set('panther', ['b', 'c', 'd']) == {'a', 'e', ''} | ||
assert remove_from_string_set('panther', '') == {'a', 'e'} | ||
assert remove_from_string_set('panther', '') == {'a', 'e'} | ||
|
||
# Overwrite contents completely | ||
put_string_set('panther', ['go', 'python']) | ||
assert get_string_set('panther') == {'go', 'python'} | ||
put_string_set('labs', []) | ||
assert get_string_set('labs') == set() | ||
|
||
reset_string_set('panther') | ||
reset_string_set('nonexistent') # no error | ||
assert get_string_set('panther') == set() | ||
|
||
|
||
if __name__ == '__main__': | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we include this and the tests in a separate file? IIRC this file is shown in the UI cc @nhakmiller There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After my most recent change this is no longer shown in the UI, it is saved as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm happy to move this somewhere else in the future, I just wanted to quickly save it somewhere. I wasn't sure how best to do an "integration test" in this repo |
||
_test_kv_store() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What check is
B101
? Could you add a comment explaining why/what is being skipped, or use a long form name if that's possible (like in pylint)?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Long-form isn't possible with bandit, but I can add a comment. It's just warning about the "assert" statements, which are ignored when compiling Python into bytecode