Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade work queue #38335

Merged
merged 37 commits into from
Dec 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
8744ba7
add align for WorkQueue
liutiexing Sep 22, 2021
4759bc8
Merge branch 'develop' of https://github.com/liutiexing/Paddle into d…
liutiexing Sep 22, 2021
6f00ace
add spinlock
liutiexing Sep 23, 2021
2d6f1cf
merge develop
liutiexing Sep 23, 2021
f5099be
Merge branch 'develop' of https://github.com/liutiexing/Paddle into d…
liutiexing Sep 26, 2021
54aa332
merge develop
liutiexing Oct 12, 2021
1d1bd82
merge
liutiexing Oct 12, 2021
dfbf3e4
Merge remote-tracking branch 'upstream/develop' into develop
liutiexing Oct 12, 2021
a5392b3
Merge remote-tracking branch 'upstream/develop' into develop
liutiexing Oct 14, 2021
e206173
Add EventsWaiter
liutiexing Oct 15, 2021
0a3dcd9
Revert "Add EventsWaiter"
liutiexing Oct 15, 2021
4689bb5
Merge remote-tracking branch 'upstream/develop' into develop
liutiexing Oct 15, 2021
0cec99a
Merge remote-tracking branch 'upstream/develop' into develop
liutiexing Oct 20, 2021
481c4fa
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Oct 27, 2021
83db84e
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Oct 29, 2021
7010e0d
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Nov 16, 2021
ec2a363
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Nov 23, 2021
90a59ec
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Nov 26, 2021
1445bbe
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Nov 29, 2021
a2c74ab
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 1, 2021
1c09b4e
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 2, 2021
cb8cf7d
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 8, 2021
cf0dcd6
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 8, 2021
2f95801
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 14, 2021
14bec1b
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 15, 2021
8a5f7af
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 16, 2021
f0a5915
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 20, 2021
0fe35aa
Merge branch 'PaddlePaddle:develop' into develop
liutiexing Dec 21, 2021
3876d20
update EventsWater
Dec 21, 2021
b8c1bb5
fix
Dec 21, 2021
d26e97f
split workqueue files
Dec 21, 2021
28516da
add more tests
Dec 22, 2021
539038b
fix
Dec 22, 2021
a9669c8
bugfix
Dec 22, 2021
8748ca1
bugfix
Dec 22, 2021
91b289c
Merge branch 'UpgradeWorkQueue' of https://github.com/liutiexing/Padd…
Dec 22, 2021
9211ad2
update
Dec 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions paddle/fluid/framework/new_executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ set(INTERPRETERCORE_DEPS op_registry device_context scope framework_proto data_f
lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor nan_inf_utils)

add_subdirectory(workqueue)

cc_library(data_transfer SRCS data_transfer.cc DEPS enforce scope glog)
cc_library(workqueue SRCS workqueue.cc workqueue_utils.cc DEPS enforce)
cc_library(new_executor_defs SRCS new_executor_defs.cc DEPS enforce glog scope)
cc_library(interpretercore_garbage_collector SRCS interpretercore_garbage_collector.cc DEPS workqueue ${DEVICE_EVENT_LIBS} executor_gc_helper)
cc_library(interpretercore_util SRCS interpretercore_util.cc DEPS ${INTERPRETERCORE_DEPS} workqueue new_executor_defs data_transfer)
cc_library(event_manager SRCS event_manager.cc DEPS ${DEVICE_EVENT_LIBS} glog new_executor_defs)
cc_library(stream_analyzer SRCS stream_analyzer.cc DEPS ${DEVICE_EVENT_LIBS} glog device_context new_executor_defs)
cc_library(interpretercore SRCS interpretercore.cc DEPS workqueue ${DEVICE_EVENT_LIBS} interpretercore_util interpretercore_garbage_collector stream_analyzer event_manager)
cc_library(standalone_executor SRCS standalone_executor.cc DEPS interpretercore)
cc_test(workqueue_test SRCS workqueue_test.cc DEPS workqueue)

