-
Notifications
You must be signed in to change notification settings - Fork 28
/
uModBusTCP.py
133 lines (89 loc) · 5.33 KB
/
uModBusTCP.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#Source: https://github.com/pycom/pycom-modbus/tree/master/uModbus (2018-07-16)
#This file has been modified and differ from its source version.
import uModBusFunctions as functions
import uModBusConst as Const
import struct
import socket
import random
class uModBusTCP:
def __init__(self, slave_ip, slave_port=502, timeout=5):
self._sock = socket.socket()
self._sock.connect(socket.getaddrinfo(slave_ip, slave_port)[0][-1])
self._sock.settimeout(timeout)
def _create_mbap_hdr(self, slave_id, modbus_pdu):
trans_id = random.randint(0, 65535) & 0xFFFF
mbap_hdr = struct.pack('>HHHB', trans_id, 0, len(modbus_pdu) + 1, slave_id)
return mbap_hdr, trans_id
def _bytes_to_bool(self, byte_list):
bool_list = []
for index, byte in enumerate(byte_list):
bool_list.extend([bool(byte & (1 << n)) for n in range(8)])
return bool_list
def _to_short(self, byte_array, signed=True):
response_quantity = int(len(byte_array) / 2)
fmt = '>' + (('h' if signed else 'H') * response_quantity)
return struct.unpack(fmt, byte_array)
def _validate_resp_hdr(self, response, trans_id, slave_id, function_code, count=False):
rec_tid, rec_pid, rec_len, rec_uid, rec_fc = struct.unpack('>HHHBB', response[:Const.MBAP_HDR_LENGTH + 1])
if (trans_id != rec_tid):
raise ValueError('wrong transaction Id')
if (rec_pid != 0):
raise ValueError('invalid protocol Id')
if (slave_id != rec_uid):
raise ValueError('wrong slave Id')
if (rec_fc == (function_code + Const.ERROR_BIAS)):
raise ValueError('slave returned exception code: {:d}'.format(rec_fc))
hdr_length = (Const.MBAP_HDR_LENGTH + 2) if count else (Const.MBAP_HDR_LENGTH + 1)
return response[hdr_length:]
def _send_receive(self, slave_id, modbus_pdu, count):
mbap_hdr, trans_id = self._create_mbap_hdr(slave_id, modbus_pdu)
self._sock.send(mbap_hdr + modbus_pdu)
response = self._sock.recv(256)
modbus_data = self._validate_resp_hdr(response, trans_id, slave_id, modbus_pdu[0], count)
return modbus_data
def read_coils(self, slave_addr, starting_addr, coil_qty):
modbus_pdu = functions.read_coils(starting_addr, coil_qty)
response = self._send_receive(slave_addr, modbus_pdu, True)
status_pdu = self._bytes_to_bool(response)
return status_pdu
def read_discrete_inputs(self, slave_addr, starting_addr, input_qty):
modbus_pdu = functions.read_discrete_inputs(starting_addr, input_qty)
response = self._send_receive(slave_addr, modbus_pdu, True)
status_pdu = self._bytes_to_bool(response)
return status_pdu
def read_holding_registers(self, slave_addr, starting_addr, register_qty, signed = True):
modbus_pdu = functions.read_holding_registers(starting_addr, register_qty)
response = self._send_receive(slave_addr, modbus_pdu, True)
register_value = self._to_short(response, signed)
return register_value
def read_input_registers(self, slave_addr, starting_address, register_quantity, signed = True):
modbus_pdu = functions.read_input_registers(starting_address, register_quantity)
response = self._send_receive(slave_addr, modbus_pdu, True)
register_value = self._to_short(response, signed)
return register_value
def write_single_coil(self, slave_addr, output_address, output_value):
modbus_pdu = functions.write_single_coil(output_address, output_value)
response = self._send_receive(slave_addr, modbus_pdu, False)
operation_status = functions.validate_resp_data(response, Const.WRITE_SINGLE_COIL,
output_address, value=output_value, signed=False)
return operation_status
def write_single_register(self, slave_addr, register_address, register_value, signed=True):
modbus_pdu = functions.write_single_register(register_address, register_value, signed)
response = self._send_receive(slave_addr, modbus_pdu, False)
operation_status = functions.validate_resp_data(response, Const.WRITE_SINGLE_REGISTER,
register_address, value=register_value, signed=signed)
return operation_status
def write_multiple_coils(self, slave_addr, starting_address, output_values):
modbus_pdu = functions.write_multiple_coils(starting_address, output_values)
response = self._send_receive(slave_addr, modbus_pdu, False)
operation_status = functions.validate_resp_data(response, Const.WRITE_MULTIPLE_COILS,
starting_address, quantity=len(output_values))
return operation_status
def write_multiple_registers(self, slave_addr, starting_address, register_values, signed=True):
modbus_pdu = functions.write_multiple_registers(starting_address, register_values, signed)
response = self._send_receive(slave_addr, modbus_pdu, False)
operation_status = functions.validate_resp_data(response, Const.WRITE_MULTIPLE_REGISTERS,
starting_address, quantity=len(register_values))
return operation_status
def close(self):
self._sock.close()