-
Notifications
You must be signed in to change notification settings - Fork 0
/
service.py
executable file
·233 lines (196 loc) · 8.28 KB
/
service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
#!/usr/bin/env python3
from argparse import ArgumentParser, Namespace
import logging
import os
import re
import signal
from subprocess import getstatusoutput
import sys
import time
from types import FrameType
try:
import setproctitle
setproctitle.setproctitle('wacom_ff')
except ImportError:
pass
LOG_PATH = os.path.expanduser('~/.local/share/wacom_ff/service.log')
if not os.path.isdir(os.path.dirname(LOG_PATH)):
os.makedirs(os.path.dirname(LOG_PATH))
logging.basicConfig(filename=LOG_PATH, filemode='a', style='{',
format=('[{levelname:^8}] {asctime} | {module}:{lineno} in {funcName!r}: '
'{message}'))
logger = logging.getLogger('wacom_ff')
SIGNALS = {getattr(signal, a): a for a in dir(signal) if bool(re.match('SIG[^_]', a))}
XRANDR_MONITORS_LINE_RE = re.compile(r'\s*(?P<id_num>\d+): (?:\+|.)(?P<name_1>.+) (?P<width_px>\d+)'
r'/(?P<width_phys>\d+)x(?P<height_px>\d+)/(?P<height_phys>\d+)'
r'\+(?P<x_offset>\d+)\+(?P<y_offset>\d+)\s+(?P<name>.+)')
def xdotool_get_cursor_position():
"""Get the position of the mouse."""
s = time.time()
r, output = getstatusoutput('xdotool getmouselocation')
logger.debug('ran "xdotool getmouselocation" in %.3f s', time.time() - s)
if r:
raise ValueError(f'xdotool exited with code {r}', output)
m = re.fullmatch(r'x:(?P<x>\d+) y:(?P<y>\d+) screen:(?P<screen>\d+)'
r' window:(?P<window_id>\d+)', output.strip())
if not m:
raise ValueError(f'xdotool returned invalid data: {output!r}')
d = m.groupdict()
return int(d['x']), int(d['y'])
class PseudoRange:
"""Implement pseudo ranges."""
def __init__(self, start: int, stop: int, step: int = 1):
self.start, self.stop, self.step = start, stop, step
def __contains__(self, value: int):
return (isinstance(value, int)
and value >= self.start <= value < self.stop
and (value - self.start) % self.step == 0)
def __repr__(self):
return f'range({self.start}, {self.stop}, {self.step})'
class MonitorDummy:
"""Implement a monitor dummy structure."""
def __init__(self, id_num: int, name: str, name_1: str,
width_px: int, height_px: int,
x_offset: int = 0, y_offset: int = 0,
width_phys: float = None, height_phys: float = None):
self.name, self.name_1, self.id_num = name, name_1, id_num
self.width_px, self.height_px = width_px, height_px
self.width_phys, self.height_phys = width_phys, height_phys
self.x_offset, self.y_offset = x_offset, y_offset
def __str__(self):
return (f'<Monitor name={self.name} w={self.width_px}px={self.width_phys}phys '
f'h={self.height_px}px={self.height_phys}phys xoffset={self.x_offset}px '
f'yoffset={self.y_offset}px />')
@property
def x_range(self):
"Return horizontal pixel range."
return PseudoRange(self.x_offset, self.x_offset + self.width_px)
@property
def y_range(self):
"Return vertical pixel range."
return PseudoRange(self.y_offset, self.y_offset + self.height_px)
def __contains__(self, location: tuple):
x, y = location
return x in self.x_range and y in self.y_range
class MonitorConfiguration:
def __init__(self, *monitors):
self.monitors = list(monitors)
def __len__(self):
return len(self.monitors)
def __str__(self):
return ('<MonitorConfiguration>\n '
+ '\n '.join(map(str, self.monitors))
+ '\n</MonitorConfiguration>')
def get_monitor_from_position(self, position: tuple = None):
"""Return the monitor on which the mouse currently is."""
if position is None:
position = xdotool_get_cursor_position()
x, y = position
for m in self.monitors:
if (x, y) in m:
return m
return None
def get_xrandr_monitor_data():
intify = {'id_num', 'width_px', 'height_px', 'x_offset', 'y_offset'}
floatify = {'width_phys', 'height_phys'}
s = time.time()
r, output = getstatusoutput('xrandr --listactivemonitors')
logger.debug('ran "xrandr --listactivemonitors" in %.3f s', time.time() - s)
if r:
raise ValueError(f'xrandr failed with code {r}', output)
lines = list(map(str.strip, output.split('\n')))
out = {}
for l in lines:
m = XRANDR_MONITORS_LINE_RE.fullmatch(l)
if not m:
continue
d = m.groupdict()
for k in intify:
d[k] = int(d[k])
for k in floatify:
d[k] = float(d[k])
out[d['id_num']] = MonitorDummy(**d)
return MonitorConfiguration(*out.values())
class WacomService:
"""Implement a focus following service."""
def __init__(self, args: Namespace):
try:
self.monitor_config = get_xrandr_monitor_data()
except Exception as e:
logger.critical('Failed to get monitor config: %r', e)
sys.exit(1)
self.options = args
self.every = args.every
self.device = args.device
self.always_poll = args.always_poll
def start(self):
"""Start the service."""
logger.info('Starting...')
signal.signal(signal.SIGPOLL, self.poll)
signal.signal(signal.SIGALRM, self.poll)
signal.signal(signal.SIGUSR1, lambda *a: self.stop())
if self.every > 0:
signal.alarm(self.every)
return self
def stop(self):
"""Stop the service."""
logger.info('Stopping...')
signal.signal(signal.SIGPOLL, signal.SIG_DFL)
signal.signal(signal.SIGALRM, signal.SIG_DFL)
signal.signal(signal.SIGUSR1, lambda *a: self.start())
if self.every > 0:
signal.alarm(0)
def poll(self, sig: int = None, _: FrameType = None):
"""Set the drawing area."""
s = time.time()
logger.debug('Caught signal %s', SIGNALS.get(sig, sig))
if sig == signal.SIGPOLL or self.always_poll:
logger.info('Reloading monitor config')
if not self.reload_monitor_config():
logger.warning('Discarding poll event because the monitor config could not be '
'reloaded!')
return
if len(self.monitor_config) < 2:
logger.warning('Only one monitor found! Ignoring event!')
return
m = self.monitor_config.get_monitor_from_position()
if m is None:
logger.error('Could not determine the focussed monitor! Reloading monitor config.')
self.reload_monitor_config()
return
cmd = 'xsetwacom set %r MapToOutput %r' % (self.device, m.name)
logger.debug('Running %s', cmd)
r, o = getstatusoutput(cmd)
if r:
logger.error('xsetwacom exited with code %r: %s', r, o)
if self.every > 0:
signal.alarm(self.every)
logger.debug('Poll handling took %.3f s', time.time() - s)
def reload_monitor_config(self) -> bool:
try:
self.monitor_config = get_xrandr_monitor_data()
logger.info('Monitor config reloaded:\n%s', self.monitor_config)
return True
except Exception:
logger.error('Failed to load monitor config!')
return False
ap = ArgumentParser(prog='wacom_ff', conflict_handler='resolve')
ap.add_argument('--every', type=int, default=0, metavar='N',
help='Automatically update ever N seconds')
ap.add_argument('--log-level', type=int, default=20,
help='Set log level (high values mean quiet operation)')
ap.add_argument('--device', required=True, help='Which wacom device to handle')
ap.add_argument('--always-poll', action='store_true',
help='Wether to always reload the monitor configuration (slower)')
if __name__ == '__main__':
args = ap.parse_args()
logger.setLevel(args.log_level)
s = WacomService(args).start()
def quit_service(sig: int, _: FrameType = None):
"""Stop the service and quit."""
logger.info('Caught signal %s. Quitting now.', SIGNALS.get(sig, sig))
s.stop()
sys.exit(0)
signal.signal(signal.SIGINT, quit_service)
while True:
time.sleep(60)