# skip win32 since wget is not installed by default on windows machine.
# skip COVERAGE_CI since the test runs slowly because of instrumentation.
Expand Down
3 changes: 1 addition & 2 deletions paddle/fluid/framework/new_executor/interpretercore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ InterpreterCore::InterpreterCore(const platform::Place& place,
new interpreter::AsyncWorkQueue(kHostNumThreads, &main_thread_blocker_));
gc_.reset(new InterpreterCoreGarbageCollector());

exception_notifier_ = main_thread_blocker_.RegisterEvent(
kExceptionCaught, [this]() { return exception_holder_.IsCaught(); });
exception_notifier_ = main_thread_blocker_.RegisterEvent(kExceptionCaught);

create_local_scope_ = FLAGS_new_executor_use_local_scope;
if (FLAGS_new_executor_use_local_scope) {
Expand Down
2 changes: 0 additions & 2 deletions paddle/fluid/framework/new_executor/interpretercore.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
#include "paddle/fluid/framework/new_executor/new_executor_defs.h"
#include "paddle/fluid/framework/new_executor/profiler.h"
#include "paddle/fluid/framework/new_executor/stream_analyzer.h"
#include "paddle/fluid/framework/new_executor/workqueue.h"
#include "paddle/fluid/framework/new_executor/workqueue_utils.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/variable.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <queue>
#include <vector>

#include "paddle/fluid/framework/new_executor/workqueue.h"
#include "paddle/fluid/framework/new_executor/workqueue/workqueue.h"
#include "paddle/fluid/memory/allocation/spin_lock.h"
#include "paddle/fluid/platform/device_event.h"

Expand Down
10 changes: 6 additions & 4 deletions paddle/fluid/framework/new_executor/interpretercore_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
#include "paddle/fluid/framework/executor_gc_helper.h"
#include "paddle/fluid/framework/garbage_collector.h"
#include "paddle/fluid/framework/new_executor/new_executor_defs.h"
#include "paddle/fluid/framework/new_executor/workqueue.h"
#include "paddle/fluid/framework/new_executor/workqueue_utils.h"
#include "paddle/fluid/framework/new_executor/workqueue/workqueue.h"
#include "paddle/fluid/framework/new_executor/workqueue/workqueue_utils.h"
#include "paddle/fluid/framework/op_info.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
Expand Down Expand Up @@ -61,12 +61,14 @@ class AsyncWorkQueue {
group_options.emplace_back(/*num_threads*/ host_num_threads,
/*allow_spinning*/ true,
/*track_task*/ true,
/*queue_empty_waiter*/ waiter);
/*detached*/ true,
/*events_waiter*/ waiter);
// for launch device Kernel
group_options.emplace_back(/*num_threads*/ 1,
/*allow_spinning*/ true,
/*track_task*/ true,
/*queue_empty_waiter*/ waiter);
/*detached*/ true,
/*events_waiter*/ waiter);
queue_group_ = CreateWorkQueueGroup(group_options);
}

Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/framework/new_executor/workqueue/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
cc_library(workqueue SRCS workqueue.cc workqueue_utils.cc events_waiter.cc DEPS enforce glog)
cc_test(workqueue_test SRCS workqueue_test.cc DEPS workqueue)
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
// and won't block, or notifying thread will see state_ change and will unblock
// the waiter, or both. But it can't happen that both threads don't see each
// other changes, which would lead to deadlock.
//
// What changed by PaddlePaddle
// 1. Allocate aligned storage for Waiters to get better performance.
// 2. Replace Eigen utils with std utils.

#pragma once

Expand Down
147 changes: 147 additions & 0 deletions paddle/fluid/framework/new_executor/workqueue/events_waiter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/framework/new_executor/workqueue/events_waiter.h"
#include <glog/logging.h>
#include "paddle/fluid/platform/enforce.h"

