-
Notifications
You must be signed in to change notification settings - Fork 0
/
installer
executable file
·328 lines (246 loc) · 9.47 KB
/
installer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
#!/usr/bin/env python3
import sysconfig
import shutil, os, subprocess
import re
import pwd
import argparse
import readline
#taken from colorama
sRED = '\033[31m'
sGREEN = '\033[32m'
sRESET = '\033[0m'
app_name = 'diskwatcher'
service_name = 'diskwatcher_fetcher'
service_user_name = 'diskwatcher'
file_chmod = \
{
'/opt/diskwatcher/diskwatcher': 0o555,
'/opt/diskwatcher/diskwatcher_fetcher': 0o500
}
def main():
parser = argparse.ArgumentParser(description=f"Installer for {app_name}")
parser.add_argument("--nopackages", action="store_true", help="No package management!")
args = parser.parse_args()
_show_header()
_check_requirements()
_get_username()
if not args.nopackages:
ret = _install_packages()
if not ret:
print(f'{sRED}Use --nopackages option if you have issues with package management!{sRESET}')
_copy_files('_/')
_register_service()
_final_notes()
def _show_header():
print('')
print(f'###===> INSTALLER of {app_name}', end='')
print('\n')
#sys.stdout.flush()
#time.sleep(0.5)
def _check_requirements():
sysreq_functions = [func for func in globals().values() if callable(func) and func.__name__.startswith("_syscheck_")]
print("Checking system...\n")
for func in sysreq_functions:
ret = func()
fname = func.__name__.split('_')[-1]
print(f'{fname}: ', end='')
# if ret is not True:
if isinstance(ret, str):
print(f'{sRED}{ret}{sRESET}')
exit(1)
print(f'{sGREEN}OK{sRESET}')
print('')
def _get_username():
pass
def _install_packages():
def _is_python_externally_managed():
p3_path = sysconfig.get_path("stdlib", sysconfig.get_default_scheme())
return os.path.exists(os.path.join(p3_path, 'EXTERNALLY-MANAGED'))
def _has_apt():
res = cmd('which apt-get')
if res.returncode > 0:
return False
bin_apt = res.stdout.strip()
res = cmd_safe(bin_apt, ['--version'])
return not res.returncode > 0
def _has_pip():
res = cmd('pip --version')
return not res.returncode > 0
def get_requirements(file='requirements.txt')->bool|dict:
"""
Use only for basic package only requirements.txt parsing!!!
Example file contents:
#apt: sqlite3
#apt_pyext: python3-psutil python3-dbus
dbus_python
psutil
"""
if not os.path.exists(file):
return False
file_content = open(file).read()
result = \
{
#System packages to be installed by apt
"apt": [],
#Externally managed python3 packages to be installed by apt
# (should be the same as python packages, but debianized)
"apt_pyext": [],
#Python packages to install if python packages are not externally managed
"python": []
}
#todo: filter package names with case sens re=[a-z0-9-+._]
#filter_name = lambda x: bool(re.match(r'^[a-z0-9-+._]+$', str(x)))
#filter_empty = lambda x: not (x.isspace() or not x)
#filtered_list = [x for x in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] if filter_name(x) and filter_empty(x)]
for line in file_content.strip().split('\n'):
line = line.strip()
if line.startswith('#apt:'):
result['apt'].extend(filter(None, line.replace('#apt:','',1).strip().split()))
elif line.startswith('#apt_pyext:'):
result['apt_pyext'].extend(filter(None, line.replace('#apt_pyext:','',1).strip().split()))
elif not line.startswith('#') and line:
result['python'].append(line)
#No packages
if all(not key for key in result.keys()):
return False
return result
packages = get_requirements()
# Nothing to do
if not packages:
return True
# System packages
if packages['apt']:
if not _has_apt():
print(f'{sRED}APT not found!{sRESET}')
print(f'{sRED}Manually install required system packages: {package_list(packages["apt"])}{sRESET}')
else:
cmd_safe('apt-get', '-y install'.split(" ") + packages['apt'])
if not packages['python'] and not packages['apt_pyext']:
return False
# Packages are managed by the package manager, so use APT to install required python packages
if _is_python_externally_managed():
if not _has_apt():
print(f'{sRED}APT not found!{sRESET}')
print(f'{sRED}Manually install required python packages ({package_list(packages["apt_pyext"])})!{sRESET}')
return False
res = cmd_safe('apt-get','-y install'.split(" ") + packages['apt_pyext'])
print(res.stdout, res.stderr)
return not res.returncode
else:
if not _has_pip():
print(f'{sRED}Pip package manager not found!')
return False
return cmd('pip install -r requirements.txt').returncode == 0
def _copy_files(fromdir:str='_/'):
print("Copying files...\n")
def get_files_recursively(directory):
file_paths = []
for root, dirs, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
file_paths.append(file_path)
return file_paths
def _copy_file(src, dst):
try:
shutil.copy(src, dst)
except IOError as e:
print(f"Error copying file: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
files = get_files_recursively(fromdir)
for file in files:
path, filename = os.path.split(file)
filename_dest = '/' + os.path.relpath(file, fromdir)
path_dest = os.path.dirname(filename_dest)
# file = 0 str(40) "_/etc/systemd/system/mouzwheeler.service"
# path_dest = 1 str(19) "/etc/systemd/system"
# filename = 2 str(19) "mouzwheeler.service"
# filename_dest = 3 str(39) "/etc/systemd/system/mouzwheeler.service"
print(f'{file} => {path_dest}/')
_create_dir(path_dest)
_copy_file(file, os.path.join(path_dest, filename))
if filename_dest in file_chmod:
os.chmod(filename_dest, file_chmod[filename_dest])
print('')
def _register_service():
def get_non_root_users():
return [user.pw_name for user in pwd.getpwall() if user.pw_uid >= 1000 and user.pw_name != 'nobody']
def get_sudo_user():
return os.environ.get('SUDO_USER')
def get_users():
users = get_non_root_users()
sudouser = get_sudo_user()
return ' '.join(f"{user}*" if user == sudouser else user for user in users)
def service_template_variables(filename, name, uid):
with open(filename, 'r') as file:
content = file.read()
content = content.replace('%NAME%', name).replace('%UID%', str(uid))
with open(filename, 'w') as file:
file.write(content)
def input_with_default(prompt, default):
readline.set_startup_hook(lambda: readline.insert_text(default))
try:
return input(prompt)
finally:
readline.set_startup_hook()
print('')
user_default = get_sudo_user() or next(iter(get_non_root_users()), '')
userok = False
while userok == False:
input_user = input_with_default(f"User to run {app_name} service under [{get_users()}]: ", user_default)
try:
userdata = pwd.getpwnam(input_user)
userok = True
except KeyError:
print(f'{sRED}User {input_user} not found!{sRESET}')
user_default = ''
continue
service_template_variables('/etc/systemd/system/diskwatcher.service', userdata.pw_name, userdata.pw_uid)
print("Registering service(s)...\n")
cmdList = [
['Reloading systemd config files','systemctl daemon-reload'],
[f'Enabling service: {service_name}', f'systemctl enable {service_name}'],
[f'Starting service: {service_name}',f'systemctl start {service_name}'],
[f'Enabling service: {service_user_name}', f'systemctl enable {service_user_name}'],
[f'Starting service: {service_user_name}', f'systemctl start {service_user_name}']
]
for command in cmdList:
print(command[0])
result = subprocess.run(command[1], shell=True, capture_output=True, text=True)
if result.returncode > 0:
print('error!')
print('\nDONE!')
def _final_notes():
return None
def package_list(packages:list, sep=' '):
return sep.join(packages)
def cmd_safe(command:str, args:list):
arglist = args
arglist.insert(0, command)
res = subprocess.run(arglist, shell=False, capture_output=True, text=True)
return res
def cmd(command:str):
"""
Quote arguments with shlex!
"""
res = subprocess.run(command, shell=True, capture_output=True, text=True)
return res
def _create_dir(dir:str, mode:int=0o777):
try:
os.makedirs(dir, mode, exist_ok=True)
except OSError as e:
if e.errno == 13:
print(f"Permission denied: {e.strerror}")
else:
print(f"An error occurred: {e.strerror}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
####
#### System checks
####
def _syscheck_root():
return os.geteuid() == 0 or 'Execute as root!'
def _syscheck_systemd():
return 'systemd' in os.readlink('/sbin/init') or 'Only works on a SystemD enabled system'
if __name__ == "__main__":
main()