Skip to content

Commit

Permalink
resolve conflit with develop
Browse files Browse the repository at this point in the history
  • Loading branch information
chenwhql committed Nov 12, 2021
2 parents ba49429 + 1773afd commit 001ff55
Show file tree
Hide file tree
Showing 168 changed files with 10,605 additions and 2,293 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ PaddlePaddle is originated from industrial practices with dedication and commitm

## Installation

### Latest PaddlePaddle Release: [v2.1](https://github.com/PaddlePaddle/Paddle/tree/release/2.1)
### Latest PaddlePaddle Release: [v2.2](https://github.com/PaddlePaddle/Paddle/tree/release/2.2)

Our vision is to enable deep learning for everyone via PaddlePaddle.
Please refer to our [release announcement](https://github.com/PaddlePaddle/Paddle/releases) to track the latest features of PaddlePaddle.
Expand Down
2 changes: 1 addition & 1 deletion README_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

## 安装

### PaddlePaddle最新版本: [v2.1](https://github.com/PaddlePaddle/Paddle/tree/release/2.1)
### PaddlePaddle最新版本: [v2.2](https://github.com/PaddlePaddle/Paddle/tree/release/2.2)

跟进PaddlePaddle最新特性请参考我们的[版本说明](https://github.com/PaddlePaddle/Paddle/releases)

Expand Down
9 changes: 8 additions & 1 deletion cmake/external/cinn.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ include(ExternalProject)
set(CINN_SOURCE_DIR ${THIRD_PARTY_PATH}/CINN)
# TODO(zhhsplendid): Modify git tag after we have release tag
set(CINN_GIT_TAG develop)
set(CINN_OPTIONAL_ARGS -DPY_VERSION=${PY_VERSION} -DWITH_CUDA=${WITH_GPU} -DWITH_CUDNN=${WITH_GPU} -DPUBLISH_LIBS=ON -DWITH_TESTING=ON)
set(CINN_OPTIONAL_ARGS -DPY_VERSION=${PY_VERSION}
-DWITH_CUDA=${WITH_GPU}
-DWITH_CUDNN=${WITH_GPU}
-DWITH_MKL_CBLAS=${WITH_MKL}
-DWITH_MKLDNN=${WITH_MKL}
-DPUBLISH_LIBS=ON
-DWITH_TESTING=ON
)
set(CINN_BUILD_COMMAND $(MAKE) cinnapi -j)
ExternalProject_Add(
external_cinn
Expand Down
8 changes: 8 additions & 0 deletions cmake/third_party.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,14 @@ if (WITH_CINN)
message(STATUS "Compile Paddle with CINN.")
include(external/cinn)
add_definitions(-DPADDLE_WITH_CINN)
if (WITH_GPU)
add_definitions(-DCINN_WITH_CUDA)
add_definitions(-DCINN_WITH_CUDNN)
endif (WITH_GPU)
if (WITH_MKL)
add_definitions(-DCINN_WITH_MKL_CBLAS)
add_definitions(-DCINN_WITH_MKLDNN)
endif (WITH_MKL)
endif (WITH_CINN)

if (WITH_CRYPTO)
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/distributed/fleet_executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ cc_library(fleet_executor SRCS fleet_executor.cc carrier.cc

if(WITH_DISTRIBUTE)
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
set_source_files_properties(interceptor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(message_bus.h PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(message_bus.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(fleet_executor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(carrier.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(interceptor_message_service.h PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(interceptor_message_service.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
Expand Down
44 changes: 38 additions & 6 deletions paddle/fluid/distributed/fleet_executor/fleet_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@
// limitations under the License.

#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"
#include "paddle/fluid/distributed/fleet_executor/message_bus.h"
#include "paddle/fluid/distributed/fleet_executor/runtime_graph.h"
#include "paddle/fluid/framework/program_desc.h"

namespace paddle {
namespace distributed {

FleetExecutor::FleetExecutor(const std::string& exe_desc_str) {
// Initialize Executor
bool parse_flag = exe_desc_.ParseFromString(exe_desc_str);
PADDLE_ENFORCE(parse_flag, platform::errors::PreconditionNotMet(
"Error occurs while parsing string to proto"));
}

FleetExecutor::~FleetExecutor() {
Expand All @@ -29,6 +32,40 @@ FleetExecutor::~FleetExecutor() {

void FleetExecutor::Init(const paddle::framework::ProgramDesc& program_desc) {
// Compile and Initialize
InitMessageBus();
}

void FleetExecutor::InitMessageBus() {
std::stringstream ss;
ss << "\nThe DNS table of the message bus is: \n";
int64_t cur_rank = exe_desc_.cur_rank();
std::unordered_map<int64_t, int64_t> interceptor_id_to_rank;
std::unordered_map<int64_t, std::string> rank_to_addr;
std::string addr;
for (const auto& rank_info : exe_desc_.cluster_info()) {
int64_t rank = rank_info.rank();
std::string ip_port = rank_info.ip_port();
ss << rank << "\t->\t" << ip_port << "\n";
// TODO(Yuang): replace the first 'rank' with real interceptor id
interceptor_id_to_rank.insert(std::make_pair(rank, rank));
rank_to_addr.insert(std::make_pair(rank, ip_port));
if (rank == cur_rank) {
addr = ip_port;
}
}
PADDLE_ENFORCE_NE(
addr, "",
platform::errors::NotFound(
"Current rank is %s, which ip_port cannot be found in the config.",
cur_rank));
VLOG(3) << "Current rank is " << cur_rank << " and the ip_port is " << addr
<< ".";
VLOG(3) << "The number of ranks are " << interceptor_id_to_rank.size() << ".";
VLOG(5) << ss.str();
MessageBus& message_bus_instance = MessageBus::Instance();
if (!message_bus_instance.IsInit()) {
message_bus_instance.Init(interceptor_id_to_rank, rank_to_addr, addr);
}
}

void FleetExecutor::Run() {
Expand All @@ -44,10 +81,5 @@ std::shared_ptr<Carrier> FleetExecutor::GetCarrier() {
return nullptr;
}

std::shared_ptr<MessageBus> FleetExecutor::GetMessageBus() {
// get message bus
return nullptr;
}

} // namespace distributed
} // namespace paddle
3 changes: 1 addition & 2 deletions paddle/fluid/distributed/fleet_executor/fleet_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,13 @@ class FleetExecutor final {
void Run();
void Release();
static std::shared_ptr<Carrier> GetCarrier();
static std::shared_ptr<MessageBus> GetMessageBus();

private:
DISABLE_COPY_AND_ASSIGN(FleetExecutor);
FleetExecutorDesc exe_desc_;
std::unique_ptr<RuntimeGraph> runtime_graph_;
void InitMessageBus();
static std::shared_ptr<Carrier> global_carrier_;
static std::shared_ptr<MessageBus> global_message_bus_;
};

} // namespace distributed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
syntax = "proto2";
package paddle.distributed;

message RankInfo {
required int64 rank = 1;
required string ip_port = 2;
}

message FleetExecutorDesc {
optional string grain = 1 [ default = "coarse" ];
repeated string addrs = 2; // "ip:port" of all ranks
optional int64 cur_rank = 2 [ default = 0 ]; // Rank id of current processor
repeated RankInfo cluster_info = 3;
}
21 changes: 19 additions & 2 deletions paddle/fluid/distributed/fleet_executor/interceptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "paddle/fluid/distributed/fleet_executor/interceptor.h"
#include "paddle/fluid/distributed/fleet_executor/message_bus.h"

namespace paddle {
namespace distributed {
Expand All @@ -27,6 +28,14 @@ Interceptor::Interceptor(int64_t interceptor_id, TaskNode* node)

Interceptor::~Interceptor() { interceptor_thread_.join(); }

void Interceptor::RegisterMsgHandle(MsgHandle handle) { handle_ = handle; }

void Interceptor::Handle(const InterceptorMessage& msg) {
if (handle_) {
handle_(msg);
}
}

std::condition_variable& Interceptor::GetCondVar() {
// get the conditional var
return cond_var_;
Expand All @@ -42,12 +51,18 @@ bool Interceptor::EnqueueRemoteInterceptorMessage(
// Called by Carrier, enqueue an InterceptorMessage to remote mailbox
VLOG(3) << "Enqueue message: " << interceptor_message.message_type()
<< " into " << interceptor_id_ << "'s remote mailbox.";
remote_mailbox_mutex_.lock();
std::unique_lock<std::mutex> lock(remote_mailbox_mutex_);
remote_mailbox_.push(interceptor_message);
remote_mailbox_mutex_.unlock();
return true;
}

void Interceptor::Send(int64_t dst_id,
std::unique_ptr<InterceptorMessage> msg) {
msg->set_src_id(interceptor_id_);
msg->set_dst_id(dst_id);
MessageBus::Instance().Send(*msg.get());
}

void Interceptor::PoolTheMailbox() {
// pool the local mailbox, parse the Message
while (true) {
Expand All @@ -68,6 +83,8 @@ void Interceptor::PoolTheMailbox() {
// break the pooling thread
break;
}

Handle(interceptor_message);
}
}

Expand Down
14 changes: 14 additions & 0 deletions paddle/fluid/distributed/fleet_executor/interceptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <condition_variable>
#include <functional>
#include <map>
#include <memory>
#include <queue>
Expand All @@ -32,13 +33,21 @@ namespace distributed {
class TaskNode;

class Interceptor {
public:
using MsgHandle = std::function<void(const InterceptorMessage&)>;

public:
Interceptor() = delete;

Interceptor(int64_t interceptor_id, TaskNode* node);

virtual ~Interceptor();

// register interceptor handle
void RegisterMsgHandle(MsgHandle handle);

void Handle(const InterceptorMessage& msg);

// return the interceptor id
int64_t GetInterceptorId() const;

Expand All @@ -49,6 +58,8 @@ class Interceptor {
bool EnqueueRemoteInterceptorMessage(
const InterceptorMessage& interceptor_message);

void Send(int64_t dst_id, std::unique_ptr<InterceptorMessage> msg);

DISABLE_COPY_AND_ASSIGN(Interceptor);

private:
Expand All @@ -65,6 +76,9 @@ class Interceptor {
// node need to be handled by this interceptor
TaskNode* node_;

// interceptor handle which process message
MsgHandle handle_{nullptr};

// mutex to control read/write conflict for remote mailbox
std::mutex remote_mailbox_mutex_;

Expand Down
48 changes: 35 additions & 13 deletions paddle/fluid/distributed/fleet_executor/message_bus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,31 @@
namespace paddle {
namespace distributed {

MessageBus::MessageBus(
void MessageBus::Init(
const std::unordered_map<int64_t, int64_t>& interceptor_id_to_rank,
const std::unordered_map<int64_t, std::string>& rank_to_addr,
const std::string& addr)
: interceptor_id_to_rank_(interceptor_id_to_rank),
rank_to_addr_(rank_to_addr),
addr_(addr) {
const std::string& addr) {
PADDLE_ENFORCE_EQ(is_init_, false, platform::errors::AlreadyExists(
"MessageBus is already init."));
is_init_ = true;
interceptor_id_to_rank_ = interceptor_id_to_rank;
rank_to_addr_ = rank_to_addr;
addr_ = addr;

listen_port_thread_ = std::thread([this]() {
VLOG(3) << "Start listen_port_thread_ for message bus";
ListenPort();
});

std::call_once(once_flag_, []() {
std::atexit([]() { MessageBus::Instance().Release(); });
});
}

MessageBus::~MessageBus() {
bool MessageBus::IsInit() const { return is_init_; }

void MessageBus::Release() {
VLOG(3) << "Message bus releases resource.";
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
server_.Stop(1000);
Expand All @@ -48,15 +59,25 @@ bool MessageBus::Send(const InterceptorMessage& interceptor_message) {
int64_t src_id = interceptor_message.src_id();
int64_t dst_id = interceptor_message.dst_id();
if (IsSameRank(src_id, dst_id)) {
VLOG(3) << "Send a message from: " << src_id << " to " << dst_id
<< " within a same rank.";
VLOG(3) << "Send a message from rank " << src_id << " to rank " << dst_id
<< ", which are same ranks.";
return SendIntraRank(interceptor_message);
} else {
VLOG(3) << "Send a message from: " << src_id << " to " << dst_id
<< " between different ranks.";
VLOG(3) << "Send a message from rank " << src_id << " to rank " << dst_id
<< ", which are different ranks.";
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
return SendInterRank(interceptor_message);
int retry_time = 0; // message bus will retry sending for 10 times
while (retry_time < 10) {
++retry_time;
if (SendInterRank(interceptor_message)) {
VLOG(3) << "Message bus sends inter rank successfully with "
<< retry_time << " times retries.";
return true;
}
}
VLOG(3) << "Message bus sends inter rank fail after 10 times retries.";
return false;
#else
PADDLE_THROW(platform::errors::Unavailable(
"Fleet executor does not support sending message between different "
Expand Down Expand Up @@ -134,6 +155,7 @@ bool MessageBus::SendInterRank(const InterceptorMessage& interceptor_message) {
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = "baidu_std";
options.connect_timeout_ms = 1000;
options.timeout_ms = 1000;
options.max_retry = 5;
PADDLE_ENFORCE_EQ(
Expand All @@ -149,11 +171,11 @@ bool MessageBus::SendInterRank(const InterceptorMessage& interceptor_message) {
VLOG(3) << "Message bus: brpc sends success.";
return true;
} else {
VLOG(3) << "Message bus: InterceptorMessageService error.";
VLOG(4) << "Message bus: InterceptorMessageService error.";
return false;
}
} else {
VLOG(3) << "Message bus: brpc sends failed with error text: "
VLOG(4) << "Message bus: brpc sends failed with error text: "
<< ctrl.ErrorText();
return false;
}
Expand Down
Loading

0 comments on commit 001ff55

Please sign in to comment.