Skip to content

Commit

Permalink
[SYCL] Wait for stream service tasks (#7130)
Browse files Browse the repository at this point in the history
Before the patch explicit waiting for the task which is expected to
print a message using sycl::stream doesn't guarantee that the message is
actually printed(the printing may happen later).
The patch adds such a guarantee by registering events of stream service
tasks in the user event produced for the original user task as well as
in the queue where this task is submitted to. Then on explicit calls to
queue::wait and event::wait we make sure that these additional events
are complete as well.
  • Loading branch information
romanovvlad authored Nov 3, 2022
1 parent c6091df commit 1db0e81
Show file tree
Hide file tree
Showing 12 changed files with 205 additions and 40 deletions.
4 changes: 4 additions & 0 deletions sycl/include/sycl/memory_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ static constexpr std::memory_order getStdMemoryOrder(sycl::memory_order order) {
case memory_order::seq_cst:
return std::memory_order_seq_cst;
}
// Return default value here to avoid compiler warnings.
// default case in switch doesn't help because some compiler warn about
// having a default case while all values of enum are handled.
return std::memory_order_acq_rel;
}
#endif // __SYCL_DEVICE_ONLY__

Expand Down
41 changes: 13 additions & 28 deletions sycl/source/detail/event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,22 @@ event_impl::~event_impl() {

void event_impl::waitInternal() {
if (!MHostEvent && MEvent) {
// Wait for the native event
getPlugin().call<PiApiKind::piEventsWait>(1, &MEvent);
return;
}

if (MState == HES_Discarded)
} else if (MState == HES_Discarded) {
// Waiting for the discarded event is invalid
throw sycl::exception(
make_error_code(errc::invalid),
"waitInternal method cannot be used for a discarded event.");
} else if (MState != HES_Complete) {
// Wait for the host event
std::unique_lock<std::mutex> lock(MMutex);
cv.wait(lock, [this] { return MState == HES_Complete; });
}

if (MState == HES_Complete)
return;

std::unique_lock<std::mutex> lock(MMutex);
cv.wait(lock, [this] { return MState == HES_Complete; });
// Wait for connected events(e.g. streams prints)
for (const EventImplPtr &Event : MPostCompleteEvents)
Event->wait(Event);
}

