This repository has been archived by the owner on Mar 17, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
db.py
191 lines (166 loc) · 5.48 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import yaml
import pymysql
def load_creds():
"""
Load database credentials from a file.
"""
with open('etc/db.yaml', 'r') as f:
creds = yaml.load(f)
return creds
def connect():
"""
Connect to the MySQL database.
Returns:
Connection: A connection to the database
"""
creds = load_creds()
connection = pymysql.connect(host=creds['host'],
user=creds['user'], password=creds['password'])
return connection
def get(table, columns, where=None, orderby=None, args=None):
"""
Execute a SELECT statement on the database and return the matching data.
Arguments:
table (str): Table to SELECT data from
columns (List(str)): Columns to get
where (str): Optional, MySQL WHERE statement
orderby (str): Optional, MySQL ORDER BY statement
args (Tuple(str)): Optional, arguments for a prepared statement
Returns:
List(List(object)): List of rows which match the SELECT statement
"""
# Build command
columns = ','.join(columns)
cmd = 'SELECT %s FROM %s' % (columns, table)
if where is not None:
cmd += ' WHERE ' + where
if orderby is not None:
cmd += ' ORDER BY ' + orderby
# Execute command
connection = connect()
with connection.cursor() as cursor:
cursor.execute('USE scoring')
cursor.execute(cmd, args)
rows = cursor.fetchall()
connection.close()
return rows
def getall(table, orderby=None):
"""
Get all rows from the given table.
Arguments:
table (str): The table to get rows from
Returns:
List(List(object)): List of all rows in the table
"""
rows = get(table, ['*'], orderby=orderby)
return rows
def execute(cmd, args=None):
"""
Execute a MySQL command on the database.
Arguments:
cmd (str): MySQL command to execute
args (Tuple(str)): Optional, arguments for a prepared statement
Returns:
int: The ID of the last row created or modified by the command
"""
connection = connect()
with connection.cursor() as cursor:
cursor.execute('USE scoring')
cursor.execute(cmd, args)
lid = cursor.lastrowid
connection.commit()
connection.close()
return lid
def reset_table(table):
"""
Delete all rows from the given table.
Arguments:
table (str): Table to delete all data from
"""
try:
execute('TRUNCATE TABLE %s' % table)
except:
execute('DELETE FROM %s' % table)
def reset_all_tables():
"""
Delete all data from all tables in the database.
"""
reset_table('settings')
reset_table('vapp')
reset_table('system')
reset_table('team')
reset_table('users')
reset_table('domain')
reset_table('service_check')
reset_table('check_io')
reset_table('credential')
reset_table('result')
reset_table('pcr')
reset_table('default_creds_log')
reset_table('revert_log')
reset_table('check_log')
def insert(table, columns, args):
"""
Insert data into the given table.
Arguments:
table (str): The table to insert data into
columns (List(str)): List of columns identifying the data
args (List(str)): List of pieces of data corresponding to the columns
Returns:
int: The ID of the inserted row
"""
columns = ','.join(columns)
vals = ', '.join(['%s']*len(args))
cmd = 'INSERT INTO %s (%s)' % (table, columns)
cmd += ' VALUES (%s)' % vals
id = execute(cmd, args)
return id
def modify(table, set, args, where=None):
"""
Modify the given data in the given table matching the given criteria.
Arguments:
table (str): The table to modify
set (str): The fields to modify (field1=%s, field2=%s)
args (List(str)): List of pieces of data corresponding to the columns
where (str): The matching criteria
"""
cmd = 'UPDATE %s SET %s' % (table, set)
if where is not None:
cmd += ' WHERE %s' % where
execute(cmd, args)
def delete(table, args, where=None):
"""
Delete the given rows from the given table matching the given criteria.
Arguments:
table (str): The table to modify
args (List(str)): List of pieces of data corresponding to the columns
where (str): The matching criteria
"""
cmd = 'DELETE FROM %s' % table
if where is not None:
cmd += ' WHERE %s' % where
execute(cmd, args)
def set_credential_password(username, password, team_id, check_id=None, domain=None):
"""
Set the password for the credentials matching the given criteria.
Arguments:
username (str): The username of the credentials
password (str): The password to change to
team_id (str): The ID of the team of the credentials
check_id (str): Optional, the ID of the check of the credentials. check_id and domain cannot both be None
domain (str): Optional, the fqdn of the domain of the credentials. check_id and domain cannot both be None
"""
if check_id is None and domain is None:
raise Exception('check_id and domain cannot both be None.')
cmd = 'UPDATE credential SET password=%s, is_default=0 WHERE team_id=%s'
args = [password, team_id]
if username.lower() != 'all':
cmd += ' AND username=%s'
args.append(username)
if check_id is not None:
cmd += ' AND check_id=%s'
args.append(check_id)
else:
cmd += ' AND domain=%s'
args.append(domain)
execute(cmd, args)