Skip to content

Commit

Permalink
Add entrance of the DistModel in the Analysis Predictor.
Browse files Browse the repository at this point in the history
  • Loading branch information
FeixLiu committed Feb 25, 2022
1 parent 4fe465c commit 852c91f
Show file tree
Hide file tree
Showing 16 changed files with 573 additions and 46 deletions.
24 changes: 19 additions & 5 deletions paddle/fluid/distributed/fleet_executor/carrier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>

#include "paddle/fluid/distributed/fleet_executor/carrier.h"
#include "paddle/fluid/distributed/fleet_executor/global.h"
#include "paddle/fluid/distributed/fleet_executor/interceptor.h"
Expand Down Expand Up @@ -46,7 +48,8 @@ void Carrier::Init(
const std::unordered_map<int64_t, int64_t>& interceptor_id_to_rank,
const std::unordered_map<int64_t, TaskNode*>& interceptor_id_to_node,
const framework::ProgramDesc& program, framework::Scope* scope,
int64_t num_micro_batches, const platform::Place& place) {
int64_t num_micro_batches, const platform::Place& place,
const std::vector<std::string>& force_root_scope_var_names) {
rank_ = rank;
interceptor_id_to_rank_ = interceptor_id_to_rank;
interceptor_id_to_node_ = interceptor_id_to_node;
Expand All @@ -60,7 +63,7 @@ void Carrier::Init(
microbatch_scopes_.resize(num_micro_batches);
for (int i = 0; i < num_micro_batches; ++i) {
microbatch_scopes_[i] = &minibatch_scope_->NewScope();
CopyParameters(i, program);
CopyParameters(i, program, force_root_scope_var_names);
}

// TODO(fleet_exe dev): thread pool
Expand All @@ -80,12 +83,23 @@ void Carrier::Release() {

Carrier::~Carrier() { VLOG(3) << "Carrier's destructor."; }

void Carrier::CopyParameters(int microbatch_id,
const framework::ProgramDesc& program) {
void Carrier::CopyParameters(
int microbatch_id, const framework::ProgramDesc& program,
const std::vector<std::string>& force_root_scope_var_names) {
auto& global_block = program.Block(0);

std::map<std::string, int> force_root_tmp_map;
for (auto var_name : force_root_scope_var_names) {
force_root_tmp_map.insert({var_name, 1});
}
for (auto& var : global_block.AllVars()) {
if (var->Persistable() && microbatch_id == 0) {
std::string var_name = var->Name();
bool force_root =
force_root_tmp_map.find(var_name) != force_root_tmp_map.end();
if (force_root) {
VLOG(3) << var_name << " will be forced to be created in the root scope.";
}
if ((var->Persistable() || force_root) && microbatch_id == 0) {
auto* ptr = root_scope_->Var(var->Name());
InitializeVariable(ptr, var->GetType());
VLOG(5) << "Create persistable var: " << var->Name()
Expand Down
7 changes: 5 additions & 2 deletions paddle/fluid/distributed/fleet_executor/carrier.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,12 @@ class Carrier final {
const std::unordered_map<int64_t, int64_t>& interceptor_id_to_rank,
const std::unordered_map<int64_t, TaskNode*>& interceptor_id_to_node,
const framework::ProgramDesc& program, framework::Scope* scope,
int64_t num_micro_batches, const platform::Place& place);
int64_t num_micro_batches, const platform::Place& place,
const std::vector<std::string>& force_root_scope_var_names = {});

void CopyParameters(int microbatch_id, const framework::ProgramDesc& program);
void CopyParameters(
int microbatch_id, const framework::ProgramDesc& program,
const std::vector<std::string>& force_root_scope_var_names);

void Release();
void Wait();
Expand Down
206 changes: 190 additions & 16 deletions paddle/fluid/distributed/fleet_executor/dist_model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/utils/string/split.h"

namespace paddle {
namespace distributed {
Expand Down Expand Up @@ -53,7 +54,8 @@ bool LoadDataFromDistModelTensor(const DistModelTensor &input_data,
} else if (input_data.dtype == DistModelDataType::INT32) {
input_tensor_ptr = input_tensor->mutable_data<int32_t>(dims, place);
} else if (input_data.dtype == DistModelDataType::FLOAT16) {
input_tensor_ptr = input_tensor->mutable_data<float16>(dims, place);
input_tensor_ptr =
input_tensor->mutable_data<paddle::platform::float16>(dims, place);
} else {
LOG(ERROR) << "unsupported feed type " << input_data.dtype;
return false;
Expand Down Expand Up @@ -136,11 +138,14 @@ class DistModelTimer {

bool DistModel::Init() {
carrier_id_ = "inference";
bool init_method = (!config_.model_dir.empty() || config_.program_desc);
PADDLE_ENFORCE_EQ(init_method, true,
platform::errors::InvalidArgument(
"One of model dir or program desc must be provided to "
"dist model inference."));
bool init_method =
!config_.model_dir.empty() || config_.program_desc ||
(!config_.program_path.empty() && !config_.param_path.empty());
PADDLE_ENFORCE_EQ(
init_method, true,
platform::errors::InvalidArgument(
"One of model dir, program desc or (program_path, param_path) pair "
"must be provided to dist model inference."));
if (config_.program_desc) {
PADDLE_ENFORCE_NOT_NULL(
config_.scope, platform::errors::InvalidArgument(
Expand Down Expand Up @@ -178,6 +183,7 @@ bool DistModel::Init() {
}

bool DistModel::PreparePlace() {
VLOG(3) << "DistModel is going to set place for: " << config_.place;
if (config_.place == "GPU") {
place_ = paddle::platform::CUDAPlace(config_.device_id);
} else if (config_.place == "CPU") {
Expand All @@ -186,10 +192,151 @@ bool DistModel::PreparePlace() {
PADDLE_THROW(platform::errors::InvalidArgument(
"Place must be choosen from GPU or CPU, but got %s.", config_.place));
}
VLOG(3) << "DistModel prepare place success";
return true;
}

bool DistModel::LoadConverterConfig() {
VLOG(3) << "Going to load converter config from: " << config_.comm_init_config
<< "\n";
std::ifstream fin(config_.comm_init_config, std::ios::in);
PADDLE_ENFORCE_EQ(
static_cast<bool>(fin.is_open()), true,
platform::errors::NotFound(
"Cannot open file %s, please confirm whether the file is normal.",
config_.comm_init_config));
std::string line;
bool ring_to_rank;
// Reading config from file, the config file should like these format
// [ring_id -> ranks]
// 0,0,1,2,3
// 1,0,1
// 2,2,3
// 21,0,1
// 22,1,2
// 23,2,3
// [rank -> ring_ids]
// 0,0,1,21
// 1,0,1,21,22
// 2,0,2,22,23
// 3,0,2,23
while (std::getline(fin, line)) {
std::vector<std::string> one_line = paddle::string::Split(line, ',');
if (one_line.size() == 1) {
// start a new section of the config
if (line == "[ring_id -> ranks]") {
ring_to_rank = true;
} else if (line == "[rank -> ring_ids]") {
ring_to_rank = false;
}
} else {
// parse key - values pairs in one section
int64_t key = std::stoll(one_line[0]);
for (size_t i = 1; i < one_line.size(); ++i) {
int64_t val = std::stoll(one_line[i]);
if (ring_to_rank) {
if (config_.ring_id_to_ranks_.find(key) ==
config_.ring_id_to_ranks_.end()) {
config_.ring_id_to_ranks_[key] = std::vector<int64_t>();
}
config_.ring_id_to_ranks_[key].emplace_back(val);
} else {
if (config_.rank_to_ring_ids_.find(key) ==
config_.rank_to_ring_ids_.end()) {
config_.rank_to_ring_ids_[key] = std::vector<int64_t>();
}
config_.rank_to_ring_ids_[key].emplace_back(val);
}
// NOTE: add more configuration sections here
}
}
}
std::stringstream ss;
ss << "Loaded the following converter config:\n";
ss << "ring_id_to_ranks:\n";
for (auto pair : config_.ring_id_to_ranks_) {
int64_t key = pair.first;
ss << "\t" << key << "\t->\t";
for (auto value : pair.second) {
ss << value << "\t";
}
ss << "\n";
}
ss << "rank_to_ring_ids:\n";
for (auto pair : config_.rank_to_ring_ids_) {
int64_t key = pair.first;
ss << "\t" << key << "\t->\t";
for (auto value : pair.second) {
ss << value << "\t";
}
ss << "\n";
}
VLOG(3) << ss.str();
return true;
}

std::vector<std::string> DistModel::GetOutputNames() {
std::vector<std::string> rst;
std::stringstream ss;
ss << "DistModel GetOutputNames: ";
for (const auto &pair : idx_to_fetches_) {
ss << pair.second << ", ";
rst.emplace_back(pair.second);
}
ss << "\n";
VLOG(3) << ss.str();
return rst;
}

std::vector<std::string> DistModel::GetInputNames() {
std::vector<std::string> rst;
std::stringstream ss;
ss << "DistModel GetInputNames: ";
for (const auto &pair : idx_to_feeds_) {
ss << pair.second << ", ";
rst.emplace_back(pair.second);
}
ss << "\n";
VLOG(3) << ss.str();
return rst;
}

framework::Scope *DistModel::GetScope() {
VLOG(3) << "DistModel GetScope()";
return scope_.get();
}

paddle::platform::Place DistModel::GetPlace() {
VLOG(3) << "DistModel GetPlace()";
return place_;
}

bool DistModel::ZeroCopyRun() {
VLOG(3) << "DistModel run with ZeroCopy.";

DistModelTimer timer;
timer.tic();
double start_time = timer.toc();

fleet_exe->Run(carrier_id_);

double end_time = timer.toc();
if (config_.enable_timer) {
LOG(INFO) << "DistModel finish inf, cost " << end_time - start_time << "ms";
} else {
VLOG(3) << "DistModel finish inf.";
}
return true;
}

bool DistModel::CommInit() {
VLOG(3) << "DistModel CommInit()";
if (!config_.comm_init_config.empty()) {
if (!LoadConverterConfig()) {
VLOG(3) << "Load converter config failed, DistModel init failed.";
return false;
}
}
std::unique_ptr<framework::ProgramDesc> comm_init_program(
new framework::ProgramDesc());
framework::BlockDesc *comm_init_block = comm_init_program->MutableBlock(0);
Expand Down Expand Up @@ -278,25 +425,32 @@ void DistModel::InsertCommOp(std::string tmp_var_name, int nranks, int rank,
}

bool DistModel::PrepareScope() {
VLOG(3) << "DistModel PrepareScope()";
scope_.reset(new framework::Scope());
VLOG(3) << "DistModel prepare scope success";
return true;
}

bool DistModel::PrepareProgram() {
VLOG(3) << "DistModel PrepareProgram()";
if (!LoadProgram()) {
return false;
}
if (!LoadParameters()) {
return false;
}
VLOG(3) << "DistModel prepare program success";
return true;
}

bool DistModel::LoadProgram() {
VLOG(3) << "Loading program from " << config_.model_dir;
PADDLE_ENFORCE_NE(config_.model_dir, "", platform::errors::InvalidArgument(
"Model dir must be provided."));
std::string model_path = config_.model_dir + ".pdmodel";
std::string model_path = config_.model_dir.empty()
? config_.program_path
: (config_.model_dir + ".pdmodel");
PADDLE_ENFORCE_NE(model_path, "",
platform::errors::InvalidArgument(
"One of model dir or program_path must be provided."));
VLOG(3) << "Loading program from " << model_path;
framework::proto::ProgramDesc program_proto;
std::string pb_content;
// Read binary
Expand All @@ -318,7 +472,6 @@ bool DistModel::LoadProgram() {
}

bool DistModel::LoadParameters() {
VLOG(3) << "Loading parameters from " << config_.model_dir;
PADDLE_ENFORCE_NOT_NULL(program_.get(),
platform::errors::PreconditionNotMet(
"The program should be loaded first."));
Expand Down Expand Up @@ -346,7 +499,13 @@ bool DistModel::LoadParameters() {
}
}

std::string param_path = config_.model_dir + ".pdiparams";
std::string param_path = config_.model_dir.empty()
? config_.param_path
: (config_.model_dir + ".pdiparams");
PADDLE_ENFORCE_NE(param_path, "",
platform::errors::InvalidArgument(
"One of model dir or param_path must be provided."));
VLOG(3) << "Loading parameters from " << param_path;
// sort paramlist to have consistent ordering
std::sort(params.begin(), params.end());
// append just the load_combine op
Expand All @@ -370,10 +529,11 @@ bool DistModel::LoadParameters() {
}

bool DistModel::PrepareFleetExe() {
VLOG(3) << "DistModel PrepareFleetExe()";
task_node_.reset(new TaskNode(program_.get(), config_.local_rank));
// With auto cut, there is no concept of pp, no need to add dependency.
task_node_->SetType("Compute");
task_node_->Init();
task_node_->Init(config_.use_feed_fetch_ops);
executor_desc_ = FleetExecutorDesc();
executor_desc_.set_cur_rank(config_.local_rank);
std::unordered_map<int64_t, int64_t> id_to_rank;
Expand All @@ -385,11 +545,13 @@ bool DistModel::PrepareFleetExe() {
}
fleet_exe.reset(new FleetExecutor(executor_desc_));
fleet_exe->Init(carrier_id_, *(program_.get()), scope_.get(), place_, 1,
{task_node_.get()}, id_to_rank);
{task_node_.get()}, id_to_rank, force_root_scope_var_names_);
VLOG(3) << "DistModel prepare fleet exe success.";
return true;
}

bool DistModel::PrepareFeedAndFetch() {
VLOG(3) << "DistModel PrepareFeedAndFetch()";
for (auto *op : program_->Block(0).AllOps()) {
if (op->Type() == "feed") {
VLOG(3) << "feed op with feed var: " << op->Output("Out")[0];
Expand All @@ -399,6 +561,15 @@ bool DistModel::PrepareFeedAndFetch() {
}
feeds_[idx] = op;
std::string var_name = op->Output("Out")[0];
// NOTE: Vars of feed fetch ops are not persistable,
// which will result in that those vars will be created in
// the subscope (microscope) in fleet executor. This will
// cause that the GetInputTensor/GetOutputTensor funct
// in analysis predictor cannot find those vars in the scope
// returned by the DistModel, since DistModel only return the
// root scope. So, those vars must be forced to be created
// in the root scope instead of in the microscope.
force_root_scope_var_names_.emplace_back(var_name);
feed_names_[var_name] = idx;
idx_to_feeds_[idx] = var_name;
framework::VarDesc *real_var = program_->Block(0).FindVar(var_name);
Expand Down Expand Up @@ -428,7 +599,9 @@ bool DistModel::PrepareFeedAndFetch() {
fetches_.resize(idx + 1);
}
fetches_[idx] = op;
idx_to_fetches_[idx] = op->Input("X")[0];
std::string var_name = op->Input("X")[0];
force_root_scope_var_names_.emplace_back(var_name);
idx_to_fetches_[idx] = var_name;
}
}

Expand All @@ -440,6 +613,7 @@ bool DistModel::PrepareFeedAndFetch() {
LOG(ERROR) << "No fetch op in the inf program, please check the program.";
return false;
}
VLOG(3) << "DistModel prepare feed and fetch success.";
return true;
}

Expand Down Expand Up @@ -508,7 +682,7 @@ bool DistModel::FetchResults(std::vector<DistModelTensor> *output_data,
rst = FetchResult<int32_t>(fetch, output);
output->dtype = DistModelDataType::INT32;
} else if (type == framework::proto::VarType::FP16) {
rst = FetchResult<float16>(fetch, output);
rst = FetchResult<paddle::platform::float16>(fetch, output);
output->dtype = DistModelDataType::FLOAT16;
} else {
LOG(ERROR) << "DistModel meets unknown fetch data type. DistModel only "
Expand Down
Loading

0 comments on commit 852c91f

Please sign in to comment.