From 0ec18c959089d2e2da11ef5128148eca26b13f8a Mon Sep 17 00:00:00 2001 From: liuyuang Date: Wed, 16 Mar 2022 11:18:49 +0800 Subject: [PATCH] fleet executor for npu --- .../distributed/fleet_executor/CMakeLists.txt | 2 +- .../distributed/fleet_executor/message_bus.cc | 12 ++++-------- .../distributed/fleet_executor/message_bus.h | 9 +++------ .../fleet_executor/message_service.cc | 3 +-- .../fleet_executor/message_service.h | 3 +-- .../fluid/inference/api/analysis_predictor.cc | 18 ++++++------------ .../fluid/inference/api/analysis_predictor.h | 9 +++------ 7 files changed, 19 insertions(+), 37 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt index 3e734b1b9ed24..8641b36a1be8e 100644 --- a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt +++ b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt @@ -4,7 +4,7 @@ if(WITH_PYTHON) endif() proto_library(interceptor_message_proto SRCS interceptor_message.proto) -if(WITH_DISTRIBUTE AND WITH_PSCORE AND NOT (WITH_ASCEND OR WITH_ASCEND_CL)) +if(WITH_DISTRIBUTE AND WITH_PSCORE) set(BRPC_DEPS brpc ssl crypto protobuf zlib leveldb snappy gflags glog) else() set(BRPC_DEPS "") diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.cc b/paddle/fluid/distributed/fleet_executor/message_bus.cc index 8d2ec5c41d864..80a6b4667aa1a 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.cc +++ b/paddle/fluid/distributed/fleet_executor/message_bus.cc @@ -67,8 +67,7 @@ bool MessageBus::IsInit() const { return is_init_; } MessageBus::~MessageBus() { VLOG(3) << "Message bus releases resource."; -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) server_.Stop(1000); server_.Join(); #endif @@ -87,8 +86,7 @@ bool MessageBus::Send(int64_t dst_rank, IsInit(), true, platform::errors::PreconditionNotMet( "Using message bus since it has not been initialized.")); -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) int retry_time = 0; // message bus will retry sending for 10 times while (retry_time < 10) { ++retry_time; @@ -173,8 +171,7 @@ void MessageBus::ListenPort() { LOG(INFO) << "No need listen to port since training on single card."; return; } -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) // function keep listen the port and handle the message PADDLE_ENFORCE_EQ( server_.AddService(&message_service_, brpc::SERVER_DOESNT_OWN_SERVICE), 0, @@ -203,8 +200,7 @@ void MessageBus::ListenPort() { #endif } -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) bool MessageBus::SendInterRank(int64_t dst_rank, const InterceptorMessage& interceptor_message) { const auto& dst_addr = GetAddr(dst_rank); diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.h b/paddle/fluid/distributed/fleet_executor/message_bus.h index d805ac81606b8..dfd65fdbc00d4 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.h +++ b/paddle/fluid/distributed/fleet_executor/message_bus.h @@ -20,8 +20,7 @@ #include #include -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) #include "brpc/channel.h" #include "brpc/server.h" #include "paddle/fluid/distributed/fleet_executor/message_service.h" @@ -64,8 +63,7 @@ class MessageBus final { const std::string& GetAddr(int64_t rank) const; -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) // send the message inter rank (dst is different rank with src) bool SendInterRank(int64_t dst_rank, const InterceptorMessage& interceptor_message); @@ -81,8 +79,7 @@ class MessageBus final { // the ip needs to be listened std::string addr_; -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) MessageServiceImpl message_service_; // brpc server brpc::Server server_; diff --git a/paddle/fluid/distributed/fleet_executor/message_service.cc b/paddle/fluid/distributed/fleet_executor/message_service.cc index c3fff98f684ad..1c66d83ea34d7 100644 --- a/paddle/fluid/distributed/fleet_executor/message_service.cc +++ b/paddle/fluid/distributed/fleet_executor/message_service.cc @@ -11,8 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) #include "paddle/fluid/distributed/fleet_executor/message_service.h" #include "brpc/server.h" #include "paddle/fluid/distributed/fleet_executor/global.h" diff --git a/paddle/fluid/distributed/fleet_executor/message_service.h b/paddle/fluid/distributed/fleet_executor/message_service.h index 02f73471e3b91..5ab687ff93dc4 100644 --- a/paddle/fluid/distributed/fleet_executor/message_service.h +++ b/paddle/fluid/distributed/fleet_executor/message_service.h @@ -11,8 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) #pragma once #include "brpc/server.h" diff --git a/paddle/fluid/inference/api/analysis_predictor.cc b/paddle/fluid/inference/api/analysis_predictor.cc index 871ed596a3ee9..f367c1ab412f0 100644 --- a/paddle/fluid/inference/api/analysis_predictor.cc +++ b/paddle/fluid/inference/api/analysis_predictor.cc @@ -50,8 +50,7 @@ #include "paddle/phi/api/ext/op_meta_info.h" #include "paddle/utils/string/split.h" -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) #include "paddle/fluid/distributed/fleet_executor/fleet_executor.h" #include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h" #include "paddle/fluid/distributed/fleet_executor/task_node.h" @@ -374,8 +373,7 @@ static void DisablePrepareDataOpt( } bool AnalysisPredictor::PrepareExecutor() { -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) if (config_.dist_config().use_dist_model()) { VLOG(3) << "use_dist_model is enabled, will init FleetExecutor."; return PrepareFleetExecutor(); @@ -393,8 +391,7 @@ bool AnalysisPredictor::PrepareExecutor() { return true; } -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) bool AnalysisPredictor::PrepareFleetExecutor() { VLOG(3) << "AnalysisPredictor::PrepareFleetExecutor()"; if (config_.dist_config().nranks() > 1 && !CommInit()) { @@ -1189,8 +1186,7 @@ std::vector AnalysisPredictor::GetOutputNames() { std::unique_ptr AnalysisPredictor::GetInputTensor( const std::string &name) { framework::Scope *scope; -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) if (config_.dist_config().use_dist_model()) { scope = scope_.get(); } else { @@ -1239,8 +1235,7 @@ std::unique_ptr AnalysisPredictor::GetInputTensor( std::unique_ptr AnalysisPredictor::GetOutputTensor( const std::string &name) { framework::Scope *scope; -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) if (config_.dist_config().use_dist_model()) { scope = scope_.get(); } else { @@ -1287,8 +1282,7 @@ std::unique_ptr AnalysisPredictor::GetOutputTensor( } bool AnalysisPredictor::ZeroCopyRun() { -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) if (config_.dist_config().use_dist_model()) { VLOG(3) << "ZeroCopyRun will use the fleet executor."; inference::Timer timer; diff --git a/paddle/fluid/inference/api/analysis_predictor.h b/paddle/fluid/inference/api/analysis_predictor.h index 21a7e9658bbee..d9992f3fbef9d 100644 --- a/paddle/fluid/inference/api/analysis_predictor.h +++ b/paddle/fluid/inference/api/analysis_predictor.h @@ -18,8 +18,7 @@ #include #include #include -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) #include "paddle/fluid/distributed/fleet_executor/fleet_executor.h" #endif #include "paddle/fluid/framework/naive_executor.h" @@ -395,8 +394,7 @@ class AnalysisPredictor : public PaddlePredictor { void StatisticShapeRangeInfo(); void CollectShapeRangeInfo(); -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) // fleet exe related /// @@ -488,8 +486,7 @@ class AnalysisPredictor : public PaddlePredictor { std::map>> shape_info_; static int clone_num_; -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ - !defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) // fleet executor related distributed::FleetExecutorDesc executor_desc_; std::shared_ptr fleet_exe_;