namespace paddle {
namespace framework {

EventsWaiter::EventsWaiter()
: trigger_event_(nullptr), counter_(0), waiting_(false), cv_(1) {}

std::shared_ptr<EventsWaiter::EventNotifier> EventsWaiter::RegisterEvent(
const std::string& name, EventChecker checker) {
auto counter = counter_.fetch_add(1);
auto id = std::hash<std::string>()(name + std::to_string(counter));
VLOG(10) << "Register event id:" << id << " name:" << name;
auto notifier = std::shared_ptr<EventNotifier>(new EventNotifier(id, this));
EventInfo evt{id, name, TriggerType::LevelTriggered, std::move(checker)};
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
events_[id] = std::move(evt);
return notifier;
}

std::shared_ptr<EventsWaiter::EventNotifier> EventsWaiter::RegisterEvent(
const std::string& name) {
auto counter = counter_.fetch_add(1);
auto id = std::hash<std::string>()(name + std::to_string(counter));
VLOG(10) << "Register event id:" << id << " name:" << name;
auto notifier = std::shared_ptr<EventNotifier>(new EventNotifier(id, this));
EventInfo evt{id, name, TriggerType::EdgeTriggered, []() { return false; }};
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
events_[id] = std::move(evt);
return notifier;
}

void EventsWaiter::UnregisterEvent(const EventId& id) {
VLOG(10) << "Unregister event id:" << id;
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
events_.erase(id);
}

std::string EventsWaiter::WaitEvent() {
// only one user can wait at any time
bool waiting = false;
if (!waiting_.compare_exchange_strong(waiting, true,
std::memory_order_seq_cst,
std::memory_order_relaxed)) {
PADDLE_THROW(
platform::errors::ResourceExhausted("Another thread is waiting."));
}
auto w = cv_.GetWaiter(0);
cv_.Prewait();
std::string* triggered = trigger_event_;
if (triggered == nullptr) {
// checkers
{
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
for (auto& kv : events_) {
auto& evt = kv.second;
if (TriggerType::LevelTriggered == evt.type && evt.checker()) {
triggered = new std::string(evt.name);
break;
}
}
}
if (triggered != nullptr) {
std::string* prev = nullptr;
if (!trigger_event_.compare_exchange_strong(prev, triggered,
std::memory_order_seq_cst,
std::memory_order_relaxed)) {
delete triggered;
triggered = prev;
}
}
}
if (triggered) {
cv_.CancelWait();
} else {
cv_.CommitWait(w);
triggered = trigger_event_;
}
trigger_event_.store(nullptr, std::memory_order_relaxed);
waiting_.store(false);
auto trigger_event = *triggered;
delete triggered;
return trigger_event;
}

int EventsWaiter::Clear() {
bool waiting = false;
if (!waiting_.compare_exchange_strong(waiting, true,
std::memory_order_seq_cst,
std::memory_order_relaxed)) {
return -1;
}
trigger_event_.store(nullptr, std::memory_order_relaxed);
waiting_.store(false);
return 0;
}

void EventsWaiter::TriggerEvent(const EventId& id) {
VLOG(10) << "Try to trigger event id:" << id;
std::string* trigger_event = new std::string;
{
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
auto iter = events_.find(id);
if (iter == events_.end()) {
delete trigger_event;
return;
}
*trigger_event = iter->second.name;
}
std::string* prev = nullptr;
if (!trigger_event_.compare_exchange_strong(prev, trigger_event,
std::memory_order_seq_cst,
std::memory_order_relaxed)) {
delete trigger_event;
return;
}
VLOG(10) << "Triggered event id:" << id << " name:" << *trigger_event;
cv_.Notify(true);
}

std::string EventsWaiter::GetEventName(const EventId& id) {
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
auto iter = events_.find(id);
if (iter == events_.end()) {
return "Unregistered";
}
return iter->second.name;
}

} // namespace framework
} // namespace paddle
111 changes: 111 additions & 0 deletions paddle/fluid/framework/new_executor/workqueue/events_waiter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <atomic>
#include <cstddef>
#include <functional>
#include <string>
#include <unordered_map>
#include "paddle/fluid/framework/new_executor/workqueue/event_count.h"
#include "paddle/fluid/memory/allocation/spin_lock.h"

