-
Notifications
You must be signed in to change notification settings - Fork 9
/
cov-client-shell.py
203 lines (164 loc) · 5.5 KB
/
cov-client-shell.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
"""
Simple example that creates a change-of-value context and dumps out the
property identifier and its value from the notifications it receives.
The device address and monitored object identifier are the only required
parameters to `change_of_value()`.
"""
# from __future__ import annotations
import asyncio
from typing import Callable
from bacpypes3.debugging import bacpypes_debugging, ModuleLogger
from bacpypes3.argparse import SimpleArgumentParser
from bacpypes3.app import Application
from bacpypes3.cmd import Cmd
from bacpypes3.comm import bind
from bacpypes3.console import Console
from bacpypes3.pdu import Address
from bacpypes3.primitivedata import ObjectIdentifier
from bacpypes3.basetypes import PropertyReference
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
assigned_ids = 0
running_processes = {}
@bacpypes_debugging
class Snork:
def __init__(
self,
device_address: Address,
object_identifier: ObjectIdentifier,
property_identifier: PropertyReference,
):
global assigned_ids
# give this a name
assigned_ids += 1
self.process_identifier = assigned_ids
if _debug:
Snork._debug("__init__(%s)", self.process_identifier)
# keep track of the args
self.device_address = device_address
self.object_identifier = object_identifier
self.property_identifier = property_identifier
# create a fini event
self.fini = asyncio.Event()
self.task = None
async def run(self):
if _debug:
Snork._debug("run(%s)", self.process_identifier)
try:
# use a SubscriptionContextManager
async with app.change_of_value(
self.device_address,
self.object_identifier,
self.process_identifier,
True, # args.confirmed,
30, # args.lifetime,
) as scm:
if _debug:
_log.debug(" - scm: %r", scm)
while not self.fini.is_set():
incoming: asyncio.Future = asyncio.ensure_future(scm.get_value())
done, pending = await asyncio.wait(
[incoming, self.fini.wait()],
return_when=asyncio.FIRST_COMPLETED,
)
# cancel pending tasks to avoid leaking them
for task in pending:
task.cancel()
# send incoming messages up the stack
if incoming in done:
property_identifier, property_value = incoming.result()
print(
f"{self.process_identifier} {property_identifier} {property_value}"
)
if _debug:
_log.debug(f"{self.process_identifier} exited context")
except Exception as err:
if _debug:
_log.debug(" - exception: %r", err)
def stop(self):
if _debug:
Snork._debug("stop(%s)", self.process_identifier)
# set the finished event
self.fini.set()
@bacpypes_debugging
class SnorkCmd(Cmd):
"""
Snork Cmd
"""
_debug: Callable[..., None]
async def do_start(
self,
device_address: Address,
object_identifier: ObjectIdentifier,
property_reference: PropertyReference,
) -> None:
"""
usage: start address objid prop[indx]
"""
if _debug:
SnorkCmd._debug(
"do_start %r %r %r",
device_address,
object_identifier,
property_reference,
)
global running_processes
# create an instance
snork = Snork(
device_address, object_identifier, property_reference.propertyIdentifier
)
if _debug:
SnorkCmd._debug(" - snork: %r", snork)
# save a reference
running_processes[snork.process_identifier] = snork
# stuff the task into the snork
snork.task = asyncio.create_task(snork.run())
if _debug:
SnorkCmd._debug(" - task: %r", snork.task)
async def do_stop(
self,
process_identifier: int,
) -> None:
"""
usage: stop procid
"""
if _debug:
SnorkCmd._debug("do_stop %r", process_identifier)
if process_identifier not in running_processes:
await self.response("object identifier expected")
return
# find the snork
snork = running_processes.pop(process_identifier)
if _debug:
SnorkCmd._debug(" - snork: %r", snork)
# tell it to stop and wait for it to complete
snork.stop()
await snork.task
async def main() -> None:
global app
app = None
try:
parser = SimpleArgumentParser()
args = parser.parse_args()
if _debug:
_log.debug("args: %r", args)
# build a very small stack
console = Console()
cmd = SnorkCmd()
bind(console, cmd)
# build an application
app = Application.from_args(args)
if _debug:
_log.debug("app: %r", app)
# wait until the user is done
await console.fini.wait()
except KeyboardInterrupt:
if _debug:
_log.debug("keyboard interrupt")
finally:
if app:
app.close()
if __name__ == "__main__":
asyncio.run(main())