Skip to content

Commit

Permalink
Refactor StreamAnalyzer and EventManager from InterpreterCore
Browse files Browse the repository at this point in the history
  • Loading branch information
Aurelius84 committed Sep 14, 2021
1 parent 2bb4431 commit 3590bc9
Show file tree
Hide file tree
Showing 9 changed files with 288 additions and 171 deletions.
4 changes: 3 additions & 1 deletion paddle/fluid/framework/new_executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ graph_to_program_pass variable_helper timer monitor)
cc_library(workqueue SRCS workqueue.cc DEPS enforce)
cc_library(interpretercore_garbage_collector SRCS interpretercore_garbage_collector.cc DEPS workqueue ${DEVICE_EVENT_LIBS})
cc_library(interpretercore_util SRCS interpretercore_util.cc DEPS ${INTERPRETERCORE_DEPS})
cc_library(interpretercore SRCS interpretercore.cc DEPS workqueue ${DEVICE_EVENT_LIBS} interpretercore_util interpretercore_garbage_collector)
cc_library(event_manager SRCS event_manager.cc DEPS ${DEVICE_EVENT_LIBS} glog)
cc_library(stream_analyzer SRCS stream_analyzer.cc DEPS ${DEVICE_EVENT_LIBS} glog device_context)
cc_library(interpretercore SRCS interpretercore.cc DEPS workqueue ${DEVICE_EVENT_LIBS} interpretercore_util interpretercore_garbage_collector stream_analyzer event_manager)
cc_library(standalone_executor SRCS standalone_executor.cc DEPS interpretercore)
cc_test(workqueue_test SRCS workqueue_test.cc DEPS workqueue)
# cc_binary(standalone_executor_test SRCS standalone_executor_test.cc DEPS interpretercore standalone_executor operator op_registry executor ${GLOB_OP_LIB} ${GLOB_OPERATOR_DEPS} profiler)
58 changes: 58 additions & 0 deletions paddle/fluid/framework/new_executor/event_manager.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/framework/new_executor/event_manager.h"

namespace paddle {
namespace framework {

void EventManager::WaitEvent(const Instruction& instruction,
const platform::Place& place) {
// If InterpreterCore in on CPUPlace, do nothing.
if (platform::is_cpu_place(place)) return;

VLOG(3) << "Deal StreamWaitEventOrSync for "
<< instruction.kernel_func_.operator_base_->Type();
auto* dev_ctx = instruction.dev_ctx_;

WaitOrSync(instruction.intput_events_, dev_ctx);
}

void EventManager::RecordEvent(const Instruction& instruction,
const OpFuncNode& op_func_node,
const platform::Place& place) {
// If InterpreterCore in on CPUPlace, do nothing.
if (platform::is_cpu_place(place)) return;

for (auto& event : instruction.output_events_) {
VLOG(3) << "Record event in out_var_id: " << event.var_id_;
event.event_->Record(instruction.dev_ctx_);
}
}

void EventManager::WaitOrSync(const std::vector<EventInter>& events,
const platform::DeviceContext* dev_ctx) {
for (auto& event_iter : events) {
if (event_iter.is_sync_) {
VLOG(3) << "host sync wait in_var_id " << event_iter.var_id_;
event_iter.event_->Wait(platform::kCPU, dev_ctx);
} else {
VLOG(3) << "stream async wait in_var_id " << event_iter.var_id_;
event_iter.event_->Wait(platform::kCUDA, dev_ctx);
}
}
}

} // namespace framework
} // namespace paddle
35 changes: 35 additions & 0 deletions paddle/fluid/framework/new_executor/event_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#include "paddle/fluid/framework/new_executor/new_executor_defs.h"