void event_impl::setComplete() {
Expand Down Expand Up @@ -236,27 +238,10 @@ void event_impl::wait(std::shared_ptr<sycl::detail::event_impl> Self) {

void event_impl::wait_and_throw(
std::shared_ptr<sycl::detail::event_impl> Self) {
Scheduler &Sched = Scheduler::getInstance();

QueueImplPtr submittedQueue = nullptr;
{
Scheduler::ReadLockT Lock(Sched.MGraphLock);
Command *Cmd = static_cast<Command *>(Self->getCommand());
if (Cmd)
submittedQueue = Cmd->getSubmittedQueue();
}
wait(Self);

{
Scheduler::ReadLockT Lock(Sched.MGraphLock);
for (auto &EventImpl : getWaitList()) {
Command *Cmd = (Command *)EventImpl->getCommand();
if (Cmd)
Cmd->getSubmittedQueue()->throw_asynchronous();
}
}
if (submittedQueue)
submittedQueue->throw_asynchronous();
if (QueueImplPtr SubmittedQueue = MSubmittedQueue.lock())
SubmittedQueue->throw_asynchronous();
}

void event_impl::cleanupCommand(
Expand Down
19 changes: 18 additions & 1 deletion sycl/source/detail/event_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,15 @@ class event_impl {
MWorkerQueue = WorkerQueue;
};

/// Sets original queue used for submission.
///
/// @return
void setSubmittedQueue(const QueueImplPtr &SubmittedQueue) {
MSubmittedQueue = SubmittedQueue;
};

QueueImplPtr getSubmittedQueue() const { return MSubmittedQueue.lock(); };

/// Checks if an event is in a fully intialized state. Default-constructed
/// events will return true only after having initialized its native event,
/// while other events will assume that they are fully initialized at
Expand All @@ -234,7 +243,12 @@ class event_impl {
/// state.
bool isInitialized() const noexcept { return MIsInitialized; }

private:
void attachEventToComplete(const EventImplPtr &Event) {
std::lock_guard<std::mutex> Lock(MMutex);
MPostCompleteEvents.push_back(Event);
}

protected:
// When instrumentation is enabled emits trace event for event wait begin and
// returns the telemetry event generated for the wait
void *instrumentationProlog(std::string &Name, int32_t StreamID,
Expand All @@ -257,11 +271,14 @@ class event_impl {
const bool MIsProfilingEnabled = false;

std::weak_ptr<queue_impl> MWorkerQueue;
std::weak_ptr<queue_impl> MSubmittedQueue;

/// Dependency events prepared for waiting by backend.
std::vector<EventImplPtr> MPreparedDepsEvents;
std::vector<EventImplPtr> MPreparedHostDepsEvents;

std::vector<EventImplPtr> MPostCompleteEvents;

/// Indicates that the task associated with this event has been submitted by
/// the queue to the device.
std::atomic<bool> MIsFlushed = false;
Expand Down
9 changes: 9 additions & 0 deletions sycl/source/detail/queue_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,15 @@ void queue_impl::wait(const detail::code_location &CodeLoc) {
for (event &Event : SharedEvents)
Event.wait();
}

std::vector<EventImplPtr> StreamsServiceEvents;
{
std::lock_guard<std::mutex> Lock(MMutex);
StreamsServiceEvents.swap(MStreamsServiceEvents);
}
for (const EventImplPtr &Event : StreamsServiceEvents)
Event->wait(Event);

#ifdef XPTI_ENABLE_INSTRUMENTATION
instrumentationEpilog(TelemetryEvent, Name, StreamID, IId);
#endif
Expand Down
11 changes: 9 additions & 2 deletions sycl/source/detail/queue_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,11 @@ class queue_impl {
return MAssertHappenedBuffer;
}

void registerStreamServiceEvent(const EventImplPtr &Event) {
std::lock_guard<std::mutex> Lock(MMutex);
MStreamsServiceEvents.push_back(Event);
}

protected:
// template is needed for proper unit testing
template <typename HandlerType = handler>
Expand Down Expand Up @@ -480,7 +485,7 @@ class queue_impl {
EventRet = Handler.finalize();
}

private:
protected:
/// Helper function for checking whether a device is either a member of a
/// context or a descendnant of its member.
/// \return True iff the device or its parent is a member of the context.
Expand Down Expand Up @@ -610,12 +615,14 @@ class queue_impl {

const bool MIsInorder;

std::vector<EventImplPtr> MStreamsServiceEvents;

public:
// Queue constructed with the discard_events property
const bool MDiscardEvents;
const bool MIsProfilingEnabled;

private:
protected:
// This flag says if we can discard events based on a queue "setup" which will
// be common for all operations submitted to the queue. This is a must
// condition for discarding, but even if it's true, in some cases, we won't be
Expand Down
6 changes: 3 additions & 3 deletions sycl/source/detail/scheduler/commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,9 @@ Command::Command(CommandType Type, QueueImplPtr Queue)
MPreparedDepsEvents(MEvent->getPreparedDepsEvents()),
MPreparedHostDepsEvents(MEvent->getPreparedHostDepsEvents()),
MType(Type) {
MSubmittedQueue = MQueue;
MWorkerQueue = MQueue;
MEvent->setWorkerQueue(MWorkerQueue);
MEvent->setSubmittedQueue(MWorkerQueue);
MEvent->setCommand(this);
MEvent->setContextImpl(MQueue->getContextImplPtr());
MEvent->setStateIncomplete();
Expand Down Expand Up @@ -1712,8 +1712,8 @@ ExecCGCommand::ExecCGCommand(std::unique_ptr<detail::CG> CommandGroup,
: Command(CommandType::RUN_CG, std::move(Queue)),
MCommandGroup(std::move(CommandGroup)) {
if (MCommandGroup->getType() == detail::CG::CodeplayHostTask) {
MSubmittedQueue =
static_cast<detail::CGHostTask *>(MCommandGroup.get())->MQueue;
MEvent->setSubmittedQueue(
static_cast<detail::CGHostTask *>(MCommandGroup.get())->MQueue);
MEvent->setNeedsCleanupAfterWait(true);
} else if (MCommandGroup->getType() == CG::CGTYPE::Kernel &&
(static_cast<CGExecKernel *>(MCommandGroup.get())->hasStreams() ||
Expand Down
3 changes: 0 additions & 3 deletions sycl/source/detail/scheduler/commands.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,6 @@ class Command {

const QueueImplPtr &getQueue() const { return MQueue; }

const QueueImplPtr &getSubmittedQueue() const { return MSubmittedQueue; }

const EventImplPtr &getEvent() const { return MEvent; }

// Methods needed to support SYCL instrumentation
Expand Down Expand Up @@ -216,7 +214,6 @@ class Command {

protected:
QueueImplPtr MQueue;
QueueImplPtr MSubmittedQueue;
EventImplPtr MEvent;
QueueImplPtr MWorkerQueue;

Expand Down
2 changes: 1 addition & 1 deletion sycl/source/detail/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
cleanupCommands(ToCleanUp);

for (auto StreamImplPtr : Streams) {
StreamImplPtr->flush();
StreamImplPtr->flush(NewEvent);
}

return NewEvent;
Expand Down
12 changes: 10 additions & 2 deletions sycl/source/detail/stream_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//
//===----------------------------------------------------------------------===//

#include <detail/queue_impl.hpp>
#include <detail/scheduler/scheduler.hpp>
#include <detail/stream_impl.hpp>
#include <sycl/queue.hpp>
Expand Down Expand Up @@ -68,13 +69,13 @@ size_t stream_impl::get_size() const { return BufferSize_; }

size_t stream_impl::get_max_statement_size() const { return MaxStatementSize_; }

void stream_impl::flush() {
void stream_impl::flush(const EventImplPtr &LeadEvent) {
// We don't want stream flushing to be blocking operation that is why submit a
// host task to print stream buffer. It will fire up as soon as the kernel
// finishes execution.
auto Q = detail::createSyclObjFromImpl<queue>(
sycl::detail::Scheduler::getInstance().getDefaultHostQueue());
Q.submit([&](handler &cgh) {
event Event = Q.submit([&](handler &cgh) {
auto BufHostAcc =
detail::Scheduler::getInstance()
.StreamBuffersPool.find(this)
Expand All @@ -96,7 +97,14 @@ void stream_impl::flush() {
fflush(stdout);
});
});
if (LeadEvent) {
LeadEvent->attachEventToComplete(detail::getSyclObjImpl(Event));
LeadEvent->getSubmittedQueue()->registerStreamServiceEvent(
detail::getSyclObjImpl(Event));
}
}

void stream_impl::flush() { flush(nullptr); }
} // namespace detail
} // __SYCL_INLINE_VER_NAMESPACE(_V1)
} // namespace sycl
6 changes: 6 additions & 0 deletions sycl/source/detail/stream_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ class __SYCL_EXPORT stream_impl {
GlobalOffsetAccessorT accessGlobalOffset(handler &CGH);

// Enqueue task to copy stream buffer to the host and print the contents
// The host task event is then registered for post processing in the
// LeadEvent as well as in queue LeadEvent associated with.
void flush(const EventImplPtr &LeadEvent);

// Enqueue task to copy stream buffer to the host and print the contents
// Remove during next ABI breaking window
void flush();

size_t get_size() const;
Expand Down
1 change: 1 addition & 0 deletions sycl/test/abi/sycl_symbols_linux.dump
Original file line number Diff line number Diff line change
Expand Up @@ -3756,6 +3756,7 @@ _ZN4sycl3_V16detail11make_kernelEmRKNS0_7contextENS0_7backendE
_ZN4sycl3_V16detail11stream_impl15accessGlobalBufERNS0_7handlerE
_ZN4sycl3_V16detail11stream_impl18accessGlobalOffsetERNS0_7handlerE
_ZN4sycl3_V16detail11stream_impl20accessGlobalFlushBufERNS0_7handlerE
_ZN4sycl3_V16detail11stream_impl5flushERKSt10shared_ptrINS1_10event_implEE
_ZN4sycl3_V16detail11stream_impl5flushEv
_ZN4sycl3_V16detail11stream_implC1EmmRKNS0_13property_listE
_ZN4sycl3_V16detail11stream_implC1EmmRNS0_7handlerE
Expand Down
Loading

0 comments on commit 1db0e81

Please sign in to comment.