From ab6f6314666d46b14a0beb849753ce2b15e89e2d Mon Sep 17 00:00:00 2001 From: Nils Weiss Date: Wed, 24 Feb 2021 15:59:14 +0100 Subject: [PATCH] Improve stability of gmlan utils tests --- test/contrib/automotive/gm/gmlanutils.uts | 157 +++++++++++++++++++--- 1 file changed, 135 insertions(+), 22 deletions(-) diff --git a/test/contrib/automotive/gm/gmlanutils.uts b/test/contrib/automotive/gm/gmlanutils.uts index 7a9ff4746bd..8ee65090aa8 100644 --- a/test/contrib/automotive/gm/gmlanutils.uts +++ b/test/contrib/automotive/gm/gmlanutils.uts @@ -25,8 +25,10 @@ load_contrib("automotive.gm.gmlanutils", globals_dict=globals()) + GMLAN_RequestDownload Tests ############################################################################## = Positive, immediate positive response -ecusimSuccessfullyExecuted = True +drain_bus(iface0) +ecusimSuccessfullyExecuted = False started = threading.Event() + def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True @@ -35,6 +37,8 @@ def ecusim(): pkt = GMLAN()/GMLAN_RD(memorySize=4) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False + else: + ecusimSuccessfullyExecuted = True ack = b"\x74" isotpsock2.send(ack) @@ -44,11 +48,13 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base started.wait(timeout=5) res = GMLAN_RequestDownload(isotpsock, 4, timeout=1) == True thread.join(timeout=5) - assert res +assert res assert ecusimSuccessfullyExecuted == True = Negative, immediate negative response + +drain_bus(iface0) started = threading.Event() def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: @@ -65,11 +71,14 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base assert res = Negative, timeout + +drain_bus(iface0) with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, basecls=GMLAN) as isotpsock: assert GMLAN_RequestDownload(isotpsock, 4, timeout=1) == False ############################ Response pending = Positive, after response pending +drain_bus(iface0) started = threading.Event() def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: @@ -84,13 +93,12 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base thread.start() started.wait(timeout=5) res = GMLAN_RequestDownload(isotpsock, 4, timeout=2) == True - join = thread.join(timeout=5) - print("JOIN", join) - print("Result", repr(res)) + thread.join(timeout=5) assert res = Positive, hold response pending for several messages -tout = 0.8 +drain_bus(iface0) +tout = 1 repeats = 4 started = threading.Event() def ecusim(): @@ -109,10 +117,8 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base started.wait(timeout=5) starttime = time.time() # may be inaccurate -> on some systems only seconds precision result = GMLAN_RequestDownload(isotpsock, 4, timeout=repeats*tout+0.5) - endtime = time.time() - join = thread.join(timeout=5) - print("Result", repr(result)) - print("Join", join) + endtime = time.time() + 1 + thread.join(timeout=5) assert result print(endtime - starttime) print(tout * (repeats - 1)) @@ -120,6 +126,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base = Negative, negative response after response pending +drain_bus(iface0) started = threading.Event() def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: @@ -138,8 +145,8 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base assert res - = Negative, timeout after response pending +drain_bus(iface0) started = threading.Event() def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: @@ -158,6 +165,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base = Positive, pending message from different service interferes while pending +drain_bus(iface0) started = threading.Event() def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: @@ -165,8 +173,11 @@ def ecusim(): pending = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78) isotpsock2.send(pending) wrongservice = GMLAN()/GMLAN_NR(requestServiceId=0x36, returnCode=0x78) + time.sleep(0.1) isotpsock2.send(wrongservice) + time.sleep(0.1) isotpsock2.send(pending) + time.sleep(0.1) ack = b"\x74" isotpsock2.send(ack) @@ -179,15 +190,19 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base assert res = Positive, negative response from different service interferes while pending +drain_bus(iface0) started = threading.Event() def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) pending = GMLAN()/GMLAN_NR(requestServiceId=0x34, returnCode=0x78) isotpsock2.send(pending) + time.sleep(0.1) wrongservice = GMLAN()/GMLAN_NR(requestServiceId=0x36, returnCode=0x22) isotpsock2.send(wrongservice) + time.sleep(0.1) isotpsock2.send(pending) + time.sleep(0.1) ack = b"\x74" isotpsock2.send(ack) @@ -201,6 +216,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base ################### RETRY = Positive, first: immediate negative response, retry: Positive +drain_bus(iface0) started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted @@ -237,6 +253,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base + GMLAN_TransferData Tests ############################################################################## = Positive, short payload, scheme = 4 +drain_bus(iface0) conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" ecusimSuccessfullyExecuted = True @@ -263,6 +280,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base assert ecusimSuccessfullyExecuted == True = Positive, short payload, scheme = 3 +drain_bus(iface0) conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 3 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" ecusimSuccessfullyExecuted = True @@ -289,6 +307,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base assert ecusimSuccessfullyExecuted == True = Positive, short payload, scheme = 2 +drain_bus(iface0) conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 2 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" ecusimSuccessfullyExecuted = True @@ -315,6 +334,7 @@ with new_can_socket0() as isocan2, ISOTPSocket(isocan2, sid=0x242, did=0x642, ba assert ecusimSuccessfullyExecuted == True = Negative, short payload +drain_bus(iface0) conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" ecusimSuccessfullyExecuted = True @@ -323,7 +343,7 @@ def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: - requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) + isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) nr = GMLAN() / GMLAN_NR(requestServiceId=0x36, returnCode=0x22) isotpsock2.send(nr) @@ -335,11 +355,14 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base thread.join(timeout=5) assert res +drain_bus(iface0) = Negative, timeout with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, basecls=GMLAN) as isotpsock: assert GMLAN_TransferData(isotpsock, 0x4000, payload, timeout=1) == False +drain_bus(iface0) + = Positive, long payload conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" @@ -375,21 +398,22 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base # +drain_bus(iface0) = Positive, first part of payload succeeds, second pending, then fails, retry succeeds conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" started = threading.Event() def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: - requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) + isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = b"\x76" # second package with inscreased address - requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) + isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) pending = GMLAN() / GMLAN_NR(requestServiceId=0x36, returnCode=0x78) isotpsock2.send(pending) time.sleep(0.1) nr = GMLAN() / GMLAN_NR(requestServiceId=0x36, returnCode=0x22) - requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(nr)) + isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(nr)) ack = b"\x76" isotpsock2.send(ack) @@ -402,19 +426,24 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base assert res ############ +drain_bus(iface0) = Positive, maxmsglen length check -> message is split automatically * TODO: This test causes an error in ISOTPSoftSockets exit_if_no_isotp_module() + conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" ecusimSuccessfullyExecuted = True sim_started = threading.Event() -started = threading.Event() def ecusim(): global ecusimSuccessfullyExecuted ecusimSuccessfullyExecuted= True - with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: + if ISOTP_KERNEL_MODULE_AVAILABLE: + isosock = lambda sock: ISOTPSocket(sock, sid=0x642, did=0x242, basecls=GMLAN) + else: + isosock = lambda sock: ISOTPSocket(sock, sid=0x642, did=0x242, basecls=GMLAN, rx_separation_time_min=2) + with new_can_socket0() as isocan, isosock(isocan) as isotpsock2: requ = isotpsock2.sniff(count=1, timeout=3, started_callback=sim_started.set) pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000000, dataRecord=payload*511+payload[:1]) @@ -423,6 +452,7 @@ def ecusim(): return ack = b"\x76" # second package with inscreased address + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=3, started_callback=lambda: isotpsock2.send(ack)) pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000FF9, dataRecord=payload[1:]) @@ -430,6 +460,7 @@ def ecusim(): ecusimSuccessfullyExecuted = False return ack = b"\x76" + time.sleep(0.1) isotpsock2.send(ack) thread = threading.Thread(target=ecusim) @@ -444,13 +475,14 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base assert ecusimSuccessfullyExecuted == True ############ Address boundary checks +drain_bus(iface0) = Positive, highest possible address for scheme conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" started = threading.Event() def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: - requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) + isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = b"\x76" isotpsock2.send(ack) @@ -463,12 +495,13 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base assert res = Negative, invalid address (too large for addressing scheme) +drain_bus(iface0) conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" started = threading.Event() def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: - requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) + isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = b"\x76" isotpsock2.send(ack) @@ -481,12 +514,13 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base assert res = Positive, address zero +drain_bus(iface0) conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: - requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) + isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = b"\x76" isotpsock2.send(ack) @@ -499,12 +533,13 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base assert res = Negative, negative address +drain_bus(iface0) conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" started = threading.Event() def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: - requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) + isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = b"\x76" isotpsock2.send(ack) @@ -520,6 +555,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base + GMLAN_TransferPayload Tests ############################################ = Positive, short payload +drain_bus(iface0) conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" ecusimSuccessfullyExecuted = True @@ -533,12 +569,14 @@ def ecusim(): if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = b"\x74" + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) pkt = GMLAN() / GMLAN_TD(startingAddress=0x40000000, dataRecord=payload) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = b"\x76" + time.sleep(0.1) isotpsock2.send(ack) thread = threading.Thread(target=ecusim) @@ -558,6 +596,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base keyfunc = lambda seed : seed - 0x1FBE = Positive scenario, level 1, tests if keyfunction applied properly +drain_bus(iface0) ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): @@ -571,8 +610,10 @@ def ecusim(): ecusimSuccessfullyExecuted = False seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead) # wait for key + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg)) pkt = GMLAN()/GMLAN_SA(subfunction=2, securityKey=0xbeef) + time.sleep(0.1) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False nr = GMLAN() / GMLAN_NR(requestServiceId=0x27, returnCode=0x35) @@ -591,6 +632,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base assert ecusimSuccessfullyExecuted == True = Positive scenario, level 3 +drain_bus(iface0) ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): @@ -603,9 +645,11 @@ def ecusim(): if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False seedmsg = GMLAN()/GMLAN_SAPR(subfunction=3, securitySeed=0xdead) + time.sleep(0.1) # wait for key requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg)) pkt = GMLAN()/GMLAN_SA(subfunction=4, securityKey=0xbeef) + time.sleep(0.1) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False nr = GMLAN() / GMLAN_NR(requestServiceId=0x27, returnCode=0x35) @@ -625,6 +669,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base = Negative scenario, invalid password +drain_bus(iface0) ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): @@ -638,8 +683,10 @@ def ecusim(): ecusimSuccessfullyExecuted = False seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead) # wait for key + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg)) pkt = GMLAN()/GMLAN_SA(subfunction=2, securityKey=0xbabe) + time.sleep(0.1) if bytes(requ[0]) != bytes(pkt): nr = GMLAN() / GMLAN_NR(requestServiceId=0x27, returnCode=0x35) isotpsock2.send(nr) @@ -658,16 +705,19 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base assert ecusimSuccessfullyExecuted == True = invalid level (not an odd number) +drain_bus(iface0) with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, basecls=GMLAN) as isotpsock: assert GMLAN_GetSecurityAccess(isotpsock, keyfunc, level=2, timeout=1) == False = zero seed +drain_bus(iface0) started = threading.Event() def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: # wait for request - requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) + isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0x0000) + time.sleep(0.1) isotpsock2.send(seedmsg) thread = threading.Thread(target=ecusim) @@ -680,6 +730,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base ############### retry = Positive scenario, request timeout, retry works +drain_bus(iface0) started = threading.Event() def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: @@ -689,9 +740,11 @@ def ecusim(): requ = isotpsock2.sniff(count=1, timeout=3) seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead) # wait for key + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg)) pkt = GMLAN()/GMLAN_SA(subfunction=2, securityKey=0xbeef) pr = GMLAN()/GMLAN_SAPR(subfunction=2) + time.sleep(0.1) isotpsock2.send(pr) thread = threading.Thread(target=ecusim) @@ -703,6 +756,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base assert res = Positive scenario, keysend timeout, retry works +drain_bus(iface0) started = threading.Event() def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: @@ -710,13 +764,17 @@ def ecusim(): requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead) # timeout + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg)) # retry from start + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=3) seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead) # wait for key + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg)) pr = GMLAN()/GMLAN_SAPR(subfunction=2) + time.sleep(0.1) isotpsock2.send(pr) thread = threading.Thread(target=ecusim) @@ -729,19 +787,23 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base = Positive scenario, request error, retry works +drain_bus(iface0) started = threading.Event() def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: # wait for request requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) nr = GMLAN() / GMLAN_NR(requestServiceId=0x27, returnCode=0x37) + time.sleep(0.1) # wait for request requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(nr)) seedmsg = GMLAN()/GMLAN_SAPR(subfunction=1, securitySeed=0xdead) + time.sleep(0.1) # wait for key requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(seedmsg)) pkt = GMLAN()/GMLAN_SA(subfunction=2, securityKey=0xbeef) pr = GMLAN()/GMLAN_SAPR(subfunction=2) + time.sleep(0.1) isotpsock2.send(pr) thread = threading.Thread(target=ecusim) @@ -757,6 +819,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base + GMLAN_InitDiagnostics Tests ############################################################################## = sequence of the correct messages +drain_bus(iface0) ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): @@ -769,18 +832,21 @@ def ecusim(): if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = b"\x68" + time.sleep(0.1) # ReportProgrammedState requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) pkt = GMLAN(b"\xa2") if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = GMLAN()/GMLAN_RPSPR(programmedState=0) + time.sleep(0.1) # ProgrammingMode requestProgramming requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) pkt = GMLAN() / GMLAN_PM(subfunction=0x1) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = GMLAN(b"\xe5") + time.sleep(0.1) # InitiateProgramming enableProgramming requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) pkt = GMLAN() / GMLAN_PM(subfunction=0x3) @@ -798,6 +864,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base assert ecusimSuccessfullyExecuted == True = sequence of the correct messages, disablenormalcommunication as broadcast +drain_bus(iface0) ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): @@ -817,12 +884,14 @@ def ecusim(): ecusimSuccessfullyExecuted = False ack = GMLAN()/GMLAN_RPSPR(programmedState=0) print("ProgrammingMode requestProgramming") + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=3, started_callback=lambda: isotpsock2.send(ack)) pkt = GMLAN() / GMLAN_PM(subfunction=0x1) if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = GMLAN(b"\xe5") print("InitiateProgramming enableProgramming") + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=3, started_callback=lambda: isotpsock2.send(ack)) pkt = GMLAN() / GMLAN_PM(subfunction=0x3) if bytes(requ[0]) != bytes(pkt): @@ -841,6 +910,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base ######## timeout = timeout DisableNormalCommunication +drain_bus(iface0) ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): @@ -864,6 +934,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base = timeout ReportProgrammedState +drain_bus(iface0) ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): @@ -876,12 +947,14 @@ def ecusim(): if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = b"\x68" + time.sleep(0.1) # ReportProgrammedState requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) pkt = GMLAN(b"\xa2") if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = GMLAN()/GMLAN_RPSPR(programmedState=0) + time.sleep(0.1) isotpsock2.send(ack) thread = threading.Thread(target=ecusim) @@ -896,6 +969,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base = timeout ProgrammingMode requestProgramming +drain_bus(iface0) ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): @@ -908,6 +982,7 @@ def ecusim(): if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = b"\x68" + time.sleep(0.1) # ReportProgrammedState requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) pkt = GMLAN(b"\xa2") @@ -915,6 +990,7 @@ def ecusim(): ecusimSuccessfullyExecuted = False ack = GMLAN()/GMLAN_RPSPR(programmedState=0) # ProgrammingMode requestProgramming + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) pkt = GMLAN() / GMLAN_PM(subfunction=0x1) if bytes(requ[0]) != bytes(pkt): @@ -932,6 +1008,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base ###### negative respone = timeout DisableNormalCommunication +drain_bus(iface0) ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): @@ -944,6 +1021,7 @@ def ecusim(): if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12) + time.sleep(0.1) isotpsock2.send(ack) thread = threading.Thread(target=ecusim) @@ -958,6 +1036,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base ###### retry tests = sequence of the correct messages, retry set +drain_bus(iface0) started = threading.Event() def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: @@ -965,12 +1044,15 @@ def ecusim(): requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = b"\x68" # ReportProgrammedState + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN()/GMLAN_RPSPR(programmedState=0) # ProgrammingMode requestProgramming + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN(b"\xe5") # InitiateProgramming enableProgramming + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) thread = threading.Thread(target=ecusim) @@ -984,6 +1066,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base = negative response, make sure no retries are made +drain_bus(iface0) ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): @@ -992,6 +1075,7 @@ def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12) + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) if len(requ) != 0: ecusimSuccessfullyExecuted = False @@ -1008,21 +1092,26 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base = first fail at DisableNormalCommunication, then sequence of the correct messages +drain_bus(iface0) started = threading.Event() def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12) # DisableNormalCommunication + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = b"\x68" # ReportProgrammedState + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN()/GMLAN_RPSPR(programmedState=0) # ProgrammingMode requestProgramming + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN(b"\xe5") # InitiateProgramming enableProgramming + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) thread = threading.Thread(target=ecusim) @@ -1034,6 +1123,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base assert res = first fail at ReportProgrammedState, then sequence of the correct messages +drain_bus(iface0) started = threading.Event() def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: @@ -1041,18 +1131,23 @@ def ecusim(): requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = b"\x68" # Fail + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN() / GMLAN_NR(requestServiceId=0xA2, returnCode=0x12) # DisableNormalCommunication + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = b"\x68" # ReportProgrammedState + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN()/GMLAN_RPSPR(programmedState=0) # ProgrammingMode requestProgramming + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN(b"\xe5") # InitiateProgramming enableProgramming + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) thread = threading.Thread(target=ecusim) @@ -1064,6 +1159,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base assert res = first fail at ProgrammingMode requestProgramming, then sequence of the correct messages +drain_bus(iface0) started = threading.Event() def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: @@ -1071,20 +1167,26 @@ def ecusim(): requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = b"\x68" # ReportProgrammedState + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN()/GMLAN_RPSPR(programmedState=0) # Fail + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN() / GMLAN_NR(requestServiceId=0xA5, returnCode=0x12) # DisableNormalCommunication + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = b"\x68" # ReportProgrammedState + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN()/GMLAN_RPSPR(programmedState=0) # ProgrammingMode requestProgramming + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN(b"\xe5") + time.sleep(0.1) isotpsock2.send(ack) # InitiateProgramming enableProgramming requ = isotpsock2.sniff(count=1, timeout=1) @@ -1098,6 +1200,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base assert res = fail twice +drain_bus(iface0) ecusimSuccessfullyExecuted = True started = threading.Event() def ecusim(): @@ -1106,8 +1209,10 @@ def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12) + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN() / GMLAN_NR(requestServiceId=0x28, returnCode=0x12) + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) if len(requ) != 0: ecusimSuccessfullyExecuted = False @@ -1125,6 +1230,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base + GMLAN_ReadMemoryByAddress Tests ############################################################################## = Positive, short length, scheme = 4 +drain_bus(iface0) conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" ecusimSuccessfullyExecuted = True @@ -1138,6 +1244,7 @@ def ecusim(): if bytes(requ[0]) != bytes(pkt): ecusimSuccessfullyExecuted = False ack = GMLAN() / GMLAN_RMBAPR(memoryAddress=0x0, dataRecord=payload) + time.sleep(0.1) isotpsock2.send(ack) thread = threading.Thread(target=ecusim) @@ -1151,6 +1258,7 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base = Negative, negative response +drain_bus(iface0) conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" started = threading.Event() @@ -1158,6 +1266,7 @@ def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = GMLAN() / GMLAN_NR(requestServiceId=0x23, returnCode=0x31) + time.sleep(0.1) isotpsock2.send(ack) thread = threading.Thread(target=ecusim) @@ -1169,11 +1278,13 @@ with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, base assert res = Negative, timeout +drain_bus(iface0) with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x242, did=0x642, basecls=GMLAN) as isotpsock: assert GMLAN_ReadMemoryByAddress(isotpsock, 0x0, 0x8, timeout=1) is None ###### RETRY = Positive, negative response, retry succeeds +drain_bus(iface0) conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 payload = b"\x00\x11\x22\x33\x44\x55\x66\x77" started = threading.Event() @@ -1181,8 +1292,10 @@ def ecusim(): with new_can_socket0() as isocan, ISOTPSocket(isocan, sid=0x642, did=0x242, basecls=GMLAN) as isotpsock2: requ = isotpsock2.sniff(count=1, timeout=1, started_callback=started.set) ack = GMLAN() / GMLAN_NR(requestServiceId=0x23, returnCode=0x31) + time.sleep(0.1) requ = isotpsock2.sniff(count=1, timeout=1, started_callback=lambda:isotpsock2.send(ack)) ack = GMLAN() / GMLAN_RMBAPR(memoryAddress=0x0, dataRecord=payload) + time.sleep(0.1) isotpsock2.send(ack) thread = threading.Thread(target=ecusim)