forked from mhulsman/cluster_manager
-
Notifications
You must be signed in to change notification settings - Fork 2
/
ipengine_chief
executable file
·189 lines (161 loc) · 5.73 KB
/
ipengine_chief
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
#!/usr/bin/env python
import subprocess, sys, getopt, time, shlex
from subprocess import Popen
import xmlrpclib
import httplib
import socket
import cluster_storage
import os,sys
import signal
import psutil
import shutil
import random
from dns import resolver, reversename
PING_INTERVAL = 120
PING_TIMEOUT = 1200
REQUEST_TIMEOUT = 30
REGISTRATION_ATTEMPTS = 5
REGISTRATION_SLEEP = 30
REGISTRATION_SLEEP_RANDOM = 180
mytype = "LOCAL"
address = "localhost"
loadenv = False
port = "30024"
class TimeoutHTTPConnection(httplib.HTTPConnection):
def __init__(self,host,timeout=10):
httplib.HTTPConnection.__init__(self, host, timeout = timeout)
self.set_debuglevel(99)
class TimeoutTransport(xmlrpclib.Transport):
def __init__(self, timeout = 10, *l, **kw):
xmlrpclib.Transport.__init__(self, *l, **kw)
self.timeout = timeout
def make_connection(self, host):
conn = TimeoutHTTPConnection(host, self.timeout)
return conn
class TimeoutServerProxy(xmlrpclib.ServerProxy):
def __init__(self, uri, timeout = 10, *l, **kw):
kw['transport'] = TimeoutTransport(timeout = timeout, use_datetime = kw.get('use_datetime', 0))
xmlrpclib.ServerProxy.__init__(self, uri, *l, **kw)
opts, args = getopt.getopt(sys.argv[1:],"e:t:a:lp:",["env=", "type=", "address=", "loadenv", "port="])
for o, a in opts:
if o in ('-t', '--type'):
mytype = a
elif o in ('-a', '--address'):
address = a
elif o in ('-l', '--loadenv'):
loadenv = True
elif o in ('-p', '--port'):
port = a
#get IP address
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM);
s.settimeout(REQUEST_TIMEOUT)
s.connect(('google.com', 0));
myip = s.getsockname()[0]
try:
addr = reversename.from_address(myip)
myip = str(resolver.query(addr, "PTR")[0])[:-1]
except Exception, e:
pass
#self register
print "[i] Preparing to register on " + address + ":" + port
s = TimeoutServerProxy('http://' + address + ":" + port, timeout = REQUEST_TIMEOUT, allow_none = True)
myid, mycodeid = None, None
attempt = REGISTRATION_ATTEMPTS
while attempt > 0 :
e_memtot = psutil.TOTAL_PHYMEM
e_ncpu = psutil.NUM_CPUS
try :
myid, mycodeid = s.register(myip, mytype, e_ncpu, e_memtot / 1024.0 ** 2)
except (socket.error, httplib.HTTPException), serror :
attempt -= 1
sleep_time = random.randint(REGISTRATION_SLEEP, REGISTRATION_SLEEP + REGISTRATION_SLEEP_RANDOM)
print '[-] Failed registration, %d attempts left, sleeping for %d seconds' % (attempt, sleep_time)
time.sleep(sleep_time)
if myid is not None :
break
print "[+] Registered with controller, got id: " + myid
last_seen = time.time()
cwd = os.getcwd()
json_path = cwd + "/ipcontroller-engine.json"
local_json_path = cwd + "/profile/security/ipcontroller-engine.json"
from_json_path = cwd + "/../../ipcontroller-engine.json"
def start_engine(mycodeid) :
if loadenv :
print "[i] Retrieving code file from " + mycodeid
code_path = cluster_storage.receive_file(mycodeid)
cmd = "tar -xzf " + code_path
args = shlex.split(cmd)
r = Popen(args).wait()
if (r):
raise RuntimeError, "Code unpacking failed"
print "[i] Retrieving json file from " + from_json_path + " to " + json_path
shutil.copyfile(from_json_path, json_path)
cmd = "python ../bin/ipengine --quiet --file=" + json_path
else:
cmd = "ipengine --quiet --file=" + local_json_path
print "[i] Starting local engine..."
args = shlex.split(cmd)
lengine = Popen(args)
return lengine
def stop_engine(lengine) :
if lengine.poll() is None :
lengine.terminate()
# It seems that terminate does not always work
time.sleep(10)
lengine.kill()
def handler(signum, frame) :
print "[!] CATCH SYSTEM EXIT"
sys.exit()
signal.signal(signal.SIGTERM, handler)
lengine = None
DIE = 1
RESTART_ENGINE = 2
try :
lengine = start_engine(mycodeid)
lengine_monitor = psutil.Process(lengine.pid)
while True :
if lengine.poll() is not None:
del lengine
lengine = None
s.unregister(myid)
break
e_cpu = lengine_monitor.get_cpu_percent()
e_mem = lengine_monitor.get_memory_percent()
e_memphys, e_memvirt = lengine_monitor.get_memory_info()
try :
cmd, param = s.poll(myid, e_cpu, e_mem * e_ncpu, e_memphys / 1024.0 / 1024.0, e_memvirt / 1024.0 / 1024.0)
last_seen = time.time()
print "\t[i] POLL", cmd
if cmd :
if (cmd == DIE) :
stop_engine(lengine)
del lengine
lengine = None
s.unregister(myid)
break
elif (cmd == RESTART_ENGINE) :
stop_engine(lengine)
del lengine
lengine = None
lengine = start_engine(param)
lengine_monitor = psutil.Process(lengine.pid)
continue
else :
print "\t[-] UNKNOWN COMMAND RECEIVED, EXITING!"
stop_engine(lengine)
del lengine
lengine = None
break
except (socket.error, httplib.HTTPException), serror :
print '\t[!] Caught a socket error!'
if time.time() - last_seen > PING_TIMEOUT :
print '\t[-] Timeout triggered!'
break
time.sleep(PING_INTERVAL)
except (Exception, KeyboardInterrupt, SystemExit) :
if (not lengine is None) :
stop_engine(lengine)
del lengine
lengine = None
s.unregister(myid)
raise