Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to dash::Future #451

Merged
merged 17 commits into from
Feb 14, 2018
Merged
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions dart-if/include/dash/dart/if/dart_communication.h
Original file line number Diff line number Diff line change
@@ -598,6 +598,23 @@ dart_ret_t dart_test_local(
dart_handle_t * handle,
int32_t * result) DART_NOTHROW;

/**
* Test for the completion of an operation and ensure remote completion.
* If the transfer completed, the handle is invalidated and may not be used
* in another \c dart_wait or \c dart_test operation.
*
* \param handle The handle of an operation to test for completion.
* \param[out] result \c True if the operation has completed.
*
* \return \c DART_OK on success, any other of \ref dart_ret_t otherwise.
*
* \threadsafe
* \ingroup DartCommunication
*/
dart_ret_t dart_test(
dart_handle_t * handleptr,
int32_t * is_finished);

/**
* Test for the local completion of operations.
* If the transfers completed, the handles are invalidated and may not be
@@ -617,6 +634,36 @@ dart_ret_t dart_testall_local(
size_t n,
int32_t * result) DART_NOTHROW;

/**
* Test for the completion of operations and ensure remote completion.
* If the transfers completed, the handles are invalidated and may not be
* used in another \c dart_wait or \c dart_test operation.
*
* \param handles Array of handles of operations to test for completion.
* \param n Number of \c handles to test for completion.
* \param[out] result \c True if all operations have completed.
*
* \return \c DART_OK on success, any other of \ref dart_ret_t otherwise.
*
* \threadsafe
* \ingroup DartCommunication
*/
dart_ret_t dart_testall(
dart_handle_t handles[],
size_t n,
int32_t * is_finished);

/**
* Free the handle without testing or waiting for completion of the operation.
*
* \param handle Pointer to the handle to free.
*
* \return \c DART_OK on success, any other of \ref dart_ret_t otherwise.
*
*/
dart_ret_t dart_handle_free(
dart_handle_t * handle) DART_NOTHROW;

/** \} */

/**
205 changes: 177 additions & 28 deletions dart-impl/mpi/src/dart_communication.c
Original file line number Diff line number Diff line change
@@ -77,7 +77,7 @@
* Temporary space allocation:
* - on the stack for allocations <=64B
* - on the heap otherwise
* Mainly meant to be used in dart_waitall_* and dart_testall_local
* Mainly meant to be used in dart_waitall* and dart_testall*
*/
#define ALLOC_TMP(__size) ((__size)<=64) ? alloca((__size)) : malloc((__size))
/**
@@ -172,8 +172,8 @@ dart__mpi__get(
{
if (reqs != NULL) {
return MPI_Rget(origin_addr, origin_count, origin_datatype,
target_rank, target_disp, target_count, target_datatype,
win, &reqs[(*num_reqs)++]);
target_rank, target_disp, target_count, target_datatype,
win, &reqs[(*num_reqs)++]);
} else {
return MPI_Get(origin_addr, origin_count, origin_datatype,
target_rank, target_disp, target_count,
@@ -191,8 +191,8 @@ dart__mpi__put(
{
if (reqs != NULL) {
return MPI_Rput(origin_addr, origin_count, origin_datatype,
target_rank, target_disp, target_count, target_datatype,
win, &reqs[(*num_reqs)++]);
target_rank, target_disp, target_count, target_datatype,
win, &reqs[(*num_reqs)++]);
} else {
return MPI_Put(origin_addr, origin_count, origin_datatype,
target_rank, target_disp, target_count,
@@ -609,7 +609,7 @@ dart_ret_t dart_accumulate(

CHECK_UNITID_RANGE(team_unit_id, team_data);

DART_LOG_DEBUG("dart_accumulate() nelem:%zu dtype:%d op:%d unit:%d",
DART_LOG_DEBUG("dart_accumulate() nelem:%zu dtype:%ld op:%d unit:%d",
nelem, dtype, op, team_unit_id.id);

dart_segment_info_t *seginfo = dart_segment_get_info(
@@ -703,7 +703,7 @@ dart_ret_t dart_fetch_and_op(

CHECK_UNITID_RANGE(team_unit_id, team_data);

DART_LOG_DEBUG("dart_fetch_and_op() dtype:%d op:%d unit:%d "
DART_LOG_DEBUG("dart_fetch_and_op() dtype:%ld op:%d unit:%d "
"offset:%"PRIu64" segid:%d",
dtype, op, team_unit_id.id,
gptr.addr_or_offs.offset, seg_id);
@@ -755,7 +755,7 @@ dart_ret_t dart_compare_and_swap(

CHECK_UNITID_RANGE(team_unit_id, team_data);

DART_LOG_TRACE("dart_compare_and_swap() dtype:%d unit:%d offset:%"PRIu64,
DART_LOG_TRACE("dart_compare_and_swap() dtype:%ld unit:%d offset:%"PRIu64,
dtype, team_unit_id.id, gptr.addr_or_offs.offset);

dart_segment_info_t *seginfo = dart_segment_get_info(
@@ -1372,6 +1372,27 @@ dart_ret_t dart_waitall_local(
return ret;
}

static
dart_ret_t wait_remote_completion(
dart_handle_t *handles,
size_t n
)
{
for (size_t i = 0; i < n; i++) {
if (handles[i] != DART_HANDLE_NULL && handles[i]->needs_flush) {
DART_LOG_DEBUG("dart_waitall: -- MPI_Win_flush(handle[%zu]: %p, dest: %d))",
i, (void*)handles[i], handles[i]->dest);
/*
* MPI_Win_flush to wait for remote completion if required:
*/
if (MPI_Win_flush(handles[i]->dest, handles[i]->win) != MPI_SUCCESS) {
return DART_ERR_INVAL;
}
}
}
return DART_OK;
}

