Skip to content

Commit

Permalink
Added writer, fixed data structure to asim table util
Browse files Browse the repository at this point in the history
  • Loading branch information
slincoln-aiq committed Sep 20, 2024
1 parent 849250e commit e8a34fd
Showing 1 changed file with 84 additions and 23 deletions.
107 changes: 84 additions & 23 deletions utils/get_sentinel_asim_schema_tables.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,34 @@
from typing import Dict, List
import subprocess

subprocess.run(["which", "python3"])

import re
from datetime import datetime, timezone
from typing import Dict, List, Optional, Tuple

import requests
from bs4 import BeautifulSoup

BASE_URL = "https://learn.microsoft.com/en-us/azure/sentinel"
OUTPUT_FILE = "sigma/pipelines/sentinelasim/tables.py"

# TODO: Add a function to get the common fields from the ASIM schemas
# TODO: Add a function to write the table data to a file


def get_request(url: str) -> requests.Response:
"""
Sends a GET request to the specified URL and returns the response.
:param url: The URL to send the GET request to.
:return: The response from the GET request.
"""
response = requests.get(url)
response.raise_for_status()

return response


def extract_asim_schema_hrefs(items: List[dict]) -> List[str]:
"""Extracts hrefs for ASIM schemas from the JSON data."""
for item in items:
Expand Down Expand Up @@ -52,16 +72,32 @@ def extract_table_name_and_fields(url: str) -> Dict[str, List[Dict[str, str]]]:
:param url: Full URL of the schema page.
:return: A dictionary with the table name and a list of field schemas.
"""
response = requests.get(url)
response.raise_for_status()
response = get_request(url)
soup = BeautifulSoup(response.content, "html.parser")

# Extract the table name (e.g. 'imAuditEvent')
table_name = soup.find("code", class_="lang-kql").text.strip().split()[0]
# Extract the field schema details under "Schema details"
table_name = extract_table_name(soup)
if table_name is None:
print(f"No ASIM table found for {url}. Skipping...")
return None

field_data = extract_field_data(soup)

return {"table_name": table_name, "fields": field_data}
return {table_name: field_data}


def extract_table_name(soup: BeautifulSoup) -> Optional[str]:
"""
Extracts the table name from the BeautifulSoup object.
:param soup: BeautifulSoup object of the schema page.
:return: The extracted table name or None if not found.
"""
try:
return soup.find("code", class_="lang-kql").text.strip().split()[0]
except AttributeError:
whole_text = soup.get_text()
match = re.search(r"(?i)im(\w+)<?vendor>?<?Product>?", whole_text)
return f"im{match.group(1)}" if match else None


def extract_field_data(soup: BeautifulSoup) -> List[Dict[str, str]]:
Expand All @@ -72,7 +108,7 @@ def extract_field_data(soup: BeautifulSoup) -> List[Dict[str, str]]:
:return: A list of dictionaries with the field name and type.
"""
schema_details_section = soup.find(id="schema-details")
field_data = []
field_data = {}

if schema_details_section:
# Loop through all tables in the section and its subsections
Expand All @@ -85,7 +121,7 @@ def extract_field_data(soup: BeautifulSoup) -> List[Dict[str, str]]:
for row in table.find_all("tr")[1:]: # Skip header row
cols = [td.text.strip() for td in row.find_all("td")]
if len(cols) == 4: # Ensure we have all four columns
field_data.append({"Field": cols[0], "Class": cols[1], "Type": cols[2], "Description": cols[3]})
field_data[cols[0]] = {"class": cols[1], "data_type": cols[2], "description": cols[3]}
return field_data


Expand All @@ -96,28 +132,53 @@ def get_common_field_data() -> List[Dict[str, str]]:
:return: A list of dictionaries with the field name and type.
"""
full_url = f"{BASE_URL}/normalization-common-fields"
common_field_info = extract_table_name_and_fields(full_url)


def process_asim_schemas() -> List[Dict[str, List[Dict[str, str]]]]:
response = get_request(full_url)
soup = BeautifulSoup(response.content, "html.parser")
common_field_info = extract_field_data(soup)

return common_field_info


def write_schema(output_file: str, schema_tables: Dict[str, dict], common_field_data: Dict[str, dict]):
"""Write the schema tables to a Python file."""
with open(output_file, "w") as f:
f.write("# This file is auto-generated. Do not edit manually.\n")
f.write(f"# Last updated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC\n\n")
f.write("SENTINEL_ASIM_TABLES = {\n")
for table, fields in schema_tables.items():
f.write(f' "{table}": {{\n')
for field, info in fields.items():
f.write(
f' "{field.strip("`")}": {{"data_type": "{info["data_type"].strip("`")}", "description": {repr(info["description"])}, "class": "{info["class"].strip("`")}"}},\n'
)
f.write(" },\n")
f.write("}\n")
f.write("SENTINEL_ASIM_COMMON_FIELDS = {\n")
f.write(f' "COMMON": {{\n')
for field, info in common_field_data.items():
f.write(
f' "{field.strip("`")}": {{"data_type": "{info["data_type"].strip("`")}", "description": {repr(info["description"])}, "class": "{info["class"].strip("`")}"}},\n'
)
f.write(" },\n")
f.write("}\n")


def process_asim_schemas() -> Tuple[Dict[str, dict], Dict[str, dict]]:
"""Processes all ASIM schemas and extracts table names and field schemas."""
tables = get_sentinel_asim_schema_tables()
schema_data = []
schema_data = {}
common_field_data = get_common_field_data()

for href in tables:
full_url = f"{BASE_URL}/{href}"
print(f"Processing {full_url}...")
schema_info = extract_table_name_and_fields(full_url)
schema_data.append(schema_info)
if schema_info := extract_table_name_and_fields(full_url):
schema_data.update(schema_info)

return schema_data
return schema_data, common_field_data


if __name__ == "__main__":
asim_schema_data = process_asim_schemas()
for schema in asim_schema_data:
print(f"Table Name: {schema['table_name']}")
print("Fields:")
for field in schema["fields"]:
print(f" - {field}")
schema_data, common_field_data = process_asim_schemas()
write_schema(OUTPUT_FILE, schema_data, common_field_data)
print(f"Schema written to {OUTPUT_FILE}")

0 comments on commit e8a34fd

Please sign in to comment.