-
Notifications
You must be signed in to change notification settings - Fork 29
/
kwp2000.py
executable file
·275 lines (213 loc) · 9.39 KB
/
kwp2000.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
#!/usr/bin/env python3
import struct
from enum import IntEnum
from panda import Panda # type: ignore
from tp20 import TP20Transport
class NegativeResponseError(Exception):
def __init__(self, message, service_id, error_code):
super().__init__()
self.message = message
self.service_id = service_id
self.error_code = error_code
def __str__(self):
return self.message
class InvalidServiceIdError(Exception):
pass
class InvalidSubFunctionError(Exception):
pass
class SERVICE_TYPE(IntEnum):
DIAGNOSTIC_SESSION_CONTROL = 0x10
ECU_RESET = 0x11
READ_FREEZE_FRAME_DATA = 0x12
READ_DIAGNOSTIC_TROUBLE_CODES = 0x13
CLEAR_DIAGNOSTIC_INFORMATION = 0x14
READ_STATUS_OF_DIAGNOSTIC_TROUBLE_CODES = 0x17
READ_DIAGNOSITC_TROUBE_CODES_BY_STATUS = 0x18
READ_ECU_IDENTIFICATION = 0x1A
STOP_DIAGNOSTIC_SESSION = 0x20
READ_DATA_BY_LOCAL_IDENTIFIER = 0x21
READ_DATA_BY_COMMON_IDENTIFIER = 0x22
READ_MEMORY_BY_ADDRESS = 0x23
SET_DATA_RATES = 0x26
SECURITY_ACCESS = 0x27
DYNAMICALLY_DEFINE_LOCAL_IDENTIFIER = 0x2C
WRITE_DATA_BY_COMMON_IDENTIFIER = 0x2E
INPUT_OUTPUT_CONTROL_BY_COMMON_IDENTIFIER = 0x2F
INPUT_OUTPUT_CONTROL_BY_LOCAL_IDENTIFIER = 0x30
START_ROUTINE_BY_LOCAL_IDENTIFIER = 0x31
STOP_ROUTINE_BY_LOCAL_IDENTIFIER = 0x32
REQUEST_ROUTINE_RESULTS_BY_LOCAL_IDENTIFIER = 0x33
REQUEST_DOWNLOAD = 0x34
REQUEST_UPLOAD = 0x35
TRANSFER_DATA = 0x36
REQUEST_TRANSFER_EXIT = 0x37
START_ROUTINE_BY_ADDRESS = 0x38
STOP_ROUTINE_BY_ADDRESS = 0x39
REQUEST_ROUTINE_RESULTS_BY_ADDRESS = 0x3A
WRITE_DATA_BY_LOCAL_IDENTIFIER = 0x3B
WRITE_MEMORY_BY_ADDRESS = 0x3D
TESTER_PRESENT = 0x3E
ESC_CODE = 0x80
STOP_COMMUNICATION = 0x82
class ROUTINE_CONTROL_TYPE(IntEnum):
ERASE_FLASH = 0xC4
CALCULATE_FLASH_CHECKSUM = 0xC5
class ECU_IDENTIFICATION_TYPE(IntEnum):
ECU_IDENT = 0x9B
STATUS_FLASH = 0x9C
class SESSION_TYPE(IntEnum):
PROGRAMMING = 0x85
ENGINEERING_MODE = 0x86
DIAGNOSTIC = 0x89
class ACCESS_TYPE(IntEnum):
PROGRAMMING_REQUEST_SEED = 1
PROGRAMMING_SEND_KEY = 2
REQUEST_SEED = 3
SEND_KEY = 4
class COMPRESSION_TYPE(IntEnum):
UNCOMPRESSED = 0x0
class ENCRYPTION_TYPE(IntEnum):
UNENCRYPTED = 0x0
_negative_response_codes = {
0x10: "generalReject",
0x11: "serviceNotSupported",
0x12: "subFunctionNotSupported-invalidFormat",
0x21: "busy-RepeatRequest",
0x22: "conditionsNotCorrect or requestSequenceError",
0x23: "routineNotComplete",
0x31: "requestOutOfRange",
0x33: "securityAccessDenied",
0x35: "invalidKey",
0x36: "exceedNumberOfAttempts",
0x37: "requiredTimeDelayNotExpired",
0x40: "downloadNotAccepted",
0x41: "improperDownloadType",
0x42: "cantDownloadToSpecifiedAddress",
0x43: "cantDownloadNumberOfBytesRequested",
0x50: "uploadNotAccepted",
0x51: "improperUploadType",
0x52: "cantUploadFromSpecifiedAddress",
0x53: "cantUploadNumberOfBytesRequested",
0x71: "transferSuspended",
0x72: "transferAborted",
0x74: "illegalAddressInBlockTransfer",
0x75: "illegalByteCountInBlockTransfer",
0x76: "illegalBlockTransferType",
0x77: "blockTransferDataChecksumError",
0x78: "reqCorrectlyRcvd-RspPending(requestCorrectlyReceived-ResponsePending)",
0x79: "incorrectByteCountDuringBlockTransfer",
}
class KWP2000Client:
def __init__(self, transport: TP20Transport, debug: bool = False):
self.transport = transport
self.debug = debug
def _kwp(self, service_type: SERVICE_TYPE, subfunction: int = None, data: bytes = None) -> bytes:
req = bytes([service_type])
if subfunction is not None:
req += bytes([subfunction])
if data is not None:
req += data
if self.debug:
print(f"KWP TX: {req.hex()}")
self.transport.send(req)
resp = self.transport.recv()
if self.debug:
print(f"KWP RX: {resp.hex()}")
resp_sid = resp[0] if len(resp) > 0 else None
# negative response
if resp_sid == 0x7F:
service_id = resp[1] if len(resp) > 1 else -1
try:
service_desc = SERVICE_TYPE(service_id).name
except BaseException:
service_desc = "NON_STANDARD_SERVICE"
error_code = resp[2] if len(resp) > 2 else -1
try:
error_desc = _negative_response_codes[error_code]
except BaseException:
error_desc = resp[3:].hex()
raise NegativeResponseError("{} - {}".format(service_desc, error_desc), service_id, error_code)
# positive response
if service_type + 0x40 != resp_sid:
resp_sid_hex = hex(resp_sid) if resp_sid is not None else None
raise InvalidServiceIdError("invalid response service id: {}".format(resp_sid_hex))
# check subfunction
if subfunction is not None:
resp_sfn = resp[1] if len(resp) > 1 else None
if subfunction != resp_sfn:
resp_sfn_hex = hex(resp_sfn) if resp_sfn is not None else None
raise InvalidSubFunctionError(f"invalid response subfunction: {resp_sfn_hex:x}")
# return data (exclude service id and sub-function id)
return resp[(1 if subfunction is None else 2) :]
def diagnostic_session_control(self, session_type: SESSION_TYPE):
self._kwp(SERVICE_TYPE.DIAGNOSTIC_SESSION_CONTROL, subfunction=session_type)
def security_access(self, access_type: ACCESS_TYPE, security_key: bytes = b""):
request_seed = access_type % 2 != 0
if request_seed and len(security_key) != 0:
raise ValueError("security_key not allowed")
if not request_seed and len(security_key) == 0:
raise ValueError("security_key is missing")
return self._kwp(SERVICE_TYPE.SECURITY_ACCESS, subfunction=access_type, data=security_key)
def read_ecu_identifcation(self, data_identifier_type: ECU_IDENTIFICATION_TYPE):
return self._kwp(SERVICE_TYPE.READ_ECU_IDENTIFICATION, data_identifier_type)
def request_download(
self,
memory_address: int,
uncompressed_size: int,
compression_type: COMPRESSION_TYPE = COMPRESSION_TYPE.UNCOMPRESSED,
encryption_type: ENCRYPTION_TYPE = ENCRYPTION_TYPE.UNENCRYPTED,
):
if memory_address > 0xFFFFFF:
raise ValueError(f"invalid memory_address {memory_address}")
if uncompressed_size > 0xFFFFFF:
raise ValueError(f"invalid uncompressed_size {uncompressed_size}")
addr = struct.pack(">L", memory_address)[1:]
size = struct.pack(">L", uncompressed_size)[1:]
data = addr + bytes([(compression_type << 4) | encryption_type]) + size
ret = self._kwp(SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data)
if len(ret) == 1:
return struct.unpack(">B", ret)[0]
elif len(ret) == 2:
return struct.unpack(">H", ret)[0]
else:
raise ValueError(f"Invalid response {ret.hex()}")
def start_routine_by_local_identifier(self, routine_control: ROUTINE_CONTROL_TYPE, data: bytes) -> bytes:
return self._kwp(SERVICE_TYPE.START_ROUTINE_BY_LOCAL_IDENTIFIER, routine_control, data)
def request_routine_results_by_local_identifier(self, routine_control: ROUTINE_CONTROL_TYPE) -> bytes:
return self._kwp(SERVICE_TYPE.REQUEST_ROUTINE_RESULTS_BY_LOCAL_IDENTIFIER, routine_control)
def erase_flash(self, start_address: int, end_address: int) -> bytes:
if start_address > 0xFFFFFF:
raise ValueError(f"invalid start_address {start_address}")
if end_address > 0xFFFFFF:
raise ValueError(f"invalid end_address {end_address}")
start = struct.pack(">L", start_address)[1:]
end = struct.pack(">L", end_address)[1:]
return self.start_routine_by_local_identifier(ROUTINE_CONTROL_TYPE.ERASE_FLASH, start + end)
def calculate_flash_checksum(self, start_address: int, end_address: int, checksum: int) -> bytes:
if start_address > 0xFFFFFF:
raise ValueError(f"invalid start_address {start_address}")
if end_address > 0xFFFFFF:
raise ValueError(f"invalid end_address {end_address}")
if checksum > 0xFFFF:
raise ValueError(f"invalid checksum {checksum}")
start = struct.pack(">L", start_address)[1:]
end = struct.pack(">L", end_address)[1:]
chk = struct.pack(">H", checksum)
return self.start_routine_by_local_identifier(ROUTINE_CONTROL_TYPE.CALCULATE_FLASH_CHECKSUM, start + end + chk)
def transfer_data(self, data: bytes) -> bytes:
return self._kwp(SERVICE_TYPE.TRANSFER_DATA, data=data)
def request_transfer_exit(self) -> bytes:
return self._kwp(SERVICE_TYPE.REQUEST_TRANSFER_EXIT)
def stop_communication(self) -> bytes:
return self._kwp(SERVICE_TYPE.STOP_COMMUNICATION)
if __name__ == "__main__":
p = Panda()
p.can_clear(0xFFFF)
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
tp20 = TP20Transport(p, 0x9, debug=False)
kwp_client = KWP2000Client(tp20, debug=True)
kwp_client.diagnostic_session_control(SESSION_TYPE.DIAGNOSTIC)
ident = kwp_client.read_ecu_identifcation(ECU_IDENTIFICATION_TYPE.ECU_IDENT)
print(f"Part Number {ident[:10]}")
status = kwp_client.read_ecu_identifcation(ECU_IDENTIFICATION_TYPE.STATUS_FLASH)
print("Flash status", status)