namespace paddle {
namespace framework {

class EventManager {
public:
void RecordEvent(const Instruction& instruction,
const OpFuncNode& op_func_node,
const platform::Place& place);

void WaitEvent(const Instruction& instruction, const platform::Place& place);

private:
void WaitOrSync(const std::vector<EventInter>& events,
const platform::DeviceContext* dev_ctx);
};

} // namespace framework
} // namespace paddle
157 changes: 6 additions & 151 deletions paddle/fluid/framework/new_executor/interpretercore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,101 +20,6 @@
namespace paddle {
namespace framework {

namespace {

/*
* Parse the var_ids that need to be associated with an event.
* The caller should guarantee front_op and back_op satisfy the
* following conditions:
* 1. kQueueAsync -> kQueueAsync
* 2. kQueueAsync -> kQueueSync
*
* For example: matmul(gpu) -> out_var -> memcpy_d2h
* out_var should be associated with an event.
*/
std::vector<size_t> ParseEventVarIds(const Instruction& cur_instr,
const Instruction& next_instr) {
std::unordered_set<size_t> unique_var_ids;
for (auto& item : cur_instr.output_index_) {
unique_var_ids.insert(item.second.begin(), item.second.end());
}

std::vector<size_t> new_event_var_ids;
for (auto& item : next_instr.input_index_) {
for (auto var_id : item.second) {
if (unique_var_ids.count(var_id) > 0) {
new_event_var_ids.push_back(var_id);
}
}
}
return new_event_var_ids;
}

void AssociateInputWithEvents(
const platform::Place& place, const std::vector<size_t>& new_event_var_id,
Instruction* next_instr,
std::map<size_t, std::shared_ptr<platform::DeviceEvent>>* var_id2event,
bool is_sync) {
for (auto var_id : new_event_var_id) {
if (var_id2event->count(var_id) == 0) {
auto device_event = std::make_shared<platform::DeviceEvent>(
place, platform::GenerateDeviceEventFlag());
var_id2event->emplace(var_id, std::move(device_event));
}
// Add events for next_instr.inputs
next_instr->intput_events_.emplace_back(var_id, var_id2event->at(var_id),
is_sync);
}
}

void ParseDirectAndEventRunOps(
const platform::Place& place, const std::vector<OpFuncNode>& op_func_nodes,
const std::vector<size_t>& downstream_ops, size_t op_index,
std::map<size_t, std::shared_ptr<platform::DeviceEvent>>* var_id2event,
std::vector<Instruction>* instructions) {
auto& op_func_type = op_func_nodes[op_index].type_;
auto& cur_instr = instructions->at(op_index);
auto& next_instruction = cur_instr.next_instruction_;

if (op_func_type == OpFuncType::kQueueSync) {
// all downstream ops of kQueueSync can directly run, such as CPU -> Any
next_instruction.direct_run_ = downstream_ops;
} else { // kQueueAsync
std::vector<size_t> event_var_ids;
for (auto next_op_id : downstream_ops) {
auto& next_instr = instructions->at(next_op_id);
// case 1: GPU -> GPU(same stream)
if (cur_instr.dev_ctx_ == next_instr.dev_ctx_) {
next_instruction.direct_run_.emplace_back(next_op_id);
continue;
}
// Always insert events between different stream
auto new_event_var_ids = ParseEventVarIds(cur_instr, next_instr);
event_var_ids.insert(event_var_ids.end(), new_event_var_ids.begin(),
new_event_var_ids.end());

bool is_sync =
(op_func_nodes[next_op_id].type_ == OpFuncType::kQueueSync);
AssociateInputWithEvents(place, new_event_var_ids, &next_instr,
var_id2event, is_sync);

if (is_sync) { // GPU -> CPU
next_instruction.synchronize_run_.emplace_back(next_op_id);
} else { // GPU -> GPU(different stream)
next_instruction.event_wait_run_.emplace_back(next_op_id);
}
}
// Create events for these cross-stream vars
VLOG(3) << cur_instr.kernel_func_.operator_base_->Type()
<< " event_var_ids.size: " << event_var_ids.size();
for (auto var_id : event_var_ids) {
cur_instr.output_events_.emplace_back(var_id, var_id2event->at(var_id),
false /*not used*/);
}
}
}
} // namespace

InterpreterCore::InterpreterCore(const platform::Place& place,
const ProgramDesc& main_prog,
VariableScope* global_scope,
Expand All @@ -123,8 +28,7 @@ InterpreterCore::InterpreterCore(const platform::Place& place,
: place_(place),
main_program_(main_prog),
global_scope_(global_scope),
d2h_ctx_pool_({place}),
h2d_ctx_pool_({place}) {
stream_analyzer_(place) {
is_build_ = false;

feed_names_ = feed_names;
Expand Down Expand Up @@ -199,7 +103,7 @@ void InterpreterCore::Convert() {
Instruction temp_inst;
auto* op_base = op_list_[i];
temp_inst.dev_ctx_ =
ParseDeviceContextForInstruction(vec_func_list_[i], *op_base);
stream_analyzer_.ParseDeviceContext(vec_func_list_[i], *op_base);
temp_inst.kernel_func_.compute_func_ = vec_func_list_[i].kernel_func_;
temp_inst.kernel_func_.operator_base_ = op_base;
temp_inst.input_index_ = vec_func_list_[i].input_index;
Expand Down Expand Up @@ -270,8 +174,8 @@ void InterpreterCore::Convert() {
}
}

ParseDirectAndEventRunOps(place_, vec_func_list_, filter_next, i,
&var_id2event_, &vec_instruction_);
stream_analyzer_.Schedule(vec_func_list_, filter_next, i,
&vec_instruction_);

for (auto inst_id : filter_next) {
dependecy_count_[inst_id]++;
Expand Down Expand Up @@ -361,7 +265,7 @@ void InterpreterCore::ExecuteInstructionList(
working_queue.pop();
auto& instr_node = vec_instr[instr_id];
// step1 : stream_wait (non-block host) or sync (block host)
StreamWaitEventOrSync(instr_node);
event_manager_.WaitEvent(instr_node, place_);
// step2: run instruction
RunInstruction(instr_node);
++run_op_number;
Expand All @@ -371,7 +275,7 @@ void InterpreterCore::ExecuteInstructionList(
}

// step3: insert event for out_vars if needed
RecordEventInstruction(instr_node, vec_func_list_[instr_id]);
event_manager_.RecordEvent(instr_node, vec_func_list_[instr_id], place_);

// step4: update working_queue
auto& next_instr = instr_node.next_instruction_.all_next_ops_;
Expand Down Expand Up @@ -450,54 +354,5 @@ const CostInfo& InterpreterCore::DryRun(
return dry_run_profiler_.GetCostInfo();
}

platform::DeviceContext* InterpreterCore::ParseDeviceContextForInstruction(
const OpFuncNode& op_func_node, const OperatorBase& op_base) {
auto& op_type = op_base.Type();
auto* dev_ctx = op_func_node.dev_ctx_;
if (op_type == interpretercore::kMemcpyH2D) {
VLOG(3) << "Get dev_ctx from d2h_context_pool_";
dev_ctx = d2h_ctx_pool_.Get(place_);
} else if (op_type == interpretercore::kMemcpyD2H) {
VLOG(3) << "Get dev_ctx from h2d_context_pool_";
dev_ctx = h2d_ctx_pool_.Get(place_);
}

return dev_ctx;
}

void InterpreterCore::RecordEventInstruction(const Instruction& instruction,
const OpFuncNode& op_func_node) {
// If InterpreterCore in on CPUPlace, do nothing.
if (platform::is_cpu_place(place_)) return;

for (auto& event : instruction.output_events_) {
VLOG(3) << "Record event in out_var_id: " << event.var_id_;
event.event_->Record(instruction.dev_ctx_);
}
}

void InterpreterCore::WaitOrSync(const std::vector<EventInter>& events,
const platform::DeviceContext* dev_ctx) {
for (auto& event_iter : events) {
if (event_iter.is_sync_) {
VLOG(3) << "host sync wait in_var_id " << event_iter.var_id_;
event_iter.event_->Wait(platform::kCPU, dev_ctx);
} else {
VLOG(3) << "stream async wait in_var_id " << event_iter.var_id_;
event_iter.event_->Wait(platform::kCUDA, dev_ctx);
}
}
}

void InterpreterCore::StreamWaitEventOrSync(const Instruction& instruction) {
// If InterpreterCore in on CPUPlace, do nothing.
if (platform::is_cpu_place(place_)) return;

VLOG(3) << "Deal StreamWaitEventOrSync for "
<< instruction.kernel_func_.operator_base_->Type();
auto* dev_ctx = instruction.dev_ctx_;

WaitOrSync(instruction.intput_events_, dev_ctx);
}
} // namespace framework
} // namespace paddle
20 changes: 4 additions & 16 deletions paddle/fluid/framework/new_executor/interpretercore.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
#include <unordered_map>
#include <vector>

#include "paddle/fluid/framework/new_executor/event_manager.h"
#include "paddle/fluid/framework/new_executor/interpretercore_garbage_collector.h"
#include "paddle/fluid/framework/new_executor/interpretercore_util.h"
#include "paddle/fluid/framework/new_executor/new_executor_defs.h"
#include "paddle/fluid/framework/new_executor/profiler.h"
#include "paddle/fluid/framework/new_executor/stream_analyzer.h"
#include "paddle/fluid/framework/new_executor/workqueue.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/tensor.h"
Expand Down Expand Up @@ -64,17 +66,6 @@ class InterpreterCore {
const VariableScope& var_scope, const platform::Place& place,
std::vector<VariableMetaInfo>& working_var_ref); // NOLINT

platform::DeviceContext* ParseDeviceContextForInstruction(
const OpFuncNode& op_func_node, const OperatorBase& op_base);

void RecordEventInstruction(const Instruction& instruction,
const OpFuncNode& op_func_node);

void WaitOrSync(const std::vector<EventInter>& events,
const platform::DeviceContext* dev_ctx);

void StreamWaitEventOrSync(const Instruction& instruction);

void AddFetch(const std::vector<std::string>& fetch_names);

bool is_build_;
Expand All @@ -83,9 +74,6 @@ class InterpreterCore {
ProgramDesc main_program_;
VariableScope* global_scope_;

platform::DeviceContextPool d2h_ctx_pool_;
platform::DeviceContextPool h2d_ctx_pool_;

std::vector<Instruction> vec_instruction_;
InstructionInfo instruction_info_;
std::vector<size_t> dependecy_count_;
Expand All @@ -99,8 +87,8 @@ class InterpreterCore {
std::vector<std::string> feed_names_;

InterpreterProfiler dry_run_profiler_;

std::map<size_t, std::shared_ptr<platform::DeviceEvent>> var_id2event_;
StreamAnalyzer stream_analyzer_;
EventManager event_manager_;

InterpreterCoreGarbageCollector gc_;
std::vector<paddle::platform::DeviceEvent> gc_event_;
Expand Down
3 changes: 0 additions & 3 deletions paddle/fluid/framework/new_executor/interpretercore_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,6 @@ class RuntimeInferShapeContext : public InferShapeContext {

namespace interpretercore {

static constexpr char kMemcpyH2D[] = "memcpy_h2d";
static constexpr char kMemcpyD2H[] = "memcpy_d2h";

std::string get_memcpy_type(const platform::Place& src_place,
const platform::Place& dst_place);

Expand Down
5 changes: 5 additions & 0 deletions paddle/fluid/framework/new_executor/new_executor_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
namespace paddle {
namespace framework {

namespace interpretercore {
static constexpr char kMemcpyH2D[] = "memcpy_h2d";
static constexpr char kMemcpyD2H[] = "memcpy_d2h";
} // namespace interpretercore

using OpKernelComputeFunc = std::function<void(const ExecutionContext&)>;
using OpKernelMap =
std::unordered_map<OpKernelType, OpKernelComputeFunc, OpKernelType::Hash>;
Expand Down
Loading

1 comment on commit 3590bc9

@paddle-bot-old
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Congratulation! Your pull request passed all required CI. You could ask reviewer(s) to approve and merge. 🎉

Please sign in to comment.