Skip to content

Commit

Permalink
Make the mock_device usable as an executable (#16)
Browse files Browse the repository at this point in the history
This executable can be used with socat for creating a mock serial device
on a Linux system.

There are only a handful of basic request/reply strings available in the
mock device. It is not a complete representation of the way a RAVEn
device behaves, but is still useful for smoke testing.
  • Loading branch information
cottsay authored Oct 2, 2023
1 parent f23056b commit 8018b66
Showing 1 changed file with 158 additions and 24 deletions.
182 changes: 158 additions & 24 deletions test/mock_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,172 @@

import asyncio
from contextlib import asynccontextmanager
import sys

import serial_asyncio


class _NoPopList(list):

def pop(self, index):
return self[index]


DEFAULT_RESPONSES = {
b'<Command>'
b'<Name>get_current_price</Name>'
b'<MeterMacId>0xFEDCBA9876543210</MeterMacId>'
b'</Command>': _NoPopList((
b'<PriceCluster>'
b' <DeviceMacId>0x0123456789ABCDEF</DeviceMacId>'
b' <MeterMacId>0xFEDCBA9876543210</MeterMacId>'
b' <TimeStamp>0x29bd58a7</TimeStamp>'
b' <Price>0xc7</Price>'
b' <Currency>0x348</Currency>'
b' <TrailingDigits>0x03</TrailingDigits>'
b' <Tier>0x08</Tier>'
b' <TierLabel>Set by User</TierLabel>'
b' <RateLabel>Set by User</RateLabel>'
b'</PriceCluster>',
)),
b'<Command>'
b'<Name>get_current_summation_delivered</Name>'
b'<MeterMacId>0xFEDCBA9876543210</MeterMacId>'
b'</Command>': _NoPopList((
b'<CurrentSummationDelivered>'
b' <DeviceMacId>0x0123456789ABCDEF</DeviceMacId>'
b' <MeterMacId>0xFEDCBA9876543210</MeterMacId>'
b' <TimeStamp>0x29bd58a7</TimeStamp>'
b' <SummationDelivered>0x00000010</SummationDelivered>'
b' <SummationReceived>0x00000008</SummationReceived>'
b' <Multiplier>0x00000004</Multiplier>'
b' <Divisor>0x00000002</Divisor>'
b' <DigitsRight>0x02</DigitsRight>'
b' <DigitsLeft>0x04</DigitsLeft>'
b' <SuppressLeadingZero>N</SuppressLeadingZero>'
b'</CurrentSummationDelivered>',
)),
b'<Command>'
b'<Name>get_device_info</Name>'
b'</Command>': _NoPopList((
b'<DeviceInfo>'
b' <DeviceMacId>0x0123456789ABCDEF</DeviceMacId>'
b' <InstallCode>0xABCDEF0123456789</InstallCode>'
b' <LinkKey>0xABCDEF0123456789ABCDEF0123456789</LinkKey>'
b' <FWVersion>1.21g</FWVersion>'
b' <HWVersion>5.55 rev 2</HWVersion>'
b' <ImageType>Mocked</ImageType>'
b' <Manufacturer>aioraven</Manufacturer>'
b' <ModelId>mock device</ModelId>'
b' <DateCode>20220101a0000042</DateCode>'
b'</DeviceInfo>',
)),
b'<Command>'
b'<Name>get_instantaneous_demand</Name>'
b'<MeterMacId>0xFEDCBA9876543210</MeterMacId>'
b'</Command>': _NoPopList((
b'<InstantaneousDemand>'
b' <DeviceMacId>0x0123456789ABCDEF</DeviceMacId>'
b' <MeterMacId>0xFEDCBA9876543210</MeterMacId>'
b' <TimeStamp>0x29bd58a7</TimeStamp>'
b' <Demand>0x00000010</Demand>'
b' <Multiplier>0x00000004</Multiplier>'
b' <Divisor>0x00000002</Divisor>'
b' <DigitsRight>0x02</DigitsRight>'
b' <DigitsLeft>0x04</DigitsLeft>'
b' <SuppressLeadingZero>Y</SuppressLeadingZero>'
b'</InstantaneousDemand>',
)),
b'<Command>'
b'<Name>get_meter_info</Name>'
b'<MeterMacId>0xFEDCBA9876543210</MeterMacId>'
b'</Command>': _NoPopList((
b'<MeterInfo>'
b' <DeviceMacId>0x0123456789ABCDEF</DeviceMacId>'
b' <MeterMacId>0xFEDCBA9876543210</MeterMacId>'
b' <MeterType>electric</MeterType>'
b' <NickName>House</NickName>'
b' <Account>8675309</Account>'
b' <Auth>p@ssw0rd</Auth>'
b' <Host>Example, Inc.</Host>'
b' <Enabled>Y</Enabled>'
b'</MeterInfo>',
)),
b'<Command><Name>get_meter_list</Name></Command>': _NoPopList((
b'<MeterList>'
b' <DeviceMacId>0x0123456789ABCDEF</DeviceMacId>'
b' <MeterMacId>0xFEDCBA9876543210</MeterMacId>'
b'</MeterList>',
)),
b'<Command><Name>get_network_info</Name></Command>': _NoPopList((
b'<NetworkInfo>'
b' <DeviceMacId>0x0123456789ABCDEF</DeviceMacId>'
b' <CoordMacId>0xFEDCBA9876543210</CoordMacId>'
b' <Status>Connected</Status>'
b' <Description>Network is operational</Description>'
b' <StatusCode>0x42</StatusCode>'
b' <ExtPanId>0x9876543210ABCDEF</ExtPanId>'
b' <Channel>24</Channel>'
b' <ShortAddr>0x5678</ShortAddr>'
b' <LinkStrength>0x24</LinkStrength>'
b'</NetworkInfo>',
)),
}


async def _device_loop(reader, writer, responses):
buffer = b''
while True:
try:
value = await reader.read(1)
if not value:
break
buffer += value
buffer = buffer.lstrip()
for k, v in responses.items():
if buffer.startswith(k):
if isinstance(v, list):
first = v.pop(0)
if first is not None:
writer.write(first)
if not v:
del responses[k]
else:
if v is not None:
writer.write(v)
del responses[k]
buffer = buffer[len(k):]
break
except asyncio.CancelledError:
if not writer.can_write_eof():
break
writer.write_eof()
await writer.drain()
writer.close()


@asynccontextmanager
async def mock_device(responses=None):
async def mock_device(responses=None, initial_buffer=None):
"""
Create a mock device at a TCP endpoint.
This function creates a context-managed TCP endpoint which echoes verbatim
responses given verbatim requests.
:param dict responses: A mapping of request strings to responses.
:param bytes initial_buffer: Content to initialize the response buffer.
:returns: A tuple including the host and port of the TCP endpoint.
"""
if responses is None:
responses = {}

async def client_connected_impl(reader, writer):
buffer = b''
while True:
try:
value = await reader.read(1)
if not value:
break
buffer += value
buffer = buffer.lstrip()
for k, v in responses.items():
if buffer.startswith(k):
if v:
writer.write(v)
buffer = buffer[len(k):]
del responses[k]
break
except asyncio.CancelledError:
writer.write_eof()
await writer.drain()
writer.close()
responses = DEFAULT_RESPONSES

connections = []

def client_connected(reader, writer):
task = asyncio.create_task(client_connected_impl(reader, writer))
if initial_buffer is not None:
writer.write(initial_buffer)
task = asyncio.create_task(_device_loop(reader, writer, responses))
connections.append(task)
return asyncio.wait_for(task, None)

Expand All @@ -60,3 +184,13 @@ def client_connected(reader, writer):
task.cancel()
if connections:
await asyncio.wait(connections)


async def main(argv=sys.argv):
assert len(argv) == 2
reader, writer = await serial_asyncio.open_serial_connection(url=argv[1])
await _device_loop(reader, writer, DEFAULT_RESPONSES)


if __name__ == '__main__':
sys.exit(asyncio.run(main()))

0 comments on commit 8018b66

Please sign in to comment.