diff --git a/tests/trace_link/test_trace_linker.py b/tests/trace_link/test_trace_linker.py index 24552631..2ef03e25 100644 --- a/tests/trace_link/test_trace_linker.py +++ b/tests/trace_link/test_trace_linker.py @@ -4,6 +4,10 @@ from chakra.src.trace_link.kineto_operator import KinetoOperator from chakra.src.trace_link.trace_linker import TraceLinker from chakra.src.trace_link.unique_id_assigner import UniqueIdAssigner +from param_bench.train.compute.python.tools.execution_trace import ( + EXECUTION_TRACE_PROCESS_ANNOTATION, + EXECUTION_TRACE_THREAD_ANNOTATION, +) from param_bench.train.compute.python.tools.execution_trace import ( Node as PyTorchOperator, ) @@ -239,16 +243,16 @@ def test_process_dependent_gpu_ops(trace_linker, orig_op_id, cpu_op, kineto_gpu_ @patch("json.load") def test_construct_et_plus_data(mock_json_load, mock_open, mock_process_op_and_dependents, trace_linker): mock_json_load.return_value = {"nodes": [{"id": 1}, {"id": 2}]} - mock_process_op_and_dependents.side_effect = lambda op, *args: [{"id": op["id"] + 2}] + mock_process_op_and_dependents.side_effect = lambda x, *args: [{"id": x["id"] + 2}] - pytorch_op_id_to_kineto_ops_map = {} - pytorch_op_id_to_inclusive_dur_map = {} - pytorch_op_id_to_exclusive_dur_map = {} - pytorch_op_id_to_timestamp_map = {} - pytorch_op_id_to_inter_thread_dep_map = {} + pytorch_op_id_to_kineto_ops_map = {1: [], 2: []} + pytorch_op_id_to_inclusive_dur_map = {1: 100, 2: 200} + pytorch_op_id_to_exclusive_dur_map = {1: 50, 2: 150} + pytorch_op_id_to_timestamp_map = {1: 1000, 2: 2000} + pytorch_op_id_to_inter_thread_dep_map = {1: None, 2: None} - result = trace_linker.construct_et_plus_data( - "path/to/pytorch_et.json", + trace_linker.pytorch_et_plus_data = trace_linker.construct_et_plus_data( + trace_linker.pytorch_et_file, pytorch_op_id_to_kineto_ops_map, pytorch_op_id_to_inclusive_dur_map, pytorch_op_id_to_exclusive_dur_map, @@ -256,7 +260,7 @@ def test_construct_et_plus_data(mock_json_load, mock_open, mock_process_op_and_d pytorch_op_id_to_inter_thread_dep_map, ) - assert result["nodes"] == [{"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}] + assert trace_linker.pytorch_et_plus_data["nodes"] == [{"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}] @patch("builtins.open", new_callable=MagicMock) @@ -271,3 +275,96 @@ def test_dump_pytorch_execution_trace_plus(mock_json_dump, mock_open, trace_link mock_json_dump.assert_called_once_with( {"nodes": [{"id": 1}, {"id": 2}]}, mock_open.return_value.__enter__(), indent=4 ) + + +def test_add_thread_and_process_annotations(trace_linker): + kineto_cpu_ops = [] + sorted_kineto_cpu_ops = [] + sorted_kineto_cpu_op_ts = [] + kineto_thread_info = {1: (100, 200), 2: (150, 250)} + kineto_process_start_time = 50 + kineto_process_end_time = 300 + + kineto_cpu_ops, sorted_kineto_cpu_ops, sorted_kineto_cpu_op_ts = trace_linker.add_thread_and_process_annotations( + kineto_cpu_ops, + sorted_kineto_cpu_ops, + sorted_kineto_cpu_op_ts, + kineto_thread_info, + kineto_process_start_time, + kineto_process_end_time, + ) + + assert len(kineto_cpu_ops) == 3 + assert kineto_cpu_ops[0].name == EXECUTION_TRACE_PROCESS_ANNOTATION + assert kineto_cpu_ops[1].name == EXECUTION_TRACE_THREAD_ANNOTATION + assert kineto_cpu_ops[2].name == EXECUTION_TRACE_THREAD_ANNOTATION + + assert len(sorted_kineto_cpu_ops) == 3 + assert sorted_kineto_cpu_ops[0].timestamp == kineto_cpu_ops[0].timestamp + assert sorted_kineto_cpu_ops[1].timestamp == kineto_cpu_ops[1].timestamp + assert sorted_kineto_cpu_ops[2].timestamp == kineto_cpu_ops[2].timestamp + + assert len(sorted_kineto_cpu_op_ts) == 3 + assert sorted_kineto_cpu_op_ts[0] == kineto_cpu_ops[0].timestamp + assert sorted_kineto_cpu_op_ts[1] == kineto_cpu_ops[1].timestamp + assert sorted_kineto_cpu_op_ts[2] == kineto_cpu_ops[2].timestamp + + +@patch("chakra.src.trace_link.trace_linker.TraceLinker.find_closest_op") +def test_find_parent_cpu_op(mock_find_closest_op, trace_linker): + kineto_gpu_op = MagicMock(spec=KinetoOperator) + kineto_gpu_op.correlation = 123 + kineto_gpu_op.name = "gpu_op" + + kineto_runtime_op = MagicMock(spec=KinetoOperator) + kineto_runtime_op.timestamp = 100 + kineto_runtime_op.tid = 1 + kineto_runtime_op.name = "runtime_op" + + trace_linker.kineto_correlation_cuda_runtime_map = {123: kineto_runtime_op} + + mock_find_closest_op.return_value = kineto_runtime_op + + result = trace_linker.find_parent_cpu_op(kineto_gpu_op, trace_linker.kineto_correlation_cuda_runtime_map) + + assert result == kineto_runtime_op + mock_find_closest_op.assert_called_once_with( + kineto_gpu_op, trace_linker.sorted_kineto_cpu_ops, kineto_runtime_op.timestamp + ) + + +def test_group_gpu_ops_by_cpu_launchers(trace_linker): + kineto_gpu_op1 = MagicMock(spec=KinetoOperator) + kineto_gpu_op1.correlation = 123 + kineto_gpu_op1.name = "gpu_op1" + kineto_gpu_op1.timestamp = 150 + kineto_gpu_op1.tid = 1 + + kineto_gpu_op2 = MagicMock(spec=KinetoOperator) + kineto_gpu_op2.correlation = 456 + kineto_gpu_op2.name = "gpu_op2" + kineto_gpu_op2.timestamp = 250 + kineto_gpu_op2.tid = 2 + + kineto_runtime_op1 = MagicMock(spec=KinetoOperator) + kineto_runtime_op1.ev_idx = "cpu_op1" + kineto_runtime_op1.timestamp = 100 + kineto_runtime_op1.tid = 1 + kineto_runtime_op1.name = "runtime_op1" + kineto_runtime_op1.correlation = 123 + + kineto_runtime_op2 = MagicMock(spec=KinetoOperator) + kineto_runtime_op2.ev_idx = "cpu_op2" + kineto_runtime_op2.timestamp = 200 + kineto_runtime_op2.tid = 2 + kineto_runtime_op2.name = "runtime_op2" + kineto_runtime_op2.correlation = 456 + + trace_linker.kineto_correlation_cuda_runtime_map = {123: kineto_runtime_op1, 456: kineto_runtime_op2} + + with patch.object(trace_linker, "find_parent_cpu_op", side_effect=[kineto_runtime_op1, kineto_runtime_op2]): + result = trace_linker.group_gpu_ops_by_cpu_launchers( + [kineto_gpu_op1, kineto_gpu_op2], trace_linker.kineto_correlation_cuda_runtime_map + ) + + assert result == {"cpu_op1": [kineto_gpu_op1], "cpu_op2": [kineto_gpu_op2]}