forked from sec-report/SecAutoBan
-
Notifications
You must be signed in to change notification settings - Fork 0
/
chaitin_waf_ce.py
93 lines (85 loc) · 2.99 KB
/
chaitin_waf_ce.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import jwt
import time
import datetime
import requests
from SecAutoBan import SecAutoBan
requests.packages.urllib3.disable_warnings()
def password_login() -> str:
r = requests.get(chaitin_waf_config["url"] + "/api/open/auth/csrf", verify=False)
if r.status_code != 200:
return ""
post_data = {
"csrf_token": r.json()["data"]["csrf_token"],
"username": chaitin_waf_config["username"],
"password": chaitin_waf_config["password"]
}
r = requests.post(chaitin_waf_config["url"] + "/api/open/auth/login", json=post_data, verify=False)
if r.status_code != 200:
return ""
return r.json()["data"]["jwt"]
def get_header() -> dict:
header = {}
if len(chaitin_waf_config["jwt-secret"]) != 0:
t = int((datetime.datetime.now()+datetime.timedelta(days=7)).timestamp())
jwt_payload = {
"uid": 1,
"pwd": True,
"tfa": False,
"ver": 1,
"iss": "chaitin",
"exp": t,
"iat": t
}
token = jwt.encode(jwt_payload, chaitin_waf_config["jwt-secret"], algorithm='HS256')
header["Authorization"] = "Bearer " + token
return header
if len(chaitin_waf_config["username"]) != 0:
header["Authorization"] = "Bearer " + password_login()
return header
header["Authorization"] = "Bearer " + chaitin_waf_config["bearer"]
return header
def alarm_analysis(ws_client):
event_id_list = []
ip_list = []
while True:
time.sleep(5)
try:
r = requests.get(
chaitin_waf_config["url"] + "/api/open/records?page=1&page_size=20&ip=&url=&port=&host=&attack_type=&action=1",
headers=get_header(),
verify=False
)
except Exception as e:
sec_auto_ban.print("[-] WAF连接失败, Error: " + str(e))
continue
if r.status_code != 200:
if r.status_code == 401:
sec_auto_ban.print("[-] WAF登录失败")
continue
sec_auto_ban.print("[-] WAF连接失败")
continue
for i in r.json()["data"]["data"]:
if i["event_id"] not in event_id_list and i["src_ip"] not in ip_list:
ws_client.send_alarm(i["src_ip"], "攻击资产:" + i["host"] + " " + i["reason"])
event_id_list.append(i["event_id"])
if len(event_id_list) > 1000:
event_id_list.pop(0)
ip_list.append(i["src_ip"])
if len(ip_list) > 1000:
ip_list.pop(0)
if __name__ == "__main__":
chaitin_waf_config = {
"url": "https://xxx.xxx.xxx.xxx:9443",
"jwt-secret": "",
"username": "",
"password": "",
"bearer": "xxx.xxx.xxx"
}
sec_auto_ban = SecAutoBan(
server_ip="127.0.0.1",
server_port=8000,
sk="sk-*****",
client_type="alarm",
alarm_analysis = alarm_analysis
)
sec_auto_ban.run()