diff --git a/burp_probe/models.py b/burp_probe/models.py index 4f69efe..ebac88c 100644 --- a/burp_probe/models.py +++ b/burp_probe/models.py @@ -1,7 +1,7 @@ from burp_probe import db, bcrypt from burp_probe.constants import ScanStates -from burp_probe.services.burp import BurpProApi -from burp_probe.utilities import get_guid, get_current_utc_time, get_local_from_utc, BurpScanParser +from burp_probe.services.burp import BurpProApi, BurpScanParser +from burp_probe.utilities import get_guid, get_current_utc_time from sqlalchemy.orm import Mapped, mapped_column, relationship import binascii import json diff --git a/burp_probe/schemas.py b/burp_probe/schemas.py index c1f279e..e6906a4 100644 --- a/burp_probe/schemas.py +++ b/burp_probe/schemas.py @@ -1,5 +1,6 @@ from burp_probe.models import Node, Scan -from marshmallow import Schema, fields, pre_load, validate, validates, validates_schema, ValidationError +from marshmallow import Schema, fields, validate, validates, validates_schema, ValidationError +import re # region form validation schemas @@ -39,13 +40,19 @@ def name_is_unique(self, data, **kwargs): class ScanFormSchema(Schema): name = fields.Str(required=True) description = fields.Str() - credentials = fields.Str(validate=validate.Regexp(r'[^:]:[^:]')) + credentials = fields.Str() configurations = fields.Str() targets = fields.Str(required=True) scope_includes = fields.Str(required=True) scope_excludes = fields.Str() node = fields.Str(required=True) + @validates('credentials') + def credentials_are_valid(self, value): + for line in value.split('\n'): + if not re.match(r'^[^:]+:[^:]+$', line): + raise ValidationError('String does not match expected pattern.') + @validates('name') def name_is_unique(self, value): if Scan.query.filter_by(name=value).first(): diff --git a/burp_probe/services/burp.py b/burp_probe/services/burp.py index dcc62fe..26c6fe9 100644 --- a/burp_probe/services/burp.py +++ b/burp_probe/services/burp.py @@ -1,3 +1,8 @@ +from burp_probe.utilities import get_local_from_utc +from datetime import timedelta +import base64 +import html +import humanize import json import logging import requests @@ -68,3 +73,314 @@ def is_alive(self): except requests.exceptions.RequestException as e: self.logger.debug(f"Burp Node Test Failure: {e}") return False + + +class BurpScanBuilder: + + def __init__(self, callback_url, credentials, configurations, scope_includes, scope_excludes, target_urls): + self.raw_callback_url = callback_url + self.raw_credentials = credentials + self.raw_configurations = configurations + self.raw_scope_includes = scope_includes + self.raw_scope_excludes = scope_excludes + self.raw_target_urls = target_urls + + @property + def callback_url(self): + callback_url = { + 'url': self.raw_callback_url, + } + return callback_url + + @property + def credentials(self): + credentials = [] + for credential in self.raw_credentials.split('\n'): + username, password = [w.strip() for w in credential.split(':', 1)] + c = { + 'password': password, + 'type': 'UsernameAndPasswordLogin', + 'username': username, + } + credentials.append(c) + return credentials + + @property + def configurations(self): + configurations = [] + for configuration in self.raw_configurations.split('\n'): + c = { + 'name': configuration, + 'type': 'NamedConfiguration' + } + configurations.append(c) + return configurations + + @property + def scope_includes(self): + scope_includes = [] + for scope_include in self.raw_scope_includes.split('\n'): + c = { + 'rule': scope_include + } + scope_includes.append(c) + return scope_includes + + @property + def scope_excludes(self): + scope_excludes = [] + for scope_exclude in self.raw_scope_excludes.split('\n'): + c = { + 'rule': scope_exclude + } + scope_excludes.append(c) + return scope_excludes + + @property + def scope(self): + scope = {} + if self.raw_scope_includes: + scope['include'] = self.scope_includes + if self.raw_scope_excludes: + scope['exclude'] = self.scope_excludes + if scope: + scope['type'] = 'SimpleScope' + return scope + + @property + def target_urls(self): + target_urls = [] + for target_url in self.raw_target_urls.split('\n'): + target_urls.append(target_url) + return target_urls + + @property + def config_as_json(self): + scan_config = {} + if self.raw_callback_url: + scan_config['scan_callback'] = self.callback_url + if self.raw_credentials: + scan_config['application_logins'] = self.credentials + if self.raw_configurations: + scan_config['scan_configurations'] = self.configurations + if self.raw_scope_includes or self.raw_scope_excludes: + scan_config['scope'] = self.scope + if self.raw_target_urls: + scan_config['urls'] = self.target_urls + return scan_config + + +class BurpScanParser: + + dtg_format = "%Y-%m-%d %H:%M:%S" + time_format = '%-I:%M %p' + severity_weight = {'high': 4, 'medium': 3, 'low': 2, 'info': 1} + + def __init__(self, scan): + self.scan = scan + self.result = scan.result_as_json + self.config = scan.config_as_json + + @property + def raw_start_time(self): + return get_local_from_utc(self.scan.created) + + @property + def raw_end_time(self): + if not self.result: + return self.raw_start_time + elapsed = self.result['scan_metrics']['total_elapsed_time'] + return get_local_from_utc(self.scan.created) + timedelta(seconds=elapsed) + + @property + def raw_duration(self): + return self.raw_end_time - self.raw_start_time + + @property + def start_time(self): + return f"{humanize.naturaldate(self.raw_start_time)}, at {self.raw_start_time.strftime(self.time_format)}" + + @property + def end_time(self): + if not self.scan.is_dead: + return self.scan.status + return f"{humanize.naturaldate(self.raw_end_time)}, at {self.raw_end_time.strftime(self.time_format)}" + + @property + def duration(self): + return humanize.precisedelta(self.raw_duration, minimum_unit="seconds") + + @property + def issue_count(self): + if not self.result: + return 0 + return self.result['scan_metrics']['issue_events'] + + @property + def issue_events(self): + if not self.result: + return [] + return self.result['issue_events'] + + @property + def organized_issue_events(self): + sorted_issue_events = sorted(self.issue_events, key=lambda x: self.severity_weight[x['issue']['severity']], reverse=True) + return self.organize_issue_events_by_type(sorted_issue_events) + + def organize_issue_events_by_type(self, issue_events): + organized_issue_events = [] + for issue_event in issue_events: + organized_issue_event = next((s for s in organized_issue_events if s['type_index'] == issue_event['issue']['type_index']), None) + if not organized_issue_event: + organized_issue_event = {'type_index': issue_event['issue']['type_index'], 'issue_events': []} + organized_issue_events.append(organized_issue_event) + organized_issue_event['issue_events'].append(issue_event) + return organized_issue_events + + def organize_issue_events_by_severity(self, issue_events): + organized_issue_events = [] + for issue_event in issue_events: + organized_issue_event = next((s for s in organized_issue_events if s['severity'] == issue_event['issue']['severity']), None) + if not organized_issue_event: + organized_issue_event = {'severity': issue_event['issue']['severity'], 'issue_events': []} + organized_issue_events.append(organized_issue_event) + organized_issue_event['issue_events'].append(issue_event) + return organized_issue_events + + @property + def issues_by_severity(self): + issues = {'high': [], 'medium': [], 'low': [], 'info': []} + for issue in self.issue_events: + issues[issue['issue']['severity']].append(issue) + return issues + + +class BurpIssueParser: + + def __init__(self, issue_event): + self.logger = logging.getLogger('burp_probe.burp_issue_parser') + self.issue_event = issue_event + + @property + def exhibits(self): + exhibits = [] + for evidence in self.issue_event['issue']['evidence']: + exhibits.extend(self.process_evidence(evidence)) + return exhibits + + def process_evidence(self, evidence): + func = getattr(self, f"process_{evidence['type']}", None) + if func: + return func(evidence) + self.logger.debug(f"Unidentified evidence type: {evidence['type']}") + return [] + + def process_FirstOrderEvidence(self, evidence): + return self.process_message(evidence['request_response']) + + def process_DiffableEvidence(self, evidence): + exhibits = [] + for instance in ['first_evidence', 'second_evidence']: + exhibits.extend(self.process_evidence(evidence[instance])) + return exhibits + + def process_TimingBasedEvidence(self, evidence): + return self.process_evidence(evidence['evidence']) + + def process_InformationListEvidence(self, evidence): + return self.process_message(evidence['request_response']) + + def process_CollaboratorEvidence(self, evidence): + exhibits = [] + exhibits.extend(self.process_message(evidence['request_response'])) + if http_event := evidence.get('http_event'): + exhibits.append({'title': f"Collaborator HTTP Interaction", 'content': http_event['description']}) + for exhibit in self.process_message(http_event['request_response']): + exhibit['title'] = f"{exhibit['title']} (Collaborator)" + exhibits.append(exhibit) + if dns_event := evidence.get('dns_event'): + exhibits.append({'title': f"Collaborator DNS Interaction", 'content': dns_event['description']}) + # have never seen a `smtp` event, so this will likely need to be updated at some point to include more detail + if smtp_event := evidence.get('smtp_event'): + exhibits.append({'title': f"Collaborator SMTP Interaction", 'content': smtp_event['description']}) + return exhibits + + def process_StoredEvidence(self, evidence): + exhibits = [] + for exhibit in self.process_message(evidence['originating_request_response']): + exhibit['title'] = f"{exhibit['title']} (Injection)" + exhibits.append(exhibit) + for exhibit in self.process_message(evidence['retrieval_request_response']): + exhibit['title'] = f"{exhibit['title']} (Retrieval)" + exhibits.append(exhibit) + return exhibits + + def process_DynamicJavascriptAnalysisEvidence(self, evidence): + exhibits = [] + exhibits.extend(self.process_evidence(evidence['composable_evidence'])) + content = '' + if value := evidence.get('source_caption'): + content += f"

