Skip to content

Commit

Permalink
data_transfer: implement data transfer test
Browse files Browse the repository at this point in the history
  • Loading branch information
SilverBzH committed Feb 6, 2024
1 parent 0f6356f commit b047253
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 14 deletions.
190 changes: 177 additions & 13 deletions tests/data_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import asyncio
import argparse
import logging
from pica import Host
from pica.packets import uci
from .helper import init
Expand All @@ -24,7 +25,7 @@
MAX_DATA_PACKET_PAYLOAD_SIZE = 1024


async def data_message_send(host: Host, peer: Host, file: Path):
async def controller(host: Host, peer: Host, file: Path):
await init(host)

host.send_control(
Expand Down Expand Up @@ -92,6 +93,167 @@ async def data_message_send(host: Host, peer: Host, file: Path):

await data_transfer(host, peer.mac_address, file, 0)

# START SESSION CMD
host.send_control(uci.SessionStartCmd(session_id=0))

await host.expect_control(uci.SessionStartRsp(status=uci.StatusCode.UCI_STATUS_OK))

await host.expect_control(
uci.SessionStatusNtf(
session_token=0,
session_state=uci.SessionState.SESSION_STATE_ACTIVE,
reason_code=0,
)
)

await host.expect_control(
uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE)
)

event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0)
event.show()

event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0)
event.show()

# STOP SESSION
host.send_control(uci.SessionStopCmd(session_id=0))

await host.expect_control(uci.SessionStopRsp(status=uci.StatusCode.UCI_STATUS_OK))

await host.expect_control(
uci.SessionStatusNtf(
session_token=0,
session_state=uci.SessionState.SESSION_STATE_IDLE,
reason_code=0,
)
)

await host.expect_control(
uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
)

# DEINIT
host.send_control(uci.SessionDeinitCmd(session_token=0))

await host.expect_control(uci.SessionDeinitRsp(status=uci.StatusCode.UCI_STATUS_OK))


async def controlee(host: Host, peer: Host, file: Path):
await init(host)

host.send_control(
uci.SessionInitCmd(
session_id=0,
session_type=uci.SessionType.FIRA_RANGING_AND_IN_BAND_DATA_SESSION,
)
)

await host.expect_control(uci.SessionInitRsp(status=uci.StatusCode.UCI_STATUS_OK))

await host.expect_control(
uci.SessionStatusNtf(
session_token=0,
session_state=uci.SessionState.SESSION_STATE_INIT,
reason_code=0,
)
)

mac_address_mode = 0x0
ranging_duration = int(1000).to_bytes(4, byteorder="little")
device_role_responder = bytes([1])
device_type_controllee = bytes([0])
host.send_control(
uci.SessionSetAppConfigCmd(
session_token=0,
tlvs=[
uci.AppConfigTlv(
cfg_id=uci.AppConfigTlvType.DEVICE_ROLE, v=device_role_responder
),
uci.AppConfigTlv(
cfg_id=uci.AppConfigTlvType.DEVICE_TYPE, v=device_type_controllee
),
uci.AppConfigTlv(
cfg_id=uci.AppConfigTlvType.DEVICE_MAC_ADDRESS, v=host.mac_address
),
uci.AppConfigTlv(
cfg_id=uci.AppConfigTlvType.MAC_ADDRESS_MODE,
v=bytes([mac_address_mode]),
),
uci.AppConfigTlv(
cfg_id=uci.AppConfigTlvType.RANGING_DURATION, v=ranging_duration
),
uci.AppConfigTlv(
cfg_id=uci.AppConfigTlvType.NO_OF_CONTROLEE, v=bytes([1])
),
uci.AppConfigTlv(
cfg_id=uci.AppConfigTlvType.DST_MAC_ADDRESS, v=peer.mac_address
),
],
)
)

await host.expect_control(
uci.SessionSetAppConfigRsp(status=uci.StatusCode.UCI_STATUS_OK, cfg_status=[])
)

await host.expect_control(
uci.SessionStatusNtf(
session_token=0,
session_state=uci.SessionState.SESSION_STATE_IDLE,
reason_code=0,
)
)

host.send_control(uci.SessionStartCmd(session_id=0))

await host.expect_control(uci.SessionStartRsp(status=uci.StatusCode.UCI_STATUS_OK))

await host.expect_control(
uci.SessionStatusNtf(
session_token=0,
session_state=uci.SessionState.SESSION_STATE_ACTIVE,
reason_code=0,
)
)

await host.expect_control(
uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_ACTIVE)
)

with file.open("rb") as f:
application_data = list(bytearray(f.read()))
event = await host.expect_data(
uci.DataMessageRcv(
session_handle=0,
status=uci.StatusCode.UCI_STATUS_OK,
source_address=int.from_bytes(peer.mac_address, "big"),
data_sequence_number=0x01,
application_data=application_data,
),
timeout=2.0,
)
event.show()

event = await host.expect_control(uci.ShortMacTwoWaySessionInfoNtf, timeout=2.0)
event.show()

host.send_control(uci.SessionStopCmd(session_id=0))

await host.expect_control(uci.SessionStopRsp(status=uci.StatusCode.UCI_STATUS_OK))

await host.expect_control(
uci.SessionStatusNtf(
session_token=0,
session_state=uci.SessionState.SESSION_STATE_IDLE,
reason_code=0,
)
)

await host.expect_control(
uci.DeviceStatusNtf(device_state=uci.DeviceState.DEVICE_STATE_READY)
)

host.send_control(uci.SessionDeinitCmd(session_token=0))

await host.expect_control(uci.SessionDeinitRsp(status=uci.StatusCode.UCI_STATUS_OK))
Expand Down Expand Up @@ -163,22 +325,24 @@ async def data_transfer(

async def run(address: str, uci_port: int, file: Path):
try:
host0 = await Host.connect(address, uci_port, bytes([0, 1]))
host1 = await Host.connect(address, uci_port, bytes([0, 2]))
except Exception:
print(
host0 = await Host.connect(address, uci_port, bytes([0, 0]))
host1 = await Host.connect(address, uci_port, bytes([0, 1]))
except Exception as e:
raise Exception(
f"Failed to connect to Pica server at address {address}:{uci_port}\n"
+ "Make sure the server is running"
)
exit(1)

async with asyncio.TaskGroup() as tg:
tg.create_task(data_message_send(host0, host1, file))

host0.disconnect()
host1.disconnect()

print("Data transfer test completed")
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(controller(host0, host1, file))
tg.create_task(controlee(host1, host0, file))
except Exception as e:
raise e
finally:
host0.disconnect()
host1.disconnect()
logging.debug("Data transfer test completed")


def main():
Expand Down
1 change: 0 additions & 1 deletion tests/test_runner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
from asyncio.subprocess import Process
import pytest
import pytest_asyncio
import logging
Expand Down

0 comments on commit b047253

Please sign in to comment.