Skip to content

Commit

Permalink
[SYCL] Small refactoring in scheduler (#7140)
Browse files Browse the repository at this point in the history
1. Change some functions to take std::shared_ptr by const reference
       rather than by value.
    2. Modify the helper for acquiring the graph lock to return a lock
       rather than take it as an out parameter.
    3. Move host task post processing code to a new scheduler API to
       avoid locking graph lock outside of the scheduler.
  • Loading branch information
romanovvlad authored Oct 24, 2022
1 parent 1b79491 commit 6725863
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 80 deletions.
23 changes: 1 addition & 22 deletions sycl/source/detail/scheduler/commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,28 +325,7 @@ class DispatchHostTask {
EmptyCommand *EmptyCmd = MThisCmd->MEmptyCmd;
assert(EmptyCmd && "No empty command found");

// Completing command's event along with unblocking enqueue readiness of
// empty command may lead to quick deallocation of MThisCmd by some cleanup
// process. Thus we'll copy deps prior to completing of event and unblocking
// of empty command.
// Also, it's possible to have record deallocated prior to enqueue process.
// Thus we employ read-lock of graph.
std::vector<Command *> ToCleanUp;
Scheduler &Sched = Scheduler::getInstance();
{
Scheduler::ReadLockT Lock(Sched.MGraphLock);

std::vector<DepDesc> Deps = MThisCmd->MDeps;

// update self-event status
MThisCmd->MEvent->setComplete();

EmptyCmd->MEnqueueStatus = EnqueueResultT::SyclEnqueueReady;

for (const DepDesc &Dep : Deps)
Scheduler::enqueueLeavesOfReqUnlocked(Dep.MDepRequirement, ToCleanUp);
}
Sched.cleanupCommands(ToCleanUp);
Scheduler::getInstance().NotifyHostTaskCompletion(MThisCmd, EmptyCmd);
}
};

Expand Down
8 changes: 4 additions & 4 deletions sycl/source/detail/scheduler/graph_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ Scheduler::GraphBuilder::addHostAccessor(Requirement *Req,
}