Source:

\n{self.codify(html.escape(value))}" + if value := evidence.get('sink_caption'): + content += f"

Sink:

\n{self.codify(html.escape(value))}" + if value := evidence.get('source_stack_trace'): + content += f"

Source Stack Trace:

\n{self.codify(html.escape(value))}" + if value := evidence.get('sink_stack_trace'): + content += f"

Sink Stack Trace:

\n{self.codify(html.escape(value))}" + if value := evidence.get('event_listener_stack_trace'): + content += f"

Event Listener Stack Trace:

\n{self.codify(html.escape(value))}" + if value := evidence.get('source_value'): + content += f"

Source Value:

\n{self.codify(html.escape(value))}" + if value := evidence.get('sink_value'): + content += f"

Sink Value:

\n{self.codify(html.escape(value))}" + if value := evidence.get('origin'): + content += f"

Origin:

\n{self.codify(html.escape(value))}" + if value := evidence.get('origin_checked'): + content += f"

Origin Checked:

\n{self.codify(html.escape(value))}" + if value := evidence.get('event_handler_data'): + content += f"

Event Handler Data:

\n{self.codify(html.escape(value))}" + if value := evidence.get('event_handler_data_type'): + content += f"

Event Handler Data_type:

\n{self.codify(html.escape(value))}" + if value := evidence.get('event_handler_modified_data'): + content += f"

