diff --git a/sycl/include/sycl/ext/oneapi/experimental/graph/node.hpp b/sycl/include/sycl/ext/oneapi/experimental/graph/node.hpp index 91c756e32049f..fc9312569b436 100644 --- a/sycl/include/sycl/ext/oneapi/experimental/graph/node.hpp +++ b/sycl/include/sycl/ext/oneapi/experimental/graph/node.hpp @@ -42,7 +42,8 @@ enum class node_type { host_task = 9, native_command = 10, async_malloc = 11, - async_free = 12 + async_free = 12, + host_sync = 13 }; /// Class representing a node in the graph, returned by command_graph::add(). diff --git a/sycl/source/detail/event_impl.cpp b/sycl/source/detail/event_impl.cpp index b0c838cdd890c..f9124fd280c1f 100644 --- a/sycl/source/detail/event_impl.cpp +++ b/sycl/source/detail/event_impl.cpp @@ -278,10 +278,30 @@ void event_impl::wait(bool *Success) { throw sycl::exception(make_error_code(errc::invalid), "wait method cannot be used for a discarded event."); + printf("FOOBAR3 %d\n", MGraph.expired()); + + if (!MGraph.expired()) { - throw sycl::exception(make_error_code(errc::invalid), - "wait method cannot be used for an event associated " - "with a command graph."); + auto GraphImpl = MGraph.lock(); + + // Add a host sync node to the graph to create a partition point. + // TODO: test if partitioned wait bits are set + if (GraphImpl) { + auto EmptyCG = std::make_shared( + detail::CGType::None, + detail::CG::StorageInitHelper{} + ); + + std::vector EmptyDeps; + ext::oneapi::experimental::detail::node_impl &HostSyncNode = GraphImpl->add( + ext::oneapi::experimental::node_type::host_sync, + EmptyCG, + EmptyDeps + ); + + printf("FOOBAR2\n"); + + } } #ifdef XPTI_ENABLE_INSTRUMENTATION diff --git a/sycl/source/detail/graph/graph_impl.cpp b/sycl/source/detail/graph/graph_impl.cpp index 9a36bbb0b9476..fa1cf1a4800d2 100644 --- a/sycl/source/detail/graph/graph_impl.cpp +++ b/sycl/source/detail/graph/graph_impl.cpp @@ -18,6 +18,7 @@ #include // for kernel_impl #include // ProgramManager #include // for queue_impl +#include // for debug output #include // for SYCLMemObjT #include // for stack #include // for tls_code_loc_t etc.. @@ -68,6 +69,8 @@ inline const char *nodeTypeToString(node_type NodeType) { return "async_malloc"; case node_type::async_free: return "async_free"; + case node_type::host_sync: + return "host_sync"; } assert(false && "Unhandled node type"); return {}; @@ -140,19 +143,19 @@ void propagatePartitionUp(node_impl &Node, int PartitionNum) { /// remain. /// @param Node Node to assign to the partition. /// @param PartitionNum Number to propagate. -/// @param HostTaskList List of host tasks that have already been processed and +/// @param CutVertexList List of tasks that have already been processed and /// are encountered as successors to the node Node. void propagatePartitionDown(node_impl &Node, int PartitionNum, - std::list &HostTaskList) { + std::list &CutVertexList) { if (Node.MCGType == sycl::detail::CGType::CodeplayHostTask) { if (Node.MPartitionNum != -1) { - HostTaskList.push_front(&Node); + CutVertexList.push_front(&Node); } return; } Node.MPartitionNum = PartitionNum; for (node_impl &Successor : Node.successors()) { - propagatePartitionDown(Successor, PartitionNum, HostTaskList); + propagatePartitionDown(Successor, PartitionNum, CutVertexList); } } @@ -180,24 +183,50 @@ void partition::updateSchedule() { void exec_graph_impl::makePartitions() { int CurrentPartition = -1; - std::list HostTaskList; + std::list CutVertexList; + +#define SYCL_GRAPH_DEBUG 1 +#ifdef SYCL_GRAPH_DEBUG + // Debug: Print total number of nodes + std::cout << "[DEBUG] makePartitions: Starting with " << MNodeStorage.size() << " nodes" << std::endl; + + // Debug: Print all nodes and their types + int nodeIndex = 0; + for (node_impl &Node : nodes()) { + std::cout << "[DEBUG] Node " << nodeIndex << ": Type=" << nodeTypeToString(Node.MNodeType) + << ", CGType=" << static_cast(Node.MCGType) << std::endl; + nodeIndex++; + } +#endif + + // A cut vertex is a node that, when removed, increases the number of connected components + // in the graph. In our case, cut vertices are host-tasks / sync tasks that separate partitions + auto const IsCutVertex = [](node_impl const& node) { + return node.MCGType == sycl::detail::CGType::CodeplayHostTask || + node.MNodeType == node_type::host_sync; + }; + // find all the host-tasks in the graph for (node_impl &Node : nodes()) { - if (Node.MCGType == sycl::detail::CGType::CodeplayHostTask) { - HostTaskList.push_back(&Node); + if (IsCutVertex(Node)) { + CutVertexList.push_back(&Node); } } - MContainsHostTask = HostTaskList.size() > 0; + MContainsHostTask = CutVertexList.size() > 0; +#ifdef SYCL_GRAPH_DEBUG + std::cout << "[DEBUG] Found " << CutVertexList.size() << " host tasks, MContainsHostTask=" + << (MContainsHostTask ? "true" : "false") << std::endl; +#endif // Annotate nodes // The first step in graph partitioning is to annotate all nodes of the graph // with a temporary partition or group number. This step allows us to group // the graph nodes into sets of nodes with kind of meta-dependencies that must // be enforced by the runtime. For example, Group 2 depends on Groups 0 and 1, // which means that we should not try to run Group 2 before Groups 0 and 1 - // have finished executing. Since host-tasks are currently the only tasks that + // have finished executing. Since host-tasks and sync-tasks are the only tasks that // require runtime dependency handling, groups of nodes are created from - // host-task nodes. We therefore loop over all the host-task nodes, and for + // these nodes. We therefore loop over all the host-task and sync-task nodes, and for // each node: // - Its predecessors are assigned to group number `n-1` // - The node itself constitutes a group, group number `n` @@ -213,23 +242,32 @@ void exec_graph_impl::makePartitions() { // case, the host-task node `A` must be reprocessed after the node `B` and the // group that includes the predecessor of `B` can be merged with the group of // the predecessors of the node `A`. - while (HostTaskList.size() > 0) { - node_impl &Node = *HostTaskList.front(); - HostTaskList.pop_front(); + while (CutVertexList.size() > 0) { + node_impl &Node = *CutVertexList.front(); + CutVertexList.pop_front(); +#ifdef SYCL_GRAPH_DEBUG + std::cout << "[DEBUG] Processing host task node, CurrentPartition=" << CurrentPartition << std::endl; +#endif CurrentPartition++; for (node_impl &Predecessor : Node.predecessors()) { propagatePartitionUp(Predecessor, CurrentPartition); } CurrentPartition++; Node.MPartitionNum = CurrentPartition; +#ifdef SYCL_GRAPH_DEBUG + std::cout << "[DEBUG] Assigned host task to partition " << CurrentPartition << std::endl; +#endif CurrentPartition++; - auto TmpSize = HostTaskList.size(); + auto TmpSize = CutVertexList.size(); for (node_impl &Successor : Node.successors()) { - propagatePartitionDown(Successor, CurrentPartition, HostTaskList); + propagatePartitionDown(Successor, CurrentPartition, CutVertexList); } - if (HostTaskList.size() > TmpSize) { + if (CutVertexList.size() > TmpSize) { +#ifdef SYCL_GRAPH_DEBUG + std::cout << "[DEBUG] Host task list size increased, merging partitions" << std::endl; +#endif // At least one HostTask has been re-numbered so group merge opportunities - for (node_impl *HT : HostTaskList) { + for (node_impl *HT : CutVertexList) { auto HTPartitionNum = HT->MPartitionNum; if (HTPartitionNum != -1) { // can merge predecessors of node `Node` with predecessors of node @@ -246,14 +284,32 @@ void exec_graph_impl::makePartitions() { } } +#ifdef SYCL_GRAPH_DEBUG + // Debug: Print node partition assignments before creating partitions + std::cout << "[DEBUG] Node partition assignments:" << std::endl; + nodeIndex = 0; + for (node_impl &Node : nodes()) { + std::cout << "[DEBUG] Node " << nodeIndex << ": Partition=" << Node.MPartitionNum + << ", Type=" << nodeTypeToString(Node.MNodeType) << std::endl; + nodeIndex++; + } +#endif + // Create partitions int PartitionFinalNum = 0; +#ifdef SYCL_GRAPH_DEBUG + std::cout << "[DEBUG] Creating partitions from " << -1 << " to " << CurrentPartition << std::endl; +#endif for (int i = -1; i <= CurrentPartition; i++) { const std::shared_ptr &Partition = std::make_shared(); + int nodesInPartition = 0; + int rootsInPartition = 0; for (node_impl &Node : nodes()) { if (Node.MPartitionNum == i) { + nodesInPartition++; MPartitionNodes[&Node] = PartitionFinalNum; if (isPartitionRoot(Node)) { + rootsInPartition++; Partition->MRoots.insert(&Node); if (Node.MCGType == CGType::CodeplayHostTask) { Partition->MIsHostTask = true; @@ -261,21 +317,37 @@ void exec_graph_impl::makePartitions() { } } } +#ifdef SYCL_GRAPH_DEBUG + std::cout << "[DEBUG] Partition " << i << ": " << nodesInPartition << " nodes, " + << rootsInPartition << " roots" << std::endl; +#endif if (Partition->MRoots.size() > 0) { Partition->updateSchedule(); Partition->MIsInOrderGraph = Partition->checkIfGraphIsSinglePath(); MPartitions.push_back(Partition); MRootPartitions.push_back(Partition); +#ifdef SYCL_GRAPH_DEBUG + std::cout << "[DEBUG] Added partition " << PartitionFinalNum << " (original " << i + << "), IsHostTask=" << (Partition->MIsHostTask ? "true" : "false") + << ", IsInOrder=" << (Partition->MIsInOrderGraph ? "true" : "false") << std::endl; +#endif PartitionFinalNum++; } } // Add an empty partition if there is no partition, i.e. empty graph if (MPartitions.empty()) { +#ifdef SYCL_GRAPH_DEBUG + std::cout << "[DEBUG] No partitions created, adding empty partition" << std::endl; +#endif MPartitions.push_back(std::make_shared()); MRootPartitions.push_back(MPartitions[0]); } +#ifdef SYCL_GRAPH_DEBUG + std::cout << "[DEBUG] Final result: " << MPartitions.size() << " partitions created" << std::endl; +#endif + // Make global schedule list for (const auto &Partition : MPartitions) { MSchedule.insert(MSchedule.end(), Partition->MSchedule.begin(), @@ -283,22 +355,39 @@ void exec_graph_impl::makePartitions() { } // Compute partition dependencies + int partitionIdx = 0; for (const auto &Partition : MPartitions) { + int predecessorCount = 0; + int successorCount = 0; for (node_impl &Root : Partition->roots()) { for (node_impl &NodeDep : Root.predecessors()) { auto &Predecessor = MPartitions[MPartitionNodes[&NodeDep]]; Partition->MPredecessors.push_back(Predecessor.get()); Predecessor->MSuccessors.push_back(Partition.get()); + predecessorCount++; } } + for (auto &Succ : Partition->MSuccessors) { + successorCount++; + } +#ifdef SYCL_GRAPH_DEBUG + std::cout << "[DEBUG] Partition " << partitionIdx << " dependencies: " + << predecessorCount << " predecessors, " << successorCount << " successors" << std::endl; +#endif + partitionIdx++; } // Reset node groups (if node have to be re-processed - e.g. subgraph) for (node_impl &Node : nodes()) { Node.MPartitionNum = -1; } + +#ifdef SYCL_GRAPH_DEBUG + std::cout << "[DEBUG] makePartitions completed" << std::endl; +#endif } + graph_impl::graph_impl(const sycl::context &SyclContext, const sycl::device &SyclDevice, const sycl::property_list &PropList) @@ -685,6 +774,7 @@ std::vector graph_impl::getExitNodesEvents( void graph_impl::beginRecording(sycl::detail::queue_impl &Queue) { graph_impl::WriteLock Lock(MMutex); + printf("Graph %p beginRecording on Queue %p\n", this, &Queue); if (!Queue.hasCommandGraph()) { Queue.setCommandGraph(shared_from_this()); addQueue(Queue); diff --git a/sycl/source/detail/queue_impl.cpp b/sycl/source/detail/queue_impl.cpp index 52ee77d251eb2..751df62e35a2a 100644 --- a/sycl/source/detail/queue_impl.cpp +++ b/sycl/source/detail/queue_impl.cpp @@ -9,6 +9,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -586,11 +589,31 @@ void queue_impl::wait(const detail::code_location &CodeLoc) { auto StreamID = detail::getActiveXPTIStreamID(); TelemetryEvent = instrumentationProlog(CodeLoc, Name, StreamID, IId); #endif + printf("FOOBAR4 %d\n", MGraph.expired()); + if (!MGraph.expired()) { - throw sycl::exception(make_error_code(errc::invalid), - "wait cannot be called for a queue which is " - "recording to a command graph."); + auto GraphImpl = MGraph.lock(); + + // TODO: test if partitioned wait bits are set + if (GraphImpl) { + + auto EmptyCG = std::make_shared( + detail::CGType::None, + detail::CG::StorageInitHelper{}, + CodeLoc + ); + + printf("FOOBAR1\n"); + + std::vector EmptyDeps; + GraphImpl->add( + ext::oneapi::experimental::node_type::host_sync, + EmptyCG, + EmptyDeps + ); + } + return; } // If there is an external event set, we know we are using an in-order queue diff --git a/sycl/test-e2e/Graph/RecordReplay/partitioned_wait.cpp b/sycl/test-e2e/Graph/RecordReplay/partitioned_wait.cpp new file mode 100644 index 0000000000000..695e7a3fd8ce0 --- /dev/null +++ b/sycl/test-e2e/Graph/RecordReplay/partitioned_wait.cpp @@ -0,0 +1,129 @@ +// RUN: %{build} -o %t.out +// RUN: %{run} %t.out +// Extra run to check for leaks in Level Zero using UR_L0_LEAKS_DEBUG +// RUN: %if level_zero %{%{l0_leak_check} %{run} %t.out 2>&1 | FileCheck %s --implicit-check-not=LEAK %} + +// Tests partitioned wait feature in SYCL Graph. + +#include "../graph_common.hpp" + +#include + +int main() { + property_list Properties{property::queue::in_order{}}; + queue Queue{Properties}; + + exp_ext::command_graph Graph{Queue.get_context(), Queue.get_device()}; + + const size_t N = 100; + int *A = malloc_device(N, Queue); + int *B = malloc_device(N, Queue); + int *C = malloc_device(N, Queue); + int *D = malloc_device(N, Queue); + + Queue.submit([&](handler &CGH) { + CGH.parallel_for(N, [=](id<1> it) { + A[it] = static_cast(it); + B[it] = 0; + C[it] = 0; + D[it] = 0; + }); + }).wait(); + + // Begin recording the graph + Graph.begin_recording(Queue); + + // Part 1: "Before" subgraph operations + auto Event1 = Queue.submit([&](handler &CGH) { + CGH.parallel_for(N, [=](id<1> it) { + B[it] = A[it] * 2; + }); + }); + + auto Event2 = Queue.submit([&](handler &CGH) { + CGH.depends_on(Event1); + CGH.parallel_for(N, [=](id<1> it) { + C[it] = B[it] + 1; + }); + }); + + // should create a dummy barrier node in the graph + Queue.wait(); + + // Part 2: "After" subgraph operations + auto Event3 = Queue.submit([&](handler &CGH) { + CGH.parallel_for(N, [=](id<1> it) { + D[it] = C[it] * 3; + }); + }); + + Queue.wait(); + + Queue.submit([&](handler &CGH) { + CGH.parallel_for(N, [=](id<1> it) { + D[it] = D[it] + A[it]; + }); + }); + + Graph.end_recording(); + + auto ExecGraph = Graph.finalize(); + Queue.submit([&](handler &CGH) { CGH.ext_oneapi_graph(ExecGraph); }); + Queue.wait_and_throw(); + + // Verify results + std::vector OutputA(N), OutputB(N), OutputC(N), OutputD(N); + Queue.memcpy(OutputA.data(), A, N * sizeof(int)).wait(); + Queue.memcpy(OutputB.data(), B, N * sizeof(int)).wait(); + Queue.memcpy(OutputC.data(), C, N * sizeof(int)).wait(); + Queue.memcpy(OutputD.data(), D, N * sizeof(int)).wait(); + + for (size_t i = 0; i < N; i++) { + int expected_a = static_cast(i); + int expected_b = expected_a * 2; + int expected_c = expected_b + 1; + int expected_d = expected_c * 3 + expected_a; + + assert(check_value(i, expected_a, OutputA[i], "A")); + assert(check_value(i, expected_b, OutputB[i], "B")); + assert(check_value(i, expected_c, OutputC[i], "C")); + assert(check_value(i, expected_d, OutputD[i], "D")); + } + + // Reset data and verify with new input + Queue.submit([&](handler &CGH) { + CGH.parallel_for(N, [=](id<1> it) { + A[it] = static_cast(it) + 10; // Different input + B[it] = 0; + C[it] = 0; + D[it] = 0; + }); + }).wait(); + + Queue.submit([&](handler &CGH) { CGH.ext_oneapi_graph(ExecGraph); }); + Queue.wait_and_throw(); + + Queue.memcpy(OutputA.data(), A, N * sizeof(int)).wait(); + Queue.memcpy(OutputB.data(), B, N * sizeof(int)).wait(); + Queue.memcpy(OutputC.data(), C, N * sizeof(int)).wait(); + Queue.memcpy(OutputD.data(), D, N * sizeof(int)).wait(); + + for (size_t i = 0; i < N; i++) { + int expected_a = static_cast(i) + 10; + int expected_b = expected_a * 2; + int expected_c = expected_b + 1; + int expected_d = expected_c * 3 + expected_a; + + assert(check_value(i, expected_a, OutputA[i], "A (second execution)")); + assert(check_value(i, expected_b, OutputB[i], "B (second execution)")); + assert(check_value(i, expected_c, OutputC[i], "C (second execution)")); + assert(check_value(i, expected_d, OutputD[i], "D (second execution)")); + } + + sycl::free(A, Queue); + sycl::free(B, Queue); + sycl::free(C, Queue); + sycl::free(D, Queue); + + return 0; +}