From 5a2ee1277d235f4ca2a62102b4cbb8095601c688 Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Mon, 10 Jun 2024 10:52:40 -0400 Subject: [PATCH 1/2] Refactor TraceLinker class for improved testability --- src/trace_link/trace_linker.py | 112 ++++++++++++++++++++++++--------- 1 file changed, 81 insertions(+), 31 deletions(-) diff --git a/src/trace_link/trace_linker.py b/src/trace_link/trace_linker.py index cbc0cac6..9d513718 100644 --- a/src/trace_link/trace_linker.py +++ b/src/trace_link/trace_linker.py @@ -99,23 +99,29 @@ def load_traces(self) -> None: """ Loads both PyTorch Execution Traces and Kineto Traces. """ - self.load_pytorch_et() - self.load_kineto_trace() + self.pytorch_ops = self.load_pytorch_et() + kineto_data = self.load_kineto_trace() + self.update_kineto_data(kineto_data) - def load_pytorch_et(self) -> None: + def load_pytorch_et(self) -> List[PyTorchOperator]: """ Loads and processes the PyTorch Execution Trace. This method handles multiple iterations in the trace and extracts the nodes, considering the specified annotation for segmenting the iterations. + + Returns: + List[PyTorchOperator]: List of PyTorch operators. """ self.logger.info("Starting to load PyTorch Execution Trace.") pytorch_et = load_execution_trace_file(self.pytorch_et_file) root_node = pytorch_et.get_nodes()[1] # Root node is usually 1-based - self.pytorch_ops = self.extract_pytorch_ops(root_node) - self.logger.info(f"Original ops in PyTorch ET: {len(self.pytorch_ops)}") + pytorch_ops = self.extract_pytorch_ops(root_node) + self.logger.info(f"Original ops in PyTorch ET: {len(pytorch_ops)}") self.logger.info("PyTorch Execution Trace loaded successfully.") + return pytorch_ops + def extract_pytorch_ops(self, node: PyTorchOperator) -> List[PyTorchOperator]: """ Extracts and sorts nodes from the PyTorch execution trace recursively. @@ -139,11 +145,14 @@ def traverse(node: PyTorchOperator): traverse(node) return sorted(nodes, key=lambda x: x.id) - def load_kineto_trace(self) -> None: + def load_kineto_trace(self) -> Dict: """ Loads and processes the Kineto Trace. This method parses the Kineto trace file, creating KinetoOperator instances for each operator in the trace. It then categorizes and segments these operators for further processing and linking with PyTorch operators. + + Returns: + Dict: Dictionary containing various data structures needed for linking traces. """ self.logger.info("Starting to load Kineto Trace.") kineto_trace_data = read_dictionary_from_json_file(self.kineto_file) @@ -152,20 +161,21 @@ def load_kineto_trace(self) -> None: key=lambda op: op.timestamp, ) - self.construct_kineto_data_structures(sorted_kineto_ops) - self.calculate_exclusive_dur() + kineto_data = self.construct_kineto_data_structures(sorted_kineto_ops) + self.calculate_exclusive_dur(kineto_data["kineto_tid_cpu_ops_map"]) - self.sorted_kineto_cpu_ops = sorted(self.kineto_cpu_ops, key=lambda op: op.timestamp) - self.sorted_kineto_cpu_op_ts = [op.timestamp for op in self.sorted_kineto_cpu_ops] + kineto_data["sorted_kineto_cpu_ops"] = sorted(kineto_data["kineto_cpu_ops"], key=lambda op: op.timestamp) + kineto_data["sorted_kineto_cpu_op_ts"] = [op.timestamp for op in kineto_data["sorted_kineto_cpu_ops"]] self.logger.info( - f"Processed Kineto trace with {len(self.kineto_cpu_ops)} CPU ops, " - f"{len(self.kineto_id_cuda_launch_op_map)} CPU launcher ops, " - f"and {len(self.kineto_gpu_ops)} GPU ops." + f"Processed Kineto trace with {len(kineto_data['kineto_cpu_ops'])} CPU ops, " + f"{len(kineto_data['kineto_id_cuda_launch_op_map'])} CPU launcher ops, " + f"and {len(kineto_data['kineto_gpu_ops'])} GPU ops." ) self.logger.info("Kineto Trace loaded successfully.") + return kineto_data - def construct_kineto_data_structures(self, kineto_ops: List[KinetoOperator]) -> None: + def construct_kineto_data_structures(self, kineto_ops: List[KinetoOperator]) -> Dict: """ Constructs necessary data structures required for trace linking from the provided Kineto operators. This method identifies process start time, end time, thread start time, and end time, and also categorizes operators into @@ -174,31 +184,38 @@ def construct_kineto_data_structures(self, kineto_ops: List[KinetoOperator]) -> Args: kineto_ops (List[KinetoOperator]): List of Kineto operators to categorize. - Raises: - ValueError: If duplicate correlation IDs are found in 'cuda_runtime' category operators. + Returns: + Dict: Dictionary containing categorized operators and timing boundaries. """ self.logger.info("Categorizing Kineto operators and calculating timing boundaries.") process_start_time = sys.maxsize process_end_time = 0 thread_info = {} + kineto_cpu_ops = [] + kineto_tid_cpu_ops_map = {} + kineto_correlation_cuda_runtime_map = {} + kineto_gpu_ops = [] + kineto_id_arrow_op_map = {} + kineto_id_cuda_launch_op_map = {} + for op in kineto_ops: if op.is_cpu_op(): - self.kineto_cpu_ops.append(op) - self.kineto_tid_cpu_ops_map.setdefault(op.tid, []).append(op) + kineto_cpu_ops.append(op) + kineto_tid_cpu_ops_map.setdefault(op.tid, []).append(op) self.logger.debug(f"Added CPU or user annotation op: {op.name}") elif op.is_cuda_launch_op(): - self.kineto_id_cuda_launch_op_map[op.external_id] = op - if op.correlation in self.kineto_correlation_cuda_runtime_map: + kineto_id_cuda_launch_op_map[op.external_id] = op + if op.correlation in kineto_correlation_cuda_runtime_map: raise ValueError( f"Duplicate correlation ID {op.correlation} found in self.kineto_id_cuda_launch_op_map." ) - self.kineto_correlation_cuda_runtime_map[op.correlation] = op + kineto_correlation_cuda_runtime_map[op.correlation] = op self.logger.debug(f"Added CPU launcher op: {op.name}") elif op.is_gpu_op(): - self.kineto_gpu_ops.append(op) + kineto_gpu_ops.append(op) self.logger.debug(f"Added GPU op: {op.name}") elif op.is_arrow_op(): @@ -211,7 +228,8 @@ def construct_kineto_data_structures(self, kineto_ops: List[KinetoOperator]) -> ) self.logger.error(error_msg) raise KeyError(error_msg) - self.kineto_id_arrow_op_map[op.id] = op + + kineto_id_arrow_op_map[op.id] = op # Update timing boundaries if op.tid is not None: @@ -221,19 +239,30 @@ def construct_kineto_data_structures(self, kineto_ops: List[KinetoOperator]) -> thread_start_end[0] = min(thread_start_end[0], op.timestamp) thread_start_end[1] = max(thread_start_end[1], op.timestamp + op.inclusive_dur) - self.kineto_rf_id_to_kineto_op_map = {op.rf_id: op for op in self.kineto_cpu_ops if op.rf_id is not None} + kineto_rf_id_to_kineto_op_map = {op.rf_id: op for op in kineto_cpu_ops if op.rf_id is not None} - # Apply collected timing info - self.kineto_process_start_time = process_start_time - self.kineto_process_end_time = process_end_time - self.kineto_thread_info = thread_info - self.logger.info("Kineto operators categorized and timing boundaries calculated.") + return { + "kineto_cpu_ops": kineto_cpu_ops, + "kineto_tid_cpu_ops_map": kineto_tid_cpu_ops_map, + "kineto_correlation_cuda_runtime_map": kineto_correlation_cuda_runtime_map, + "kineto_gpu_ops": kineto_gpu_ops, + "kineto_id_arrow_op_map": kineto_id_arrow_op_map, + "kineto_id_cuda_launch_op_map": kineto_id_cuda_launch_op_map, + "kineto_process_start_time": process_start_time, + "kineto_process_end_time": process_end_time, + "kineto_thread_info": thread_info, + "kineto_rf_id_to_kineto_op_map": kineto_rf_id_to_kineto_op_map, + } - def calculate_exclusive_dur(self) -> None: + def calculate_exclusive_dur(self, kineto_tid_cpu_ops_map: Dict[int, List[KinetoOperator]]) -> None: """ Calculates the exclusive duration of each operator in the Kineto traces in parallel. The exclusive duration is defined as the total duration of the operator minus any time spent in child operators, effectively representing the time spent exclusively in that operator. + + Args: + kineto_tid_cpu_ops_map (Dict[int, List[KinetoOperator]]): Map of thread IDs to their corresponding Kineto + operators. """ self.logger.info("Calculating exclusive durations for Kineto operators in parallel.") @@ -277,7 +306,7 @@ def process_ops_for_thread(ops: List[KinetoOperator]) -> None: ) with ThreadPoolExecutor() as executor: - futures = [executor.submit(process_ops_for_thread, ops) for ops in self.kineto_tid_cpu_ops_map.values()] + futures = [executor.submit(process_ops_for_thread, ops) for ops in kineto_tid_cpu_ops_map.values()] for future in as_completed(futures): future.result() # Wait for all threads to complete and handle any exceptions @@ -313,6 +342,27 @@ def merge_overlapping_intervals(intervals: List[Tuple[int, int]]) -> List[Tuple[ return merged + def update_kineto_data(self, kineto_data: Dict) -> None: + """ + Updates the instance variables of the TraceLinker class using the data structures from the kineto_data + dictionary. + + Args: + kineto_data (Dict): Dictionary containing categorized operators and timing boundaries. + """ + self.kineto_cpu_ops = kineto_data["kineto_cpu_ops"] + self.kineto_tid_cpu_ops_map = kineto_data["kineto_tid_cpu_ops_map"] + self.kineto_correlation_cuda_runtime_map = kineto_data["kineto_correlation_cuda_runtime_map"] + self.kineto_gpu_ops = kineto_data["kineto_gpu_ops"] + self.kineto_id_arrow_op_map = kineto_data["kineto_id_arrow_op_map"] + self.kineto_id_cuda_launch_op_map = kineto_data["kineto_id_cuda_launch_op_map"] + self.kineto_process_start_time = kineto_data["kineto_process_start_time"] + self.kineto_process_end_time = kineto_data["kineto_process_end_time"] + self.kineto_thread_info = kineto_data["kineto_thread_info"] + self.kineto_rf_id_to_kineto_op_map = {op.rf_id: op for op in self.kineto_cpu_ops if op.rf_id is not None} + self.sorted_kineto_cpu_ops = kineto_data["sorted_kineto_cpu_ops"] + self.sorted_kineto_cpu_op_ts = kineto_data["sorted_kineto_cpu_op_ts"] + def enforce_inter_thread_order(self, threshold: int = 1000) -> None: """ Enforces order between groups of operators in different threads. In Kineto traces with multiple threads, From fe6651923e1eb6cd0cfc6d4e03693a84a20c862f Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Fri, 7 Jun 2024 10:39:48 -0400 Subject: [PATCH 2/2] Add unit tests for TraceLinker methods --- tests/trace_link/test_trace_linker.py | 257 ++++++++++++++++++++++++++ 1 file changed, 257 insertions(+) create mode 100644 tests/trace_link/test_trace_linker.py diff --git a/tests/trace_link/test_trace_linker.py b/tests/trace_link/test_trace_linker.py new file mode 100644 index 00000000..2d75c25c --- /dev/null +++ b/tests/trace_link/test_trace_linker.py @@ -0,0 +1,257 @@ +from unittest.mock import MagicMock, patch + +import pytest +from chakra.src.trace_link.kineto_operator import KinetoOperator +from chakra.src.trace_link.trace_linker import TraceLinker +from chakra.src.trace_link.unique_id_assigner import UniqueIdAssigner +from param_bench.train.compute.python.tools.execution_trace import ( + Node as PyTorchOperator, +) + + +@pytest.fixture +def trace_linker(): + return TraceLinker(pytorch_et_file="path/to/pytorch_et.json", kineto_file="path/to/kineto.json") + + +def test_initialization(trace_linker): + assert trace_linker.pytorch_et_file == "path/to/pytorch_et.json" + assert trace_linker.kineto_file == "path/to/kineto.json" + assert isinstance(trace_linker.id_assigner, UniqueIdAssigner) + assert trace_linker.logger.name == "chakra.src.trace_link.trace_linker" + + +@patch("chakra.src.trace_link.trace_linker.TraceLinker.load_pytorch_et") +@patch("chakra.src.trace_link.trace_linker.TraceLinker.load_kineto_trace") +@patch("chakra.src.trace_link.trace_linker.TraceLinker.update_kineto_data") +def test_load_traces(mock_update_kineto_data, mock_load_kineto_trace, mock_load_pytorch_et, trace_linker): + mock_load_kineto_trace.return_value = {"sample_data": "data"} + trace_linker.load_traces() + mock_load_pytorch_et.assert_called_once() + mock_load_kineto_trace.assert_called_once() + mock_update_kineto_data.assert_called_once_with({"sample_data": "data"}) + + +def test_construct_kineto_data_structures(trace_linker): + mock_kineto_op1 = MagicMock(spec=KinetoOperator) + mock_kineto_op1.is_cpu_op.return_value = True + mock_kineto_op1.timestamp = 100 + mock_kineto_op1.inclusive_dur = 50 + mock_kineto_op1.tid = 1 + mock_kineto_op1.name = "op1" + mock_kineto_op1.rf_id = 1 + + mock_kineto_op2 = MagicMock(spec=KinetoOperator) + mock_kineto_op2.is_cpu_op.return_value = True + mock_kineto_op2.timestamp = 200 + mock_kineto_op2.inclusive_dur = 50 + mock_kineto_op2.tid = 1 + mock_kineto_op2.name = "op2" + mock_kineto_op2.rf_id = 2 + + kineto_data = trace_linker.construct_kineto_data_structures([mock_kineto_op1, mock_kineto_op2]) + assert kineto_data["kineto_tid_cpu_ops_map"][1] == [mock_kineto_op1, mock_kineto_op2] + + +@pytest.mark.parametrize( + "intervals, expected_result", + [ + ([(1, 3), (2, 6), (8, 10), (15, 18)], [(1, 6), (8, 10), (15, 18)]), + ([(1, 4), (4, 5)], [(1, 5)]), + ([], []), + ([(1, 2), (2, 3), (3, 4)], [(1, 4)]), + ([(1, 5), (2, 6), (6, 8), (7, 9)], [(1, 9)]), + ], +) +def test_merge_overlapping_intervals(intervals, expected_result): + result = TraceLinker.merge_overlapping_intervals(intervals) + assert result == expected_result + + +@pytest.mark.parametrize( + "ops_by_tid, exclude_tid, timestamp, expected_result", + [ + ( + { + 1: [MagicMock(spec=KinetoOperator, timestamp=100, category="cpu_op", rf_id=1)], + 2: [ + MagicMock(spec=KinetoOperator, timestamp=150, category="cpu_op", rf_id=2), + MagicMock(spec=KinetoOperator, timestamp=200, category="cpu_op", rf_id=3), + ], + }, + 1, + 175, + 2, + ), + ( + { + 1: [MagicMock(spec=KinetoOperator, timestamp=100, category="cpu_op", rf_id=1)], + 2: [ + MagicMock(spec=KinetoOperator, timestamp=150, category="cpu_op", rf_id=2), + MagicMock(spec=KinetoOperator, timestamp=200, category="cpu_op", rf_id=3), + ], + }, + 2, + 125, + 1, + ), + ( + { + 1: [MagicMock(spec=KinetoOperator, timestamp=100, category="cpu_op", rf_id=1)], + 2: [ + MagicMock(spec=KinetoOperator, timestamp=150, category="cpu_op", rf_id=2), + MagicMock(spec=KinetoOperator, timestamp=200, category="cpu_op", rf_id=3), + ], + }, + 2, + 50, + None, + ), + ( + { + 1: [MagicMock(spec=KinetoOperator, timestamp=100, category="cpu_op", rf_id=1)], + 2: [ + MagicMock(spec=KinetoOperator, timestamp=150, category="cpu_op", rf_id=2), + MagicMock(spec=KinetoOperator, timestamp=200, category="cpu_op", rf_id=3), + ], + }, + 1, + 50, + None, + ), + ], +) +def test_find_last_cpu_node_before_timestamp(ops_by_tid, exclude_tid, timestamp, expected_result, trace_linker): + result = trace_linker.find_last_cpu_node_before_timestamp(ops_by_tid, exclude_tid, timestamp) + assert result == expected_result + + +def test_link_gpu_ops(trace_linker): + # Create a mock PyTorch operator + pytorch_op = MagicMock(spec=PyTorchOperator) + pytorch_op.id = 123 + + # Create mock Kineto GPU operators + kineto_gpu_op1 = MagicMock(spec=KinetoOperator) + kineto_gpu_op2 = MagicMock(spec=KinetoOperator) + kineto_gpu_ops = [kineto_gpu_op1, kineto_gpu_op2] + + # Call the method + trace_linker.link_gpu_ops(pytorch_op, kineto_gpu_ops) + + # Assert that the parent_pytorch_op_id is set correctly + for gpu_op in kineto_gpu_ops: + assert gpu_op.parent_pytorch_op_id == pytorch_op.id + + +@pytest.mark.parametrize( + "orig_op_id, cpu_op, kineto_gpu_ops, expected_ids, expected_fields", + [ + ( + 1, + { + "id": 1, + "inputs": ["input1", "input2"], + "outputs": ["output1"], + }, + [ + { + "timestamp": 200, + "category": "gpu_op", + "name": "gpu_op1", + "phase": "X", + "inclusive_dur": 100, + "exclusive_dur": 80, + "stream": 1, + }, + { + "timestamp": 300, + "category": "gpu_op", + "name": "gpu_op2", + "phase": "X", + "inclusive_dur": 120, + "exclusive_dur": 100, + "stream": 2, + }, + ], + [100, 101], + [ + { + "ctrl_deps": 1, + "inputs": ["input1", "input2"], + "outputs": ["output1"], + }, + { + "ctrl_deps": 1, + "inputs": ["input1", "input2"], + "outputs": ["output1"], + }, + ], + ), + ], +) +def test_process_dependent_gpu_ops(trace_linker, orig_op_id, cpu_op, kineto_gpu_ops, expected_ids, expected_fields): + # Create mock dependent GPU operators + kineto_gpu_op_objects = [] + for gpu_op_data in kineto_gpu_ops: + gpu_op = MagicMock(spec=KinetoOperator) + gpu_op.timestamp = gpu_op_data["timestamp"] + gpu_op.category = gpu_op_data["category"] + gpu_op.name = gpu_op_data["name"] + gpu_op.phase = gpu_op_data["phase"] + gpu_op.inclusive_dur = gpu_op_data["inclusive_dur"] + gpu_op.exclusive_dur = gpu_op_data["exclusive_dur"] + gpu_op.stream = gpu_op_data["stream"] + kineto_gpu_op_objects.append(gpu_op) + + trace_linker.pytorch_op_id_to_kineto_ops_map[orig_op_id] = kineto_gpu_op_objects + + # Override the generate_new_id method to return the expected IDs + original_generate_new_id = trace_linker.id_assigner.generate_new_id + trace_linker.id_assigner.generate_new_id = MagicMock(side_effect=expected_ids) + + # Call the method + updated_gpu_ops = trace_linker.process_dependent_gpu_ops(cpu_op, orig_op_id) + + # Restore the original generate_new_id method + trace_linker.id_assigner.generate_new_id = original_generate_new_id + + # Assert the new GPU operators have the updated IDs and fields + assert len(updated_gpu_ops) == len(kineto_gpu_ops) + for i, updated_gpu_op in enumerate(updated_gpu_ops): + assert updated_gpu_op["id"] == expected_ids[i] + assert updated_gpu_op["ctrl_deps"] == expected_fields[i]["ctrl_deps"] + assert updated_gpu_op["inputs"] == expected_fields[i]["inputs"] + assert updated_gpu_op["outputs"] == expected_fields[i]["outputs"] + assert updated_gpu_op["cat"] == kineto_gpu_op_objects[i].category + assert updated_gpu_op["name"] == kineto_gpu_op_objects[i].name + assert updated_gpu_op["ph"] == kineto_gpu_op_objects[i].phase + assert updated_gpu_op["inclusive_dur"] == kineto_gpu_op_objects[i].inclusive_dur + assert updated_gpu_op["exclusive_dur"] == kineto_gpu_op_objects[i].exclusive_dur + assert updated_gpu_op["ts"] == kineto_gpu_op_objects[i].timestamp + assert updated_gpu_op["stream"] == kineto_gpu_op_objects[i].stream + + +@patch("chakra.src.trace_link.trace_linker.TraceLinker.process_op_and_dependents") +@patch("builtins.open", new_callable=MagicMock) +@patch("json.load") +def test_construct_et_plus_data(mock_json_load, mock_open, mock_process_op_and_dependents, trace_linker): + mock_json_load.return_value = {"nodes": [{"id": 1}, {"id": 2}]} + mock_process_op_and_dependents.side_effect = lambda x: [{"id": x["id"] + 2}] + + trace_linker.construct_et_plus_data() + assert trace_linker.pytorch_et_plus_data["nodes"] == [{"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}] + + +@patch("builtins.open", new_callable=MagicMock) +@patch("json.dump") +def test_dump_pytorch_execution_trace_plus(mock_json_dump, mock_open, trace_linker): + trace_linker.pytorch_et_plus_data = {"nodes": [{"id": 1}, {"id": 2}]} + trace_linker.dump_pytorch_execution_trace_plus("output.json") + + mock_open.assert_called_once_with("output.json", "w") + mock_open.return_value.__enter__.assert_called_once() + mock_open.return_value.__exit__.assert_called_once() + mock_json_dump.assert_called_once_with( + {"nodes": [{"id": 1}, {"id": 2}]}, mock_open.return_value.__enter__(), indent=4 + )