Event Handler Modified_data:

\n{self.codify(html.escape(value))}" + if value := evidence.get('source_element_id'): + content += f"

Source Element ID:

\n{self.codify(html.escape(value))}" + if value := evidence.get('source_element_name'): + content += f"

Source Element Name:

\n{self.codify(html.escape(value))}" + if value := evidence.get('event_fired_event_name'): + content += f"

Event Fired Name:

\n{self.codify(html.escape(value))}" + if value := evidence.get('event_fired_element_id'): + content += f"

Event Fired Element ID:

\n{self.codify(html.escape(value))}" + if value := evidence.get('event_fired_element_name'): + content += f"

Event Fired Element Name:

\n{self.codify(html.escape(value))}" + if value := evidence.get('event_fired_outer_html'): + content += f"

Event Fired HTML:

\n{self.codify(html.escape(value))}" + exhibits.append({'title': f"Dynamic Analysis", 'content': content}) + return exhibits + + def process_message(self, message): + exhibits = [] + request_segments = message['request'] + exhibits.append({'title': f"Request", 'content': self.codify(self.process_segments(request_segments))}) + response_segments = message['response'] + exhibits.append({'title': f"Response", 'content': self.codify(self.process_segments(response_segments))}) + return exhibits + + def process_segments(self, segments): + content = '' + for segment in segments: + if segment['type'] == 'DataSegment': + content += self.decode_segment_data(segment['data']) + elif segment['type'] == 'HighlightSegment': + content += f"{self.decode_segment_data(segment['data'])}" + elif segment['type'] == 'SnipSegment': + content += '\n\n**snipped**\n\n' + else: + self.logger.debug(f"Unidentified request segment type: {segment['type']}") + return content + + def codify(self, s): + return f"
{s}
" + + def decode_segment_data(self, data): + return html.escape(base64.b64decode(data).decode()) diff --git a/burp_probe/utilities.py b/burp_probe/utilities.py index 047a294..d30de5c 100644 --- a/burp_probe/utilities.py +++ b/burp_probe/utilities.py @@ -1,8 +1,4 @@ -from datetime import datetime, timezone, timedelta -import base64 -import html -import humanize -import logging +from datetime import datetime, timezone import uuid def get_current_utc_time(): @@ -13,268 +9,3 @@ def get_local_from_utc(dtg): def get_guid(): return str(uuid.uuid4()) - - -class BurpScanParser: - - dtg_format = "%Y-%m-%d %H:%M:%S" - time_format = '%-I:%M %p' - severity_weight = {'high': 4, 'medium': 3, 'low': 2, 'info': 1} - - def __init__(self, scan): - self.scan = scan - self.result = scan.result_as_json - self.config = scan.config_as_json - - @property - def raw_start_time(self): - return get_local_from_utc(self.scan.created) - - @property - def raw_end_time(self): - if not self.result: - return self.raw_start_time - elapsed = self.result['scan_metrics']['total_elapsed_time'] - return get_local_from_utc(self.scan.created) + timedelta(seconds=elapsed) - - @property - def raw_duration(self): - return self.raw_end_time - self.raw_start_time - - @property - def start_time(self): - return f"{humanize.naturaldate(self.raw_start_time)}, at {self.raw_start_time.strftime(self.time_format)}" - - @property - def end_time(self): - if not self.scan.is_dead: - return self.scan.status - return f"{humanize.naturaldate(self.raw_end_time)}, at {self.raw_end_time.strftime(self.time_format)}" - - @property - def duration(self): - return humanize.precisedelta(self.raw_duration, minimum_unit="seconds") - - @property - def issue_count(self): - if not self.result: - return 0 - return self.result['scan_metrics']['issue_events'] - - @property - def issue_events(self): - if not self.result: - return [] - return self.result['issue_events'] - - @property - def organized_issue_events(self): - sorted_issue_events = sorted(self.issue_events, key=lambda x: self.severity_weight[x['issue']['severity']], reverse=True) - return self.organize_issue_events_by_type(sorted_issue_events) - - def organize_issue_events_by_type(self, issue_events): - organized_issue_events = [] - for issue_event in issue_events: - organized_issue_event = next((s for s in organized_issue_events if s['type_index'] == issue_event['issue']['type_index']), None) - if not organized_issue_event: - organized_issue_event = {'type_index': issue_event['issue']['type_index'], 'issue_events': []} - organized_issue_events.append(organized_issue_event) - organized_issue_event['issue_events'].append(issue_event) - return organized_issue_events - - def organize_issue_events_by_severity(self, issue_events): - organized_issue_events = [] - for issue_event in issue_events: - organized_issue_event = next((s for s in organized_issue_events if s['severity'] == issue_event['issue']['severity']), None) - if not organized_issue_event: - organized_issue_event = {'severity': issue_event['issue']['severity'], 'issue_events': []} - organized_issue_events.append(organized_issue_event) - organized_issue_event['issue_events'].append(issue_event) - return organized_issue_events - - @property - def issues_by_severity(self): - issues = {'high': [], 'medium': [], 'low': [], 'info': []} - for issue in self.issue_events: - issues[issue['issue']['severity']].append(issue) - return issues - - -class BurpIssueParser: - - def __init__(self, issue_event): - self.logger = logging.getLogger('burp_probe.burp_issue_parser') - self.issue_event = issue_event - - @property - def exhibits(self): - exhibits = [] - for evidence in self.issue_event['issue']['evidence']: - exhibits.extend(self.process_evidence(evidence)) - return exhibits - - def process_evidence(self, evidence): - func = getattr(self, f"process_{evidence['type']}", None) - if func: - return func(evidence) - self.logger.debug(f"Unidentified evidence type: {evidence['type']}") - return [] - - def process_FirstOrderEvidence(self, evidence): - return self.process_message(evidence['request_response']) - - def process_DiffableEvidence(self, evidence): - exhibits = [] - for instance in ['first_evidence', 'second_evidence']: - exhibits.extend(self.process_evidence(evidence[instance])) - return exhibits - - def process_TimingBasedEvidence(self, evidence): - return self.process_evidence(evidence['evidence']) - - def process_InformationListEvidence(self, evidence): - return self.process_message(evidence['request_response']) - - def process_CollaboratorEvidence(self, evidence): - exhibits = [] - exhibits.extend(self.process_message(evidence['request_response'])) - if http_event := evidence.get('http_event'): - exhibits.append({'title': f"Collaborator HTTP Interaction", 'content': http_event['description']}) - for exhibit in self.process_message(http_event['request_response']): - exhibit['title'] = f"{exhibit['title']} (Collaborator)" - exhibits.append(exhibit) - if dns_event := evidence.get('dns_event'): - exhibits.append({'title': f"Collaborator DNS Interaction", 'content': dns_event['description']}) - # have never seen a `smtp` event, so this will likely need to be updated at some point to include more detail - if smtp_event := evidence.get('smtp_event'): - exhibits.append({'title': f"Collaborator SMTP Interaction", 'content': smtp_event['description']}) - return exhibits - - def process_StoredEvidence(self, evidence): - exhibits = [] - for exhibit in self.process_message(evidence['originating_request_response']): - exhibit['title'] = f"{exhibit['title']} (Injection)" - exhibits.append(exhibit) - for exhibit in self.process_message(evidence['retrieval_request_response']): - exhibit['title'] = f"{exhibit['title']} (Retrieval)" - exhibits.append(exhibit) - return exhibits - - def process_DynamicJavascriptAnalysisEvidence(self, evidence): - exhibits = [] - exhibits.extend(self.process_evidence(evidence['composable_evidence'])) - content = '' - if value := evidence.get('source_caption'): - content += f"

