From 24b2f35abaa4e7c333f36333df1da4afd4605707 Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Wed, 13 Mar 2024 08:13:23 -0400 Subject: [PATCH 1/3] Allow multiple gpu children for a given CPU operator --- et_converter/pytorch2chakra_converter.py | 236 ++++++++++------------- et_converter/pytorch_node.py | 14 +- 2 files changed, 115 insertions(+), 135 deletions(-) diff --git a/et_converter/pytorch2chakra_converter.py b/et_converter/pytorch2chakra_converter.py index 43a80730..a31fd195 100644 --- a/et_converter/pytorch2chakra_converter.py +++ b/et_converter/pytorch2chakra_converter.py @@ -169,8 +169,7 @@ def convert(self) -> None: chakra_node = self.convert_to_chakra_node(pytorch_node) self.chakra_nodes[chakra_node.id] = chakra_node - if pytorch_node.child_gpu: - pytorch_gpu_node = pytorch_node.child_gpu + for pytorch_gpu_node in pytorch_node.gpu_children: chakra_gpu_node = self.convert_to_chakra_node(pytorch_gpu_node) if chakra_node.type == COMM_COLL_NODE: @@ -273,7 +272,7 @@ def _establish_parent_child_relationships( parent_node.add_child(pytorch_node) if pytorch_node.is_gpu_op(): - parent_node.set_child_gpu(pytorch_node) + parent_node.add_gpu_child(pytorch_node) if pytorch_node.is_record_param_comms_op(): parent_node.record_param_comms_node = pytorch_node @@ -341,136 +340,114 @@ def split_cpu_nodes_with_gpu_child(self) -> None: """ self.logger.info("Decomposing CPU nodes with GPU child nodes.") updated_pytorch_nodes: Dict[int, PyTorchNode] = {} - for cpu_node in self.pytorch_nodes.values(): - if cpu_node.child_gpu is None: - new_cpu_node_id = self.id_assigner.assign_unique_id(cpu_node.id) - cpu_node.id = new_cpu_node_id - for child_node in cpu_node.children: - child_node.parent = cpu_node.id - updated_pytorch_nodes[new_cpu_node_id] = cpu_node - else: - if cpu_node.exclusive_dur > 1: - gpu_node = cpu_node.child_gpu - cpu_node_first, cpu_node_second, updated_gpu_node =\ - self._split_cpu_node(cpu_node, gpu_node, updated_pytorch_nodes) - updated_pytorch_nodes[cpu_node_first.id] = copy.deepcopy(cpu_node_first) - updated_pytorch_nodes[cpu_node_second.id] = copy.deepcopy(cpu_node_second) - updated_pytorch_nodes[updated_gpu_node.id] = copy.deepcopy(updated_gpu_node) + for pytorch_node in self.pytorch_nodes.values(): + if pytorch_node.is_cpu_op(): + cpu_node = pytorch_node + gpu_children = cpu_node.gpu_children + if gpu_children: + if cpu_node.exclusive_dur > 1: + split_nodes = self._split_cpu_node(cpu_node, gpu_children) + updated_pytorch_nodes.update(split_nodes) + else: + new_cpu_id = self.id_assigner.assign_unique_id(cpu_node.id) + cpu_node.id = new_cpu_id + updated_pytorch_nodes[new_cpu_id] = cpu_node + for child in cpu_node.children: + new_child_id = self.id_assigner.assign_unique_id(child.id) + child.id = new_child_id + child.parent = new_cpu_id + updated_pytorch_nodes[new_child_id] = gpu_child else: - new_cpu_node_id = self.id_assigner.assign_unique_id(cpu_node.id) - cpu_node.id = new_cpu_node_id - for child_node in cpu_node.children: - child_node.parent = cpu_node.id - updated_pytorch_nodes[new_cpu_node_id] = cpu_node - - gpu_node = cpu_node.child_gpu - gpu_node.parent = new_cpu_node_id - new_gpu_node_id = self.id_assigner.assign_unique_id(gpu_node.id) - updated_pytorch_nodes[new_gpu_node_id] = gpu_node + new_id = self.id_assigner.assign_unique_id(cpu_node.id) + cpu_node.id = new_id + for child in cpu_node.children: + child.parent = new_id + updated_pytorch_nodes[new_id] = cpu_node + elif not pytorch_node.is_gpu_op(): + new_id = self.id_assigner.assign_unique_id(pytorch_node.id) + pytorch_node.id = new_id + for child in pytorch_node.children: + child.parent = new_id + updated_pytorch_nodes[new_id] = pytorch_node self.pytorch_nodes = updated_pytorch_nodes def _split_cpu_node( - self, cpu_node: PyTorchNode, gpu_node: PyTorchNode, - updated_pytorch_nodes: Dict[int, PyTorchNode] - ) -> Tuple[PyTorchNode, PyTorchNode, PyTorchNode]: - """ - Splits a CPU node based on the GPU node's timestamp. - - Args: - cpu_node (PyTorchNode): Original CPU node to be split. - gpu_node (PyTorchNode): GPU node dictating the split. - updated_pytorch_nodes (Dict[int, PyTorchNode]): Updated PyTorch nodes. - - Returns: - Tuple[PyTorchNode, PyTorchNode, PyTorchNode]: Two split nodes and - the updated GPU node. - - Raises: - ValueError: For inconsistencies in the timestamps of the nodes. - """ - original_cpu_info = f"Original CPU Node ID {cpu_node.id} ({cpu_node.name}), " \ - f"Inclusive Duration: {cpu_node.inclusive_dur}, " \ - f"Exclusive Duration: {cpu_node.exclusive_dur}." - self.logger.debug(original_cpu_info) - self.logger.debug(f"GPU Node ID {gpu_node.id} ({gpu_node.name}), " - f"Inclusive Duration: {gpu_node.inclusive_dur}, " - f"Exclusive Duration: {gpu_node.exclusive_dur}.") - - cpu_node_first = copy.deepcopy(cpu_node) - cpu_node_first.id = self.id_assigner.assign_unique_id(cpu_node.id) - cpu_node_first.ts = cpu_node.ts - cpu_node_first.exclusive_dur = int(cpu_node.exclusive_dur / 2) - cpu_node_first.set_child_gpu(gpu_node) - if cpu_node_first.ts >= gpu_node.ts or cpu_node_first.inclusive_dur <= 0: - err_msg = (f"Invalid timestamps for the first split CPU node derived from {original_cpu_info}\n" - f"\tFirst Split CPU Node Timestamp: {cpu_node_first.ts}, \n" - f"\tGPU Node Timestamp: {gpu_node.ts}, \n" - f"\tFirst Split CPU Node Inclusive Duration: {cpu_node_first.inclusive_dur}, \n" - f"\tFirst Split CPU Node Exclusive Duration: {cpu_node_first.exclusive_dur}.") - self.logger.error(err_msg) - raise ValueError(err_msg) - - if cpu_node.parent in self.pytorch_nodes: - self._update_parent_node_children(self.pytorch_nodes, cpu_node, cpu_node_first) - elif cpu_node.parent in updated_pytorch_nodes: - self._update_parent_node_children(updated_pytorch_nodes, cpu_node, cpu_node_first) - - self.logger.debug(f"First Split CPU Node ID {cpu_node_first.id} ({cpu_node_first.name}), " - f"Inclusive Duration: {cpu_node_first.inclusive_dur}, " - f"Exclusive Duration: {cpu_node_first.exclusive_dur}.") - - gpu_node_id = self.id_assigner.assign_unique_id(gpu_node.id) - gpu_node.id = gpu_node_id - gpu_node.parent = cpu_node_first.id - - cpu_node_second = copy.deepcopy(cpu_node) - cpu_node_second.id = self.id_assigner.assign_unique_id(cpu_node.id) - cpu_node_second.ts = gpu_node.ts - cpu_node_second.exclusive_dur = int(cpu_node.exclusive_dur / 2) - cpu_node_second.set_child_gpu(None) - cpu_node_second.parent = cpu_node_first.id - for child_node in cpu_node.children: - child_node.parent = cpu_node_second.id - cpu_node_second.add_child(child_node) - if cpu_node_second.ts <= cpu_node_first.ts or cpu_node_second.inclusive_dur <= 0: - err_msg = (f"Invalid timestamps for the second split CPU node derived from {original_cpu_info}\n" - f"\tFirst Split Timestamp: {cpu_node_first.ts}, \n" - f"\tSecond Split Timestamp: {cpu_node_second.ts}, \n" - f"\tSecond Split Inclusive Duration: {cpu_node_second.inclusive_dur}, " - f"\tSecond Split Exclusive Duration: {cpu_node_second.exclusive_dur}.") - self.logger.error(err_msg) - raise ValueError(err_msg) - - self.logger.debug(f"Second Split CPU Node ID {cpu_node_second.id} ({cpu_node_second.name}), " - f"Inclusive Duration: {cpu_node_second.inclusive_dur}, " - f"Exclusive Duration: {cpu_node_second.exclusive_dur}.") - - cpu_node_first.add_child(cpu_node_second) - cpu_node_first.add_child(gpu_node) - - return cpu_node_first, cpu_node_second, gpu_node - - def _update_parent_node_children(self, parent_node_dict: Dict[int, PyTorchNode], - cpu_node: PyTorchNode, - cpu_node_first: PyTorchNode) -> None: - """ - Updates the children of the parent node in the given dictionary. - - This method removes the original CPU node from the parent's children list - and adds the first split node. - - Args: - parent_node_dict (Dict[int, PyTorchNode]): Dictionary containing the - parent node. - cpu_node (PyTorchNode): Original CPU node being split. - cpu_node_first (PyTorchNode): First split node to add to the parent's - children. - """ - parent_node = parent_node_dict[cpu_node.parent] - parent_node.children = [child for child in parent_node.children - if child.id != cpu_node.id] - parent_node.children.extend([cpu_node_first]) + self, + cpu_node: PyTorchNode, + gpu_children: List[PyTorchNode] + ) -> Dict[int, PyTorchNode]: + """ + Splits a CPU node based on overlaps with GPU children. It correctly assigns + the nearest preceding CPU node as the parent for each GPU child. It ensures + non-GPU children of the original CPU node are preserved and added to the + last part of the split CPU node. + """ + updated_nodes = {} + original_end_ts = cpu_node.ts + cpu_node.exclusive_dur + + # Preserving non-GPU children of the original CPU node + non_gpu_children = [child for child in cpu_node.children if child not in gpu_children] + + split_points = sorted(set(gpu_child.ts for gpu_child in gpu_children if gpu_child.ts < original_end_ts)) + last_split_end = cpu_node.ts + last_cpu_part_id = None + + for split_ts in split_points: + split_duration = split_ts - last_split_end + if split_duration > 0: + new_cpu_part = copy.deepcopy(cpu_node) + new_part_id = self.id_assigner.assign_unique_id(cpu_node.id) + new_cpu_part.id = new_part_id + new_cpu_part.ts = last_split_end + new_cpu_part.exclusive_dur = split_duration + new_cpu_part.inclusive_dur = split_duration + new_cpu_part.parent = cpu_node.parent if last_cpu_part_id is None else last_cpu_part_id + new_cpu_part.children = [] + new_cpu_part.gpu_children = [] + + updated_nodes[new_part_id] = new_cpu_part + last_cpu_part_id = new_part_id + last_split_end = split_ts + + # Creating the final part if necessary + if last_split_end < original_end_ts: + final_part_duration = original_end_ts - last_split_end + final_cpu_part = copy.deepcopy(cpu_node) + final_part_id = self.id_assigner.assign_unique_id(cpu_node.id) + final_cpu_part.id = final_part_id + final_cpu_part.ts = last_split_end + final_cpu_part.exclusive_dur = final_part_duration + final_cpu_part.inclusive_dur = final_part_duration + final_cpu_part.parent = last_cpu_part_id if last_cpu_part_id else cpu_node.parent + final_cpu_part.children = [] + final_cpu_part.gpu_children = [] + + updated_nodes[final_part_id] = final_cpu_part + last_cpu_part_id = final_part_id + + # Adding non-GPU children to the last split of the CPU node + if last_cpu_part_id: + updated_nodes[last_cpu_part_id].children.extend(non_gpu_children) + + # Assigning GPU children to the closest CPU part + for gpu_child in gpu_children: + closest_cpu_parent_id = max( + [id for id, node in updated_nodes.items() if node.ts < gpu_child.ts], + key=lambda id: updated_nodes[id].ts, + default=cpu_node.parent + ) + + gpu_child.parent = closest_cpu_parent_id + new_gpu_id = self.id_assigner.assign_unique_id(gpu_child.id) + gpu_child.id = new_gpu_id + updated_nodes[new_gpu_id] = gpu_child + + # Add GPU child to the nearest CPU part's GPU children list + updated_nodes[closest_cpu_parent_id].children.append(gpu_child) + updated_nodes[closest_cpu_parent_id].gpu_children.append(gpu_child) + + return updated_nodes def convert_to_chakra_node(self, pytorch_node: PyTorchNode) -> ChakraNode: """ @@ -708,7 +685,8 @@ def remove_dangling_nodes(self) -> None: if node_id not in parent_ids and not node.data_deps: dangling_nodes.append(node) del self.chakra_nodes[node_id] - del self.pytorch_nodes[node_id] + if node_id in self.pytorch_nodes: + del self.pytorch_nodes[node_id] if dangling_nodes: self.logger.info(f"Identified and removed {len(dangling_nodes)} dangling nodes:") diff --git a/et_converter/pytorch_node.py b/et_converter/pytorch_node.py index 47edfbd0..3afa2dec 100644 --- a/et_converter/pytorch_node.py +++ b/et_converter/pytorch_node.py @@ -31,7 +31,7 @@ def __init__(self, node_data: Dict[str, Any]) -> None: self.node_data = node_data self.data_deps: List['PyTorchNode'] = [] self.children: List['PyTorchNode'] = [] - self.child_gpu: Optional['PyTorchNode'] = None + self.gpu_children: List['PyTorchNode'] = [] self.record_param_comms_node: Optional['PyTorchNode'] = None self.nccl_node: Optional['PyTorchNode'] = None @@ -419,7 +419,9 @@ def inclusive_dur(self) -> int: Returns: int: The inclusive duration of the node. """ - return self.node_data["inclusive_dur"] + if "inclusive_dur" in self.node_data: + return self.node_data["inclusive_dur"] + return 0 @inclusive_dur.setter def inclusive_dur(self, value: int) -> None: @@ -543,14 +545,14 @@ def add_child(self, child_node: 'PyTorchNode') -> None: """ self.children.append(child_node) - def set_child_gpu(self, child_gpu_node: Optional['PyTorchNode']) -> None: + def add_gpu_child(self, gpu_child_node: 'PyTorchNode') -> None: """ - Sets a child GPU node for this node. + Adds a child GPU node for this node. Args: - child_gpu_node (Optional[PyTorchNode]): The child GPU node to be set. + gpu_child_node (Optional[PyTorchNode]): The child GPU node to be added. """ - self.child_gpu = child_gpu_node + self.gpu_children.append(gpu_child_node) def is_record_param_comms_op(self) -> bool: """ From d8d7a8aba21ffe4b9fce167d5b5b59a0e382aef0 Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Wed, 13 Mar 2024 08:14:06 -0400 Subject: [PATCH 2/3] Support Tensor(c10::BFloat16) in get_data_type_size --- et_converter/pytorch_node.py | 1 + 1 file changed, 1 insertion(+) diff --git a/et_converter/pytorch_node.py b/et_converter/pytorch_node.py index 3afa2dec..81f245b0 100644 --- a/et_converter/pytorch_node.py +++ b/et_converter/pytorch_node.py @@ -622,6 +622,7 @@ def get_data_type_size(data_type: str) -> int: "Tensor(int64)": 8, "Tensor(long)": 8, "Tensor(c10::Half)": 2, + "Tensor(c10::BFloat16)": 2, "Tensor(unsigned char)": 1, "Tensor(long int)": 8, # TODO: Add more types From 97deba1b06166dea96efc9a7012c45f19a5fe566 Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Wed, 13 Mar 2024 08:15:04 -0400 Subject: [PATCH 3/3] Ignore overlap between CPU ops and GPU ops --- et_converter/pytorch2chakra_converter.py | 135 ----------------------- 1 file changed, 135 deletions(-) diff --git a/et_converter/pytorch2chakra_converter.py b/et_converter/pytorch2chakra_converter.py index a31fd195..8a5a75cf 100644 --- a/et_converter/pytorch2chakra_converter.py +++ b/et_converter/pytorch2chakra_converter.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 -import copy import json import logging from typing import Dict, List, Optional, Tuple, Set @@ -161,8 +160,6 @@ def convert(self) -> None: self.open_chakra_execution_trace() - self.split_cpu_nodes_with_gpu_child() - for pytorch_nid, pytorch_node in self.pytorch_nodes.items(): if (pytorch_node.get_op_type() == PyTorchNodeType.CPU_OP)\ or (pytorch_node.get_op_type() == PyTorchNodeType.LABEL): @@ -317,138 +314,6 @@ def open_chakra_execution_trace(self) -> None: self.logger.error(err_msg) raise Exception(err_msg) - def split_cpu_nodes_with_gpu_child(self) -> None: - """ - Decomposes CPU nodes with GPU child nodes to model execution overlap - accurately. This method addresses scenarios where a CPU node has a GPU - child node, with an overlap in their execution ending at the same time. - The method splits the CPU node into: - 1. Non-Overlapping Part: Segment before the GPU node starts. - 2. Overlapping Part: Segment overlapping with the GPU node. - - Timeline Stages: - Stage 1 - Original Scenario: - |------------ CPU Node ------------| - |--- GPU Node ---| - - Stage 2 - After Split: - |-- Non-Overlap --|--- Overlap ----| - |--- GPU Node ---| - - Raises: - ValueError: If timestamps of GPU and CPU nodes are inconsistent. - """ - self.logger.info("Decomposing CPU nodes with GPU child nodes.") - updated_pytorch_nodes: Dict[int, PyTorchNode] = {} - for pytorch_node in self.pytorch_nodes.values(): - if pytorch_node.is_cpu_op(): - cpu_node = pytorch_node - gpu_children = cpu_node.gpu_children - if gpu_children: - if cpu_node.exclusive_dur > 1: - split_nodes = self._split_cpu_node(cpu_node, gpu_children) - updated_pytorch_nodes.update(split_nodes) - else: - new_cpu_id = self.id_assigner.assign_unique_id(cpu_node.id) - cpu_node.id = new_cpu_id - updated_pytorch_nodes[new_cpu_id] = cpu_node - for child in cpu_node.children: - new_child_id = self.id_assigner.assign_unique_id(child.id) - child.id = new_child_id - child.parent = new_cpu_id - updated_pytorch_nodes[new_child_id] = gpu_child - else: - new_id = self.id_assigner.assign_unique_id(cpu_node.id) - cpu_node.id = new_id - for child in cpu_node.children: - child.parent = new_id - updated_pytorch_nodes[new_id] = cpu_node - elif not pytorch_node.is_gpu_op(): - new_id = self.id_assigner.assign_unique_id(pytorch_node.id) - pytorch_node.id = new_id - for child in pytorch_node.children: - child.parent = new_id - updated_pytorch_nodes[new_id] = pytorch_node - - self.pytorch_nodes = updated_pytorch_nodes - - def _split_cpu_node( - self, - cpu_node: PyTorchNode, - gpu_children: List[PyTorchNode] - ) -> Dict[int, PyTorchNode]: - """ - Splits a CPU node based on overlaps with GPU children. It correctly assigns - the nearest preceding CPU node as the parent for each GPU child. It ensures - non-GPU children of the original CPU node are preserved and added to the - last part of the split CPU node. - """ - updated_nodes = {} - original_end_ts = cpu_node.ts + cpu_node.exclusive_dur - - # Preserving non-GPU children of the original CPU node - non_gpu_children = [child for child in cpu_node.children if child not in gpu_children] - - split_points = sorted(set(gpu_child.ts for gpu_child in gpu_children if gpu_child.ts < original_end_ts)) - last_split_end = cpu_node.ts - last_cpu_part_id = None - - for split_ts in split_points: - split_duration = split_ts - last_split_end - if split_duration > 0: - new_cpu_part = copy.deepcopy(cpu_node) - new_part_id = self.id_assigner.assign_unique_id(cpu_node.id) - new_cpu_part.id = new_part_id - new_cpu_part.ts = last_split_end - new_cpu_part.exclusive_dur = split_duration - new_cpu_part.inclusive_dur = split_duration - new_cpu_part.parent = cpu_node.parent if last_cpu_part_id is None else last_cpu_part_id - new_cpu_part.children = [] - new_cpu_part.gpu_children = [] - - updated_nodes[new_part_id] = new_cpu_part - last_cpu_part_id = new_part_id - last_split_end = split_ts - - # Creating the final part if necessary - if last_split_end < original_end_ts: - final_part_duration = original_end_ts - last_split_end - final_cpu_part = copy.deepcopy(cpu_node) - final_part_id = self.id_assigner.assign_unique_id(cpu_node.id) - final_cpu_part.id = final_part_id - final_cpu_part.ts = last_split_end - final_cpu_part.exclusive_dur = final_part_duration - final_cpu_part.inclusive_dur = final_part_duration - final_cpu_part.parent = last_cpu_part_id if last_cpu_part_id else cpu_node.parent - final_cpu_part.children = [] - final_cpu_part.gpu_children = [] - - updated_nodes[final_part_id] = final_cpu_part - last_cpu_part_id = final_part_id - - # Adding non-GPU children to the last split of the CPU node - if last_cpu_part_id: - updated_nodes[last_cpu_part_id].children.extend(non_gpu_children) - - # Assigning GPU children to the closest CPU part - for gpu_child in gpu_children: - closest_cpu_parent_id = max( - [id for id, node in updated_nodes.items() if node.ts < gpu_child.ts], - key=lambda id: updated_nodes[id].ts, - default=cpu_node.parent - ) - - gpu_child.parent = closest_cpu_parent_id - new_gpu_id = self.id_assigner.assign_unique_id(gpu_child.id) - gpu_child.id = new_gpu_id - updated_nodes[new_gpu_id] = gpu_child - - # Add GPU child to the nearest CPU part's GPU children list - updated_nodes[closest_cpu_parent_id].children.append(gpu_child) - updated_nodes[closest_cpu_parent_id].gpu_children.append(gpu_child) - - return updated_nodes - def convert_to_chakra_node(self, pytorch_node: PyTorchNode) -> ChakraNode: """ Converts a PyTorchNode to a ChakraNode.