diff --git a/offload/include/Shared/APITypes.h b/offload/include/Shared/APITypes.h index 978b53d5d69b9..b0cfce86f7b0d 100644 --- a/offload/include/Shared/APITypes.h +++ b/offload/include/Shared/APITypes.h @@ -21,6 +21,7 @@ #include #include +#include extern "C" { @@ -75,6 +76,11 @@ struct __tgt_async_info { /// should be freed after finalization. llvm::SmallVector AssociatedAllocations; + /// Mutex to guard access to AssociatedAllocations and the Queue. + /// This is only used for liboffload and should be ignored in libomptarget + /// code. + std::mutex Mutex; + /// The kernel launch environment used to issue a kernel. Stored here to /// ensure it is a valid location while the transfer to the device is /// happening. diff --git a/offload/liboffload/src/OffloadImpl.cpp b/offload/liboffload/src/OffloadImpl.cpp index ffc9016bca0a3..2db2186bec7de 100644 --- a/offload/liboffload/src/OffloadImpl.cpp +++ b/offload/liboffload/src/OffloadImpl.cpp @@ -206,7 +206,7 @@ Error initPlugins(OffloadContext &Context) { } Error olInit_impl() { - std::lock_guard Lock{OffloadContextValMutex}; + std::lock_guard Lock(OffloadContextValMutex); if (isOffloadInitialized()) { OffloadContext::get().RefCount++; @@ -224,7 +224,7 @@ Error olInit_impl() { } Error olShutDown_impl() { - std::lock_guard Lock{OffloadContextValMutex}; + std::lock_guard Lock(OffloadContextValMutex); if (--OffloadContext::get().RefCount != 0) return Error::success(); @@ -487,16 +487,12 @@ Error olWaitQueue_impl(ol_queue_handle_t Queue) { // Host plugin doesn't have a queue set so it's not safe to call synchronize // on it, but we have nothing to synchronize in that situation anyway. if (Queue->AsyncInfo->Queue) { - if (auto Err = Queue->Device->Device->synchronize(Queue->AsyncInfo)) + // We don't need to release the queue and we would like the ability for + // other offload threads to submit work concurrently, so pass "false" here. + if (auto Err = Queue->Device->Device->synchronize(Queue->AsyncInfo, false)) return Err; } - // Recreate the stream resource so the queue can be reused - // TODO: Would be easier for the synchronization to (optionally) not release - // it to begin with. - if (auto Res = Queue->Device->Device->initAsyncInfo(&Queue->AsyncInfo)) - return Res; - return Error::success(); } @@ -717,7 +713,7 @@ Error olGetSymbol_impl(ol_program_handle_t Program, const char *Name, ol_symbol_kind_t Kind, ol_symbol_handle_t *Symbol) { auto &Device = Program->Image->getDevice(); - std::lock_guard Lock{Program->SymbolListMutex}; + std::lock_guard Lock(Program->SymbolListMutex); switch (Kind) { case OL_SYMBOL_KIND_KERNEL: { diff --git a/offload/libomptarget/interface.cpp b/offload/libomptarget/interface.cpp index ea354400f2e99..e9b148d8a2605 100644 --- a/offload/libomptarget/interface.cpp +++ b/offload/libomptarget/interface.cpp @@ -116,7 +116,7 @@ targetData(ident_t *Loc, int64_t DeviceId, int32_t ArgNum, void **ArgsBase, TargetDataFuncPtrTy TargetDataFunction, const char *RegionTypeMsg, const char *RegionName) { assert(PM && "Runtime not initialized"); - static_assert(std::is_convertible_v, + static_assert(std::is_convertible_v, "TargetAsyncInfoTy must be convertible to AsyncInfoTy."); TIMESCOPE_WITH_DETAILS_AND_IDENT("Runtime: Data Copy", @@ -311,7 +311,7 @@ static inline int targetKernel(ident_t *Loc, int64_t DeviceId, int32_t NumTeams, int32_t ThreadLimit, void *HostPtr, KernelArgsTy *KernelArgs) { assert(PM && "Runtime not initialized"); - static_assert(std::is_convertible_v, + static_assert(std::is_convertible_v, "Target AsyncInfoTy must be convertible to AsyncInfoTy."); DP("Entering target region for device %" PRId64 " with entry point " DPxMOD "\n", diff --git a/offload/plugins-nextgen/amdgpu/src/rtl.cpp b/offload/plugins-nextgen/amdgpu/src/rtl.cpp index b2fd950c9d500..03bd373df08d0 100644 --- a/offload/plugins-nextgen/amdgpu/src/rtl.cpp +++ b/offload/plugins-nextgen/amdgpu/src/rtl.cpp @@ -2227,16 +2227,11 @@ struct AMDGPUDeviceTy : public GenericDeviceTy, AMDGenericDeviceTy { /// Get the stream of the asynchronous info structure or get a new one. Error getStream(AsyncInfoWrapperTy &AsyncInfoWrapper, AMDGPUStreamTy *&Stream) { - // Get the stream (if any) from the async info. - Stream = AsyncInfoWrapper.getQueueAs(); - if (!Stream) { - // There was no stream; get an idle one. - if (auto Err = AMDGPUStreamManager.getResource(Stream)) - return Err; - - // Modify the async info's stream. - AsyncInfoWrapper.setQueueAs(Stream); - } + auto WrapperStream = + AsyncInfoWrapper.getOrInitQueue(AMDGPUStreamManager); + if (!WrapperStream) + return WrapperStream.takeError(); + Stream = *WrapperStream; return Plugin::success(); } @@ -2291,7 +2286,8 @@ struct AMDGPUDeviceTy : public GenericDeviceTy, AMDGenericDeviceTy { } /// Synchronize current thread with the pending operations on the async info. - Error synchronizeImpl(__tgt_async_info &AsyncInfo) override { + Error synchronizeImpl(__tgt_async_info &AsyncInfo, + bool ReleaseQueue) override { AMDGPUStreamTy *Stream = reinterpret_cast(AsyncInfo.Queue); assert(Stream && "Invalid stream"); @@ -2302,8 +2298,11 @@ struct AMDGPUDeviceTy : public GenericDeviceTy, AMDGenericDeviceTy { // Once the stream is synchronized, return it to stream pool and reset // AsyncInfo. This is to make sure the synchronization only works for its // own tasks. - AsyncInfo.Queue = nullptr; - return AMDGPUStreamManager.returnResource(Stream); + if (ReleaseQueue) { + AsyncInfo.Queue = nullptr; + return AMDGPUStreamManager.returnResource(Stream); + } + return Plugin::success(); } /// Query for the completion of the pending operations on the async info. diff --git a/offload/plugins-nextgen/common/include/PluginInterface.h b/offload/plugins-nextgen/common/include/PluginInterface.h index 162b149ab483e..a836970fe2eba 100644 --- a/offload/plugins-nextgen/common/include/PluginInterface.h +++ b/offload/plugins-nextgen/common/include/PluginInterface.h @@ -59,6 +59,7 @@ struct GenericPluginTy; struct GenericKernelTy; struct GenericDeviceTy; struct RecordReplayTy; +template class GenericDeviceResourceManagerTy; /// Class that wraps the __tgt_async_info to simply its usage. In case the /// object is constructed without a valid __tgt_async_info, the object will use @@ -93,6 +94,20 @@ struct AsyncInfoWrapperTy { AsyncInfoPtr->Queue = Queue; } + /// Get the queue, using the provided resource manager to initialise it if it + /// doesn't exist. + template + Expected + getOrInitQueue(GenericDeviceResourceManagerTy &ResourceManager) { + std::lock_guard Lock(AsyncInfoPtr->Mutex); + if (!AsyncInfoPtr->Queue) { + if (auto Err = ResourceManager.getResource( + *reinterpret_cast(&AsyncInfoPtr->Queue))) + return Err; + } + return getQueueAs(); + } + /// Synchronize with the __tgt_async_info's pending operations if it's the /// internal async info. The error associated to the asynchronous operations /// issued in this queue must be provided in \p Err. This function will update @@ -104,6 +119,7 @@ struct AsyncInfoWrapperTy { /// Register \p Ptr as an associated allocation that is freed after /// finalization. void freeAllocationAfterSynchronization(void *Ptr) { + std::lock_guard AllocationGuard{AsyncInfoPtr->Mutex}; AsyncInfoPtr->AssociatedAllocations.push_back(Ptr); } @@ -771,9 +787,12 @@ struct GenericDeviceTy : public DeviceAllocatorTy { Error setupRPCServer(GenericPluginTy &Plugin, DeviceImageTy &Image); /// Synchronize the current thread with the pending operations on the - /// __tgt_async_info structure. - Error synchronize(__tgt_async_info *AsyncInfo); - virtual Error synchronizeImpl(__tgt_async_info &AsyncInfo) = 0; + /// __tgt_async_info structure. If ReleaseQueue is false, then the + // underlying queue will not be released. In this case, additional + // work may be submitted to the queue whilst a synchronize is running. + Error synchronize(__tgt_async_info *AsyncInfo, bool ReleaseQueue = true); + virtual Error synchronizeImpl(__tgt_async_info &AsyncInfo, + bool ReleaseQueue) = 0; /// Invokes any global constructors on the device if present and is required /// by the target. diff --git a/offload/plugins-nextgen/common/src/PluginInterface.cpp b/offload/plugins-nextgen/common/src/PluginInterface.cpp index 81b9d423e13d8..934eb1b483da7 100644 --- a/offload/plugins-nextgen/common/src/PluginInterface.cpp +++ b/offload/plugins-nextgen/common/src/PluginInterface.cpp @@ -1329,18 +1329,25 @@ Error PinnedAllocationMapTy::unlockUnmappedHostBuffer(void *HstPtr) { return eraseEntry(*Entry); } -Error GenericDeviceTy::synchronize(__tgt_async_info *AsyncInfo) { - if (!AsyncInfo || !AsyncInfo->Queue) - return Plugin::error(ErrorCode::INVALID_ARGUMENT, - "invalid async info queue"); +Error GenericDeviceTy::synchronize(__tgt_async_info *AsyncInfo, + bool ReleaseQueue) { + SmallVector AllocsToDelete{}; + { + std::lock_guard AllocationGuard{AsyncInfo->Mutex}; - if (auto Err = synchronizeImpl(*AsyncInfo)) - return Err; + if (!AsyncInfo || !AsyncInfo->Queue) + return Plugin::error(ErrorCode::INVALID_ARGUMENT, + "invalid async info queue"); + + if (auto Err = synchronizeImpl(*AsyncInfo, ReleaseQueue)) + return Err; + + std::swap(AllocsToDelete, AsyncInfo->AssociatedAllocations); + } - for (auto *Ptr : AsyncInfo->AssociatedAllocations) + for (auto *Ptr : AllocsToDelete) if (auto Err = dataDelete(Ptr, TargetAllocTy::TARGET_ALLOC_DEVICE)) return Err; - AsyncInfo->AssociatedAllocations.clear(); return Plugin::success(); } diff --git a/offload/plugins-nextgen/cuda/src/rtl.cpp b/offload/plugins-nextgen/cuda/src/rtl.cpp index b787376eb1770..42a9a6a33c41d 100644 --- a/offload/plugins-nextgen/cuda/src/rtl.cpp +++ b/offload/plugins-nextgen/cuda/src/rtl.cpp @@ -522,16 +522,11 @@ struct CUDADeviceTy : public GenericDeviceTy { /// Get the stream of the asynchronous info structure or get a new one. Error getStream(AsyncInfoWrapperTy &AsyncInfoWrapper, CUstream &Stream) { - // Get the stream (if any) from the async info. - Stream = AsyncInfoWrapper.getQueueAs(); - if (!Stream) { - // There was no stream; get an idle one. - if (auto Err = CUDAStreamManager.getResource(Stream)) - return Err; - - // Modify the async info's stream. - AsyncInfoWrapper.setQueueAs(Stream); - } + auto WrapperStream = + AsyncInfoWrapper.getOrInitQueue(CUDAStreamManager); + if (!WrapperStream) + return WrapperStream.takeError(); + Stream = *WrapperStream; return Plugin::success(); } @@ -642,17 +637,20 @@ struct CUDADeviceTy : public GenericDeviceTy { } /// Synchronize current thread with the pending operations on the async info. - Error synchronizeImpl(__tgt_async_info &AsyncInfo) override { + Error synchronizeImpl(__tgt_async_info &AsyncInfo, + bool ReleaseQueue) override { CUstream Stream = reinterpret_cast(AsyncInfo.Queue); CUresult Res; Res = cuStreamSynchronize(Stream); - // Once the stream is synchronized, return it to stream pool and reset - // AsyncInfo. This is to make sure the synchronization only works for its - // own tasks. - AsyncInfo.Queue = nullptr; - if (auto Err = CUDAStreamManager.returnResource(Stream)) - return Err; + // Once the stream is synchronized and we want to release the queue, return + // it to stream pool and reset AsyncInfo. This is to make sure the + // synchronization only works for its own tasks. + if (ReleaseQueue) { + AsyncInfo.Queue = nullptr; + if (auto Err = CUDAStreamManager.returnResource(Stream)) + return Err; + } return Plugin::check(Res, "error in cuStreamSynchronize: %s"); } diff --git a/offload/plugins-nextgen/host/src/rtl.cpp b/offload/plugins-nextgen/host/src/rtl.cpp index d950572265b4c..0dbf78d6eaed8 100644 --- a/offload/plugins-nextgen/host/src/rtl.cpp +++ b/offload/plugins-nextgen/host/src/rtl.cpp @@ -297,7 +297,8 @@ struct GenELF64DeviceTy : public GenericDeviceTy { /// All functions are already synchronous. No need to do anything on this /// synchronization function. - Error synchronizeImpl(__tgt_async_info &AsyncInfo) override { + Error synchronizeImpl(__tgt_async_info &AsyncInfo, + bool ReleaseQueue) override { return Plugin::success(); } diff --git a/offload/unittests/OffloadAPI/common/Fixtures.hpp b/offload/unittests/OffloadAPI/common/Fixtures.hpp index 546921164f691..4fe57bd80d704 100644 --- a/offload/unittests/OffloadAPI/common/Fixtures.hpp +++ b/offload/unittests/OffloadAPI/common/Fixtures.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include "Environment.hpp" @@ -57,6 +58,23 @@ inline std::string SanitizeString(const std::string &Str) { return NewStr; } +template inline void threadify(Fn body) { + std::vector Threads; + for (size_t I = 0; I < 20; I++) { + Threads.emplace_back( + [&body](size_t I) { + std::string ScopeMsg{"Thread #"}; + ScopeMsg.append(std::to_string(I)); + SCOPED_TRACE(ScopeMsg); + body(I); + }, + I); + } + for (auto &T : Threads) { + T.join(); + } +} + struct OffloadTest : ::testing::Test { ol_device_handle_t Host = TestEnvironment::getHostDevice(); }; diff --git a/offload/unittests/OffloadAPI/kernel/olLaunchKernel.cpp b/offload/unittests/OffloadAPI/kernel/olLaunchKernel.cpp index e7e608f2a64d4..3e128d1e84645 100644 --- a/offload/unittests/OffloadAPI/kernel/olLaunchKernel.cpp +++ b/offload/unittests/OffloadAPI/kernel/olLaunchKernel.cpp @@ -104,6 +104,29 @@ TEST_P(olLaunchKernelFooTest, Success) { ASSERT_SUCCESS(olMemFree(Mem)); } +TEST_P(olLaunchKernelFooTest, SuccessThreaded) { + threadify([&](size_t) { + void *Mem; + ASSERT_SUCCESS(olMemAlloc(Device, OL_ALLOC_TYPE_MANAGED, + LaunchArgs.GroupSize.x * sizeof(uint32_t), &Mem)); + struct { + void *Mem; + } Args{Mem}; + + ASSERT_SUCCESS(olLaunchKernel(Queue, Device, Kernel, &Args, sizeof(Args), + &LaunchArgs, nullptr)); + + ASSERT_SUCCESS(olWaitQueue(Queue)); + + uint32_t *Data = (uint32_t *)Mem; + for (uint32_t i = 0; i < 64; i++) { + ASSERT_EQ(Data[i], i); + } + + ASSERT_SUCCESS(olMemFree(Mem)); + }); +} + TEST_P(olLaunchKernelNoArgsTest, Success) { ASSERT_SUCCESS( olLaunchKernel(Queue, Device, Kernel, nullptr, 0, &LaunchArgs, nullptr));