Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency improvements #48

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 31 additions & 14 deletions arbor-monitor/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from ipaddress import IPv4Address, IPv4Network, IPv6Network, ip_network, ip_address
from dis_client_sdk import DisClient
from pathlib import Path
from typing import List, Union
from typing import List, Union, Callable, Dict
from functools import partial

#disable Warning for SSL.
requests.packages.urllib3.disable_warnings()
Expand Down Expand Up @@ -72,21 +73,22 @@ async def process_sightline_webhook_notification():
logger.debug(f"Attack ID {attack_id}: Impact BPS: {impact_bps}")
logger.debug(f"Attack ID {attack_id}: Impact PPS: {impact_pps}")

response = get_src_traffic_report(attack_id)
response: requests.Response = await run_as_async(get_src_traffic_report, attack_id)
if response.status_code != 200:
msg=f"Error retrieving the source traffic report for attack {attack_id}: (HTTP Status: {response.status_code} ({response.reason})) ({response.content}))"
logger.warning(msg)
# Returning a 404 so Netscout so we can try to retrieve the report again
return jsonify({"error": msg}), 404, {'Content-Type': 'application/json'}

src_traffic_report = response.json()
src_traffic_report: Dict = response.json()

if args.dry_run:
logger.info(f"Attack ID {attack_id}: Running in DRY RUN mode - not posting/saving attack report")
else:
warn_msg = None
try:
source_ip_list = send_report_to_dis_server(attack_id, payload, src_traffic_report)
source_ip_list: List = await run_as_async(send_report_to_dis_server, attack_id, payload,
src_traffic_report)
total_reports_sent += 1
total_source_ips_reported += len(source_ip_list)
# TODO: These stats may reflect queuing - need to revisit
Expand Down Expand Up @@ -164,32 +166,35 @@ def send_report_to_dis_server(attack_id, attack_payload, src_traffic_report):
stop_timestamp = int(dateutil.parser.isoparse(stop_time).timestamp())
logger.debug(f"Attack ID {attack_id}: Start/stop timestamp: {start_timestamp}/{stop_timestamp}")

local_dis_client = DisClient(api_uri=args.report_consumer_api_uri, api_key=args.report_consumer_api_key,
staged_limit=args.max_queued_reports, http_proxy=args.http_proxy)
source_ip_list = []
try:
dis_event = dis_client.add_attack_event(start_timestamp=start_timestamp,
dis_event = local_dis_client.add_attack_event(start_timestamp=start_timestamp,
end_timestamp=stop_timestamp,
attack_type=attack_subobjects.get("misuse_types"))

# Add attributes to the attack event
dis_client.add_attribute_to_event(event_uuid=dis_event,
local_dis_client.add_attribute_to_event(event_uuid=dis_event,
name="impact_bps", enum="BPS", value=impact_bps)
dis_client.add_attribute_to_event(event_uuid=dis_event,
local_dis_client.add_attribute_to_event(event_uuid=dis_event,
name="impact_pps", enum="PPS", value=impact_pps)
dis_client.add_attribute_to_event(event_uuid=dis_event,
local_dis_client.add_attribute_to_event(event_uuid=dis_event,
name="local_attack_id", enum="BIGINT", value=attack_id)

# Add the source address info from the report to the event
source_ip_list = add_source_ips_v2(dis_client, dis_event, attack_id, src_traffic_report)
source_ip_list = add_source_ips_v2(local_dis_client, dis_event, attack_id, src_traffic_report)
logger.info(f"Attack ID {attack_id}: Found {len(source_ip_list)} source IPs")
logger.info(f"Attack ID {attack_id}: First 50 source IPs: {source_ip_list[0:50]}")
except Exception as ex:
logger.warning(f"Caught an exception adding attack report for attack ID {attack_id} ({ex})")

try:
staged_event_ids = dis_client.get_staged_event_ids()
staged_event_ids = local_dis_client.get_staged_event_ids()
logger.info(f"Attack ID {attack_id}: Staged event IDs: {staged_event_ids}")
# TODO: Add accessor for the DIS client base URL so we can log it
logger.info(f"Attack ID {attack_id}: Sending report to DIS server")
msg = dis_client.send()
msg = local_dis_client.send()
logger.info(f"Attack ID {attack_id}: Report sent/queued to DIS server ({msg})")
except Exception as ex:
logger.warning(f"Caught an exception uploading attack(s) ({ex})")
Expand All @@ -213,7 +218,7 @@ def send_event(event_object, post_url):
logger.debug("POST response: " + r.text)


def add_source_ips_v1(dis_event, attack_id):
def add_source_ips_v1(dis_client, dis_event, attack_id):
"""
Makes a request to Arbor instance for the source IP that match the Attack ID:

Expand Down Expand Up @@ -382,6 +387,18 @@ async def perform_periodic_status_reports(report_interval_mins):

# MAIN

async def run_as_async(function: Callable, *fn_args, **kwargs):
"""
Run blocking code in a separate thread and return results.
First argument is function name, all following arguments are passed as parameters to `function`

:param function: Callable
:param fn_args: Positional arguments
:param kwargs: keyword arguments
"""
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, partial(function, *fn_args, **kwargs))
return result

def get_list_of_networks(data: str) -> List[Union[IPv4Network, IPv6Network]]:
"""
Expand Down Expand Up @@ -550,10 +567,10 @@ def get_list_of_networks(data: str) -> List[Union[IPv4Network, IPv6Network]]:
if args.dry_run:
logger.info("RUNNING IN DRY-RUN MODE (not connecting/reporting to the DIS server)")
else:
dis_client = DisClient(api_uri=args.report_consumer_api_uri, api_key=args.report_consumer_api_key,
global_dis_client = DisClient(api_uri=args.report_consumer_api_uri, api_key=args.report_consumer_api_key,
staged_limit=args.max_queued_reports, http_proxy=args.http_proxy)
try:
dis_client_info = dis_client.get_info()
dis_client_info = global_dis_client.get_info()
logger.info(f"DIS client name: {dis_client_info.get('name')}")
org = dis_client_info.get("organization")
logger.info(f"DIS client organization: {org.get('name') if org else 'Unknown'}")
Expand Down