Skip to content

Commit

Permalink
Merge pull request #77 from mlcommons/trace-linker-pytest
Browse files Browse the repository at this point in the history
Add unit tests for TraceLinker methods
  • Loading branch information
srinivas212 authored Jun 10, 2024
2 parents 05dc6e6 + fe66519 commit 4f7945e
Show file tree
Hide file tree
Showing 2 changed files with 338 additions and 31 deletions.
112 changes: 81 additions & 31 deletions src/trace_link/trace_linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,23 +99,29 @@ def load_traces(self) -> None:
"""
Loads both PyTorch Execution Traces and Kineto Traces.
"""
self.load_pytorch_et()
self.load_kineto_trace()
self.pytorch_ops = self.load_pytorch_et()
kineto_data = self.load_kineto_trace()
self.update_kineto_data(kineto_data)

def load_pytorch_et(self) -> None:
def load_pytorch_et(self) -> List[PyTorchOperator]:
"""
Loads and processes the PyTorch Execution Trace.
This method handles multiple iterations in the trace and extracts the nodes,
considering the specified annotation for segmenting the iterations.
Returns:
List[PyTorchOperator]: List of PyTorch operators.
"""
self.logger.info("Starting to load PyTorch Execution Trace.")
pytorch_et = load_execution_trace_file(self.pytorch_et_file)

root_node = pytorch_et.get_nodes()[1] # Root node is usually 1-based
self.pytorch_ops = self.extract_pytorch_ops(root_node)
self.logger.info(f"Original ops in PyTorch ET: {len(self.pytorch_ops)}")
pytorch_ops = self.extract_pytorch_ops(root_node)
self.logger.info(f"Original ops in PyTorch ET: {len(pytorch_ops)}")
self.logger.info("PyTorch Execution Trace loaded successfully.")

return pytorch_ops

