-
Notifications
You must be signed in to change notification settings - Fork 11
/
CAPECInserter.py
166 lines (138 loc) · 6.13 KB
/
CAPECInserter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import os
import fnmatch
from neo4j import exceptions
class CAPECInserter:
def __init__(self, driver, import_path):
self.driver = driver
self.import_path = import_path
# Cypher Query to insert CAPEC refrence Cypher Script
def query_capec_reference_script(self, file):
capecs_cypher_file = open(self.import_path + "CAPECs_reference.cypher", "r")
query = capecs_cypher_file.read()
query = query.replace('capecReferenceFilesToImport', f"'{file}'")
try:
with self.driver.session() as session:
session.run(query)
except exceptions.CypherError as e:
print(f"CypherError: {e}")
except exceptions.DriverError as e:
print(f"DriverError: {e}")
except Exception as e:
# Handle other exceptions
print(f"An error occurred: {e}")
print("\nCAPEC Files: " + file + " insertion completed. \n----------")
# Cypher Query to insert CAPEC attack Cypher Script
def query_capec_attack_script(self, file):
capecs_cypher_file = open(self.import_path + "CAPECs_attack.cypher", "r")
query = capecs_cypher_file.read()
query = query.replace('capecAttackFilesToImport', f"'{file}'")
try:
with self.driver.session() as session:
session.run(query)
except exceptions.CypherError as e:
print(f"CypherError: {e}")
except exceptions.DriverError as e:
print(f"DriverError: {e}")
except Exception as e:
# Handle other exceptions
print(f"An error occurred: {e}")
print("\nCAPEC Files: " + file + " insertion completed. \n----------")
# Cypher Query to insert CAPEC category Cypher Script
def query_capec_category_script(self, file):
capecs_cypher_file = open(self.import_path + "CAPECs_category.cypher", "r")
query = capecs_cypher_file.read()
query = query.replace('capecCategoryFilesToImport', f"'{file}'")
try:
with self.driver.session() as session:
session.run(query)
except exceptions.CypherError as e:
print(f"CypherError: {e}")
except exceptions.DriverError as e:
print(f"DriverError: {e}")
except Exception as e:
# Handle other exceptions
print(f"An error occurred: {e}")
print("\nCAPEC Files: " + file + " insertion completed. \n----------")
# Cypher Query to insert CAPEC view Cypher Script
def query_capec_view_script(self, file):
capecs_cypher_file = open(self.import_path + "CAPECs_view.cypher", "r")
query = capecs_cypher_file.read()
query = query.replace('capecViewFilesToImport', f"'{file}'")
try:
with self.driver.session() as session:
session.run(query)
except exceptions.CypherError as e:
print(f"CypherError: {e}")
except exceptions.DriverError as e:
print(f"DriverError: {e}")
except Exception as e:
# Handle other exceptions
print(f"An error occurred: {e}")
print("\nCAPEC Files: " + file + " insertion completed. \n----------")
# Configure CAPEC Files and CAPEC Cypher Script for insertion
def capec_insertion(self):
print("\nInserting CAPEC Files to Database...")
files = self.files_to_insert_capec_reference()
for f in files:
print('Inserting ' + f)
self.query_capec_reference_script(f)
files = self.files_to_insert_capec_attack()
for f in files:
print('Inserting ' + f)
self.query_capec_attack_script(f)
files = self.files_to_insert_capec_category()
for f in files:
print('Inserting ' + f)
self.query_capec_category_script(f)
files = self.files_to_insert_capec_view()
for f in files:
print('Inserting ' + f)
self.query_capec_view_script(f)
# Define which Dataset and Cypher files will be imported on CAPEC refrence Insertion
def files_to_insert_capec_reference(self):
listOfFiles = os.listdir(self.import_path + "mitre_capec/splitted/")
pattern = "*.json"
reference_files = []
for entry in listOfFiles:
if fnmatch.fnmatch(entry, pattern):
if entry.startswith("capec_reference"):
reference_files.append("mitre_capec/splitted/" + entry)
else:
continue
return reference_files
# Define which Dataset and Cypher files will be imported on CAPEC attack Insertion
def files_to_insert_capec_attack(self):
listOfFiles = os.listdir(self.import_path + "mitre_capec/splitted/")
pattern = "*.json"
attack_pattern_files = []
for entry in listOfFiles:
if fnmatch.fnmatch(entry, pattern):
if entry.startswith("capec_attack_pattern"):
attack_pattern_files.append("mitre_capec/splitted/" + entry)
else:
continue
return attack_pattern_files
# Define which Dataset and Cypher files will be imported on CAPEC category Insertion
def files_to_insert_capec_category(self):
listOfFiles = os.listdir(self.import_path + "mitre_capec/splitted/")
pattern = "*.json"
category_files = []
for entry in listOfFiles:
if fnmatch.fnmatch(entry, pattern):
if entry.startswith("capec_category"):
category_files.append("mitre_capec/splitted/" + entry)
else:
continue
return category_files
# Define which Dataset and Cypher files will be imported on CAPEC view Insertion
def files_to_insert_capec_view(self):
listOfFiles = os.listdir(self.import_path + "mitre_capec/splitted/")
pattern = "*.json"
view_files = []
for entry in listOfFiles:
if fnmatch.fnmatch(entry, pattern):
if entry.startswith("capec_view"):
view_files.append("mitre_capec/splitted/" + entry)
else:
continue
return view_files