Skip to content

Commit

Permalink
Add tests for TraceLinker methods to improve coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
TaekyungHeo committed Jun 11, 2024
1 parent 22fc788 commit 3cb637b
Showing 1 changed file with 106 additions and 9 deletions.
115 changes: 106 additions & 9 deletions tests/trace_link/test_trace_linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
from chakra.src.trace_link.kineto_operator import KinetoOperator
from chakra.src.trace_link.trace_linker import TraceLinker
from chakra.src.trace_link.unique_id_assigner import UniqueIdAssigner
from param_bench.train.compute.python.tools.execution_trace import (
EXECUTION_TRACE_PROCESS_ANNOTATION,
EXECUTION_TRACE_THREAD_ANNOTATION,
)
from param_bench.train.compute.python.tools.execution_trace import (
Node as PyTorchOperator,
)
Expand Down Expand Up @@ -239,24 +243,24 @@ def test_process_dependent_gpu_ops(trace_linker, orig_op_id, cpu_op, kineto_gpu_
@patch("json.load")
def test_construct_et_plus_data(mock_json_load, mock_open, mock_process_op_and_dependents, trace_linker):
mock_json_load.return_value = {"nodes": [{"id": 1}, {"id": 2}]}
mock_process_op_and_dependents.side_effect = lambda op, *args: [{"id": op["id"] + 2}]
mock_process_op_and_dependents.side_effect = lambda x, *args: [{"id": x["id"] + 2}]

pytorch_op_id_to_kineto_ops_map = {}
pytorch_op_id_to_inclusive_dur_map = {}
pytorch_op_id_to_exclusive_dur_map = {}
pytorch_op_id_to_timestamp_map = {}
pytorch_op_id_to_inter_thread_dep_map = {}
pytorch_op_id_to_kineto_ops_map = {1: [], 2: []}
pytorch_op_id_to_inclusive_dur_map = {1: 100, 2: 200}
pytorch_op_id_to_exclusive_dur_map = {1: 50, 2: 150}
pytorch_op_id_to_timestamp_map = {1: 1000, 2: 2000}
pytorch_op_id_to_inter_thread_dep_map = {1: None, 2: None}

result = trace_linker.construct_et_plus_data(
"path/to/pytorch_et.json",
trace_linker.pytorch_et_plus_data = trace_linker.construct_et_plus_data(
trace_linker.pytorch_et_file,
pytorch_op_id_to_kineto_ops_map,
pytorch_op_id_to_inclusive_dur_map,
pytorch_op_id_to_exclusive_dur_map,
pytorch_op_id_to_timestamp_map,
pytorch_op_id_to_inter_thread_dep_map,
)

assert result["nodes"] == [{"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}]
assert trace_linker.pytorch_et_plus_data["nodes"] == [{"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}]


@patch("builtins.open", new_callable=MagicMock)
Expand All @@ -271,3 +275,96 @@ def test_dump_pytorch_execution_trace_plus(mock_json_dump, mock_open, trace_link
mock_json_dump.assert_called_once_with(
{"nodes": [{"id": 1}, {"id": 2}]}, mock_open.return_value.__enter__(), indent=4
)


def test_add_thread_and_process_annotations(trace_linker):
kineto_cpu_ops = []
sorted_kineto_cpu_ops = []
sorted_kineto_cpu_op_ts = []
kineto_thread_info = {1: (100, 200), 2: (150, 250)}
kineto_process_start_time = 50
kineto_process_end_time = 300

kineto_cpu_ops, sorted_kineto_cpu_ops, sorted_kineto_cpu_op_ts = trace_linker.add_thread_and_process_annotations(
kineto_cpu_ops,
sorted_kineto_cpu_ops,
sorted_kineto_cpu_op_ts,
kineto_thread_info,
kineto_process_start_time,
kineto_process_end_time,
)

assert len(kineto_cpu_ops) == 3
assert kineto_cpu_ops[0].name == EXECUTION_TRACE_PROCESS_ANNOTATION
assert kineto_cpu_ops[1].name == EXECUTION_TRACE_THREAD_ANNOTATION
assert kineto_cpu_ops[2].name == EXECUTION_TRACE_THREAD_ANNOTATION

assert len(sorted_kineto_cpu_ops) == 3
assert sorted_kineto_cpu_ops[0].timestamp == kineto_cpu_ops[0].timestamp
assert sorted_kineto_cpu_ops[1].timestamp == kineto_cpu_ops[1].timestamp
assert sorted_kineto_cpu_ops[2].timestamp == kineto_cpu_ops[2].timestamp

assert len(sorted_kineto_cpu_op_ts) == 3
assert sorted_kineto_cpu_op_ts[0] == kineto_cpu_ops[0].timestamp
assert sorted_kineto_cpu_op_ts[1] == kineto_cpu_ops[1].timestamp
assert sorted_kineto_cpu_op_ts[2] == kineto_cpu_ops[2].timestamp


@patch("chakra.src.trace_link.trace_linker.TraceLinker.find_closest_op")
def test_find_parent_cpu_op(mock_find_closest_op, trace_linker):
kineto_gpu_op = MagicMock(spec=KinetoOperator)
kineto_gpu_op.correlation = 123
kineto_gpu_op.name = "gpu_op"

kineto_runtime_op = MagicMock(spec=KinetoOperator)
kineto_runtime_op.timestamp = 100
kineto_runtime_op.tid = 1
kineto_runtime_op.name = "runtime_op"

trace_linker.kineto_correlation_cuda_runtime_map = {123: kineto_runtime_op}

mock_find_closest_op.return_value = kineto_runtime_op

result = trace_linker.find_parent_cpu_op(kineto_gpu_op, trace_linker.kineto_correlation_cuda_runtime_map)

assert result == kineto_runtime_op
mock_find_closest_op.assert_called_once_with(
kineto_gpu_op, trace_linker.sorted_kineto_cpu_ops, kineto_runtime_op.timestamp
)


def test_group_gpu_ops_by_cpu_launchers(trace_linker):
kineto_gpu_op1 = MagicMock(spec=KinetoOperator)
kineto_gpu_op1.correlation = 123
kineto_gpu_op1.name = "gpu_op1"
kineto_gpu_op1.timestamp = 150
kineto_gpu_op1.tid = 1

kineto_gpu_op2 = MagicMock(spec=KinetoOperator)
kineto_gpu_op2.correlation = 456
kineto_gpu_op2.name = "gpu_op2"
kineto_gpu_op2.timestamp = 250
kineto_gpu_op2.tid = 2

kineto_runtime_op1 = MagicMock(spec=KinetoOperator)
kineto_runtime_op1.ev_idx = "cpu_op1"
kineto_runtime_op1.timestamp = 100
kineto_runtime_op1.tid = 1
kineto_runtime_op1.name = "runtime_op1"
kineto_runtime_op1.correlation = 123

kineto_runtime_op2 = MagicMock(spec=KinetoOperator)
kineto_runtime_op2.ev_idx = "cpu_op2"
kineto_runtime_op2.timestamp = 200
kineto_runtime_op2.tid = 2
kineto_runtime_op2.name = "runtime_op2"
kineto_runtime_op2.correlation = 456

trace_linker.kineto_correlation_cuda_runtime_map = {123: kineto_runtime_op1, 456: kineto_runtime_op2}

with patch.object(trace_linker, "find_parent_cpu_op", side_effect=[kineto_runtime_op1, kineto_runtime_op2]):
result = trace_linker.group_gpu_ops_by_cpu_launchers(
[kineto_gpu_op1, kineto_gpu_op2], trace_linker.kineto_correlation_cuda_runtime_map
)

assert result == {"cpu_op1": [kineto_gpu_op1], "cpu_op2": [kineto_gpu_op2]}

0 comments on commit 3cb637b

Please sign in to comment.