-
Notifications
You must be signed in to change notification settings - Fork 4
/
demo.py
223 lines (186 loc) · 6.86 KB
/
demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
"""Demo."""
import asyncio
import cmd
import logging
import aiopulse
import functools
from typing import (
Any,
Callable,
Optional,
)
logging.basicConfig()
_LOGGER = logging.getLogger('aiopulse.hub')
async def discover(prompt):
"""Task to discover all hubs on the local network."""
print("Starting hub discovery")
async for hub in aiopulse.Hub.discover():
if hub.id not in prompt.hubs:
prompt.add_hub(hub)
class HubPrompt(cmd.Cmd):
"""Prompt command line class based on cmd."""
def __init__(self, event_loop):
"""Init command interface."""
self.hubs = {}
self.event_loop = event_loop
self.running = True
super().__init__()
def add_job(self, target: Callable[..., Any], *args: Any) -> None:
"""Add job to the executor pool.
target: target to call.
args: parameters for method to call.
"""
if target is None:
raise ValueError("Don't call add_job with None")
self.event_loop.call_soon_threadsafe(self.async_add_job, target, *args)
def async_add_job(
self, target: Callable[..., Any], *args: Any
) -> Optional[asyncio.Future]:
"""Add a job from within the event loop.
This method must be run in the event loop.
target: target to call.
args: parameters for method to call.
"""
task = None
# Check for partials to properly determine if coroutine function
check_target = target
while isinstance(check_target, functools.partial):
check_target = check_target.func
if asyncio.iscoroutine(check_target):
task = self.event_loop.create_task(target) # type: ignore
elif asyncio.iscoroutinefunction(check_target):
task = self.event_loop.create_task(target(*args))
else:
task = self.event_loop.run_in_executor( # type: ignore
None, target, *args
)
return task
def add_hub(self, hub):
"""Add a hub to the prompt."""
self.hubs[hub.id] = hub
hub.callback_subscribe(self.hub_update_callback)
print("Hub added to prompt")
async def hub_update_callback(self, update_type):
"""Called when a hub reports that its information is updated."""
print(f"Hub {update_type.name} updated")
def _get_roller(self, args):
"""Return roller based on string argument."""
try:
hub_id = int(args[0]) - 1
roller_id = int(args[1]) - 1
return list(list(self.hubs.values())[hub_id].rollers.values())[roller_id]
except Exception:
print("Invalid arguments {}".format(args))
return None
def do_discover(self, args):
"""Command to discover all hubs on the local network."""
self.add_job(discover, self)
def do_update(self, args):
"""Command to ask all hubs to send their information."""
for hub in self.hubs.values():
print("Sending update command to hub {}".format(hub.id))
self.add_job(hub.update)
def do_list(self, args):
"""Command to list all hubs, rollers, rooms, and scenes."""
print("Listing hubs...")
hub_id = 0
for hub in self.hubs.values():
hub_id += 1
print(f"Hub {hub_id}: {hub}")
roller_id = 0
for roller in hub.rollers.values():
roller_id += 1
print(f"Roller {roller_id}: {roller}")
room_id = 0
for room in hub.rooms.values():
room_id += 1
print(f"Room {room_id}: {room}")
scene_id = 0
for scene in hub.scenes.values():
scene_id += 1
print(f"Scene {scene_id}: {scene}")
timer_id = 0
for timer in hub.timers.values():
timer_id += 1
print(f"Timer {timer_id}: {timer}")
def do_moveto(self, sargs):
"""Command to tell a roller to move a % closed."""
print("Sending move to")
args = sargs.split()
roller = self._get_roller(args)
if roller:
position = int(args[2])
print("Sending blind move to {}".format(roller.name))
self.add_job(roller.move_to, position)
def do_close(self, sargs):
"""Command to close a roller."""
args = sargs.split()
roller = self._get_roller(args)
if roller:
print("Sending blind down to {}".format(roller.name))
self.add_job(roller.move_down)
def do_open(self, sargs):
"""Command to open a roller."""
args = sargs.split()
roller = self._get_roller(args)
if roller:
print("Sending blind up to {}".format(roller.name))
self.add_job(roller.move_up)
def do_stop(self, sargs):
"""Command to stop a moving roller."""
args = sargs.split()
roller = self._get_roller(args)
if roller:
print("Sending blind stop to {}".format(roller.name))
self.add_job(roller.move_stop)
def do_health(self, sargs):
"""Command to get health of a roller."""
args = sargs.split()
roller = self._get_roller(args)
if roller:
print("Sending get health to {}".format(roller.name))
self.add_job(roller.get_health)
def do_connect(self, sargs):
"""Command to connect all hubs."""
for hub in self.hubs.values():
self.add_job(hub.run)
def do_disconnect(self, sargs):
"""Command to disconnect all connected hubs."""
for hub in self.hubs.values():
self.add_job(hub.stop)
def do_log(self, sargs):
"""Change logging level."""
if sargs == "critical":
_LOGGER.setLevel(logging.CRITICAL)
print("Log level set to critical")
elif sargs == "error":
_LOGGER.setLevel(logging.ERROR)
print("Log level set to error")
elif sargs == "warning":
_LOGGER.setLevel(logging.WARNING)
print("Log level set to warning")
elif sargs == "info":
_LOGGER.setLevel(logging.INFO)
print("Log level set to info")
elif sargs == "debug":
_LOGGER.setLevel(logging.DEBUG)
print("Log level set to debug")
else:
print("Valid log levels are critical, error, warning, info, and debug.")
def do_exit(self, arg):
"""Command to exit."""
print("Exiting")
self.running = False
return True
async def main():
"""Test code."""
event_loop = asyncio.get_running_loop()
prompt = HubPrompt(event_loop)
prompt.prompt = "> "
tasks = [
event_loop.run_in_executor(None, prompt.cmdloop),
]
await asyncio.wait(tasks)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())