dart_ret_t dart_waitall(
dart_handle_t handles[],
size_t n)
@@ -1444,19 +1465,10 @@ dart_ret_t dart_waitall(
* wait for completion of MPI requests at origins and targets:
*/
DART_LOG_DEBUG("dart_waitall: waiting for remote completion");
for (size_t i = 0; i < n; i++) {
if (handles[i] != DART_HANDLE_NULL && handles[i]->needs_flush) {
DART_LOG_DEBUG("dart_waitall: -- MPI_Win_flush(handle[%zu]: %p, dest: %d))",
i, (void*)handles[i], handles[i]->dest);
/*
* MPI_Win_flush to wait for remote completion if required:
*/
if (MPI_Win_flush(handles[i]->dest, handles[i]->win) != MPI_SUCCESS) {
DART_LOG_ERROR("dart_waitall: MPI_Win_flush failed");
FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req);
return DART_ERR_INVAL;
}
}
if (DART_OK != wait_remote_completion(handles, n)) {
DART_LOG_ERROR("dart_waitall: MPI_Win_flush failed");
FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req);
return DART_ERR_OTHER;
}

/*
@@ -1480,6 +1492,39 @@ dart_ret_t dart_waitall(
return DART_OK;
}

/**
* Wrapper around MPI_Testall to account for broken MPICH implementation.
* MPICH <= 3.2.1 and its derivatives seem to be affected
*/
inline static
int
dart__mpi__testall(int num_reqs, MPI_Request *reqs, int *flag_ptr)
{
#if defined(MPICH_NUMVERSION) && MPICH_NUMVERSION <= 30201300
int flag_result = 1;
for (int i = 0; i < num_reqs; ++i) {
int flag;
/*
* if the test succeeds the request is set to MPI_REQUEST_NULL,
* which can be safely passed to MPI_Test again.
* Eventually we will have all requests tested succesfully.
*/
int ret = MPI_Test(&reqs[i], &flag, MPI_STATUS_IGNORE);
if (ret != MPI_SUCCESS) {
return ret;
}
// one incomplete request will flip the flag to 0
flag_result &= flag;
}
*flag_ptr = flag_result;
// we checked all requests succesfully
return MPI_SUCCESS;
#else
return MPI_Testall(num_reqs, reqs,
flag_ptr, MPI_STATUSES_IGNORE);
#endif //defined(MPICH_NUMVERSION) && MPICH_NUMVERSION <= 30201300
}

