-
Notifications
You must be signed in to change notification settings - Fork 5
/
utils.py
141 lines (115 loc) · 4.16 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import glob
import json
import logging
import shutil
from git import Repo
from munch import Munch, munchify
import yaml
PLANS_DIR = "./.plans"
def get_latest(refresh_repo):
"""Get the latest Emulation Plans from the MITRE Github repo. (Optionally refreshes the local copy but will grab automatically if the repo does not exist at the `PLANS_DIR` location.
Args:
refresh_repo (bool): Flag indicating whether to refresh the local copy
Returns:
NA. Files created or updated as a side-effect
"""
if glob.glob(PLANS_DIR):
if not refresh_repo:
return
try:
shutil.rmtree(PLANS_DIR)
except FileNotFoundError:
pass
print("Requesting latest plans from Github ...")
try:
Repo.clone_from("https://github.com/center-for-threat-informed-defense/adversary_emulation_library.git", PLANS_DIR)
print("Successfully retrieved lastest plans from Github")
except:
raise RuntimeError("Could not clone the remote repo and retrieve Emulation Plans from Github")
def get_plan_path(search_string):
"""Search for, and verify a plan exists for the name supplied
Args:
search_string (string): The name of an adversary, or plan, to map
Returns:
(string): The path of the plan yaml file
"""
plans = glob.glob("{plans_dir}/{search_string}/Emulation_Plan/*.yaml".format(plans_dir=PLANS_DIR, search_string=search_string))
if plans:
return plans[0]
else:
return None
def get_plan(plan_path):
"""Reads the plan and "munches" it into an object
Args:
plan_path (string): The path to the plan yaml file
Returns:
(Munch): a munched (dict) "Plan" object
"""
try:
with open(plan_path, "r") as f:
plan = yaml.safe_load(f)
plan = munchify(plan)
return plan
except:
return None
def get_techniques(plan):
"""Parses a plan and retrieves the techniques for mapping.
Includes the procedure ID and procedure step for reference in layer tooltips
Args:
plan (dict (munch)): A "Plan" object
Returns:
(list): List of (munched) "Technique" objects
"""
techniques = []
for procedure in plan:
if "id" in procedure:
technique = Munch()
technique.id = procedure.technique.attack_id
technique.procedure_id = procedure.id
technique.procedure_step = procedure.procedure_step
techniques.append(technique)
return techniques
def build_layer(plan_techniques, plan_name, layer_name):
"""Create a Navigator layer (.json) file on the local filesystem
Add the required meta data and format per the MITRE layer spec
https://github.com/mitre-attack/attack-navigator/blob/master/layers/LAYERFORMATv3.md
Args:
plan_techniques (dict): A dictionary of techniques which are in the plan. the "metadata" field contains additional information about the plan as related to the technique
layer_name (str): Output file name
Returns:
(bool): True if successful, False if not
"""
completed = False
data = {}
data["domain"] = "mitre-enterprise"
data["name"] = "{plan_name} Adversary Emulation Heatmap".format(plan_name=plan_name)
data["description"] = "Techniques part of the {plan_name} Adversary Emulation plan".format(plan_name=plan_name)
data["version"] = "3.0"
data["techniques"] = []
for technique in plan_techniques:
data["techniques"].append({
"color": "#3f2b96",
"techniqueID": technique.id,
"metadata": [{
"name": "Procedure ID",
"value": technique.procedure_id
},
{
"name": "Procedure Step",
"value": technique.procedure_step
}]})
if "." in technique.id:
data["techniques"].append({
"techniqueID": technique.id.split(".")[0],
"showSubtechniques": True
})
try:
# Indent makes it print pretty
with open(layer_name, "w") as f:
json.dump(data, f, indent=2)
completed = True
print("Success. Layer created at {output}".format(output=layer_name))
except (Exception) as e:
completed = False
finally:
return completed