Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encode communicator groups in Chakra traces #140

Merged
merged 5 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/converter/pytorch_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,10 @@ def convert_json_to_protobuf_nodes(
protobuf_node_map (Dict[int, ChakraNode]): Dictionary where the converted Protobuf nodes will be stored.
"""
for _, json_node in json_node_map.items():
if (json_node.get_op_type() == PyTorchNodeType.CPU_OP) or (
json_node.get_op_type() == PyTorchNodeType.LABEL
if (
(json_node.get_op_type() == PyTorchNodeType.CPU_OP)
or (json_node.get_op_type() == PyTorchNodeType.LABEL)
or (json_node.get_op_type() == PyTorchNodeType.METADATA)
):
chakra_node = self.convert_json_to_protobuf_node(json_node_map, protobuf_node_map, json_node)
protobuf_node_map[chakra_node.id] = chakra_node
Expand All @@ -242,13 +244,15 @@ def convert_json_to_protobuf_nodes(
[
ChakraAttr(name="comm_type", int64_val=collective_comm_type),
ChakraAttr(name="comm_size", int64_val=pytorch_gpu_node.comm_size),
*( [ChakraAttr(name="pg_name", string_val=pytorch_gpu_node.pg_name)] if pytorch_gpu_node.pg_name != "" else [] ),
]
)

elif chakra_gpu_node.type in {COMM_SEND_NODE, COMM_RECV_NODE}:
chakra_gpu_node.attr.extend(
[
ChakraAttr(name="comm_size", int64_val=pytorch_gpu_node.comm_size),
*( [ChakraAttr(name="pg_name", string_val=pytorch_gpu_node.pg_name)] if pytorch_gpu_node.pg_name != "" else [] ),
]
)

Expand Down
11 changes: 10 additions & 1 deletion src/converter/pytorch_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ class PyTorchNodeType(Enum):
CPU_OP (int): Represents a CPU operation.
GPU_OP (int): Represents a GPU operation.
LABEL (int): Represents a non-operator node (e.g., labels).
METADATA (int): Represents a metadata node (e.g., process group initialization).
"""

CPU_OP = 1
GPU_OP = 2
LABEL = 3 # Non-operator nodes
METADATA = 4 # Metadata nodes


class PyTorchNode:
Expand All @@ -42,6 +44,7 @@ class PyTorchNode:
inter_thread_dep (Any): Inter-thread dependency of the node.
cat (Any): Category of the node.
stream (int): Stream associated with the node.
pg_name (str): Process Group name for the inter-GPU communication.
"""

SUPPORTED_VERSIONS = ["1.0.2-chakra.0.0.4", "1.0.3-chakra.0.0.4", "1.1.0-chakra.0.0.4"]
Expand Down Expand Up @@ -109,6 +112,10 @@ def _parse_data_1_0_3_chakra_0_0_4(self, node_data: Dict[str, Any]) -> None:
self.inter_thread_dep = node_data.get("inter_thread_dep")
self.cat = node_data.get("cat")
self.stream = node_data.get("stream", 0)
# In Colletive comms nodes, pg_name is in node_data if exists.
# In SendRecv nodes, pg_name is in the attrs if exists.
# Otherwise, pg_name is not present.
self.pg_name = node_data.get("pg_name", "")

for attr in node_data.get("attrs", []):
setattr(self, attr["name"], attr["value"])
Expand All @@ -120,7 +127,9 @@ def get_op_type(self) -> PyTorchNodeType:
Returns
PyTorchNodeType: The type of the PyTorch operation.
"""
if self.is_gpu_op():
if "process_group:init" in self.name:
return PyTorchNodeType.METADATA
elif self.is_gpu_op():
return PyTorchNodeType.GPU_OP
elif hasattr(self, "op_schema") or hasattr(self, "outputs"):
return PyTorchNodeType.CPU_OP
Expand Down
6 changes: 6 additions & 0 deletions src/feeder/et_feeder_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ ETFeederNode::ETFeederNode(std::shared_ptr<ChakraProtoMsg::Node> node) {
this->comm_dst_ = static_cast<uint32_t>(attr.int32_val());
} else if (attr_name == "comm_tag") {
this->comm_tag_ = static_cast<uint32_t>(attr.int32_val());
} else if (attr_name == "pg_name") {
this->pg_name_ = static_cast<string>(attr.string_val());
} else {
this->other_attrs_.emplace(attr_name, attr);
}
Expand Down Expand Up @@ -138,3 +140,7 @@ uint32_t ETFeederNode::comm_dst() {
uint32_t ETFeederNode::comm_tag() {
return comm_tag_;
}

string ETFeederNode::pg_name() {
return pg_name_;
}
2 changes: 2 additions & 0 deletions src/feeder/et_feeder_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class ETFeederNode {
uint32_t comm_src();
uint32_t comm_dst();
uint32_t comm_tag();
std::string pg_name();

private:
void assign_attr_val(
Expand Down Expand Up @@ -64,6 +65,7 @@ class ETFeederNode {
uint32_t comm_src_;
uint32_t comm_dst_;
uint32_t comm_tag_;
std::string pg_name_;
};

} // namespace Chakra
13 changes: 13 additions & 0 deletions src/trace_link/kineto_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class KinetoOperator:
stream (Optional[int]): CUDA stream identifier associated with the operator.
rf_id (Optional[int]): Record function identifier.
correlation (int): Identifier used to correlate CUDA runtime and GPU operations.
pg_name (Optional[str]): Process Group name for the collective communication.
"""

def __init__(self, kineto_op: Dict[str, Any]) -> None:
Expand All @@ -51,6 +52,7 @@ def __init__(self, kineto_op: Dict[str, Any]) -> None:
self.stream: Optional[int] = kineto_op.get("args", {}).get("stream", None)
self.rf_id: Optional[int] = kineto_op.get("args", {}).get("Record function id", None)
self.correlation: int = kineto_op.get("args", {}).get("correlation", -1)
self.pg_name: Optional[str] = kineto_op.get("args", {}).get("Process Group Name", None)

def __repr__(self) -> str:
"""
Expand Down Expand Up @@ -153,3 +155,14 @@ def is_gpu_op(self) -> bool:
"""
gpu_categories = {"kernel", "gpu_memcpy"}
return self.category in gpu_categories

def is_inter_gpu_comms_op(self) -> bool:
"""
Check if the operator is a inter-GPU communication operator based on its name.

Both point-to-point send/receive primitives and collective communication primitives are considered.

Returns
bool: True if it's a inter-GPU communication, otherwise False.
"""
return "ncclDevKernel" in self.name
TaekyungHeo marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 6 additions & 0 deletions src/trace_link/trace_linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,8 +755,14 @@ def process_dependent_gpu_ops(
"exclusive_dur": gpu_op.exclusive_dur,
"ts": gpu_op.timestamp,
"stream": gpu_op.stream,
**(
{"pg_name": gpu_op.pg_name}
if gpu_op.is_inter_gpu_comms_op() and gpu_op.pg_name is not None
else {}
),
}
)

updated_gpu_ops.append(new_gpu_op)

return updated_gpu_ops
Expand Down
3 changes: 3 additions & 0 deletions tests/trace_link/test_trace_linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ def test_process_dependent_gpu_ops(trace_linker, orig_op_id, cpu_op, kineto_gpu_
gpu_op.inclusive_dur = gpu_op_data["inclusive_dur"]
gpu_op.exclusive_dur = gpu_op_data["exclusive_dur"]
gpu_op.stream = gpu_op_data["stream"]
gpu_op.pg_name = gpu_op_data.get("pg_name", None)
kineto_gpu_op_objects.append(gpu_op)

host_op_id_to_kineto_ops_map = {orig_op_id: kineto_gpu_op_objects}
Expand Down Expand Up @@ -497,6 +498,8 @@ def test_process_dependent_gpu_ops(trace_linker, orig_op_id, cpu_op, kineto_gpu_
assert updated_gpu_op["exclusive_dur"] == kineto_gpu_op_objects[i].exclusive_dur
assert updated_gpu_op["ts"] == kineto_gpu_op_objects[i].timestamp
assert updated_gpu_op["stream"] == kineto_gpu_op_objects[i].stream
if kineto_gpu_op_objects[i].is_inter_gpu_comms_op() and kineto_gpu_op_objects[i].pg_name is not None:
assert updated_gpu_op["pg_name"] == kineto_gpu_op_objects[i].pg_name


@patch("builtins.open", new_callable=MagicMock)
Expand Down
Loading