-
Notifications
You must be signed in to change notification settings - Fork 0
/
client_async.py
135 lines (123 loc) · 4.18 KB
/
client_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python3
"""Pymodbus Aynchronous Client Example.
An example of a single threaded synchronous client.
usage: client_async.py [-h] [--comm {tcp,udp,serial,tls}]
[--framer {ascii,binary,rtu,socket,tls}]
[--log {critical,error,warning,info,debug}]
[--port PORT]
options:
-h, --help show this help message and exit
--comm {tcp,udp,serial,tls}
"serial", "tcp", "udp" or "tls"
--framer {ascii,binary,rtu,socket,tls}
"ascii", "binary", "rtu", "socket" or "tls"
--log {critical,error,warning,info,debug}
"critical", "error", "warning", "info" or "debug"
--port PORT the port to use
The corresponding server must be started before e.g. as:
python3 server_sync.py
"""
import asyncio
import logging
import os
# --------------------------------------------------------------------------- #
# import the various client implementations
# --------------------------------------------------------------------------- #
from examples.helper import get_commandline
from pymodbus.client import (
AsyncModbusSerialClient,
AsyncModbusTcpClient,
AsyncModbusTlsClient,
AsyncModbusUdpClient,
)
_logger = logging.getLogger()
def setup_async_client(args):
"""Run client setup."""
_logger.info("### Create client object")
if args.comm == "tcp":
client = AsyncModbusTcpClient(
"127.0.0.1",
port=args.port, # on which port
# Common optional paramers:
framer=args.framer,
# timeout=10,
# retries=3,
# retry_on_empty=False,
# close_comm_on_error=False,
# strict=True,
# TCP setup parameters
# source_address=("localhost", 0),
)
elif args.comm == "udp":
client = AsyncModbusUdpClient(
"127.0.0.1",
port=args.port,
# Common optional paramers:
framer=args.framer,
# timeout=10,
# retries=3,
# retry_on_empty=False,
# close_comm_on_error=False,
# strict=True,
# UDP setup parameters
# source_address=None,
)
elif args.comm == "serial":
client = AsyncModbusSerialClient(
args.port,
# Common optional paramers:
# framer=ModbusRtuFramer,
# timeout=10,
# retries=3,
# retry_on_empty=False,
# close_comm_on_error=False,
# strict=True,
# Serial setup parameters
# baudrate=9600,
# bytesize=8,
# parity="N",
# stopbits=1,
# handle_local_echo=False,
)
elif args.comm == "tls":
cwd = os.getcwd().split("/")[-1]
if cwd == "examples":
path = "."
elif cwd == "test":
path = "../examples"
else:
path = "examples"
client = AsyncModbusTlsClient(
"127.0.0.1",
port=args.port,
# Common optional paramers:
framer=args.framer,
# timeout=10,
# retries=3,
# retry_on_empty=False,
# close_comm_on_error=False,
# strict=True,
# TLS setup parameters
# sslctx=sslctx,
certfile=f"{path}/certificates/pymodbus.crt",
keyfile=f"{path}/certificates/pymodbus.key",
# password="none",
server_hostname="localhost",
)
return client
async def run_async_client(client, modbus_calls=None):
"""Run sync client."""
_logger.info("### Client starting")
await client.connect()
assert client.protocol
if modbus_calls:
await modbus_calls(client)
await client.close()
_logger.info("### End of Program")
if __name__ == "__main__":
cmd_args = get_commandline(
server=False,
description="Run asynchronous client.",
)
testclient = setup_async_client(cmd_args)
asyncio.run(run_async_client(testclient), debug=True)