Source:

\n{self.codify(html.escape(value))}" - if value := evidence.get('sink_caption'): - content += f"

Sink:

\n{self.codify(html.escape(value))}" - if value := evidence.get('source_stack_trace'): - content += f"

Source Stack Trace:

\n{self.codify(html.escape(value))}" - if value := evidence.get('sink_stack_trace'): - content += f"

Sink Stack Trace:

\n{self.codify(html.escape(value))}" - if value := evidence.get('event_listener_stack_trace'): - content += f"

Event Listener Stack Trace:

\n{self.codify(html.escape(value))}" - if value := evidence.get('source_value'): - content += f"

Source Value:

\n{self.codify(html.escape(value))}" - if value := evidence.get('sink_value'): - content += f"

Sink Value:

\n{self.codify(html.escape(value))}" - if value := evidence.get('origin'): - content += f"

Origin:

\n{self.codify(html.escape(value))}" - if value := evidence.get('origin_checked'): - content += f"

Origin Checked:

\n{self.codify(html.escape(value))}" - if value := evidence.get('event_handler_data'): - content += f"

Event Handler Data:

\n{self.codify(html.escape(value))}" - if value := evidence.get('event_handler_data_type'): - content += f"

Event Handler Data_type:

\n{self.codify(html.escape(value))}" - if value := evidence.get('event_handler_modified_data'): - content += f"