Command *Scheduler::GraphBuilder::addCGUpdateHost(
std::unique_ptr<detail::CG> CommandGroup, QueueImplPtr HostQueue,
std::unique_ptr<detail::CG> CommandGroup, const QueueImplPtr &HostQueue,
std::vector<Command *> &ToEnqueue) {

auto UpdateHost = static_cast<CGUpdateHost *>(CommandGroup.get());
Expand Down Expand Up @@ -668,7 +668,7 @@ static bool checkHostUnifiedMemory(const ContextImplPtr &Ctx) {
// Note, creation of new allocation command can lead to the current context
// (Record->MCurContext) change.
AllocaCommandBase *Scheduler::GraphBuilder::getOrCreateAllocaForReq(
MemObjRecord *Record, const Requirement *Req, QueueImplPtr Queue,
MemObjRecord *Record, const Requirement *Req, const QueueImplPtr &Queue,
std::vector<Command *> &ToEnqueue) {

AllocaCommandBase *AllocaCmd = findAllocaForReq(
Expand Down Expand Up @@ -919,7 +919,7 @@ static void combineAccessModesOfReqs(std::vector<Requirement *> &Reqs) {

Command *
Scheduler::GraphBuilder::addCG(std::unique_ptr<detail::CG> CommandGroup,
QueueImplPtr Queue,
const QueueImplPtr &Queue,
std::vector<Command *> &ToEnqueue) {
std::vector<Requirement *> &Reqs = CommandGroup->MRequirements;
const std::vector<detail::EventImplPtr> &Events = CommandGroup->MEvents;
Expand Down Expand Up @@ -1302,7 +1302,7 @@ void Scheduler::GraphBuilder::removeRecordForMemObj(SYCLMemObjI *MemObject) {
// requirement.
// Optionality of Dep is set by Dep.MDepCommand equal to nullptr.
Command *Scheduler::GraphBuilder::connectDepEvent(
Command *const Cmd, EventImplPtr DepEvent, const DepDesc &Dep,
Command *const Cmd, const EventImplPtr &DepEvent, const DepDesc &Dep,
std::vector<Command *> &ToCleanUp) {
assert(Cmd->getWorkerContext() != DepEvent->getContextImpl());

Expand Down
2 changes: 1 addition & 1 deletion sycl/source/detail/scheduler/graph_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ static Command *getCommand(const EventImplPtr &Event) {
return (Command *)Event->getCommand();
}

void Scheduler::GraphProcessor::waitForEvent(EventImplPtr Event,
void Scheduler::GraphProcessor::waitForEvent(const EventImplPtr &Event,
ReadLockT &GraphReadLock,
std::vector<Command *> &ToCleanUp,
bool LockTheLock) {
Expand Down
76 changes: 38 additions & 38 deletions sycl/source/detail/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void Scheduler::waitForRecordToFinish(MemObjRecord *Record,
}

EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
QueueImplPtr Queue) {
const QueueImplPtr &Queue) {
EventImplPtr NewEvent = nullptr;
const CG::CGTYPE Type = CommandGroup->getType();
std::vector<Command *> AuxiliaryCmds;
Expand All @@ -93,8 +93,7 @@ EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
}

{
WriteLockT Lock(MGraphLock, std::defer_lock);
acquireWriteLock(Lock);
WriteLockT Lock = acquireWriteLock();

Command *NewCmd = nullptr;
switch (Type) {
Expand All @@ -115,7 +114,7 @@ EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,

std::vector<Command *> ToCleanUp;
{
ReadLockT Lock(MGraphLock);
ReadLockT Lock = acquireReadLock();

Command *NewCmd = static_cast<Command *>(NewEvent->getCommand());

Expand Down Expand Up @@ -172,8 +171,7 @@ EventImplPtr Scheduler::addCopyBack(Requirement *Req) {
std::vector<Command *> AuxiliaryCmds;
Command *NewCmd = nullptr;
{
WriteLockT Lock(MGraphLock, std::defer_lock);
acquireWriteLock(Lock);
WriteLockT Lock = acquireWriteLock();
NewCmd = MGraphBuilder.addCopyBack(Req, AuxiliaryCmds);
// Command was not creted because there were no operations with
// buffer.
Expand All @@ -183,7 +181,7 @@ EventImplPtr Scheduler::addCopyBack(Requirement *Req) {

std::vector<Command *> ToCleanUp;
try {
ReadLockT Lock(MGraphLock);
ReadLockT Lock = acquireReadLock();
EnqueueResultT Res;
bool Enqueued;

Expand All @@ -210,8 +208,8 @@ Scheduler &Scheduler::getInstance() {
return GlobalHandler::instance().getScheduler();
}

void Scheduler::waitForEvent(EventImplPtr Event) {
ReadLockT Lock(MGraphLock);
void Scheduler::waitForEvent(const EventImplPtr &Event) {
ReadLockT Lock = acquireReadLock();
// It's fine to leave the lock unlocked upon return from waitForEvent as
// there's no more actions to do here with graph
std::vector<Command *> ToCleanUp;
Expand All @@ -230,7 +228,7 @@ static void deallocateStreams(
StreamImplPtr->get());
}

void Scheduler::cleanupFinishedCommands(EventImplPtr FinishedEvent) {
void Scheduler::cleanupFinishedCommands(const EventImplPtr &FinishedEvent) {
// We are going to traverse a graph of finished commands. Gather stream
// objects from these commands if any and deallocate buffers for these stream
// objects, this is needed to guarantee that streamed data is printed and
Expand Down Expand Up @@ -276,7 +274,7 @@ void Scheduler::removeMemoryObject(detail::SYCLMemObjI *MemObj) {
{
// This only needs a shared mutex as it only involves enqueueing and
// awaiting for events
ReadLockT Lock(MGraphLock);
ReadLockT Lock = acquireReadLock();

Record = MGraphBuilder.getMemObjRecord(MemObj);
if (!Record)
Expand All @@ -287,8 +285,7 @@ void Scheduler::removeMemoryObject(detail::SYCLMemObjI *MemObj) {
}

{
WriteLockT Lock(MGraphLock, std::defer_lock);
acquireWriteLock(Lock);
WriteLockT Lock = acquireWriteLock();
MGraphBuilder.decrementLeafCountersForRecord(Record);
MGraphBuilder.cleanupCommandsForRecord(Record, StreamsToDeallocate,
AuxResourcesToDeallocate);
Expand All @@ -303,8 +300,7 @@ EventImplPtr Scheduler::addHostAccessor(Requirement *Req) {
EventImplPtr NewCmdEvent = nullptr;

{
WriteLockT Lock(MGraphLock, std::defer_lock);
acquireWriteLock(Lock);
WriteLockT Lock = acquireWriteLock();

Command *NewCmd = MGraphBuilder.addHostAccessor(Req, AuxiliaryCmds);
if (!NewCmd)
Expand All @@ -314,7 +310,7 @@ EventImplPtr Scheduler::addHostAccessor(Requirement *Req) {

std::vector<Command *> ToCleanUp;
{
ReadLockT ReadLock(MGraphLock);
ReadLockT Lock = acquireReadLock();
EnqueueResultT Res;
bool Enqueued;

Expand Down Expand Up @@ -342,7 +338,7 @@ void Scheduler::releaseHostAccessor(Requirement *Req) {

std::vector<Command *> ToCleanUp;
{
ReadLockT Lock(MGraphLock);
ReadLockT Lock = acquireReadLock();

assert(BlockedCmd && "Can't find appropriate command to unblock");

Expand Down Expand Up @@ -416,27 +412,6 @@ Scheduler::~Scheduler() {
cleanupCommands({});
}

void Scheduler::acquireWriteLock(WriteLockT &Lock) {
#ifdef _WIN32
// Avoiding deadlock situation for MSVC. std::shared_timed_mutex specification
// does not specify a priority for shared and exclusive accesses. It will be a
// deadlock in MSVC's std::shared_timed_mutex implementation, if exclusive
// access occurs after shared access.
// TODO: after switching to C++17, change std::shared_timed_mutex to
// std::shared_mutex and use std::lock_guard here both for Windows and Linux.
while (!Lock.try_lock_for(std::chrono::milliseconds(10))) {
// Without yield while loop acts like endless while loop and occupies the
// whole CPU when multiple command groups are created in multiple host
// threads
std::this_thread::yield();
}
#else
// It is a deadlock on UNIX in implementation of lock and lock_shared, if
// try_lock in the loop above will be executed, so using a single lock here
Lock.lock();
#endif // _WIN32
}

MemObjRecord *Scheduler::getMemObjRecord(const Requirement *const Req) {
return Req->MSYCLMemObj->MRecord.get();
}
Expand Down Expand Up @@ -472,6 +447,31 @@ void Scheduler::cleanupCommands(const std::vector<Command *> &Cmds) {
}
}

void Scheduler::NotifyHostTaskCompletion(Command *Cmd, Command *BlockingCmd) {
// Completing command's event along with unblocking enqueue readiness of
// empty command may lead to quick deallocation of MThisCmd by some cleanup
// process. Thus we'll copy deps prior to completing of event and unblocking
// of empty command.
// Also, it's possible to have record deallocated prior to enqueue process.
// Thus we employ read-lock of graph.

std::vector<Command *> ToCleanUp;
{
ReadLockT Lock = acquireReadLock();

std::vector<DepDesc> Deps = Cmd->MDeps;

// update self-event status
Cmd->getEvent()->setComplete();

BlockingCmd->MEnqueueStatus = EnqueueResultT::SyclEnqueueReady;

for (const DepDesc &Dep : Deps)
Scheduler::enqueueLeavesOfReqUnlocked(Dep.MDepRequirement, ToCleanUp);
}
cleanupCommands(ToCleanUp);
}

} // namespace detail
} // __SYCL_INLINE_VER_NAMESPACE(_V1)
} // namespace sycl
51 changes: 36 additions & 15 deletions sycl/source/detail/scheduler/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ class Scheduler {
/// \param CommandGroup is a unique_ptr to a command group to be added.
/// \return an event object to wait on for command group completion.
EventImplPtr addCG(std::unique_ptr<detail::CG> CommandGroup,
QueueImplPtr Queue);
const QueueImplPtr &Queue);

/// Registers a command group, that copies most recent memory to the memory
/// pointed by the requirement.
Expand All @@ -380,7 +380,7 @@ class Scheduler {
/// corresponding function of device API.
///
/// \param Event is a pointer to event to wait on.
void waitForEvent(EventImplPtr Event);
void waitForEvent(const EventImplPtr &Event);

/// Removes buffer from the graph.
///
Expand All @@ -404,7 +404,7 @@ class Scheduler {
/// \sa GraphBuilder::cleanupFinishedCommands
///
/// \param FinishedEvent is a cleanup candidate event.
void cleanupFinishedCommands(EventImplPtr FinishedEvent);
void cleanupFinishedCommands(const EventImplPtr &FinishedEvent);

/// Adds nodes to the graph, that update the requirement with the pointer
/// to the host memory.
Expand Down Expand Up @@ -441,26 +441,45 @@ class Scheduler {

QueueImplPtr getDefaultHostQueue() { return DefaultHostQueue; }

const QueueImplPtr &getDefaultHostQueue() const { return DefaultHostQueue; }

static MemObjRecord *getMemObjRecord(const Requirement *const Req);

Scheduler();
~Scheduler();

protected:
// TODO: after switching to C++17, change std::shared_timed_mutex to
// std::shared_mutex
using RWLockT = std::shared_timed_mutex;
using ReadLockT = std::shared_lock<RWLockT>;
using WriteLockT = std::unique_lock<RWLockT>;

/// Provides exclusive access to std::shared_timed_mutex object with deadlock
/// avoidance
///
/// \param Lock is an instance of WriteLockT, created with \c std::defer_lock
void acquireWriteLock(WriteLockT &Lock);
WriteLockT acquireWriteLock() {
#ifdef _WIN32
WriteLockT Lock(MGraphLock, std::defer_lock);
while (!Lock.try_lock_for(std::chrono::milliseconds(10))) {
// Without yield while loop acts like endless while loop and occupies the
// whole CPU when multiple command groups are created in multiple host
// threads
std::this_thread::yield();
}
#else
WriteLockT Lock(MGraphLock);
// It is a deadlock on UNIX in implementation of lock and lock_shared, if
// try_lock in the loop above will be executed, so using a single lock here
#endif // _WIN32
return std::move(Lock);
}

/// Provides shared access to std::shared_timed_mutex object with deadlock
/// avoidance
ReadLockT acquireReadLock() { return ReadLockT{MGraphLock}; }

void cleanupCommands(const std::vector<Command *> &Cmds);

void NotifyHostTaskCompletion(Command *Cmd, Command *BlockingCmd);

static void enqueueLeavesOfReqUnlocked(const Requirement *const Req,
std::vector<Command *> &ToCleanUp);

Expand All @@ -479,15 +498,16 @@ class Scheduler {
/// \sa queue::submit, Scheduler::addCG
///
/// \return a command that represents command group execution.
Command *addCG(std::unique_ptr<detail::CG> CommandGroup, QueueImplPtr Queue,
Command *addCG(std::unique_ptr<detail::CG> CommandGroup,
const QueueImplPtr &Queue,
std::vector<Command *> &ToEnqueue);

/// Registers a \ref CG "command group" that updates host memory to the
/// latest state.
///
/// \return a command that represents command group execution.
Command *addCGUpdateHost(std::unique_ptr<detail::CG> CommandGroup,
QueueImplPtr HostQueue,
const QueueImplPtr &HostQueue,
std::vector<Command *> &ToEnqueue);

/// Enqueues a command to update memory to the latest state.
Expand All @@ -506,7 +526,7 @@ class Scheduler {

/// [Provisional] Optimizes subgraph that consists of command associated
/// with Event passed and its dependencies.
void optimize(EventImplPtr Event);
void optimize(const EventImplPtr &Event);

void cleanupCommand(Command *Cmd);

Expand All @@ -523,7 +543,7 @@ class Scheduler {
/// used when the user provides a "secondary" queue to the submit method
/// which may be used when the command fails to enqueue/execute in the
/// primary queue.
void rescheduleCommand(Command *Cmd, QueueImplPtr Queue);
void rescheduleCommand(Command *Cmd, const QueueImplPtr &Queue);

/// \return a pointer to the corresponding memory object record for the
/// SYCL memory object provided, or nullptr if it does not exist.
Expand Down Expand Up @@ -566,7 +586,7 @@ class Scheduler {
/// \returns the connecting command which is to be enqueued
///
/// Optionality of Dep is set by Dep.MDepCommand equal to nullptr.
Command *connectDepEvent(Command *const Cmd, EventImplPtr DepEvent,
Command *connectDepEvent(Command *const Cmd, const EventImplPtr &DepEvent,
const DepDesc &Dep,
std::vector<Command *> &ToCleanUp);

Expand Down Expand Up @@ -631,7 +651,7 @@ class Scheduler {
/// If none found, creates new one.
AllocaCommandBase *
getOrCreateAllocaForReq(MemObjRecord *Record, const Requirement *Req,
QueueImplPtr Queue,
const QueueImplPtr &Queue,
std::vector<Command *> &ToEnqueue);

void markModifiedIfWrite(MemObjRecord *Record, Requirement *Req);
Expand Down Expand Up @@ -738,7 +758,8 @@ class Scheduler {
///
/// The function may unlock and lock GraphReadLock as needed. Upon return
/// the lock is left in locked state if and only if LockTheLock is true.
static void waitForEvent(EventImplPtr Event, ReadLockT &GraphReadLock,
static void waitForEvent(const EventImplPtr &Event,
ReadLockT &GraphReadLock,
std::vector<Command *> &ToCleanUp,
bool LockTheLock = true);

Expand Down

0 comments on commit 6725863

Please sign in to comment.