dart_ret_t dart_test_local(
dart_handle_t * handleptr,
int32_t * is_finished)
@@ -1496,11 +1541,9 @@ dart_ret_t dart_test_local(
*is_finished = 0;

dart_handle_t handle = *handleptr;
if (MPI_Testall(handle->num_reqs, handle->reqs,
&flag, MPI_STATUSES_IGNORE) != MPI_SUCCESS) {
DART_LOG_ERROR("dart_test_local: MPI_Test failed!");
return DART_ERR_OTHER;
}
CHECK_MPI_RET(
dart__mpi__testall(handle->num_reqs, handle->reqs, &flag),
"MPI_Testall");

if (flag) {
// deallocate handle
@@ -1512,6 +1555,43 @@ dart_ret_t dart_test_local(
return DART_OK;
}


dart_ret_t dart_test(
dart_handle_t * handleptr,
int32_t * is_finished)
{
int flag;

DART_LOG_DEBUG("dart_test()");
if (handleptr == NULL ||
*handleptr == DART_HANDLE_NULL ||
(*handleptr)->num_reqs == 0) {
*is_finished = 1;
return DART_OK;
}
*is_finished = 0;

dart_handle_t handle = *handleptr;
CHECK_MPI_RET(
dart__mpi__testall(handle->num_reqs, handle->reqs, &flag),
"MPI_Testall");

if (flag) {
if (handle->needs_flush) {
CHECK_MPI_RET(
MPI_Win_flush(handle->dest, handle->win),
"MPI_Win_flush"
);
}
// deallocate handle
free(handle);
*handleptr = DART_HANDLE_NULL;
*is_finished = 1;
}
DART_LOG_DEBUG("dart_test > finished");
return DART_OK;
}

dart_ret_t dart_testall_local(
dart_handle_t handles[],
size_t n,
@@ -1541,8 +1621,7 @@ dart_ret_t dart_testall_local(
}

if (r_n) {
if (MPI_Testall(r_n, mpi_req, &flag,
MPI_STATUSES_IGNORE) != MPI_SUCCESS){
if (dart__mpi__testall(r_n, mpi_req, &flag) != MPI_SUCCESS){
FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req);
DART_LOG_ERROR("dart_testall_local: MPI_Testall failed!");
return DART_ERR_OTHER;
@@ -1566,6 +1645,76 @@ dart_ret_t dart_testall_local(
return DART_OK;
}


dart_ret_t dart_testall(
dart_handle_t handles[],
size_t n,
int32_t * is_finished)
{
DART_LOG_DEBUG("dart_testall_local()");
if (handles == NULL || n == 0) {
DART_LOG_DEBUG("dart_testall_local: empty handles");
return DART_OK;
}

MPI_Request *mpi_req = ALLOC_TMP(2 * n * sizeof (MPI_Request));
size_t r_n = 0;
for (size_t i = 0; i < n; ++i) {
if (handles[i] != DART_HANDLE_NULL) {
for (uint8_t j = 0; j < handles[i]->num_reqs; ++j) {
if (handles[i]->reqs[j] != MPI_REQUEST_NULL){
mpi_req[r_n] = handles[i]->reqs[j];
++r_n;
}
}
}
}

if (r_n) {
DART_LOG_TRACE(" MPI_Testall on %zu requests", r_n);
if (dart__mpi__testall(r_n, mpi_req, is_finished) != MPI_SUCCESS){
DART_LOG_ERROR("dart_testall: MPI_Testall failed");
FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req);
return DART_ERR_OTHER;
}

if (*is_finished) {
/*
* wait for completion of MPI requests at origins and targets:
*/
DART_LOG_DEBUG("dart_testall: waiting for remote completion");
if (DART_OK != wait_remote_completion(handles, n)) {
DART_LOG_ERROR("dart_testall: MPI_Win_flush failed");
FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req);
return DART_ERR_OTHER;
}

for (size_t i = 0; i < n; i++) {
if (handles[i] != DART_HANDLE_NULL) {
// free the handle
free(handles[i]);
handles[i] = DART_HANDLE_NULL;
}
}
}
} else {
*is_finished = 1;
}
FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req);
DART_LOG_DEBUG("dart_testall_local > finished");
return DART_OK;
}

dart_ret_t dart_handle_free(
dart_handle_t * handleptr)
{
if (handleptr != NULL && *handleptr != DART_HANDLE_NULL) {
free(*handleptr);
*handleptr = DART_HANDLE_NULL;
}
return DART_OK;
}

/* -- Dart collective operations -- */

static int _dart_barrier_count = 0;
75 changes: 47 additions & 28 deletions dash/include/dash/Future.h
Original file line number Diff line number Diff line change
@@ -16,14 +16,17 @@ template<typename ResultT>
class Future
{
private:
typedef Future<ResultT> self_t;
typedef std::function<ResultT (void)> func_t;
typedef Future<ResultT> self_t;
typedef std::function<ResultT (void)> get_func_t;
typedef std::function<bool (ResultT*)> test_func_t;
typedef std::function<void (void)> destroy_func_t;

private:
func_t _func;
ResultT _value;
bool _ready = false;
bool _has_func = false;
get_func_t _get_func;
test_func_t _test_func;
destroy_func_t _destroy_func;
ResultT _value;
bool _ready = false;

public:
// For ostream output
@@ -33,55 +36,71 @@ class Future
const Future<ResultT_> & future);

public:

Future()
: _ready(false),
_has_func(false)
: _ready(false)
{ }

Future(ResultT & result)
: _value(result),
_ready(true)
{ }

Future(const func_t & func)
: _func(func),
_ready(false),
_has_func(true)
Future(const get_func_t & func)
: _get_func(func)
{ }

Future(
const self_t & other)
: _func(other._func),
_value(other._value),
_ready(other._ready),
_has_func(other._has_func)
const get_func_t & get_func,
const test_func_t & test_func)
: _get_func(get_func),
_test_func(test_func)
{ }

Future<ResultT> & operator=(const self_t & other)
{
if (this != &other) {
_func = other._func;
_value = other._value;
_ready = other._ready;
_has_func = other._has_func;
Future(
const get_func_t & get_func,
const test_func_t & test_func,
const destroy_func_t & destroy_func)
: _get_func(get_func),
_test_func(test_func),
_destroy_func(destroy_func)
{ }

Future(const self_t& other) = delete;
Future(self_t&& other) = default;

~Future() {
if (_destroy_func) {
_destroy_func();
}
return *this;
}

/// copy-assignment is not permitted
Future<ResultT> & operator=(const self_t& other) = delete;
Future<ResultT> & operator=(self_t&& other) = default;

void wait()
{
DASH_LOG_TRACE_VAR("Future.wait()", _ready);
if (_ready) {
return;
}
if (!_has_func) {
if (!_get_func) {
DASH_LOG_ERROR("Future.wait()", "No function");
DASH_THROW(
dash::exception::RuntimeError,
"Future not initialized with function");
}
_value = _func();
_value = _get_func();
_ready = true;
DASH_LOG_TRACE_VAR("Future.wait >", _ready);
}

bool test() const
bool test()
{
if (!_ready && _test_func) {
_ready = _test_func(&_value);
}
return _ready;
}

578 changes: 192 additions & 386 deletions dash/include/dash/algorithm/Copy.h

Large diffs are not rendered by default.

98 changes: 92 additions & 6 deletions dash/test/algorithm/CopyTest.cc
Original file line number Diff line number Diff line change
@@ -414,7 +414,7 @@ TEST_F(CopyTest, BlockingLocalToGlobalBlock)
array.barrier();
}

TEST_F(CopyTest, AsyncLocalToGlobPtr)
TEST_F(CopyTest, AsyncLocalToGlobPtrWait)
{
// Copy all elements contained in a single, continuous block.
const int num_elem_per_unit = 5;
@@ -443,14 +443,14 @@ TEST_F(CopyTest, AsyncLocalToGlobPtr)

glob_ptr_t gptr_dest = static_cast<glob_ptr_t>(
array.begin() + global_offset);
LOG_MESSAGE("CopyTest.AsyncLocalToGlobPtr: call copy_async");
LOG_MESSAGE("CopyTest.AsyncLocalToGlobPtrWait: call copy_async");

auto copy_fut = dash::copy_async(local_range,
local_range + num_elem_per_unit,
gptr_dest);

// Blocks until remote completion:
LOG_MESSAGE("CopyTest.AsyncLocalToGlobPtr: call fut.wait");
LOG_MESSAGE("CopyTest.AsyncLocalToGlobPtrWait: call fut.wait");
copy_fut.wait();

array.barrier();
@@ -463,6 +463,56 @@ TEST_F(CopyTest, AsyncLocalToGlobPtr)
array.barrier();
}


TEST_F(CopyTest, AsyncLocalToGlobPtrTest)
{
// Copy all elements contained in a single, continuous block.
const int num_elem_per_unit = 5;
size_t num_elem_total = _dash_size * num_elem_per_unit;

// Global target range:
dash::Array<int> array(num_elem_total, dash::BLOCKED);
// Local range to copy:
int local_range[num_elem_per_unit];

// Assign initial values: [ 1000, 1001, 1002, ... 2000, 2001, ... ]
for (auto l = 0; l < num_elem_per_unit; ++l) {
array.local[l] = ((dash::myid() + 1) * 110000) + l;
local_range[l] = ((dash::myid() + 1) * 1000) + l;
}
array.barrier();

// Copy values from local range to remote global range.
// All units (u) copy into block (nblocks-1-u), so unit 0 copies into
// last block.
auto block_offset = (dash::myid() + 1) % dash::size();
auto global_offset = block_offset * num_elem_per_unit;

using glob_it_t = decltype(array.begin());
using glob_ptr_t = typename glob_it_t::pointer;

glob_ptr_t gptr_dest = static_cast<glob_ptr_t>(
array.begin() + global_offset);
LOG_MESSAGE("CopyTest.AsyncLocalToGlobPtrTest: call copy_async");

auto copy_fut = dash::copy_async(local_range,
local_range + num_elem_per_unit,
gptr_dest);

// Blocks until remote completion:
LOG_MESSAGE("CopyTest.AsyncLocalToGlobPtrTest: call fut.test");
while (!copy_fut.test()) {}

array.barrier();

for (auto l = 0; l < num_elem_per_unit; ++l) {
// Compare local buffer and global array dest range:
EXPECT_EQ_U(local_range[l],
static_cast<int>(array[global_offset + l]));
}
array.barrier();
}

TEST_F(CopyTest, BlockingGlobalToLocalSubBlock)
{
// Copy all elements contained in a single, continuous block,
@@ -729,7 +779,7 @@ TEST_F(CopyTest, AsyncGlobalToLocalTiles)
auto req = dash::copy_async(gblock_a.begin(),
gblock_a.end(),
matrix_b_dest);
req_handles.push_back(req);
req_handles.push_back(std::move(req));
dst_pointers.push_back(matrix_b_dest);
}

@@ -743,7 +793,7 @@ TEST_F(CopyTest, AsyncGlobalToLocalTiles)
// To prevent compiler from removing work load loop in optimization:
LOG_MESSAGE("Dummy result: %f", m);

for (auto req : req_handles) {
for (auto& req : req_handles) {
// Wait for completion of async copy operation.
// Returns pointer to final element copied into target range:
value_t * copy_dest_end = req.get();
@@ -769,7 +819,7 @@ TEST_F(CopyTest, AsyncGlobalToLocalTiles)
}
}

TEST_F(CopyTest, AsyncGlobalToLocalBlock)
TEST_F(CopyTest, AsyncGlobalToLocalBlockWait)
{
// Copy all elements contained in a single, continuous block.
const int num_elem_per_unit = 20;
@@ -803,6 +853,42 @@ TEST_F(CopyTest, AsyncGlobalToLocalBlock)
}
}

TEST_F(CopyTest, AsyncGlobalToLocalTest)
{
// Copy all elements contained in a single, continuous block.
const int num_elem_per_unit = 20;
size_t num_elem_total = _dash_size * num_elem_per_unit;

dash::Array<int> array(num_elem_total, dash::BLOCKED);

EXPECT_EQ_U(num_elem_per_unit, array.local.size());
EXPECT_EQ_U(num_elem_per_unit, array.lsize());

// Assign initial values: [ 1000, 1001, 1002, ... 2000, 2001, ... ]
for (auto l = 0; l < num_elem_per_unit; ++l) {
array.local[l] = ((dash::myid() + 1) * 1000) + l;
}
array.barrier();

// Local range to store copy:
int local_copy[num_elem_per_unit];

// Copy values from global range to local memory.
// All units copy first block, so unit 0 tests local-to-local copying.
auto dest_end = dash::copy_async(array.begin(),
array.begin() + num_elem_per_unit,
local_copy);

// spin until the transfer is completed
while (!dest_end.test()) { }

EXPECT_EQ_U(num_elem_per_unit, dest_end.get() - local_copy);
for (auto l = 0; l < num_elem_per_unit; ++l) {
EXPECT_EQ_U(static_cast<int>(array[l]),
local_copy[l]);
}
}

#if 0
// TODO
TEST_F(CopyTest, AsyncAllToLocalVector)