Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fleet_executor] Add entrance of FleetExecutor in AnalysisPredictor for distributed inference #39992

Merged
merged 7 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions paddle/fluid/distributed/fleet_executor/carrier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>

#include "paddle/fluid/distributed/fleet_executor/carrier.h"
#include "paddle/fluid/distributed/fleet_executor/global.h"
#include "paddle/fluid/distributed/fleet_executor/interceptor.h"
Expand Down Expand Up @@ -46,7 +48,8 @@ void Carrier::Init(
const std::unordered_map<int64_t, int64_t>& interceptor_id_to_rank,
const std::unordered_map<int64_t, TaskNode*>& interceptor_id_to_node,
const framework::ProgramDesc& program, framework::Scope* scope,
int64_t num_micro_batches, const platform::Place& place) {
int64_t num_micro_batches, const platform::Place& place,
const std::vector<std::string>& inference_root_scope_vars) {
rank_ = rank;
interceptor_id_to_rank_ = interceptor_id_to_rank;
interceptor_id_to_node_ = interceptor_id_to_node;
Expand All @@ -60,7 +63,7 @@ void Carrier::Init(
microbatch_scopes_.resize(num_micro_batches);
for (int i = 0; i < num_micro_batches; ++i) {
microbatch_scopes_[i] = &minibatch_scope_->NewScope();
CopyParameters(i, program);
CopyParameters(i, program, inference_root_scope_vars);
}

// TODO(fleet_exe dev): thread pool
Expand All @@ -80,12 +83,23 @@ void Carrier::Release() {

Carrier::~Carrier() { VLOG(3) << "Carrier's destructor."; }

void Carrier::CopyParameters(int microbatch_id,
const framework::ProgramDesc& program) {
void Carrier::CopyParameters(
int microbatch_id, const framework::ProgramDesc& program,
const std::vector<std::string>& inference_root_scope_vars) {
auto& global_block = program.Block(0);

std::map<std::string, int> inference_root_scope_var_map;
for (auto var_name : inference_root_scope_vars) {
inference_root_scope_var_map.insert({var_name, 1});
}
for (auto& var : global_block.AllVars()) {
if (var->Persistable() && microbatch_id == 0) {
std::string var_name = var->Name();
bool force_root = inference_root_scope_var_map.find(var_name) !=
inference_root_scope_var_map.end();
if (force_root) {
VLOG(4) << var_name << " will be forced to be created in the root scope.";
}
if ((var->Persistable() || force_root) && microbatch_id == 0) {
auto* ptr = root_scope_->Var(var->Name());
InitializeVariable(ptr, var->GetType());
VLOG(5) << "Create persistable var: " << var->Name()
Expand Down
7 changes: 5 additions & 2 deletions paddle/fluid/distributed/fleet_executor/carrier.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,12 @@ class Carrier final {
const std::unordered_map<int64_t, int64_t>& interceptor_id_to_rank,
const std::unordered_map<int64_t, TaskNode*>& interceptor_id_to_node,
const framework::ProgramDesc& program, framework::Scope* scope,
int64_t num_micro_batches, const platform::Place& place);
int64_t num_micro_batches, const platform::Place& place,
const std::vector<std::string>& inference_root_scope_vars = {});

void CopyParameters(int microbatch_id, const framework::ProgramDesc& program);
void CopyParameters(
int microbatch_id, const framework::ProgramDesc& program,
const std::vector<std::string>& inference_root_scope_vars);

void Release();
void Wait();
Expand Down
48 changes: 41 additions & 7 deletions paddle/fluid/distributed/fleet_executor/fleet_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <algorithm>

#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"
#include "paddle/fluid/distributed/fleet_executor/global.h"
Expand Down Expand Up @@ -52,7 +53,8 @@ void FleetExecutor::Init(
const std::string& carrier_id, const framework::ProgramDesc& program_desc,
framework::Scope* scope, const platform::Place& place,
int64_t num_micro_batches, const std::vector<TaskNode*>& task_nodes,
const std::unordered_map<int64_t, int64_t>& task_id_to_rank) {
const std::unordered_map<int64_t, int64_t>& task_id_to_rank,
const std::vector<std::string>& inference_root_scope_vars) {
PADDLE_ENFORCE_GT(task_nodes.size(), 0,
platform::errors::InvalidArgument(
"Fleet executor is inited with empty task node"));
Expand All @@ -64,6 +66,37 @@ void FleetExecutor::Init(
}
}
auto unused_vars = framework::GetUnusedVars(program_desc.Block(0), ops, {});
// NOTE: For inference, the vars in inference_root_scope_vars
// shouldn't be deleted during inf, for that they may be the result of the
// inf. If they are GCed, it will cause error during ZeroCopy the result.
std::vector<const framework::OperatorBase*> changed_ops;
for (auto pair : unused_vars) {
const framework::OperatorBase* op = pair.first;
std::vector<std::string> unused = pair.second;
for (auto name : inference_root_scope_vars) {
auto iter = std::find(unused.begin(), unused.end(), name);
if (iter != unused.end()) {
VLOG(3) << "Removing var: [" << name
<< "] from the unused vars list of op: [" << op->Type() << "]";
unused.erase(iter);
if (std::find(changed_ops.begin(), changed_ops.end(), op) ==
changed_ops.end()) {
// record the op whose unused vars have been updated
changed_ops.emplace_back(op);
}
}
}
// update the unused vars list in the map
unused_vars[op] = unused;
}
for (auto op : changed_ops) {
auto iter = unused_vars.find(op);
if (iter->second.empty()) {
// remove those ops in the map that have empty unused vars list
VLOG(3) << "Removing op: [" << op->Type() << "] from unused_vars map.";
unused_vars.erase(iter);
}
}
runtime_graph_ = std::make_shared<RuntimeGraph>();
std::unordered_map<int64_t, TaskNode*> interceptor_id_to_task;
for (auto task_node : task_nodes) {
Expand All @@ -82,17 +115,18 @@ void FleetExecutor::Init(
carrier_ids_.insert(carrier_id);
// Set current running carrier
GlobalVal<std::string>::Set(new std::string(carrier_id));
InitCarrier(carrier, scope, place, num_micro_batches, program_desc);
InitCarrier(carrier, scope, place, num_micro_batches, program_desc,
inference_root_scope_vars);
GlobalVal<MessageBus>::Get()->Barrier();
}

void FleetExecutor::InitCarrier(Carrier* carrier, framework::Scope* scope,
const platform::Place& place,
int64_t num_micro_batches,
const framework::ProgramDesc& program_desc) {
void FleetExecutor::InitCarrier(
Carrier* carrier, framework::Scope* scope, const platform::Place& place,
int64_t num_micro_batches, const framework::ProgramDesc& program_desc,
const std::vector<std::string>& inference_root_scope_vars) {
carrier->Init(exe_desc_.cur_rank(), runtime_graph_->interceptor_id_to_rank(),
runtime_graph_->interceptor_id_to_node(), program_desc, scope,
num_micro_batches, place);
num_micro_batches, place, inference_root_scope_vars);
}

void FleetExecutor::InitMessageBus() {
Expand Down
10 changes: 6 additions & 4 deletions paddle/fluid/distributed/fleet_executor/fleet_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,17 @@ class FleetExecutor final {
const framework::ProgramDesc& program_desc, framework::Scope* scope,
const platform::Place& place, int64_t num_micro_batches,
const std::vector<TaskNode*>& task_nodes,
const std::unordered_map<int64_t, int64_t>& task_id_to_rank);
const std::unordered_map<int64_t, int64_t>& task_id_to_rank,
const std::vector<std::string>& inference_root_scope_vars = {});
void Run(const std::string& carrier_id);

private:
DISABLE_COPY_AND_ASSIGN(FleetExecutor);
void InitMessageBus();
void InitCarrier(Carrier* carrier, framework::Scope* scope,
const platform::Place& place, int64_t num_micro_batches,
const framework::ProgramDesc& program_desc);
void InitCarrier(
Carrier* carrier, framework::Scope* scope, const platform::Place& place,
int64_t num_micro_batches, const framework::ProgramDesc& program_desc,
const std::vector<std::string>& inference_root_scope_vars = {});
FleetExecutorDesc exe_desc_;
std::shared_ptr<RuntimeGraph> runtime_graph_;
std::unordered_set<std::string> carrier_ids_;
Expand Down
11 changes: 10 additions & 1 deletion paddle/fluid/distributed/fleet_executor/task_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,20 @@ void TaskNode::SetProgram(paddle::framework::ProgramDesc* program) {
program_ = program;
}

void TaskNode::Init() {
void TaskNode::Init(bool use_feed_fetch_ops) {
if (!use_feed_fetch_ops) {
VLOG(3) << "TaskNode will be inited without feed and fetch ops";
}
if (ops_.empty()) {
// Q (for fleet executor dev): should we need another reset funct?
VLOG(3) << "Task node will be inited by calling Init().";
for (const auto& op_desc : program_->Block(0).AllOps()) {
if (!use_feed_fetch_ops &&
(op_desc->Type() == "feed" || op_desc->Type() == "fetch")) {
VLOG(3) << "TaskNode will skip [" << op_desc->Input("X")[0] << "], "
<< op_desc->Type() << " -> " << op_desc->Output("Out")[0];
continue;
}
ops_vec_.emplace_back(framework::OpRegistry::CreateOp(*op_desc));
}
for (const auto& op : ops_vec_) {
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/distributed/fleet_executor/task_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class TaskNode final {
~TaskNode() = default;

void SetProgram(paddle::framework::ProgramDesc* program);
void Init();
void Init(bool use_feed_fetch_ops = true);
int64_t rank() const { return rank_; }
int64_t task_id() const { return task_id_; }
int32_t role() const { return role_; }
Expand Down
3 changes: 3 additions & 0 deletions paddle/fluid/inference/api/analysis_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ AnalysisConfig::AnalysisConfig(const AnalysisConfig &other) {
CP_MEMBER(ipu_available_memory_proportion_);
CP_MEMBER(ipu_enable_half_partial_);

// fleet exe related
CP_MEMBER(dist_config_);

if (use_gpu_) {
PADDLE_ENFORCE_EQ(use_xpu_, false,
platform::errors::InvalidArgument(
Expand Down
Loading