Skip to content

Commit

Permalink
Upgrade work queue (#38335)
Browse files Browse the repository at this point in the history
* add align for WorkQueue

* add spinlock

* merge develop

* merge

* Add EventsWaiter

* Revert "Add EventsWaiter"

This reverts commit e206173.

* update EventsWater

* fix

* split workqueue files

* add more tests

* fix

* bugfix

* bugfix

* update

Co-authored-by: liutiexing <liutiexing@google.com>
  • Loading branch information
liutiexing and liutiexing committed Dec 23, 2021
1 parent 4221cd3 commit 198d11b
Show file tree
Hide file tree
Showing 17 changed files with 380 additions and 165 deletions.
4 changes: 2 additions & 2 deletions paddle/fluid/framework/new_executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ set(INTERPRETERCORE_DEPS op_registry device_context scope framework_proto data_f
lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor nan_inf_utils)

add_subdirectory(workqueue)

cc_library(data_transfer SRCS data_transfer.cc DEPS enforce scope glog)
cc_library(workqueue SRCS workqueue.cc workqueue_utils.cc DEPS enforce)
cc_library(new_executor_defs SRCS new_executor_defs.cc DEPS enforce glog scope)
cc_library(interpretercore_garbage_collector SRCS interpretercore_garbage_collector.cc DEPS workqueue ${DEVICE_EVENT_LIBS} executor_gc_helper)
cc_library(interpretercore_util SRCS interpretercore_util.cc DEPS ${INTERPRETERCORE_DEPS} workqueue new_executor_defs data_transfer)
cc_library(event_manager SRCS event_manager.cc DEPS ${DEVICE_EVENT_LIBS} glog new_executor_defs)
cc_library(stream_analyzer SRCS stream_analyzer.cc DEPS ${DEVICE_EVENT_LIBS} glog device_context new_executor_defs)
cc_library(interpretercore SRCS interpretercore.cc DEPS workqueue ${DEVICE_EVENT_LIBS} interpretercore_util interpretercore_garbage_collector stream_analyzer event_manager)
cc_library(standalone_executor SRCS standalone_executor.cc DEPS interpretercore)
cc_test(workqueue_test SRCS workqueue_test.cc DEPS workqueue)

# skip win32 since wget is not installed by default on windows machine.
# skip COVERAGE_CI since the test runs slowly because of instrumentation.
Expand Down
3 changes: 1 addition & 2 deletions paddle/fluid/framework/new_executor/interpretercore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ InterpreterCore::InterpreterCore(const platform::Place& place,
new interpreter::AsyncWorkQueue(kHostNumThreads, &main_thread_blocker_));
gc_.reset(new InterpreterCoreGarbageCollector());

exception_notifier_ = main_thread_blocker_.RegisterEvent(
kExceptionCaught, [this]() { return exception_holder_.IsCaught(); });
exception_notifier_ = main_thread_blocker_.RegisterEvent(kExceptionCaught);

create_local_scope_ = FLAGS_new_executor_use_local_scope;
if (FLAGS_new_executor_use_local_scope) {
Expand Down
2 changes: 0 additions & 2 deletions paddle/fluid/framework/new_executor/interpretercore.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
#include "paddle/fluid/framework/new_executor/new_executor_defs.h"
#include "paddle/fluid/framework/new_executor/profiler.h"
#include "paddle/fluid/framework/new_executor/stream_analyzer.h"
#include "paddle/fluid/framework/new_executor/workqueue.h"
#include "paddle/fluid/framework/new_executor/workqueue_utils.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/variable.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <queue>
#include <vector>

#include "paddle/fluid/framework/new_executor/workqueue.h"
#include "paddle/fluid/framework/new_executor/workqueue/workqueue.h"
#include "paddle/fluid/memory/allocation/spin_lock.h"
#include "paddle/fluid/platform/device_event.h"

Expand Down
10 changes: 6 additions & 4 deletions paddle/fluid/framework/new_executor/interpretercore_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
#include "paddle/fluid/framework/executor_gc_helper.h"
#include "paddle/fluid/framework/garbage_collector.h"
#include "paddle/fluid/framework/new_executor/new_executor_defs.h"
#include "paddle/fluid/framework/new_executor/workqueue.h"
#include "paddle/fluid/framework/new_executor/workqueue_utils.h"
#include "paddle/fluid/framework/new_executor/workqueue/workqueue.h"
#include "paddle/fluid/framework/new_executor/workqueue/workqueue_utils.h"
#include "paddle/fluid/framework/op_info.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
Expand Down Expand Up @@ -61,12 +61,14 @@ class AsyncWorkQueue {
group_options.emplace_back(/*num_threads*/ host_num_threads,
/*allow_spinning*/ true,
/*track_task*/ true,
/*queue_empty_waiter*/ waiter);
/*detached*/ true,
/*events_waiter*/ waiter);
// for launch device Kernel
group_options.emplace_back(/*num_threads*/ 1,
/*allow_spinning*/ true,
/*track_task*/ true,
/*queue_empty_waiter*/ waiter);
/*detached*/ true,
/*events_waiter*/ waiter);
queue_group_ = CreateWorkQueueGroup(group_options);
}

Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/framework/new_executor/workqueue/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
cc_library(workqueue SRCS workqueue.cc workqueue_utils.cc events_waiter.cc DEPS enforce glog)
cc_test(workqueue_test SRCS workqueue_test.cc DEPS workqueue)
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
// and won't block, or notifying thread will see state_ change and will unblock
// the waiter, or both. But it can't happen that both threads don't see each
// other changes, which would lead to deadlock.
//
// What changed by PaddlePaddle
// 1. Allocate aligned storage for Waiters to get better performance.
// 2. Replace Eigen utils with std utils.

