Skip to content

Commit

Permalink
Multi-threaded executor
Browse files Browse the repository at this point in the history
  • Loading branch information
Xin Pan authored and panyx0718 committed Mar 15, 2018
1 parent a4b801c commit 0c581b1
Show file tree
Hide file tree
Showing 20 changed files with 499 additions and 119 deletions.
171 changes: 158 additions & 13 deletions paddle/fluid/framework/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ limitations under the License. */
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/reader.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/framework/threadpool.h"

DECLARE_bool(benchmark);
DEFINE_bool(check_nan_inf, false,
Expand All @@ -33,6 +35,10 @@ DEFINE_bool(check_nan_inf, false,

namespace paddle {
namespace framework {
namespace {
std::mutex reduce_mu;
std::vector<std::string> runned;
}

Executor::Executor(const platform::Place& place) : place_(place) {}

Expand Down Expand Up @@ -83,6 +89,12 @@ static void CheckTensorNANOrInf(const std::string& name,
"Tensor %s contains NAN", name);
}

void WaitOnPlace(const platform::Place& place) {
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &dev_ctx = *pool.Get(place);
dev_ctx.Wait();
}

void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id,
bool create_local_scope, bool create_vars) {
// TODO(tonyyang-svail):
Expand All @@ -100,6 +112,7 @@ void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id,
continue;
}


if (var->Persistable()) {
auto* ptr = scope->Var(var->Name());
CreateTensor(ptr, var->GetType());
Expand All @@ -122,26 +135,156 @@ void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id,
} // if (create_local_scope)
} // if (create_vars)

for (auto& op_desc : block.AllOps()) {
auto op = paddle::framework::OpRegistry::CreateOp(*op_desc);
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto dev_ctx = pool.Get(place_);
int dev_id_orig = boost::get<platform::CUDAPlace>(place_).GetDeviceId();
int dev_id = dev_id_orig * 1000 + block_id;
if (dev_id_orig == 0) {
std::lock_guard<std::mutex> l2(reduce_mu);
runned.clear();
}

VLOG(3) << place_ << " " << op->DebugStringEx(local_scope);
op->Run(*local_scope, place_);
std::mutex to_runs_mu;
std::deque<OpDesc*> to_runs;
std::unordered_map<int64_t, OpDesc*> running;
int64_t cur_id = 0;
for (auto& op_desc : block.AllOps()) {
op_desc->Reset(dev_id);
if (op_desc->IsReady(dev_id)) {
to_runs.push_back(op_desc);
}
}

if (FLAGS_benchmark) {
VLOG(2) << "Memory used after operator " + op->Type() + " running: "
<< memory::memory_usage(place_);
std::unordered_map<std::string, OpDesc*> reduces;
int cur_reduce = 0;

while (true) {
int64_t old_id = cur_id;
cur_id++;
OpDesc* op_desc = nullptr;
bool is_all_running = false;
bool is_too_many_running = false;
{
std::lock_guard<std::mutex> l(to_runs_mu);
if (to_runs.empty()) {
if (running.empty()) {
break;
} else {
is_all_running = true;
}
} else {
if (running.size() > 50) {
is_all_running = true;
} else {
op_desc = to_runs.front();
running[old_id] = op_desc;
to_runs.pop_front();
}
}
}
if (FLAGS_check_nan_inf) {
for (auto& vname : op->OutputVars(true)) {
auto* var = local_scope->FindVar(vname);
if (var == nullptr) continue;
if (var->IsType<framework::LoDTensor>()) {
CheckTensorNANOrInf(vname, var->Get<framework::LoDTensor>());
if (is_all_running || is_too_many_running) {
std::this_thread::sleep_for(std::chrono::microseconds(4));
continue;
}

if (op_desc->UniqueName().find("ncclAllReduce") !=
op_desc->UniqueName().npos) {
if (dev_id_orig == 0) {
auto op = paddle::framework::OpRegistry::CreateOp(*op_desc);
// fprintf(stderr, "%s seq_start1 at %d at idx: %lu\n",
// op_desc->UniqueName().c_str(), dev_id_orig, runned.size());
{
std::lock_guard<std::mutex> l2(reduce_mu);
runned.push_back(op_desc->UniqueName());
}
op->Run(*local_scope, place_);
// fprintf(stderr, "%s seq_done1\n",
// op_desc->UniqueName().c_str());
std::vector<OpDesc*> nexts = op_desc->GetRunnables(dev_id);
{
std::lock_guard<std::mutex> l(to_runs_mu);
for (int i = 0; i < nexts.size(); ++i) {
to_runs.push_back(nexts[i]);
}
running.erase(old_id);
}
} else {
reduces[op_desc->UniqueName()] = op_desc;
bool can_run = false;
{
std::lock_guard<std::mutex> l2(reduce_mu);
can_run = cur_reduce < runned.size() &&
runned[cur_reduce] == op_desc->UniqueName() &&
reduces.find(runned[cur_reduce]) != reduces.end();
}
if (can_run) {
// fprintf(stderr, "to run at idx: %d\n", cur_reduce);
auto op = paddle::framework::OpRegistry::CreateOp(*op_desc);
// fprintf(stderr, "%s seq_start2 at %d\n",
// op_desc->UniqueName().c_str(), dev_id_orig);
op->Run(*local_scope, place_);
// fprintf(stderr, "%s seq_done2\n", op_desc->UniqueName().c_str());
std::vector<OpDesc*> nexts = op_desc->GetRunnables(dev_id);
{
std::lock_guard<std::mutex> l(to_runs_mu);
for (int i = 0; i < nexts.size(); ++i) {
to_runs.push_back(nexts[i]);
}
running.erase(old_id);
}
std::lock_guard<std::mutex> l2(reduce_mu);
cur_reduce++;
} else {
std::lock_guard<std::mutex> l(to_runs_mu);
running.erase(old_id);
to_runs.push_back(op_desc);
}
}
continue;
}
std::thread(
[this, &to_runs, &to_runs_mu, op_desc, local_scope, dev_ctx, old_id,
&running, dev_id] {
OpDesc* desc = op_desc;
platform::RecordEvent record_event(
desc->UniqueName(), dev_ctx);
auto op = paddle::framework::OpRegistry::CreateOp(*desc);
// fprintf(stderr, "%s start3 at %d\n",
// desc->UniqueName().c_str(), dev_id);
op->Run(*local_scope, place_);
// fprintf(stderr, "%s done3\n", desc->UniqueName().c_str());

std::vector<OpDesc*> nexts = desc->GetRunnables(dev_id);
std::lock_guard<std::mutex> l(to_runs_mu);
for (int i = 0; i < nexts.size(); ++i) {
to_runs.push_back(nexts[i]);
}
running.erase(old_id);
}).detach();
}
/*
for (auto& op_desc : block.AllOps()) {
auto op = paddle::framework::OpRegistry::CreateOp(*op_desc);
VLOG(3) << place_ << " " << op->DebugStringEx(local_scope);
op->Run(*local_scope, place_);
std::thread t([&](){
auto op = paddle::framework::OpRegistry::CreateOp(*op_desc);
VLOG(3) << place_ << " " << op->DebugStringEx(local_scope);
op->Run(*local_scope, place_);
});
t.join();
}*/

int64_t no_scheduled = 0;
for (auto& op_desc : block.AllOps()) {
if (!op_desc->Scheduled(dev_id)) {
++no_scheduled;
fprintf(stderr, "%s not scheduled at %d\n",
op_desc->UniqueName().c_str(), dev_id);
}
}

if (create_vars && create_local_scope) {
scope->DeleteScope(local_scope);
}
Expand Down Expand Up @@ -247,6 +390,8 @@ void Executor::Run(const ProgramDesc& program, Scope* scope,
feed_holder->SetType(proto::VarType::FEED_MINIBATCH);
feed_holder->SetPersistable(true);

// for (auto block : program.Block())

int i = 0;
for (auto& feed_target : feed_targets) {
std::string var_name = feed_target.first;
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/framework/framework.proto
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ message OpDesc {
repeated string arguments = 2;
};

optional string unique_name = 14;
required string type = 3;
repeated Var inputs = 1;
repeated Var outputs = 2;
Expand Down
88 changes: 87 additions & 1 deletion paddle/fluid/framework/op_desc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ limitations under the License. */

#include "paddle/fluid/framework/op_desc.h"
#include <functional>
#include <mutex>
#include <unordered_map>
#include "glog/logging.h"
#include "paddle/fluid/framework/block_desc.h"
Expand Down Expand Up @@ -83,13 +82,32 @@ class CompileTimeInferShapeContext : public InferShapeContext {
const BlockDesc &block_;
};

OpDesc::OpDesc(BlockDesc *block) : block_(block) {
rand_num_ = std::rand();
}

OpDesc::OpDesc() {
rand_num_ = std::rand();
}

OpDesc::OpDesc(const std::string &type, const VariableNameMap &inputs,
const VariableNameMap &outputs, const AttributeMap &attrs) {
desc_.set_type(type);
inputs_ = inputs;
outputs_ = outputs;
attrs_ = attrs;
need_update_ = true;
rand_num_ = std::rand();
}

std::string OpDesc::UniqueName() {
return name_;
}

void OpDesc::SetUniqueName(uint64_t block_id) {
name_ = string::Sprintf("%s_b%lu_%d", Type().c_str(), block_id,
rand_num_);
desc_.set_unique_name(name_);
}

void OpDesc::CopyFrom(const OpDesc &op_desc) {
Expand All @@ -100,6 +118,72 @@ void OpDesc::CopyFrom(const OpDesc &op_desc) {
need_update_ = true;
}

std::vector<OpDesc*> OpDesc::GetRunnables(int dev_id) {
std::lock_guard<std::mutex> l(mu_);
std::vector<OpDesc*> runnables;
for (auto op_it : ops_) {
auto op = op_it.second;
if (op->ReduceADependent(dev_id)) {
runnables.push_back(op);
/*
fprintf(stderr, " %s need %d more to scheduled on %d, ready\n",
op->UniqueName().c_str(),
op->CurDependency(dev_id), dev_id);*/
op->ReduceADependent(dev_id);
} else {
/*fprintf(stderr, " %s need %d more to scheduled on %d\n",
op->UniqueName().c_str(),
op->CurDependency(dev_id), dev_id);*/
}
}
return runnables;
}

std::vector<std::string> OpDesc::AllDepOps() {
std::vector<std::string> deps;
for (auto op_it : ops_) {
deps.push_back(op_it.first);
}
return deps;
}

bool OpDesc::IsReady(int dev_id) {
std::lock_guard<std::mutex> l(mu_);
return cur_dependents_.at(dev_id) == 0;
}

bool OpDesc::Scheduled(int dev_id) {
std::lock_guard<std::mutex> l(mu_);
return cur_dependents_.at(dev_id) <= 0;
}

int OpDesc::CurDependency(int dev_id) {
return cur_dependents_.at(dev_id);
}

void OpDesc::AddDependency(OpDesc* op) {
std::lock_guard<std::mutex> l(mu_);
if (ops_.find(op->UniqueName()) != ops_.end()) {
return;
}
ops_[op->UniqueName()] = op;
op->IncreaseADependent();
}

void OpDesc::Reset(int dev_id) {
std::lock_guard<std::mutex> l(mu_);
cur_dependents_[dev_id] = dependent_;
}

void OpDesc::IncreaseADependent() {
dependent_ += 1;
}

bool OpDesc::ReduceADependent(int dev_id) {
cur_dependents_[dev_id] -= 1;
return cur_dependents_.at(dev_id) == 0;
}

OpDesc::OpDesc(const proto::OpDesc &desc, ProgramDesc *prog, BlockDesc *block)
: desc_(desc), need_update_(false) {
// restore inputs_
Expand Down Expand Up @@ -134,6 +218,8 @@ OpDesc::OpDesc(const proto::OpDesc &desc, ProgramDesc *prog, BlockDesc *block)
}
}
this->block_ = block;

rand_num_ = std::rand();
}

proto::OpDesc *OpDesc::Proto() {
Expand Down
Loading

0 comments on commit 0c581b1

Please sign in to comment.