-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Fleet Executor] Support multi carrier #38535
[Fleet Executor] Support multi carrier #38535
Conversation
Thanks for your contribution! |
3d3850c
to
5866830
Compare
5866830
to
3a98214
Compare
d15bced
to
8e3cc62
Compare
namespace paddle { | ||
namespace distributed { | ||
|
||
template <typename KeyT, typename ValueT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这样的话,一个type到另一个type的组合含义是固定的?
如果以后需要interceptor id到thread id,carrier id到stream id的等其他int64到int64的映射,如何复用这个类呢?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
有todo,下一个pr会改
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
你是说用InterceptorId的structure� 来代替int 64那个todo?这样如果同时存在interceptor id到thread id,interceptor id到carrier id的映射怎么处理?还是说未来会更新这个global map类,把TODO放在这个类里?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
没有这个场景,而且要是有的话也可以加ThreadID和CarrierID
paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_with_brpc_test.cc
Show resolved
Hide resolved
@@ -140,7 +156,9 @@ bool Carrier::Send(const InterceptorMessage& msg) { | |||
if (src_rank == dst_rank) { | |||
VLOG(3) << "Send a message from interceptor " << src_id | |||
<< " to interceptor " << dst_id << ", which are in the same ranks."; | |||
return EnqueueInterceptorMessage(msg); | |||
int64_t carrier_id = *GlobalMap<int64_t, int64_t>::Get(dst_id); | |||
return GlobalMap<int64_t, Carrier>::Get(carrier_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
不同carrier没有互通的必要吧
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这是因为如果src interceptor和dst interceptor对应不同的carrier,但这两个carrier在相同的rank下,src interceptor里调用的是自己carrier的send,这里不处理的话会出问题。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
嗯,不过我觉得不同carrier没有互通的必要,你这个场景肯定是src和dst有关联,那么应该属于一个carrier。不同carrier可以对应不同program,比如训练和预测program,跑一个epoch训练再预测一下,时间和关系上是相互独立的。
msg_bus_->IsInit(), true, | ||
platform::errors::Unavailable("MessageBus has not been init yet.")); | ||
GetCarrier()->Start(); | ||
for (const auto& item : runtime_graph_->carrier_id_to_interceptor_ids()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
同时跑多个carrier吗
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cpu carrier与gpu carrier?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
是想同时启动起来的,不能运行可以等在那
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
一个时刻应该只跑一个carrier吧,同时跑应该是通过graph的拓扑依赖,两个拓扑依赖的节点可以同时跑
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里应该跑一个carrier,然后不同Program对应不同的carrier,通过外面的接口选择具体是跑哪个carrier
|
||
private: | ||
static std::unique_ptr<ValueT>* GetPPtr(KeyT id) { | ||
static std::mutex mutex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
目前carrier使用场景都是先初始化固定好的,没有读写并发的情况下,可以不用加锁。当然也不排除之后carrier也是动态的
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个global map除了用在多carrier,interceptor id 到 carrier id也有一个map,这里会有并发的情况,就统一了一下,不过现在没有动态增删interceptor也可以把这个map的建立放到初始化里。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
后面再增加一个没有锁的类吧,这样都加锁确实开销很大
if (request->ctrl_message()) { | ||
carrier_id = 0; | ||
} else { | ||
carrier_id = *GlobalMap<int64_t, int64_t>::Get(request->dst_id()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里获取了carrier,前面Carrier中不用重新获取了
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个和前面carrier的那个场景不一样,这是不同rank的消息收发,前面是相同rank不同carrier的消息收发
// TODO(liyurui): Remove this hard code. | ||
int64_t carrier_id; | ||
if (request->ctrl_message()) { | ||
carrier_id = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
barrier那个逻辑应该可以直接放到这来了
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
把message bus的instance传下来?把message bus也放global map里好像也行
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
嗯,不过这样也有点麻烦了
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
barrier那个逻辑应该可以直接放到这来了
是的
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
用global map的话是不是carrier也不用存message bus的instance了,直接存id就行。
// NOTE: need Init msg_bus after carrier SetMsgBus | ||
carrier->Init(0, interceptor_id_to_rank, {0}); | ||
msg_bus->Init(0, {{0, ip0}, {1, ip1}}, ip0); | ||
carrier->SetMsgBus(msg_bus); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这行重复了
carrier.SetInterceptor(5, InterceptorFactory::Create("Compute", 5, node_f)); | ||
carrier->SetInterceptor(0, InterceptorFactory::Create("Compute", 0, node_a)); | ||
carrier->SetInterceptor(1, | ||
InterceptorFactory::Create("Amplifier", 1, node_b)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
换行不整齐了🥺
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
PR types
Others
PR changes
Others
Describe