Skip to content

[SYCL] Remove OoO Emulation #17943

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 10 additions & 70 deletions sycl/source/detail/queue_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ template <>
uint32_t queue_impl::get_info<info::queue::reference_count>() const {
ur_result_t result = UR_RESULT_SUCCESS;
getAdapter()->call<UrApiKind::urQueueGetInfo>(
MQueues[0], UR_QUEUE_INFO_REFERENCE_COUNT, sizeof(result), &result,
nullptr);
MQueue, UR_QUEUE_INFO_REFERENCE_COUNT, sizeof(result), &result, nullptr);
return result;
}

Expand Down Expand Up @@ -303,55 +302,14 @@ void queue_impl::addEvent(const event &Event) {
const EventImplPtr &EImpl = getSyclObjImpl(Event);
assert(EImpl && "Event implementation is missing");
auto *Cmd = static_cast<Command *>(EImpl->getCommand());
if (!Cmd) {
// if there is no command on the event, we cannot track it with MEventsWeak
// as that will leave it with no owner. Track in MEventsShared only if we're
// unable to call urQueueFinish during wait.
if (MEmulateOOO)
addSharedEvent(Event);
}
// As long as the queue supports urQueueFinish we only need to store events
// for undiscarded, unenqueued commands and host tasks.
else if (MEmulateOOO ||
(EImpl->getHandle() == nullptr && !EImpl->isDiscarded())) {
if (Cmd != nullptr && EImpl->getHandle() == nullptr &&
!EImpl->isDiscarded()) {
std::weak_ptr<event_impl> EventWeakPtr{EImpl};
std::lock_guard<std::mutex> Lock{MMutex};
MEventsWeak.push_back(std::move(EventWeakPtr));
}
}

/// addSharedEvent - queue_impl tracks events with weak pointers
/// but some events have no other owner. In this case,
/// addSharedEvent will have the queue track the events via a shared pointer.
void queue_impl::addSharedEvent(const event &Event) {
assert(MEmulateOOO);
std::lock_guard<std::mutex> Lock(MMutex);
// Events stored in MEventsShared are not released anywhere else aside from
// calls to queue::wait/wait_and_throw, which a user application might not
// make, and ~queue_impl(). If the number of events grows large enough,
// there's a good chance that most of them are already completed and ownership
// of them can be released.
const size_t EventThreshold = 128;
if (MEventsShared.size() >= EventThreshold) {
// Generally, the vector is ordered so that the oldest events are in the
// front and the newer events are in the end. So, search to find the first
// event that isn't yet complete. All the events prior to that can be
// erased. This could leave some few events further on that have completed
// not yet erased, but that is OK. This cleanup doesn't have to be perfect.
// This also keeps the algorithm linear rather than quadratic because it
// doesn't continually recheck things towards the back of the list that
// really haven't had time to complete.
MEventsShared.erase(
MEventsShared.begin(),
std::find_if(
MEventsShared.begin(), MEventsShared.end(), [](const event &E) {
return E.get_info<info::event::command_execution_status>() !=
info::event_command_status::complete;
}));
}
MEventsShared.push_back(Event);
}

event queue_impl::submit_impl(const detail::type_erased_cgfo_ty &CGF,
const std::shared_ptr<queue_impl> &Self,
const std::shared_ptr<queue_impl> &PrimaryQueue,
Expand Down Expand Up @@ -487,9 +445,7 @@ event queue_impl::submitMemOpHelper(const std::shared_ptr<queue_impl> &Self,
: MExtGraphDeps.LastEventPtr;
EventToStoreIn = EventImpl;
}
// Track only if we won't be able to handle it with urQueueFinish.
if (MEmulateOOO)
addSharedEvent(ResEvent);

return discard_or_return(ResEvent);
}
}
Expand Down Expand Up @@ -597,10 +553,9 @@ void queue_impl::wait(const detail::code_location &CodeLoc) {

// Additionally, we can clean up the event lists that we would have
// otherwise cleared.
if (!MEventsWeak.empty() || !MEventsShared.empty()) {
if (!MEventsWeak.empty()) {
std::lock_guard<std::mutex> Lock(MMutex);
MEventsWeak.clear();
MEventsShared.clear();
}
if (!MStreamsServiceEvents.empty()) {
std::lock_guard<std::mutex> Lock(MStreamsServiceEventsMutex);
Expand All @@ -609,11 +564,9 @@ void queue_impl::wait(const detail::code_location &CodeLoc) {
}

std::vector<std::weak_ptr<event_impl>> WeakEvents;
std::vector<event> SharedEvents;
{
std::lock_guard<std::mutex> Lock(MMutex);
WeakEvents.swap(MEventsWeak);
SharedEvents.swap(MEventsShared);

{
std::lock_guard<std::mutex> RequestLock(MMissedCleanupRequestsMtx);
Expand All @@ -627,27 +580,19 @@ void queue_impl::wait(const detail::code_location &CodeLoc) {
// directly. Otherwise, only wait for unenqueued or host task events, starting
// from the latest submitted task in order to minimize total amount of calls,
// then handle the rest with urQueueFinish.
const bool SupportsPiFinish = !MEmulateOOO;
for (auto EventImplWeakPtrIt = WeakEvents.rbegin();
EventImplWeakPtrIt != WeakEvents.rend(); ++EventImplWeakPtrIt) {
if (std::shared_ptr<event_impl> EventImplSharedPtr =
EventImplWeakPtrIt->lock()) {
// A nullptr UR event indicates that urQueueFinish will not cover it,
// either because it's a host task event or an unenqueued one.
if (!SupportsPiFinish || nullptr == EventImplSharedPtr->getHandle()) {
if (nullptr == EventImplSharedPtr->getHandle()) {
EventImplSharedPtr->wait(EventImplSharedPtr);
}
}
}
if (SupportsPiFinish) {
const AdapterPtr &Adapter = getAdapter();
Adapter->call<UrApiKind::urQueueFinish>(getHandleRef());
assert(SharedEvents.empty() && "Queues that support calling piQueueFinish "
"shouldn't have shared events");
} else {
for (event &Event : SharedEvents)
Event.wait();
}
const AdapterPtr &Adapter = getAdapter();
Adapter->call<UrApiKind::urQueueFinish>(getHandleRef());

std::vector<EventImplPtr> StreamsServiceEvents;
{
Expand Down Expand Up @@ -727,7 +672,7 @@ ur_native_handle_t queue_impl::getNative(int32_t &NativeHandleDesc) const {
nullptr, nullptr};
UrNativeDesc.pNativeData = &NativeHandleDesc;

Adapter->call<UrApiKind::urQueueGetNativeHandle>(MQueues[0], &UrNativeDesc,
Adapter->call<UrApiKind::urQueueGetNativeHandle>(MQueue, &UrNativeDesc,
&Handle);
if (getContextImplPtr()->getBackend() == backend::opencl)
__SYCL_OCL_CALL(clRetainCommandQueue, ur::cast<cl_command_queue>(Handle));
Expand Down Expand Up @@ -756,18 +701,13 @@ bool queue_impl::ext_oneapi_empty() const {
// Check the status of the backend queue if this is not a host queue.
ur_bool_t IsReady = false;
getAdapter()->call<UrApiKind::urQueueGetInfo>(
MQueues[0], UR_QUEUE_INFO_EMPTY, sizeof(IsReady), &IsReady, nullptr);
MQueue, UR_QUEUE_INFO_EMPTY, sizeof(IsReady), &IsReady, nullptr);
if (!IsReady)
return false;

// We may have events like host tasks which are not submitted to the backend
// queue so we need to get their status separately.
std::lock_guard<std::mutex> Lock(MMutex);
for (event Event : MEventsShared)
if (Event.get_info<info::event::command_execution_status>() !=
info::event_command_status::complete)
return false;

for (auto EventImplWeakPtrIt = MEventsWeak.begin();
EventImplWeakPtrIt != MEventsWeak.end(); ++EventImplWeakPtrIt)
if (std::shared_ptr<event_impl> EventImplSharedPtr =
Expand Down
76 changes: 10 additions & 66 deletions sycl/source/detail/queue_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class queue_impl {
}
const QueueOrder QOrder =
MIsInorder ? QueueOrder::Ordered : QueueOrder::OOO;
MQueues.push_back(createQueue(QOrder));
MQueue = createQueue(QOrder);
// This section is the second part of the instrumentation that uses the
// tracepoint information and notifies

Expand All @@ -191,13 +191,13 @@ class queue_impl {
"discard_events and enable_profiling.");
}

MQueues.push_back(UrQueue);
MQueue = UrQueue;

ur_device_handle_t DeviceUr{};
const AdapterPtr &Adapter = getAdapter();
// TODO catch an exception and put it to list of asynchronous exceptions
Adapter->call<UrApiKind::urQueueGetInfo>(
MQueues[0], UR_QUEUE_INFO_DEVICE, sizeof(DeviceUr), &DeviceUr, nullptr);
MQueue, UR_QUEUE_INFO_DEVICE, sizeof(DeviceUr), &DeviceUr, nullptr);
MDevice = MContext->findMatchingDeviceImpl(DeviceUr);
if (MDevice == nullptr) {
throw sycl::exception(
Expand Down Expand Up @@ -264,7 +264,7 @@ class queue_impl {
destructorNotification();
#endif
throw_asynchronous();
getAdapter()->call<UrApiKind::urQueueRelease>(MQueues[0]);
getAdapter()->call<UrApiKind::urQueueRelease>(MQueue);
} catch (std::exception &e) {
__SYCL_REPORT_EXCEPTION_TO_STREAM("exception in ~queue_impl", e);
}
Expand All @@ -274,7 +274,7 @@ class queue_impl {

cl_command_queue get() {
ur_native_handle_t nativeHandle = 0;
getAdapter()->call<UrApiKind::urQueueGetNativeHandle>(MQueues[0], nullptr,
getAdapter()->call<UrApiKind::urQueueGetNativeHandle>(MQueue, nullptr,
&nativeHandle);
__SYCL_OCL_CALL(clRetainCommandQueue, ur::cast<cl_command_queue>(nativeHandle));
return ur::cast<cl_command_queue>(nativeHandle);
Expand Down Expand Up @@ -322,9 +322,7 @@ class queue_impl {
"flush cannot be called for a queue which is "
"recording to a command graph.");
}
for (const auto &queue : MQueues) {
getAdapter()->call<UrApiKind::urQueueFlush>(queue);
}
getAdapter()->call<UrApiKind::urQueueFlush>(MQueue);
}

/// Submits a command group function object to the queue, in order to be
Expand Down Expand Up @@ -540,62 +538,15 @@ class queue_impl {
.get_index();
Properties.pNext = &IndexProperties;
}
ur_result_t Error = Adapter->call_nocheck<UrApiKind::urQueueCreate>(
Context, Device, &Properties, &Queue);

// If creating out-of-order queue failed and this property is not
// supported (for example, on FPGA), it will return
// UR_RESULT_ERROR_INVALID_QUEUE_PROPERTIES and will try to create in-order
// queue.
if (!MEmulateOOO && Error == UR_RESULT_ERROR_INVALID_QUEUE_PROPERTIES) {
MEmulateOOO = true;
Queue = createQueue(QueueOrder::Ordered);
} else {
Adapter->checkUrResult(Error);
}
Adapter->call<UrApiKind::urQueueCreate>(Context, Device, &Properties,
&Queue);

return Queue;
}

/// \return a raw UR handle for a free queue. The returned handle is not
/// retained. It is caller responsibility to make sure queue is still alive.
ur_queue_handle_t &getExclusiveUrQueueHandleRef() {
ur_queue_handle_t *PIQ = nullptr;
bool ReuseQueue = false;
{
std::lock_guard<std::mutex> Lock(MMutex);

// To achieve parallelism for FPGA with in order execution model with
// possibility of two kernels to share data with each other we shall
// create a queue for every kernel enqueued.
if (MQueues.size() < MaxNumQueues) {
MQueues.push_back({});
PIQ = &MQueues.back();
} else {
// If the limit of OpenCL queues is going to be exceeded - take the
// earliest used queue, wait until it finished and then reuse it.
PIQ = &MQueues[MNextQueueIdx];
MNextQueueIdx = (MNextQueueIdx + 1) % MaxNumQueues;
ReuseQueue = true;
}
}

if (!ReuseQueue)
*PIQ = createQueue(QueueOrder::Ordered);
else
getAdapter()->call<UrApiKind::urQueueFinish>(*PIQ);

return *PIQ;
}

/// \return a raw UR queue handle. The returned handle is not retained. It
/// is caller responsibility to make sure queue is still alive.
ur_queue_handle_t &getHandleRef() {
if (!MEmulateOOO)
return MQueues[0];

return getExclusiveUrQueueHandleRef();
}
ur_queue_handle_t &getHandleRef() { return MQueue; }

/// \return true if the queue was constructed with property specified by
/// PropertyT.
Expand Down Expand Up @@ -998,19 +949,12 @@ class queue_impl {
/// Events without data dependencies (such as USM) need an owner,
/// additionally, USM operations are not added to the scheduler command graph,
/// queue is the only owner on the runtime side.
std::vector<event> MEventsShared;
exception_list MExceptions;
const async_handler MAsyncHandler;
const property_list MPropList;

/// List of queues created for FPGA device from a single SYCL queue.
std::vector<ur_queue_handle_t> MQueues;
/// Iterator through MQueues.
size_t MNextQueueIdx = 0;

/// Indicates that a native out-of-order queue could not be created and we
/// need to emulate it with multiple native in-order queues.
bool MEmulateOOO = false;
ur_queue_handle_t MQueue;

// Access should be guarded with MMutex
struct DependencyTrackingItems {
Expand Down
1 change: 0 additions & 1 deletion sycl/unittests/queue/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
add_sycl_unittest(QueueTests OBJECT
DeviceCheck.cpp
EventClear.cpp
Hash.cpp
USM.cpp
Wait.cpp
Expand Down
Loading