Event Handler Modified_data:

\n{self.codify(html.escape(value))}" - if value := evidence.get('source_element_id'): - content += f"

Source Element ID:

\n{self.codify(html.escape(value))}" - if value := evidence.get('source_element_name'): - content += f"

Source Element Name:

\n{self.codify(html.escape(value))}" - if value := evidence.get('event_fired_event_name'): - content += f"

Event Fired Name:

\n{self.codify(html.escape(value))}" - if value := evidence.get('event_fired_element_id'): - content += f"

Event Fired Element ID:

\n{self.codify(html.escape(value))}" - if value := evidence.get('event_fired_element_name'): - content += f"

Event Fired Element Name:

\n{self.codify(html.escape(value))}" - if value := evidence.get('event_fired_outer_html'): - content += f"

Event Fired HTML:

\n{self.codify(html.escape(value))}" - exhibits.append({'title': f"Dynamic Analysis", 'content': content}) - return exhibits - - def process_message(self, message): - exhibits = [] - request_segments = message['request'] - exhibits.append({'title': f"Request", 'content': self.codify(self.process_segments(request_segments))}) - response_segments = message['response'] - exhibits.append({'title': f"Response", 'content': self.codify(self.process_segments(response_segments))}) - return exhibits - - def process_segments(self, segments): - parsed_request = '' - for segment in segments: - if segment['type'] == 'DataSegment': - parsed_request += self.decode_segment_data(segment['data']) - elif segment['type'] == 'HighlightSegment': - parsed_request += f"{self.decode_segment_data(segment['data'])}" - elif segment['type'] == 'SnipSegment': - parsed_request += '\n\n**snipped**\n\n' - else: - self.logger.debug(f"Unidentified request segment type: {segment['type']}") - return parsed_request - - def codify(self, s): - return f"
{s}
" - - def decode_segment_data(self, data): - return html.escape(base64.b64decode(data).decode()) - - -def burp_scan_builder(callback_url, credentials, configurations, scope_includes, scope_excludes, target_urls): - scan_config = {} - if callback_url: - scan_config['scan_callback'] = { - 'url': callback_url, - } - if credentials: - scan_config['application_logins'] = [] - for credential in credentials.split('\n'): - username, password = [w.strip() for w in credential.split(':', 1)] - c = { - 'password': password, - 'type': 'UsernameAndPasswordLogin', - 'username': username, - } - scan_config['application_logins'].append(c) - if configurations: - scan_config['scan_configurations'] = [] - for configuration in configurations.split('\n'): - c = { - 'name': configuration, - 'type': 'NamedConfiguration' - } - scan_config['scan_configurations'].append(c) - if scope_includes or scope_excludes: - scan_config['scope'] = { - 'type': 'SimpleScope', - } - if scope_includes: - scan_config['scope']['include'] = [] - for scope_include in scope_includes.split('\n'): - c = { - 'rule': scope_include - } - scan_config['scope']['include'].append(c) - if scope_excludes: - scan_config['scope']['exclude'] = [] - for scope_exclude in scope_excludes.split('\n'): - c = { - 'rule': scope_exclude - } - scan_config['scope']['exclude'].append(c) - if target_urls: - scan_config['urls'] = [] - for target_url in target_urls.split('\n'): - scan_config['urls'].append(target_url) - return scan_config diff --git a/burp_probe/views/core.py b/burp_probe/views/core.py index 19e70fb..919d84d 100644 --- a/burp_probe/views/core.py +++ b/burp_probe/views/core.py @@ -5,8 +5,7 @@ from burp_probe.helpers import render_partial from burp_probe.middleware import load_user, strip_empty_params, modify_response from burp_probe.models import Node, Scan -from burp_probe.services.burp import BurpProApi, BurpServiceException -from burp_probe.utilities import burp_scan_builder, BurpIssueParser +from burp_probe.services.burp import BurpProApi, BurpScanBuilder, BurpServiceException, BurpIssueParser from burp_probe.schemas import node_form_create_schema, node_form_update_schema, scan_form_schema import json import traceback @@ -221,7 +220,8 @@ def scans_create(): db.session.add(scan) db.session.flush() callback_url = None - scan_config = burp_scan_builder(callback_url, credentials, configurations, scope_includes, scope_excludes, target_urls) + scan_builder = BurpScanBuilder(callback_url, credentials, configurations, scope_includes, scope_excludes, target_urls) + scan_config = scan_builder.config_as_json current_app.logger.debug(f"Scanner Config:\n{json.dumps(scan_config, indent=4)}") burp = BurpProApi( protocol=node.protocol,