From e8a34fd4626e30b4a3ba277dcde4c426c1acd705 Mon Sep 17 00:00:00 2001 From: Stephen Lincoln Date: Fri, 20 Sep 2024 15:48:28 -0400 Subject: [PATCH] Added writer, fixed data structure to asim table util --- utils/get_sentinel_asim_schema_tables.py | 107 ++++++++++++++++++----- 1 file changed, 84 insertions(+), 23 deletions(-) diff --git a/utils/get_sentinel_asim_schema_tables.py b/utils/get_sentinel_asim_schema_tables.py index 43db94b..78aad7b 100644 --- a/utils/get_sentinel_asim_schema_tables.py +++ b/utils/get_sentinel_asim_schema_tables.py @@ -1,14 +1,34 @@ -from typing import Dict, List +import subprocess + +subprocess.run(["which", "python3"]) + +import re +from datetime import datetime, timezone +from typing import Dict, List, Optional, Tuple import requests from bs4 import BeautifulSoup BASE_URL = "https://learn.microsoft.com/en-us/azure/sentinel" +OUTPUT_FILE = "sigma/pipelines/sentinelasim/tables.py" # TODO: Add a function to get the common fields from the ASIM schemas # TODO: Add a function to write the table data to a file +def get_request(url: str) -> requests.Response: + """ + Sends a GET request to the specified URL and returns the response. + + :param url: The URL to send the GET request to. + :return: The response from the GET request. + """ + response = requests.get(url) + response.raise_for_status() + + return response + + def extract_asim_schema_hrefs(items: List[dict]) -> List[str]: """Extracts hrefs for ASIM schemas from the JSON data.""" for item in items: @@ -52,16 +72,32 @@ def extract_table_name_and_fields(url: str) -> Dict[str, List[Dict[str, str]]]: :param url: Full URL of the schema page. :return: A dictionary with the table name and a list of field schemas. """ - response = requests.get(url) - response.raise_for_status() + response = get_request(url) soup = BeautifulSoup(response.content, "html.parser") - # Extract the table name (e.g. 'imAuditEvent') - table_name = soup.find("code", class_="lang-kql").text.strip().split()[0] - # Extract the field schema details under "Schema details" + table_name = extract_table_name(soup) + if table_name is None: + print(f"No ASIM table found for {url}. Skipping...") + return None + field_data = extract_field_data(soup) - return {"table_name": table_name, "fields": field_data} + return {table_name: field_data} + + +def extract_table_name(soup: BeautifulSoup) -> Optional[str]: + """ + Extracts the table name from the BeautifulSoup object. + + :param soup: BeautifulSoup object of the schema page. + :return: The extracted table name or None if not found. + """ + try: + return soup.find("code", class_="lang-kql").text.strip().split()[0] + except AttributeError: + whole_text = soup.get_text() + match = re.search(r"(?i)im(\w+)??", whole_text) + return f"im{match.group(1)}" if match else None def extract_field_data(soup: BeautifulSoup) -> List[Dict[str, str]]: @@ -72,7 +108,7 @@ def extract_field_data(soup: BeautifulSoup) -> List[Dict[str, str]]: :return: A list of dictionaries with the field name and type. """ schema_details_section = soup.find(id="schema-details") - field_data = [] + field_data = {} if schema_details_section: # Loop through all tables in the section and its subsections @@ -85,7 +121,7 @@ def extract_field_data(soup: BeautifulSoup) -> List[Dict[str, str]]: for row in table.find_all("tr")[1:]: # Skip header row cols = [td.text.strip() for td in row.find_all("td")] if len(cols) == 4: # Ensure we have all four columns - field_data.append({"Field": cols[0], "Class": cols[1], "Type": cols[2], "Description": cols[3]}) + field_data[cols[0]] = {"class": cols[1], "data_type": cols[2], "description": cols[3]} return field_data @@ -96,28 +132,53 @@ def get_common_field_data() -> List[Dict[str, str]]: :return: A list of dictionaries with the field name and type. """ full_url = f"{BASE_URL}/normalization-common-fields" - common_field_info = extract_table_name_and_fields(full_url) - - -def process_asim_schemas() -> List[Dict[str, List[Dict[str, str]]]]: + response = get_request(full_url) + soup = BeautifulSoup(response.content, "html.parser") + common_field_info = extract_field_data(soup) + + return common_field_info + + +def write_schema(output_file: str, schema_tables: Dict[str, dict], common_field_data: Dict[str, dict]): + """Write the schema tables to a Python file.""" + with open(output_file, "w") as f: + f.write("# This file is auto-generated. Do not edit manually.\n") + f.write(f"# Last updated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC\n\n") + f.write("SENTINEL_ASIM_TABLES = {\n") + for table, fields in schema_tables.items(): + f.write(f' "{table}": {{\n') + for field, info in fields.items(): + f.write( + f' "{field.strip("`")}": {{"data_type": "{info["data_type"].strip("`")}", "description": {repr(info["description"])}, "class": "{info["class"].strip("`")}"}},\n' + ) + f.write(" },\n") + f.write("}\n") + f.write("SENTINEL_ASIM_COMMON_FIELDS = {\n") + f.write(f' "COMMON": {{\n') + for field, info in common_field_data.items(): + f.write( + f' "{field.strip("`")}": {{"data_type": "{info["data_type"].strip("`")}", "description": {repr(info["description"])}, "class": "{info["class"].strip("`")}"}},\n' + ) + f.write(" },\n") + f.write("}\n") + + +def process_asim_schemas() -> Tuple[Dict[str, dict], Dict[str, dict]]: """Processes all ASIM schemas and extracts table names and field schemas.""" tables = get_sentinel_asim_schema_tables() - schema_data = [] + schema_data = {} common_field_data = get_common_field_data() for href in tables: full_url = f"{BASE_URL}/{href}" print(f"Processing {full_url}...") - schema_info = extract_table_name_and_fields(full_url) - schema_data.append(schema_info) + if schema_info := extract_table_name_and_fields(full_url): + schema_data.update(schema_info) - return schema_data + return schema_data, common_field_data if __name__ == "__main__": - asim_schema_data = process_asim_schemas() - for schema in asim_schema_data: - print(f"Table Name: {schema['table_name']}") - print("Fields:") - for field in schema["fields"]: - print(f" - {field}") + schema_data, common_field_data = process_asim_schemas() + write_schema(OUTPUT_FILE, schema_data, common_field_data) + print(f"Schema written to {OUTPUT_FILE}")