Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v0.4.2 Update - New endpoints, Missing UpdateDeviceTags method #73

Merged
merged 8 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,430 changes: 2,157 additions & 273 deletions src/falconpy/_endpoint.py

Large diffs are not rendered by default.

24 changes: 11 additions & 13 deletions src/falconpy/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def generate_b64cred(client_id: str, client_secret: str) -> str:
return encoded


def service_request(caller: object = None, **kwargs): # May return dict or object datatypes
def service_request(caller: object = None, **kwargs) -> object: # May return dict or object datatypes
""" Checks for token expiration, refreshing if possible and then performs the request. """
if caller:
try:
Expand All @@ -115,8 +115,7 @@ def perform_request(method: str = "", endpoint: str = "", headers: dict = None,
params: dict = None, body: dict = None, verify: bool = True,
data=None, files: list = [],
params_validator: dict = None, params_required: dict = None,
body_validator: dict = None, body_required: dict = None): # May return dict or object datatypes
# Additional parameters MUST be propagated to service_request function
body_validator: dict = None, body_required: dict = None) -> object: # May return dict or object datatypes
"""
Leverages the requests library to perform the requested CrowdStrike OAuth2 API operation.

Expand Down Expand Up @@ -152,24 +151,22 @@ def perform_request(method: str = "", endpoint: str = "", headers: dict = None,
if params_validator:
try:
validate_payload(params_validator, params, params_required)

except ValueError as e:
returned = Result()(500, {}, {"errors": [{"message": f"{str(e)}"}], "resources": ""})
returned = generate_error_result(message=f"{str(e)}")
PERFORM = False
except TypeError as e:
returned = Result()(500, {}, {"errors": [{"message": f"{str(e)}"}], "resources": ""})
returned = generate_error_result(message=f"{str(e)}")
PERFORM = False

# Validate body payload
if body_validator:
try:
validate_payload(body_validator, body, body_required)

except ValueError as e:
returned = Result()(500, {}, {"errors": [{"message": f"{str(e)}"}], "resources": ""})
returned = generate_error_result(message=f"{str(e)}")
PERFORM = False
except TypeError as e:
returned = Result()(500, {}, {"errors": [{"message": f"{str(e)}"}], "resources": ""})
returned = generate_error_result(message=f"{str(e)}")
PERFORM = False

# Perform the request
Expand All @@ -179,16 +176,17 @@ def perform_request(method: str = "", endpoint: str = "", headers: dict = None,
response = requests.request(METHOD, endpoint, params=params, headers=headers,
json=body, data=data, files=files, verify=verify)
if response.headers.get('content-type') == "application/json":
returned = dict(Result()(response.status_code, response.headers, response.json()))
returned = Result()(response.status_code, response.headers, response.json())
else:
returned = response.content
except Exception as e:
returned = Result()(500, {}, {"errors": [{"message": f"{str(e)}"}], "resources": ""})
returned = generate_error_result(message=f"{str(e)}")
else:
returned = Result()(405, {}, {"errors": [{"message": "Invalid API service method."}], "resources": ""})
returned = generate_error_result(message="Invalid API operation specified.", code=405)

return returned


def generate_error_result(message: str = "An error has occurred. Check your payloads and try again.", code: int = 500) -> dict:
return Result()(status_code=code, headers={}, body={"errors": [{"message": f"{message}"}], "resources": ""})
""" Normalized error messaging handler. """
return Result()(status_code=code, headers={}, body={"errors": [{"message": f"{message}"}], "resources": []})
2 changes: 1 addition & 1 deletion src/falconpy/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

For more information, please refer to <https://unlicense.org>
"""
_version = '0.4.1'
_version = '0.4.2'
_maintainer = 'Joshua Hiller'
_author = 'CrowdStrike'
_author_email = 'falconpy@crowdstrike.com'
Expand Down
25 changes: 10 additions & 15 deletions src/falconpy/api_complete.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@
For more information, please refer to <https://unlicense.org>
"""
import time
from ._util import perform_request, parse_id_list, generate_b64cred
from ._util import perform_request, parse_id_list, generate_b64cred, _ALLOWED_METHODS, generate_error_result
from ._endpoint import api_endpoints
from ._result import Result


class APIHarness:
Expand Down Expand Up @@ -120,7 +119,6 @@ def command(self: object, action: str = "", parameters: dict = {}, body: dict =
CMD = [a for a in self.commands if a[0] == action]
if CMD:
FULL_URL = self.base_url+"{}".format(CMD[0][2])
# Consider calculating ? vs & character replacement
if ids:
ID_LIST = str(parse_id_list(ids)).replace(",", "&ids=")
FULL_URL = FULL_URL.format(ID_LIST)
Expand All @@ -142,18 +140,15 @@ def command(self: object, action: str = "", parameters: dict = {}, body: dict =
PARAMS = parameters
FILES = files
if self.authenticated:
returned = perform_request(method=CMD[0][1].upper(), endpoint=FULL_URL, body=BODY, data=DATA,
params=PARAMS, headers=HEADERS, files=FILES, verify=self.ssl_verify)
METHOD = CMD[0][1].upper()
if METHOD in _ALLOWED_METHODS:
returned = perform_request(method=METHOD, endpoint=FULL_URL, body=BODY, data=DATA,
params=PARAMS, headers=HEADERS, files=FILES, verify=self.ssl_verify)
else:
returned = generate_error_result(message="Invalid HTTP method specified.", code=405)
else:
returned = Result()(
status_code=401,
headers={},
body={"errors": [{"message": "Failed to issue token."}], "resources": ""}
)
returned = generate_error_result(message="Failed to issue token.", code=401)
else:
returned = Result()(
status_code=418, # Send a teapot to devs who don't specify a method
headers={},
body={"errors": [{"message": "Invalid API service method."}], "resources": ""}
)
returned = generate_error_result(message="Invalid API operation specified.", code=418)

return returned
4 changes: 2 additions & 2 deletions src/falconpy/event_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class Event_Streams(ServiceClass):
""" The only requirement to instantiate an instance of this class
is a valid token provided by the Falcon API SDK OAuth2 class.
"""
def refreshActiveStreamSession(self, parameters, partition=0):
def refreshActiveStreamSession(self: object, parameters: dict, partition: int = 0) -> dict:
""" Refresh an active event stream. Use the URL shown in a GET /sensors/entities/datafeed/v2 response. """
# [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/event-streams/refreshActiveStreamSession
FULL_URL = self.base_url+'/sensors/entities/datafeed-actions/v1/{}'.format(str(partition))
Expand All @@ -60,7 +60,7 @@ def refreshActiveStreamSession(self, parameters, partition=0):
)
return returned

def listAvailableStreamsOAuth2(self, parameters):
def listAvailableStreamsOAuth2(self: object, parameters: dict) -> dict:
""" Discover all event streams in your environment. """
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/event-streams/listAvailableStreamsOAuth2
FULL_URL = self.base_url+'/sensors/entities/datafeed/v2'
Expand Down
6 changes: 3 additions & 3 deletions src/falconpy/falconx_sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class FalconX_Sandbox(ServiceClass):
""" The only requirement to instantiate an instance of this class
is a valid token provided by the Falcon API SDK OAuth2 class.
"""
def GetArtifacts(self: object, parameters: dict) -> dict:
def GetArtifacts(self: object, parameters: dict) -> object:
""" Download IOC packs, PCAP files, and other analysis artifacts. """
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/falconx-sandbox/GetArtifacts
FULL_URL = self.base_url+'/falconx/entities/artifacts/v1'
Expand Down Expand Up @@ -159,7 +159,7 @@ def UploadSampleV2(self: object, parameters: dict, body: dict) -> dict:
)
return returned

def GetReports(self: object, ids) -> dict:
def GetReports(self: object, ids) -> object:
""" Retrieves a full sandbox report. """
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/falconx-sandbox/GetReports
ID_LIST = str(parse_id_list(ids)).replace(",", "&ids=")
Expand Down Expand Up @@ -189,7 +189,7 @@ def DeleteReport(self: object, ids) -> dict:
)
return returned

def GetSampleV2(self: object, ids, password_protect: bool = False) -> dict:
def GetSampleV2(self: object, ids, password_protect: bool = False) -> object:
""" Retrieves the file associated with the given ID (SHA256). """
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/falconx-sandbox/GetSampleV2
ID_LIST = str(parse_id_list(ids)).replace(",", "&ids=")
Expand Down
38 changes: 38 additions & 0 deletions src/falconpy/hosts.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,44 @@ def PerformActionV2(self: object, parameters: dict, body: dict, action_name: str

return returned

def UpdateDeviceTags(self: object, action_name: str, ids: list or str, tags: list or str) -> dict:
"""
allows for tagging hosts. If the tags are empty
"""
ALLOWED_ACTIONS = ["add", "remove"]
# validate action is allowed AND tags is "something"
if action_name.lower() in ALLOWED_ACTIONS and tags is not None:
FULL_URL = self.base_url + '/devices/entities/devices/tags/v1'
HEADERS = self.headers
# convert ids/tags to be a list object if not already
if isinstance(ids, str):
ids = ids.split(",")
if isinstance(tags, str):
tags = tags.split(",")
# tags must start with FalconGroupingTags, users probably won't know this so add it for them
patch_tag = []
for tag in tags:
if tag.startswith("FalconGroupingTags/"):
patch_tag.append(tag)
else:
tag_name = "FalconGroupingTags/" + tag
patch_tag.append(tag_name)
BODY = {
"action": action_name,
"device_ids": ids,
"tags": patch_tag
}
returned = service_request(caller=self,
method="PATCH",
endpoint=FULL_URL,
body=BODY,
headers=HEADERS,
verify=self.ssl_verify
)
else:
returned = generate_error_result("Invalid value specified for action_name parameter.")
return returned

def GetDeviceDetails(self: object, ids) -> dict:
""" Get details on one or more hosts by providing agent IDs (AID).
You can get a host's agent IDs (AIDs) from the /devices/queries/devices/v1 endpoint,
Expand Down
48 changes: 47 additions & 1 deletion tests/test_hosts.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
auth = Authorization.TestAuthorization()
auth.serviceAuth()
falcon = FalconHosts.Hosts(access_token=auth.token)
AllowedResponses = [200, 429] # Adding rate-limiting as an allowed response for now
AllowedResponses = [200, 202, 429] # Adding rate-limiting as an allowed response for now


