-
Notifications
You must be signed in to change notification settings - Fork 1
/
extconfig.py
83 lines (68 loc) · 2.8 KB
/
extconfig.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import logging
import os
import shutil
from pathlib import Path
from subprocess import check_output, CalledProcessError
from config import FLASH_LABEL, PLAYLIST_FILENAME
# path to the interfaces.d dir in on the source partition
UDISKS_CMD = '/usr/bin/udisks'
BLKID_CMD = '/sbin/blkid'
SRC_IFS_DIR = "interfaces.d"
class ExtConfigError(Exception):
pass
class ExtConfig():
def __init__(self):
self.__partition = None
self.__partition = self.__get_radio_partition(FLASH_LABEL)
# print("Partition je " + self.__partition)
self.__mountpoint = self.__mount(self.__partition)
self.__playlistPath = None
def close(self):
if self.__partition is not None:
self.__unmount(self.__partition)
def getPlaylistPath(self) -> str:
if self.__playlistPath is None:
self.__playlistPath = self.__get_path(PLAYLIST_FILENAME)
return self.__playlistPath
def __copydir(self, source, dest):
"""Copy a directory structure overwriting existing files"""
for root, dirs, files in os.walk(source):
if not os.path.isdir(root):
os.makedirs(root)
for each_file in files:
rel_path = root.replace(source, '').lstrip(os.sep)
src_path = os.path.join(root, each_file)
dest_path = os.path.join(dest, rel_path, each_file)
shutil.copyfile(src_path, dest_path)
logging.debug("copied " + src_path + " to " + dest_path)
def __get_radio_partition(self, label: str) -> str:
# blkid -L LABEL_NAME -> /dev/sdb1
try:
devices = check_output([BLKID_CMD, '-L', label]).splitlines()
for device in devices:
return device.decode("utf-8")
except CalledProcessError:
raise ExtConfigError("No " + label + " partition found")
def __mount(self, partition: str) -> str:
try:
check_output([UDISKS_CMD, '--mount-options', 'ro', '--mount', partition])
except Exception as e:
raise ExtConfigError("Unable to mount " + partition + ": " + str(e))
with open('/proc/mounts', 'r') as f:
for line in f.readlines():
parts = line.split()
if parts[0] == partition:
return parts[1]
def __unmount(self, partition: str):
try:
check_output([UDISKS_CMD, '--unmount', partition])
except Exception as e:
logging.exception(e)
raise ExtConfigError("Unable to unmount " + partition + ": " + str(e))
def __get_path(self, name: str) -> str:
path = os.path.join(self.__mountpoint, name)
file = Path(path)
if file.exists():
return path
else:
raise ExtConfigError(path + " not found")