def extract_pytorch_ops(self, node: PyTorchOperator) -> List[PyTorchOperator]:
"""
Extracts and sorts nodes from the PyTorch execution trace recursively.
Expand All @@ -139,11 +145,14 @@ def traverse(node: PyTorchOperator):
traverse(node)
return sorted(nodes, key=lambda x: x.id)

def load_kineto_trace(self) -> None:
def load_kineto_trace(self) -> Dict:
"""
Loads and processes the Kineto Trace.
This method parses the Kineto trace file, creating KinetoOperator instances for each operator in the trace.
It then categorizes and segments these operators for further processing and linking with PyTorch operators.
Returns:
Dict: Dictionary containing various data structures needed for linking traces.
"""
self.logger.info("Starting to load Kineto Trace.")
kineto_trace_data = read_dictionary_from_json_file(self.kineto_file)
Expand All @@ -152,20 +161,21 @@ def load_kineto_trace(self) -> None:
key=lambda op: op.timestamp,
)

self.construct_kineto_data_structures(sorted_kineto_ops)
self.calculate_exclusive_dur()
kineto_data = self.construct_kineto_data_structures(sorted_kineto_ops)
self.calculate_exclusive_dur(kineto_data["kineto_tid_cpu_ops_map"])

self.sorted_kineto_cpu_ops = sorted(self.kineto_cpu_ops, key=lambda op: op.timestamp)
self.sorted_kineto_cpu_op_ts = [op.timestamp for op in self.sorted_kineto_cpu_ops]
kineto_data["sorted_kineto_cpu_ops"] = sorted(kineto_data["kineto_cpu_ops"], key=lambda op: op.timestamp)
kineto_data["sorted_kineto_cpu_op_ts"] = [op.timestamp for op in kineto_data["sorted_kineto_cpu_ops"]]

self.logger.info(
f"Processed Kineto trace with {len(self.kineto_cpu_ops)} CPU ops, "
f"{len(self.kineto_id_cuda_launch_op_map)} CPU launcher ops, "
f"and {len(self.kineto_gpu_ops)} GPU ops."
f"Processed Kineto trace with {len(kineto_data['kineto_cpu_ops'])} CPU ops, "
f"{len(kineto_data['kineto_id_cuda_launch_op_map'])} CPU launcher ops, "
f"and {len(kineto_data['kineto_gpu_ops'])} GPU ops."
)
self.logger.info("Kineto Trace loaded successfully.")
return kineto_data

def construct_kineto_data_structures(self, kineto_ops: List[KinetoOperator]) -> None:
def construct_kineto_data_structures(self, kineto_ops: List[KinetoOperator]) -> Dict:
"""
Constructs necessary data structures required for trace linking from the provided Kineto operators. This method
identifies process start time, end time, thread start time, and end time, and also categorizes operators into
Expand All @@ -174,31 +184,38 @@ def construct_kineto_data_structures(self, kineto_ops: List[KinetoOperator]) ->
Args:
kineto_ops (List[KinetoOperator]): List of Kineto operators to categorize.
Raises:
ValueError: If duplicate correlation IDs are found in 'cuda_runtime' category operators.
Returns:
Dict: Dictionary containing categorized operators and timing boundaries.
"""
self.logger.info("Categorizing Kineto operators and calculating timing boundaries.")
process_start_time = sys.maxsize
process_end_time = 0
thread_info = {}

kineto_cpu_ops = []
kineto_tid_cpu_ops_map = {}
kineto_correlation_cuda_runtime_map = {}
kineto_gpu_ops = []
kineto_id_arrow_op_map = {}
kineto_id_cuda_launch_op_map = {}

for op in kineto_ops:
if op.is_cpu_op():
self.kineto_cpu_ops.append(op)
self.kineto_tid_cpu_ops_map.setdefault(op.tid, []).append(op)
kineto_cpu_ops.append(op)
kineto_tid_cpu_ops_map.setdefault(op.tid, []).append(op)
self.logger.debug(f"Added CPU or user annotation op: {op.name}")

elif op.is_cuda_launch_op():
self.kineto_id_cuda_launch_op_map[op.external_id] = op
if op.correlation in self.kineto_correlation_cuda_runtime_map:
kineto_id_cuda_launch_op_map[op.external_id] = op
if op.correlation in kineto_correlation_cuda_runtime_map:
raise ValueError(
f"Duplicate correlation ID {op.correlation} found in self.kineto_id_cuda_launch_op_map."
)
self.kineto_correlation_cuda_runtime_map[op.correlation] = op
kineto_correlation_cuda_runtime_map[op.correlation] = op
self.logger.debug(f"Added CPU launcher op: {op.name}")

elif op.is_gpu_op():
self.kineto_gpu_ops.append(op)
kineto_gpu_ops.append(op)
self.logger.debug(f"Added GPU op: {op.name}")

elif op.is_arrow_op():
Expand All @@ -211,7 +228,8 @@ def construct_kineto_data_structures(self, kineto_ops: List[KinetoOperator]) ->
)
self.logger.error(error_msg)
raise KeyError(error_msg)
self.kineto_id_arrow_op_map[op.id] = op

kineto_id_arrow_op_map[op.id] = op

# Update timing boundaries
if op.tid is not None:
Expand All @@ -221,19 +239,30 @@ def construct_kineto_data_structures(self, kineto_ops: List[KinetoOperator]) ->
thread_start_end[0] = min(thread_start_end[0], op.timestamp)
thread_start_end[1] = max(thread_start_end[1], op.timestamp + op.inclusive_dur)

self.kineto_rf_id_to_kineto_op_map = {op.rf_id: op for op in self.kineto_cpu_ops if op.rf_id is not None}
kineto_rf_id_to_kineto_op_map = {op.rf_id: op for op in kineto_cpu_ops if op.rf_id is not None}

# Apply collected timing info
self.kineto_process_start_time = process_start_time
self.kineto_process_end_time = process_end_time
self.kineto_thread_info = thread_info
self.logger.info("Kineto operators categorized and timing boundaries calculated.")
return {
"kineto_cpu_ops": kineto_cpu_ops,
"kineto_tid_cpu_ops_map": kineto_tid_cpu_ops_map,
"kineto_correlation_cuda_runtime_map": kineto_correlation_cuda_runtime_map,
"kineto_gpu_ops": kineto_gpu_ops,
"kineto_id_arrow_op_map": kineto_id_arrow_op_map,
"kineto_id_cuda_launch_op_map": kineto_id_cuda_launch_op_map,
"kineto_process_start_time": process_start_time,
"kineto_process_end_time": process_end_time,
"kineto_thread_info": thread_info,
"kineto_rf_id_to_kineto_op_map": kineto_rf_id_to_kineto_op_map,
}

def calculate_exclusive_dur(self) -> None:
def calculate_exclusive_dur(self, kineto_tid_cpu_ops_map: Dict[int, List[KinetoOperator]]) -> None:
"""
Calculates the exclusive duration of each operator in the Kineto traces in parallel. The exclusive duration is
defined as the total duration of the operator minus any time spent in child operators, effectively representing
the time spent exclusively in that operator.
Args:
kineto_tid_cpu_ops_map (Dict[int, List[KinetoOperator]]): Map of thread IDs to their corresponding Kineto
operators.
"""
self.logger.info("Calculating exclusive durations for Kineto operators in parallel.")

Expand Down Expand Up @@ -277,7 +306,7 @@ def process_ops_for_thread(ops: List[KinetoOperator]) -> None:
)

with ThreadPoolExecutor() as executor:
futures = [executor.submit(process_ops_for_thread, ops) for ops in self.kineto_tid_cpu_ops_map.values()]
futures = [executor.submit(process_ops_for_thread, ops) for ops in kineto_tid_cpu_ops_map.values()]

for future in as_completed(futures):
future.result() # Wait for all threads to complete and handle any exceptions
Expand Down Expand Up @@ -313,6 +342,27 @@ def merge_overlapping_intervals(intervals: List[Tuple[int, int]]) -> List[Tuple[

return merged

def update_kineto_data(self, kineto_data: Dict) -> None:
"""
Updates the instance variables of the TraceLinker class using the data structures from the kineto_data
dictionary.
Args:
kineto_data (Dict): Dictionary containing categorized operators and timing boundaries.
"""
self.kineto_cpu_ops = kineto_data["kineto_cpu_ops"]
self.kineto_tid_cpu_ops_map = kineto_data["kineto_tid_cpu_ops_map"]
self.kineto_correlation_cuda_runtime_map = kineto_data["kineto_correlation_cuda_runtime_map"]
self.kineto_gpu_ops = kineto_data["kineto_gpu_ops"]
self.kineto_id_arrow_op_map = kineto_data["kineto_id_arrow_op_map"]
self.kineto_id_cuda_launch_op_map = kineto_data["kineto_id_cuda_launch_op_map"]
self.kineto_process_start_time = kineto_data["kineto_process_start_time"]
self.kineto_process_end_time = kineto_data["kineto_process_end_time"]
self.kineto_thread_info = kineto_data["kineto_thread_info"]
self.kineto_rf_id_to_kineto_op_map = {op.rf_id: op for op in self.kineto_cpu_ops if op.rf_id is not None}
self.sorted_kineto_cpu_ops = kineto_data["sorted_kineto_cpu_ops"]
self.sorted_kineto_cpu_op_ts = kineto_data["sorted_kineto_cpu_op_ts"]

def enforce_inter_thread_order(self, threshold: int = 1000) -> None:
"""
Enforces order between groups of operators in different threads. In Kineto traces with multiple threads,
Expand Down
Loading

0 comments on commit 4f7945e

Please sign in to comment.