class TestHosts:
Expand Down Expand Up @@ -45,6 +45,47 @@ def serviceHosts_QueryDevicesByFilter(self):
# else:
# return False

def serviceHosts_addTag(self):
id_list = []
id_list.append(
falcon.GetDeviceDetails(ids=falcon.QueryDevicesByFilter(parameters={"limit":1})["body"]["resources"][0])["body"]["resources"][0]["device_id"]
)
# test basic, id is a list, single valid tag w/o manipulation
if not falcon.UpdateDeviceTags(action_name="add", ids=id_list, tags=["FalconGroupingTags/testtag"])["status_code"] in AllowedResponses:
return False
if not falcon.UpdateDeviceTags(action_name="remove", ids=id_list, tags=["FalconGroupingTags/testtag"])["status_code"] in AllowedResponses:
return False
# id is a list, multiple tags needing manipulation
if not falcon.UpdateDeviceTags(action_name="add", ids=id_list, tags=["testtag", "tagtest", "anothertag"])["status_code"] in AllowedResponses:
return False
if not falcon.UpdateDeviceTags(action_name="remove", ids=id_list, tags=["testtag", "tagtest", "anothertag"])["status_code"] in AllowedResponses:
return False
# id is a list, mutliple tags some need manipulation
if not falcon.UpdateDeviceTags(action_name="add", ids=id_list, tags=["FalconGroupingTags/testtag", "manipulate", "FalconGroupingTags/anothertag"])["status_code"] in AllowedResponses:
return False
if not falcon.UpdateDeviceTags(action_name="remove", ids=id_list, tags=["FalconGroupingTags/testtag", "manipulate", "FalconGroupingTags/anothertag"])["status_code"] in AllowedResponses:
return False
# id is single string, single valid tag w/o manipulation
if not falcon.UpdateDeviceTags(action_name="add", ids=id_list[0], tags=["FalconGroupingTags/testtag"])["status_code"] in AllowedResponses:
return False
if not falcon.UpdateDeviceTags(action_name="remove", ids=id_list[0], tags=["FalconGroupingTags/testtag"])["status_code"] in AllowedResponses:
return False
# Force the unit test down line 84
if not falcon.UpdateDeviceTags(action_name="add", ids=id_list, tags="FalconGroupingTags/testtag")["status_code"] in AllowedResponses:
return False