#pragma once

Expand Down
147 changes: 147 additions & 0 deletions paddle/fluid/framework/new_executor/workqueue/events_waiter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/framework/new_executor/workqueue/events_waiter.h"
#include <glog/logging.h>
#include "paddle/fluid/platform/enforce.h"

namespace paddle {
namespace framework {

EventsWaiter::EventsWaiter()
: trigger_event_(nullptr), counter_(0), waiting_(false), cv_(1) {}

std::shared_ptr<EventsWaiter::EventNotifier> EventsWaiter::RegisterEvent(
const std::string& name, EventChecker checker) {
auto counter = counter_.fetch_add(1);
auto id = std::hash<std::string>()(name + std::to_string(counter));
VLOG(10) << "Register event id:" << id << " name:" << name;
auto notifier = std::shared_ptr<EventNotifier>(new EventNotifier(id, this));
EventInfo evt{id, name, TriggerType::LevelTriggered, std::move(checker)};
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
events_[id] = std::move(evt);
return notifier;
}

std::shared_ptr<EventsWaiter::EventNotifier> EventsWaiter::RegisterEvent(
const std::string& name) {
auto counter = counter_.fetch_add(1);
auto id = std::hash<std::string>()(name + std::to_string(counter));
VLOG(10) << "Register event id:" << id << " name:" << name;
auto notifier = std::shared_ptr<EventNotifier>(new EventNotifier(id, this));
EventInfo evt{id, name, TriggerType::EdgeTriggered, []() { return false; }};
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
events_[id] = std::move(evt);
return notifier;
}

void EventsWaiter::UnregisterEvent(const EventId& id) {
VLOG(10) << "Unregister event id:" << id;
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
events_.erase(id);
}

std::string EventsWaiter::WaitEvent() {
// only one user can wait at any time
bool waiting = false;
if (!waiting_.compare_exchange_strong(waiting, true,
std::memory_order_seq_cst,
std::memory_order_relaxed)) {
PADDLE_THROW(
platform::errors::ResourceExhausted("Another thread is waiting."));
}
auto w = cv_.GetWaiter(0);
cv_.Prewait();
std::string* triggered = trigger_event_;
if (triggered == nullptr) {
// checkers
{
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
for (auto& kv : events_) {
auto& evt = kv.second;
if (TriggerType::LevelTriggered == evt.type && evt.checker()) {
triggered = new std::string(evt.name);
break;
}
}
}
if (triggered != nullptr) {
std::string* prev = nullptr;
if (!trigger_event_.compare_exchange_strong(prev, triggered,
std::memory_order_seq_cst,
std::memory_order_relaxed)) {
delete triggered;
triggered = prev;
}
}
}
if (triggered) {
cv_.CancelWait();
} else {
cv_.CommitWait(w);
triggered = trigger_event_;
}
trigger_event_.store(nullptr, std::memory_order_relaxed);
waiting_.store(false);
auto trigger_event = *triggered;
delete triggered;
return trigger_event;
}

int EventsWaiter::Clear() {
bool waiting = false;
if (!waiting_.compare_exchange_strong(waiting, true,
std::memory_order_seq_cst,
std::memory_order_relaxed)) {
return -1;
}
trigger_event_.store(nullptr, std::memory_order_relaxed);
waiting_.store(false);
return 0;
}

void EventsWaiter::TriggerEvent(const EventId& id) {
VLOG(10) << "Try to trigger event id:" << id;
std::string* trigger_event = new std::string;
{
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
auto iter = events_.find(id);
if (iter == events_.end()) {
delete trigger_event;
return;
}
*trigger_event = iter->second.name;
}
std::string* prev = nullptr;
if (!trigger_event_.compare_exchange_strong(prev, trigger_event,
std::memory_order_seq_cst,
std::memory_order_relaxed)) {
delete trigger_event;
return;
}
VLOG(10) << "Triggered event id:" << id << " name:" << *trigger_event;
cv_.Notify(true);
}

std::string EventsWaiter::GetEventName(const EventId& id) {
std::lock_guard<paddle::memory::SpinLock> guard(events_lock_);
auto iter = events_.find(id);
if (iter == events_.end()) {
return "Unregistered";
}
return iter->second.name;
}

} // namespace framework
} // namespace paddle
111 changes: 111 additions & 0 deletions paddle/fluid/framework/new_executor/workqueue/events_waiter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <atomic>
#include <cstddef>
#include <functional>
#include <string>
#include <unordered_map>
#include "paddle/fluid/framework/new_executor/workqueue/event_count.h"
#include "paddle/fluid/memory/allocation/spin_lock.h"

