Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 27 additions & 20 deletions libs/core/async_mpi/tests/unit/algorithm_transform_mpi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

namespace ex = hpx::execution::experimental;
namespace mpi = hpx::mpi::experimental;
namespace tt = hpx::this_thread::experimental;

// This overload is only used to check dispatching. It is not a useful
// implementation.
Expand Down Expand Up @@ -52,15 +51,19 @@ int hpx_main()
// Success path
{
// MPI function pointer
// Use ex::make_future instead of tt::sync_wait: stdexec's
// sync_wait drives its own run_loop and never yields back to
// the HPX thread pool, so the MPI background poller can never
// fire the completion callback. make_future converts the sender
// to an hpx::future whose .get() yields the calling HPX thread.
int data = 0, count = 1;
if (rank == 0)
{
data = 42;
}
auto s = mpi::transform_mpi(
ex::just(&data, count, datatype, 0, comm), MPI_Ibcast);
auto mpi_result = tt::sync_wait(HPX_MOVE(s));
auto result = hpx::get<0>(*mpi_result);
auto result = ex::make_future(HPX_MOVE(s)).get();
if (rank != 0)
{
HPX_TEST_EQ(data, 42);
Expand All @@ -85,8 +88,7 @@ int hpx_main()
return MPI_Ibcast(
data, count, datatype, i, comm, request);
});
auto mpi_result = tt::sync_wait(HPX_MOVE(s));
auto result = hpx::get<0>(*mpi_result);
auto result = ex::make_future(HPX_MOVE(s)).get();
if (rank != 0)
{
HPX_TEST_EQ(data, 42);
Expand All @@ -110,7 +112,7 @@ int hpx_main()
MPI_Comm comm, MPI_Request* request) {
MPI_Ibcast(data, count, datatype, i, comm, request);
});
tt::sync_wait(HPX_MOVE(s));
ex::make_future(HPX_MOVE(s)).get();
if (rank != 0)
{
HPX_TEST_EQ(data, 42);
Expand All @@ -126,7 +128,7 @@ int hpx_main()
c.x = 3;
}
auto s = mpi::transform_mpi(c);
tt::sync_wait(s);
ex::make_future(HPX_MOVE(s)).get();
if (rank == 0)
{
HPX_TEST_EQ(c.x, 3);
Expand All @@ -142,9 +144,10 @@ int hpx_main()
{
data = 42;
}
auto result = hpx::get<0>(
*tt::sync_wait(ex::just(&data, count, datatype, 0, comm) |
mpi::transform_mpi(MPI_Ibcast)));
auto result =
ex::make_future(ex::just(&data, count, datatype, 0, comm) |
mpi::transform_mpi(MPI_Ibcast))
.get();
if (rank != 0)
{
HPX_TEST_EQ(data, 42);
Expand All @@ -161,9 +164,11 @@ int hpx_main()
bool exception_thrown = false;
try
{
tt::sync_wait(mpi::transform_mpi(
error_sender<int*, int, MPI_Datatype, int, MPI_Comm>{},
MPI_Ibcast));
ex::make_future(
mpi::transform_mpi(error_sender<int*, int, MPI_Datatype,
int, MPI_Comm>{},
MPI_Ibcast))
.get();
HPX_TEST(false);
}
catch (std::runtime_error const& e)
Expand All @@ -187,7 +192,7 @@ int hpx_main()
});
try
{
tt::sync_wait(HPX_MOVE(s));
ex::make_future(HPX_MOVE(s)).get();
}
catch (std::runtime_error const& e)
{
Expand All @@ -207,8 +212,10 @@ int hpx_main()
bool exception_thrown = false;
try
{
tt::sync_wait(mpi::transform_mpi(
ex::just(data, count, datatype, -1, comm), MPI_Ibcast));
ex::make_future(mpi::transform_mpi(ex::just(data, count,
datatype, -1, comm),
MPI_Ibcast))
.get();
HPX_TEST(false);
}
catch (std::runtime_error const& e)
Expand All @@ -233,8 +240,10 @@ int hpx_main()
bool exception_thrown = false;
try
{
tt::sync_wait(mpi::transform_mpi(
ex::just(data, count, datatype, -1, comm), MPI_Ibcast));
ex::make_future(mpi::transform_mpi(ex::just(data, count,
datatype, -1, comm),
MPI_Ibcast))
.get();
HPX_TEST(false);
}
catch (std::runtime_error const&)
Expand All @@ -252,8 +261,6 @@ int hpx_main()

int main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[])
{
// Disable MPI tests because they
// hang due to sync_wait consuming the thread
MPI_Init(&argc, &argv);

auto result = hpx::local::init(hpx_main, argc, argv);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ namespace hpx::execution::experimental {
hpx::execution::experimental::completion_signatures<>;

template <typename Env>
#if defined(HPX_CLANG_VERSION)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
#endif
friend auto tag_invoke(get_completion_signatures_t,
bulk_sender const&, Env) noexcept -> hpx::execution::
experimental::transform_completion_signatures<
Expand All @@ -68,9 +64,6 @@ namespace hpx::execution::experimental {
hpx::execution::experimental::set_error_t(
std::exception_ptr)>,
default_set_value, default_set_error, disable_set_stopped>;
#if defined(HPX_CLANG_VERSION)
#pragma clang diagnostic pop
#endif

friend constexpr auto tag_invoke(
hpx::execution::experimental::get_env_t,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@ namespace hpx::when_all_vector_detail {
hpx::execution::experimental::set_error_t(std::decay_t<Err>)>;

template <typename Env>
#if defined(HPX_CLANG_VERSION)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
#endif
friend auto tag_invoke(
hpx::execution::experimental::get_completion_signatures_t,
when_all_vector_sender_type const&, Env const&) noexcept
Expand All @@ -132,9 +128,6 @@ namespace hpx::when_all_vector_detail {
hpx::execution::experimental::set_error_t(
std::exception_ptr)>,
transformed_comp_sigs_identity, decay_set_error>;
#if defined(HPX_CLANG_VERSION)
#pragma clang diagnostic pop
#endif

template <typename Receiver>
struct operation_state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,14 +281,52 @@ namespace hpx::execution::experimental {
HPX_CXX_CORE_EXPORT using stdexec::then;
HPX_CXX_CORE_EXPORT using stdexec::then_t;

// Completion signature manipulators
// Completion signature manipulators: transform_completion_signatures and
// transform_completion_signatures_of.
//
// The stdexec public type-alias variants (stdexec::transform_completion_signatures
// and stdexec::transform_completion_signatures_of) are deprecated in favour of
// the new exec::transform_completion_signatures consteval-function API in
// <exec/completion_signatures.hpp>. HPX re-exports the same functionality using
// the internal (non-deprecated) implementation aliases so that:
// a) Existing HPX call sites that use these as type aliases continue to compile
// without warnings.
// b) New code may include <exec/completion_signatures.hpp> and call
// exec::transform_completion_signatures (the consteval function) directly.

HPX_CXX_CORE_EXPORT using stdexec::completion_signatures_of_t;
HPX_CXX_CORE_EXPORT using stdexec::error_types_of_t;
HPX_CXX_CORE_EXPORT using stdexec::sends_stopped;
HPX_CXX_CORE_EXPORT using stdexec::value_types_of_t;

HPX_CXX_CORE_EXPORT using stdexec::transform_completion_signatures;
HPX_CXX_CORE_EXPORT using stdexec::transform_completion_signatures_of;
// Type-alias variant (kept for existing call sites). Backed by the internal
// (non-deprecated) stdexec::__transform_completion_signatures_t helper.
HPX_CXX_CORE_EXPORT template <class Sigs,
class MoreSigs = stdexec::completion_signatures<>,
template <class...> class ValueTransform =
stdexec::__cmplsigs::__default_set_value,
template <class...> class ErrorTransform =
stdexec::__cmplsigs::__default_set_error,
class StoppedSigs =
stdexec::completion_signatures<stdexec::set_stopped_t()>>
using transform_completion_signatures =
stdexec::__transform_completion_signatures_t<Sigs, MoreSigs,
ValueTransform, ErrorTransform, StoppedSigs>;
Comment thread
arpittkhandelwal marked this conversation as resolved.

// Type-alias variant for the "of_t" variant (sender + env convenience form).
// Backed by stdexec::__transform_completion_signatures_of_t (non-deprecated).
HPX_CXX_CORE_EXPORT template <class Sndr, class Env = stdexec::env<>,
class MoreSigs = stdexec::completion_signatures<>,
template <class...> class ValueTransform =
stdexec::__cmplsigs::__default_set_value,
template <class...> class ErrorTransform =
stdexec::__cmplsigs::__default_set_error,
class StoppedSigs =
stdexec::completion_signatures<stdexec::set_stopped_t()>>
using transform_completion_signatures_of =
stdexec::__transform_completion_signatures_of_t<Sndr, Env, MoreSigs,
ValueTransform, ErrorTransform, StoppedSigs>;

HPX_CXX_CORE_EXPORT using exec::keep_completion;

// Transform sender
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,10 +708,6 @@ namespace hpx::execution::experimental::detail {
using sender_concept = hpx::execution::experimental::sender_t;

template <typename Env>
#if defined(HPX_CLANG_VERSION)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
#endif
friend auto tag_invoke(
hpx::execution::experimental::get_completion_signatures_t,
thread_pool_bulk_sender const&, Env const&)
Expand All @@ -720,9 +716,6 @@ namespace hpx::execution::experimental::detail {
hpx::execution::experimental::completion_signatures<
hpx::execution::experimental::set_error_t(
std::exception_ptr)>>;
#if defined(HPX_CLANG_VERSION)
#pragma clang diagnostic pop
#endif

struct env
{
Expand Down
Loading