return True

def serviceHosts_GenerateTagError(self):
id_list = []
id_list.append(
falcon.GetDeviceDetails(ids=falcon.QueryDevicesByFilter(parameters={"limit":1})["body"]["resources"][0])["body"]["resources"][0]["device_id"]
)
# Generate an error by sending garbage as the action_name
if not falcon.UpdateDeviceTags(action_name="KaBOOM!", ids=id_list, tags=["FalconGroupingTags/testtag"])["status_code"] == 500:
return False
return True

def serviceHosts_PerformActionV2(self):
id_list = []
id_list.append(
Expand Down Expand Up @@ -87,7 +128,12 @@ def test_QueryDevicesByFilterScroll(self):

def test_QueryDevicesByFilter(self):
assert self.serviceHosts_QueryDevicesByFilter() == True

def test_tagging(self):
assert self.serviceHosts_addTag() == True

def test_GenerateTagError(self):
assert self.serviceHosts_GenerateTagError() == True
# def test_GetDeviceDetails(self):
# assert self.serviceHosts_GetDeviceDetails() == True

Expand Down
11 changes: 11 additions & 0 deletions tests/test_uber_api_complete.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
sys.path.append(os.path.abspath('src'))
# Classes to test - manually imported from our sibling folder
from falconpy import api_complete as FalconSDK
# Import perform_request from _util so we can test generating 405's directly
from falconpy._util import perform_request

AllowedResponses = [200, 400, 415, 429, 500]

Expand Down Expand Up @@ -163,6 +165,12 @@ def uberCCHosts_GenerateActionNameError(self):
else:
return False

def uberCCAWS_GenerateInvalidOperationIDError(self):
if perform_request(method="FETCH", endpoint="/somewhere/interesting")["status_code"] == 405:
return True
else:
return False

def uberCCAWS_GenerateTokenError(self):
hold_token = falcon.token
falcon.token = "I am a bad token!"
Expand Down Expand Up @@ -236,6 +244,9 @@ def test_OverrideAndHeader(self):
def test_GenerateActionNameError(self):
assert self.uberCCHosts_GenerateActionNameError() == True

def test_GenerateInvalidOperationIDError(self):
assert self.uberCCAWS_GenerateInvalidOperationIDError() == True

def test_BadMethod(self):
assert self.uberCCAWS_BadMethod() == True

Expand Down
8 changes: 8 additions & 0 deletions util/lint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash
if ! [[ -z "$1" ]]
then
TARGET=$1
else
TARGET="."
fi
flake8 $TARGET --count --exit-zero --max-complexity=15 --max-line-length=127 --statistics
3 changes: 3 additions & 0 deletions util/run-tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash
coverage run --source src/falconpy -m pytest -s -v
coverage report