namespace paddle {
namespace framework {

// A multiplexing waiter, be able to wait multiple kinds of events
// simultaneously.
// Muti-Producer single-consumer single-slot message-queue.
class EventsWaiter {
public:
using EventId = std::size_t;

using EventChecker = std::function<bool()>;

// Make sure EventsWaiter has a longer lifetime than EventNotifier.
class EventNotifier {
public:
void NotifyEvent() { waiter_.TriggerEvent(id_); }

void UnregisterEvent() { waiter_.UnregisterEvent(id_); }

EventId GetEventId() { return id_; }

// return "Unregistered" if the corresponding event was unregistered.
std::string GetEventName() { return waiter_.GetEventName(id_); }

private:
friend EventsWaiter;
EventNotifier(EventId id, EventsWaiter* waiter)
: id_(id), waiter_(*waiter) {}
EventNotifier(const EventNotifier&) = delete;
void operator=(const EventNotifier&) = delete;

EventId id_;
EventsWaiter& waiter_;
};

EventsWaiter();
EventsWaiter(const EventsWaiter&) = delete;
EventsWaiter& operator=(const EventsWaiter&) = delete;

// Register a level-triggered event. If the checker returns true or
// EventNotifier::NotifyEvent is called, the corresponding event will be
// distributed.
std::shared_ptr<EventNotifier> RegisterEvent(const std::string& name,
EventChecker checker);

// Register an edge-triggered event. The corresponding event will be
// distributed when EventNotifier::NotifyEvent is called.
std::shared_ptr<EventNotifier> RegisterEvent(const std::string& name);

void UnregisterEvent(const EventId& id);

// Blocking the calling thread to wait any of the registered events.
std::string WaitEvent();

// Nonblocking.
// Clear the slot, no matter whether there is an event.
// Return value:
// -1 : another thread is waiting.
// 0 : succ.
int Clear();

private:
friend EventNotifier;

enum class TriggerType { LevelTriggered, EdgeTriggered };

struct EventInfo {
EventId id;
std::string name;
TriggerType type;
EventChecker checker;
};

void TriggerEvent(const EventId& id);

std::string GetEventName(const EventId& id);

std::unordered_map<EventId, EventInfo> events_;
paddle::memory::SpinLock events_lock_;
std::atomic<std::string*> trigger_event_;
std::atomic<uint64_t> counter_;
std::atomic<bool> waiting_;
EventCount cv_;
};

} // namespace framework
} // namespace paddle
Original file line number Diff line number Diff line change
Expand Up @@ -12,43 +12,14 @@
#include <atomic>
#include <cstdlib>
#include <vector>
#include "paddle/fluid/framework/new_executor/event_count.h"
#include "paddle/fluid/framework/new_executor/run_queue.h"
#include "paddle/fluid/framework/new_executor/thread_environment.h"
#include "paddle/fluid/framework/new_executor/workqueue/event_count.h"
#include "paddle/fluid/framework/new_executor/workqueue/run_queue.h"
#include "paddle/fluid/framework/new_executor/workqueue/thread_environment.h"
#include "paddle/fluid/platform/os_info.h"

namespace paddle {
namespace framework {

template <typename Notifier>
class TaskTracker {
public:
TaskTracker() = default;

explicit TaskTracker(Notifier& notifier) : notifier_(&notifier) {}

TaskTracker(const TaskTracker&) = delete;

TaskTracker& operator=(const TaskTracker&) = delete;

~TaskTracker() = default;

void AddCounter() { num_tasks_.fetch_add(1, std::memory_order_relaxed); }

void SubCounter() {
if (1 == num_tasks_.fetch_sub(1, std::memory_order_relaxed)) {
if (notifier_ != nullptr) {
notifier_->NotifyEvent();
}
}
}

uint64_t PendingTaskNum() { return num_tasks_.load(); }

private:
alignas(64) std::atomic<uint64_t> num_tasks_{0};
Notifier* notifier_{nullptr};
};

template <typename Environment>
class ThreadPoolTempl {
public:
Expand Down
Loading

0 comments on commit 198d11b

Please sign in to comment.