namespace paddle {
namespace framework {

// A multiplexing waiter, be able to wait multiple kinds of events
// simultaneously.
// Muti-Producer single-consumer single-slot message-queue.
class EventsWaiter {
public:
using EventId = std::size_t;

using EventChecker = std::function<bool()>;

// Make sure EventsWaiter has a longer lifetime than EventNotifier.
class EventNotifier {
public:
void NotifyEvent() { waiter_.TriggerEvent(id_); }

void UnregisterEvent() { waiter_.UnregisterEvent(id_); }

EventId GetEventId() { return id_; }

// return "Unregistered" if the corresponding event was unregistered.
std::string GetEventName() { return waiter_.GetEventName(id_); }

private:
friend EventsWaiter;
EventNotifier(EventId id, EventsWaiter* waiter)
: id_(id), waiter_(*waiter) {}
EventNotifier(const EventNotifier&) = delete;
void operator=(const EventNotifier&) = delete;

EventId id_;
EventsWaiter& waiter_;
};

EventsWaiter();
EventsWaiter(const EventsWaiter&) = delete;
EventsWaiter& operator=(const EventsWaiter&) = delete;

// Register a level-triggered event. If the checker returns true or
// EventNotifier::NotifyEvent is called, the corresponding event will be
// distributed.
std::shared_ptr<EventNotifier> RegisterEvent(const std::string& name,
EventChecker checker);

// Register an edge-triggered event. The corresponding event will be
// distributed when EventNotifier::NotifyEvent is called.
std::shared_ptr<EventNotifier> RegisterEvent(const std::string& name);

void UnregisterEvent(const EventId& id);

// Blocking the calling thread to wait any of the registered events.
std::string WaitEvent();

// Nonblocking.
// Clear the slot, no matter whether there is an event.
// Return value:
// -1 : another thread is waiting.
// 0 : succ.
int Clear();

private:
friend EventNotifier;

enum class TriggerType { LevelTriggered, EdgeTriggered };

struct EventInfo {
EventId id;
std::string name;
TriggerType type;
EventChecker checker;
};

void TriggerEvent(const EventId& id);

std::string GetEventName(const EventId& id);

std::unordered_map<EventId, EventInfo> events_;
paddle::memory::SpinLock events_lock_;
std::atomic<std::string*> trigger_event_;
std::atomic<uint64_t> counter_;
std::atomic<bool> waiting_;
EventCount cv_;
};

} // namespace framework
} // namespace paddle
Original file line number Diff line number Diff line change
Expand Up @@ -12,43 +12,14 @@
#include <atomic>
#include <cstdlib>
#include <vector>
#include "paddle/fluid/framework/new_executor/event_count.h"
#include "paddle/fluid/framework/new_executor/run_queue.h"
#include "paddle/fluid/framework/new_executor/thread_environment.h"
#include "paddle/fluid/framework/new_executor/workqueue/event_count.h"
#include "paddle/fluid/framework/new_executor/workqueue/run_queue.h"
#include "paddle/fluid/framework/new_executor/workqueue/thread_environment.h"
#include "paddle/fluid/platform/os_info.h"

namespace paddle {
namespace framework {

template <typename Notifier>
class TaskTracker {
public:
TaskTracker() = default;

explicit TaskTracker(Notifier& notifier) : notifier_(&notifier) {}

TaskTracker(const TaskTracker&) = delete;

TaskTracker& operator=(const TaskTracker&) = delete;

~TaskTracker() = default;

void AddCounter() { num_tasks_.fetch_add(1, std::memory_order_relaxed); }

void SubCounter() {
if (1 == num_tasks_.fetch_sub(1, std::memory_order_relaxed)) {
if (notifier_ != nullptr) {
notifier_->NotifyEvent();
}
}
}

uint64_t PendingTaskNum() { return num_tasks_.load(); }

private:
alignas(64) std::atomic<uint64_t> num_tasks_{0};
Notifier* notifier_{nullptr};
};

template <typename Environment>
class ThreadPoolTempl {
public:
Expand Down
Loading