diff --git a/parsec/interfaces/ptg/ptg-compiler/jdf2c.c b/parsec/interfaces/ptg/ptg-compiler/jdf2c.c index 37f7e5f10..e469d147f 100644 --- a/parsec/interfaces/ptg/ptg-compiler/jdf2c.c +++ b/parsec/interfaces/ptg/ptg-compiler/jdf2c.c @@ -5024,6 +5024,15 @@ jdf_generate_arena_string_from_datatype(string_arena_t *sa, string_arena_free(sa2); } +static int +jdf_datatype_is_auto(jdf_datatransfer_type_t datatype) +{ + if( (JDF_VAR == datatype.type->op) || (JDF_STRING == datatype.type->op) ) { + return (0 == strcmp(datatype.type->jdf_var, "AUTO")); + } + return 0; +} + static void jdf_generate_code_fillup_datatypes(string_arena_t * sa_tmp_arena, string_arena_t * sa_arena, string_arena_t * sa_tmp_type_src, string_arena_t * sa_type_src, @@ -6323,19 +6332,32 @@ jdf_generate_code_datatype_lookup(const jdf_t *jdf, string_arena_init(sa_tmp_arena); string_arena_init(sa_tmp_type); string_arena_init(sa_temp); - jdf_generate_arena_string_from_datatype(sa_temp, dl->datatype_remote); - string_arena_add_string(sa_tmp_arena, " %s->arena ", string_arena_get_string(sa_temp)); - + if( jdf_datatype_is_auto(dl->datatype_remote) ) { + string_arena_add_string(sa_tmp_arena, "PARSEC_REMOTE_DEP_AUTO_ALLOC"); + } else { + jdf_generate_arena_string_from_datatype(sa_temp, dl->datatype_remote); + string_arena_add_string(sa_tmp_arena, " %s->arena ", string_arena_get_string(sa_temp)); + } if( NULL == dl->datatype_remote.layout ) { /* no specific layout */ + if( 0 == strlen(string_arena_get_string(sa_temp)) ) { + jdf_generate_arena_string_from_datatype(sa_temp, dl->datatype_remote); + } string_arena_add_string(sa_tmp_type, "%s->opaque_dtt", string_arena_get_string(sa_temp)); } else { string_arena_add_string(sa_tmp_type, "%s", dump_expr((void**)dl->datatype_remote.layout, &info)); } string_arena_init(sa_tmp_count); - string_arena_add_string(sa_tmp_count, "%s", dump_expr((void**)dl->datatype_remote.count, &info)); + if( jdf_datatype_is_auto(dl->datatype_remote) ) { + string_arena_add_string(sa_tmp_count, "data->remote.src_count"); + } else { + string_arena_add_string(sa_tmp_count, "%s", dump_expr((void**)dl->datatype_remote.count, &info)); + } string_arena_init(sa_tmp_displ); - string_arena_add_string(sa_tmp_displ, "%s", dump_expr((void**)dl->datatype_remote.displ, &info)); - + if( jdf_datatype_is_auto(dl->datatype_remote) ) { + string_arena_add_string(sa_tmp_displ, "0"); + } else { + string_arena_add_string(sa_tmp_displ, "%s", dump_expr((void**)dl->datatype_remote.displ, &info)); + } jdf_generate_code_fillup_datatypes(sa_tmp_arena, sa_arena, sa_tmp_type, sa_type, sa_tmp_displ, sa_displ, @@ -7796,11 +7818,15 @@ jdf_generate_code_iterate_successors_or_predecessors(const jdf_t *jdf, string_arena_add_string(sa_tmp_displ_r, "0"); } else { string_arena_init(sa_temp); - jdf_generate_arena_string_from_datatype(sa_temp, dl->datatype_remote); - string_arena_add_string(sa_tmp_arena_r, "%s->arena", string_arena_get_string(sa_temp)); - - /* We select the dtt considering [type_remote] on dependency or dtt on datacopy. - */ + if( jdf_datatype_is_auto(dl->datatype_remote) ) { + string_arena_add_string(sa_tmp_arena_r, "PARSEC_REMOTE_DEP_AUTO_ALLOC"); + string_arena_add_string(sa_tmp_count_r, "data.remote.src_count"); + string_arena_add_string(sa_tmp_displ_r, "0"); + } else { + jdf_generate_arena_string_from_datatype(sa_temp, dl->datatype_remote); + string_arena_add_string(sa_tmp_arena_r, "%s->arena", string_arena_get_string(sa_temp)); + } + /* Keep type information for AUTO; only count comes from runtime header. */ if( DEP_UNDEFINED_DATATYPE == jdf_dep_undefined_type(dl->datatype_remote) /* undefined type on dependency */ && (NULL == dl->datatype_remote.layout) ){ /* User didn't specify a custom layout*/ /* using data dtt or using PARSEC_DATATYPE_NULL if we don't have a data (this is the case @@ -7809,14 +7835,19 @@ jdf_generate_code_iterate_successors_or_predecessors(const jdf_t *jdf, string_arena_add_string(sa_tmp_type_r, "(data.data != NULL ? data.data->dtt : PARSEC_DATATYPE_NULL )"); } else { if( NULL == dl->datatype_remote.layout ){ /* User didn't specify a custom layout*/ + if( 0 == strlen(string_arena_get_string(sa_temp)) ) { + jdf_generate_arena_string_from_datatype(sa_temp, dl->datatype_remote); + } string_arena_add_string(sa_tmp_type_r, "%s->opaque_dtt", string_arena_get_string(sa_temp)); } else { string_arena_add_string(sa_tmp_type_r, "%s", dump_expr((void**)dl->datatype_remote.layout, &info)); } } - assert( dl->datatype_remote.count != NULL ); - string_arena_add_string(sa_tmp_count_r, "%s", dump_expr((void**)dl->datatype_remote.count, &info)); - string_arena_add_string(sa_tmp_displ_r, "%s", dump_expr((void**)dl->datatype_remote.displ, &info)); + if( !jdf_datatype_is_auto(dl->datatype_remote) ) { + assert( dl->datatype_remote.count != NULL ); + string_arena_add_string(sa_tmp_count_r, "%s", dump_expr((void**)dl->datatype_remote.count, &info)); + string_arena_add_string(sa_tmp_displ_r, "%s", dump_expr((void**)dl->datatype_remote.displ, &info)); + } } string_arena_add_string(sa_datatype," if (action_mask & (PARSEC_ACTION_RESHAPE_ON_RELEASE | PARSEC_ACTION_RESHAPE_REMOTE_ON_RELEASE | PARSEC_ACTION_SEND_REMOTE_DEPS)) {\n"); diff --git a/parsec/remote_dep.c b/parsec/remote_dep.c index 1f6920cac..ddc21d804 100644 --- a/parsec/remote_dep.c +++ b/parsec/remote_dep.c @@ -202,10 +202,12 @@ inline void remote_deps_free(parsec_remote_deps_t* deps) #if defined(PARSEC_DEBUG_PARANOID) deps->output[k].data.data = NULL; deps->output[k].data.local.arena = NULL; + deps->output[k].data.local.device_index = 0; deps->output[k].data.local.src_displ = deps->output[k].data.local.dst_displ = 0xFFFFFFFF; deps->output[k].data.local.src_datatype = deps->output[k].data.local.dst_datatype = PARSEC_DATATYPE_NULL; deps->output[k].data.local.src_count = deps->output[k].data.local.dst_count = -1; deps->output[k].data.remote.arena = NULL; + deps->output[k].data.remote.device_index = 0; deps->output[k].data.remote.src_displ = deps->output[k].data.remote.dst_displ = 0xFFFFFFFF; deps->output[k].data.remote.src_datatype = deps->output[k].data.remote.dst_datatype = PARSEC_DATATYPE_NULL; deps->output[k].data.remote.src_count = deps->output[k].data.remote.dst_count = -1; diff --git a/parsec/remote_dep.h b/parsec/remote_dep.h index 931053b7d..8d4a8eaf4 100644 --- a/parsec/remote_dep.h +++ b/parsec/remote_dep.h @@ -59,6 +59,7 @@ typedef struct remote_dep_wire_get_s { struct parsec_dep_type_description_s { struct parsec_arena_s *arena; + int16_t device_index; parsec_datatype_t src_datatype; uint64_t src_count; int64_t src_displ; @@ -264,6 +265,9 @@ int parsec_remote_dep_propagate(parsec_execution_stream_t* es, #define parsec_set_CTL_dep(PDEP_DATA_DESC)\ { (PDEP_DATA_DESC)->data = NULL; (PDEP_DATA_DESC)->remote.src_datatype = PARSEC_DATATYPE_NULL; (PDEP_DATA_DESC)->remote.src_count=0; } +/* Marker used by jdf2c for type_remote=AUTO */ +#define PARSEC_REMOTE_DEP_AUTO_ALLOC ((struct parsec_arena_s*)(intptr_t)-1) + /** @} */ diff --git a/parsec/remote_dep_mpi.c b/parsec/remote_dep_mpi.c index a06a2088c..4b0574a86 100644 --- a/parsec/remote_dep_mpi.c +++ b/parsec/remote_dep_mpi.c @@ -15,10 +15,13 @@ #include "parsec/utils/debug.h" #include "parsec/debug_marks.h" #include "parsec/data.h" +#include "parsec/data_internal.h" #include "parsec/papi_sde.h" #include "parsec/interfaces/dtd/insert_function_internal.h" #include "parsec/remote_dep.h" #include "parsec/class/dequeue.h" +#include "parsec/mca/device/device.h" +#include "parsec/mca/device/device_gpu.h" #include "parsec/parsec_binary_profile.h" @@ -50,6 +53,14 @@ static size_t parsec_param_short_limit = RDEP_MSG_SHORT_LIMIT; static int parsec_param_enable_aggregate = 0; parsec_mempool_t *parsec_remote_dep_cb_data_mempool = NULL; +#define PARSEC_REMOTE_DEP_AUTO_BUCKET_MIN_SHIFT_LIMIT 3 /* 8 bytes */ +#define PARSEC_REMOTE_DEP_AUTO_BUCKET_MAX_SHIFT_LIMIT 24 /* 16 MiB */ +#define PARSEC_REMOTE_DEP_AUTO_BUCKET_COUNT (PARSEC_REMOTE_DEP_AUTO_BUCKET_MAX_SHIFT_LIMIT - PARSEC_REMOTE_DEP_AUTO_BUCKET_MIN_SHIFT_LIMIT + 1) +static parsec_arena_datatype_t parsec_remote_dep_auto_adts[PARSEC_REMOTE_DEP_AUTO_BUCKET_COUNT]; +static int parsec_remote_dep_auto_adts_initialized = 0; +static int parsec_param_auto_bucket_min_shift = 3; +static int parsec_param_auto_bucket_max_shift = 20; +static int parsec_param_auto_gpu_enable = 0; typedef struct remote_dep_cb_data_s { parsec_list_item_t super; @@ -187,6 +198,9 @@ static void remote_dep_mpi_release_delayed_deps(parsec_execution_stream_t* es, /* Perform a memcpy with datatypes by doing a local sendrecv */ static int remote_dep_nothread_memcpy(parsec_execution_stream_t* es, dep_cmd_item_t *item); +static void remote_dep_auto_fallback_copy_release(parsec_data_copy_t *copy, int device); +static void remote_dep_auto_gpu_memory_release(parsec_data_copy_t *copy, int device); +static int remote_dep_auto_pick_gpu_device(const parsec_task_t *task); int remote_dep_ce_reconfigure(parsec_context_t* context); @@ -209,6 +223,38 @@ static void remote_dep_mpi_params(parsec_context_t* context) { #endif parsec_mca_param_reg_int_name("runtime", "comm_aggregate", "Aggregate multiple dependencies in the same short message (1=true,0=false).", false, false, parsec_param_enable_aggregate, &parsec_param_enable_aggregate); + parsec_mca_param_reg_int_name("runtime", "comm_auto_bucket_min_shift", + "Minimum log2 bucket size for type_remote=AUTO receive pooling.", + false, false, + parsec_param_auto_bucket_min_shift, + &parsec_param_auto_bucket_min_shift); + parsec_mca_param_reg_int_name("runtime", "comm_auto_bucket_max_shift", + "Maximum log2 bucket size for type_remote=AUTO receive pooling.", + false, false, + parsec_param_auto_bucket_max_shift, + &parsec_param_auto_bucket_max_shift); + parsec_mca_param_reg_int_name("runtime", "comm_auto_gpu_enable", + "Enable GPU-device allocation for type_remote=AUTO rendezvous receives (1=true,0=false).", + false, false, + parsec_param_auto_gpu_enable, + &parsec_param_auto_gpu_enable); + + if( parsec_param_auto_bucket_min_shift < PARSEC_REMOTE_DEP_AUTO_BUCKET_MIN_SHIFT_LIMIT ) { + parsec_warning("runtime_comm_auto_bucket_min_shift=%d is too small, clamped to %d", + parsec_param_auto_bucket_min_shift, PARSEC_REMOTE_DEP_AUTO_BUCKET_MIN_SHIFT_LIMIT); + parsec_param_auto_bucket_min_shift = PARSEC_REMOTE_DEP_AUTO_BUCKET_MIN_SHIFT_LIMIT; + } + if( parsec_param_auto_bucket_max_shift > PARSEC_REMOTE_DEP_AUTO_BUCKET_MAX_SHIFT_LIMIT ) { + parsec_warning("runtime_comm_auto_bucket_max_shift=%d is too large, clamped to %d", + parsec_param_auto_bucket_max_shift, PARSEC_REMOTE_DEP_AUTO_BUCKET_MAX_SHIFT_LIMIT); + parsec_param_auto_bucket_max_shift = PARSEC_REMOTE_DEP_AUTO_BUCKET_MAX_SHIFT_LIMIT; + } + if( parsec_param_auto_bucket_min_shift > parsec_param_auto_bucket_max_shift ) { + parsec_warning("Invalid AUTO bucket range min=%d max=%d, resetting to defaults min=3 max=20", + parsec_param_auto_bucket_min_shift, parsec_param_auto_bucket_max_shift); + parsec_param_auto_bucket_min_shift = 3; + parsec_param_auto_bucket_max_shift = 20; + } } int @@ -571,10 +617,174 @@ void parsec_remote_dep_memcpy(parsec_execution_stream_t* es, parsec_dequeue_push_back(&dep_cmd_queue, (parsec_list_item_t*) item); } +static void remote_dep_auto_fallback_copy_release(parsec_data_copy_t *copy, int device) +{ + (void)device; + free(copy->device_private); + copy->device_private = NULL; +} + +static void remote_dep_auto_gpu_memory_release(parsec_data_copy_t *copy, int device) +{ + parsec_device_module_t *dev = parsec_mca_device_get((uint32_t)copy->device_index); + if( NULL != dev && PARSEC_DEV_IS_GPU(dev->type) ) { + parsec_device_gpu_module_t *gpu = (parsec_device_gpu_module_t*)dev; + (void)gpu->set_device(gpu); + (void)gpu->memory_free(gpu, copy->device_private); + } + (void)device; + copy->device_private = NULL; +} + +static int remote_dep_auto_pick_gpu_device(const parsec_task_t *task) +{ + parsec_data_ref_t ref; + parsec_data_t *data = NULL; + parsec_device_module_t *dev = NULL; + + if( 0 == parsec_param_auto_gpu_enable ) { + return 0; + } + if( NULL == task || NULL == task->task_class || NULL == task->task_class->data_affinity ) { + return 0; + } + if( 0 == task->task_class->data_affinity(task, &ref) ) { + return 0; + } + if( NULL == ref.dc || NULL == ref.dc->data_of_key ) { + return 0; + } + data = ref.dc->data_of_key(ref.dc, ref.key); + if( NULL == data ) { + return 0; + } + if( data->preferred_device >= 0 ) { + dev = parsec_mca_device_get((uint32_t)data->preferred_device); + if( NULL != dev && PARSEC_DEV_IS_GPU(dev->type) ) { + return dev->device_index; + } + } + if( data->owner_device >= 0 ) { + dev = parsec_mca_device_get((uint32_t)data->owner_device); + if( NULL != dev && PARSEC_DEV_IS_GPU(dev->type) ) { + return dev->device_index; + } + } + return 0; +} + +static inline parsec_data_copy_t* +remote_dep_auto_copy_allocate_fallback(parsec_dep_type_description_t* data) +{ + parsec_data_t* original = parsec_data_new(); + parsec_data_copy_t* dc; + size_t buffer_size = (size_t)data->dst_count; + if( NULL == original || 0 == buffer_size ) { + if( NULL != original ) PARSEC_OBJ_RELEASE(original); + return NULL; + } + + dc = parsec_data_copy_new(original, 0, parsec_datatype_int8_t, + PARSEC_DATA_FLAG_PARSEC_MANAGED | PARSEC_DATA_FLAG_PARSEC_OWNED); + PARSEC_OBJ_RELEASE(original); + if( NULL == dc ) { + return NULL; + } + + dc->device_private = malloc(buffer_size); + if( NULL == dc->device_private ) { + PARSEC_OBJ_RELEASE(dc); + return NULL; + } + dc->coherency_state = PARSEC_DATA_COHERENCY_EXCLUSIVE; + dc->release_cb = remote_dep_auto_fallback_copy_release; + dc->original->nb_elts = buffer_size; + return dc; +} + +static inline parsec_data_copy_t* +remote_dep_auto_copy_allocate_gpu(parsec_dep_type_description_t* data) +{ + parsec_data_t* original = parsec_data_new(); + parsec_data_copy_t* dc; + parsec_device_module_t *dev = parsec_mca_device_get((uint32_t)data->device_index); + size_t bytes = (size_t)data->dst_count; + + if( NULL == original || NULL == dev || !PARSEC_DEV_IS_GPU(dev->type) || 0 == bytes ) { + if( NULL != original ) PARSEC_OBJ_RELEASE(original); + return NULL; + } + + parsec_device_gpu_module_t *gpu = (parsec_device_gpu_module_t*)dev; + dc = parsec_data_copy_new(original, (uint8_t)data->device_index, parsec_datatype_int8_t, + PARSEC_DATA_FLAG_PARSEC_MANAGED | PARSEC_DATA_FLAG_PARSEC_OWNED); + PARSEC_OBJ_RELEASE(original); + if( NULL == dc ) { + return NULL; + } + if( PARSEC_SUCCESS != gpu->set_device(gpu) || + PARSEC_SUCCESS != gpu->memory_allocate(gpu, bytes, &dc->device_private) ) { + PARSEC_OBJ_RELEASE(dc); + return NULL; + } + dc->coherency_state = PARSEC_DATA_COHERENCY_EXCLUSIVE; + dc->release_cb = remote_dep_auto_gpu_memory_release; + dc->original->nb_elts = bytes; + return dc; +} + +static inline parsec_data_copy_t* +remote_dep_auto_copy_allocate(parsec_dep_type_description_t* data) +{ + parsec_data_copy_t* dc; + size_t needed = (size_t)data->dst_count; + int shift = 0; + int idx = -1; + + if( !parsec_remote_dep_auto_adts_initialized || 0 == needed ) { + return NULL; + } + if( data->device_index > 0 ) { + dc = remote_dep_auto_copy_allocate_gpu(data); + if( NULL != dc ) { + return dc; + } + } + + shift = parsec_param_auto_bucket_min_shift; + while( shift <= parsec_param_auto_bucket_max_shift ) { + if( needed <= (((size_t)1) << shift) ) { + idx = shift - PARSEC_REMOTE_DEP_AUTO_BUCKET_MIN_SHIFT_LIMIT; + break; + } + shift++; + } + + if( idx < 0 || idx >= PARSEC_REMOTE_DEP_AUTO_BUCKET_COUNT || + NULL == parsec_remote_dep_auto_adts[idx].arena ) { + return remote_dep_auto_copy_allocate_fallback(data); + } + + dc = parsec_arena_get_copy(parsec_remote_dep_auto_adts[idx].arena, + 1, 0, parsec_datatype_int8_t); + if( NULL == dc ) { + return remote_dep_auto_copy_allocate_fallback(data); + } + dc->coherency_state = PARSEC_DATA_COHERENCY_EXCLUSIVE; + + PARSEC_DEBUG_VERBOSE(20, parsec_comm_output_stream, + "MPI:\tMalloc AUTO remote tile %p need %zu bucket %zu displ = %" PRIi64, + dc, needed, ((size_t)1) << shift, data->dst_displ); + return dc; +} + static inline parsec_data_copy_t* remote_dep_copy_allocate(parsec_dep_type_description_t* data) { parsec_data_copy_t* dc; + if( PARSEC_REMOTE_DEP_AUTO_ALLOC == data->arena ) { + return remote_dep_auto_copy_allocate(data); + } if( NULL == data->arena ) { assert(0 == data->dst_count); return NULL; @@ -760,6 +970,16 @@ remote_dep_mpi_retrieve_datatype(parsec_execution_stream_t *eu, // output->data = *out_data; // } + if( PARSEC_REMOTE_DEP_AUTO_ALLOC == output->data.remote.arena ) { + uint64_t remote_size = output->data.remote.src_count; + output->data.remote.src_count = remote_size; + output->data.remote.dst_count = remote_size; + output->data.remote.src_displ = 0; + output->data.remote.dst_displ = 0; + output->data.remote.device_index = remote_dep_auto_pick_gpu_device(newcontext); + } else { + output->data.remote.device_index = 0; + } parsec_data_t* data_arena = is_read_only(oldcontext, dep); if(NULL == data_arena) { @@ -1852,9 +2072,11 @@ static void remote_dep_mpi_recv_activate(parsec_execution_stream_t* es, /* Check if the data is short-embedded in the activate */ if((length - (*position)) >= (int)data_sizes[ds_idx]) { + parsec_dep_type_description_t host_desc = *type_desc; + host_desc.device_index = 0; /* eager unpack targets host memory */ assert(NULL == data_desc->data); /* we do not support in-place tiles now, make sure it doesn't happen yet */ if(NULL == data_desc->data) { - data_desc->data = remote_dep_copy_allocate(type_desc); + data_desc->data = remote_dep_copy_allocate(&host_desc); } #ifndef PARSEC_PROF_DRY_DEP PARSEC_DEBUG_VERBOSE(10, parsec_comm_output_stream, @@ -2091,24 +2313,55 @@ static void remote_dep_mpi_get_start(parsec_execution_stream_t* es, */ parsec_ce_mem_reg_handle_t receiver_memory_handle; size_t receiver_memory_handle_size; + int rc; if(parsec_ce.capabilites.supports_noncontiguous_datatype) { - parsec_ce.mem_register(PARSEC_DATA_COPY_GET_PTR(deps->output[k].data.data), PARSEC_MEM_TYPE_NONCONTIGUOUS, - nbdtt, dtt, - -1, - &receiver_memory_handle, &receiver_memory_handle_size); + rc = parsec_ce.mem_register(PARSEC_DATA_COPY_GET_PTR(deps->output[k].data.data), PARSEC_MEM_TYPE_NONCONTIGUOUS, + nbdtt, dtt, + -1, + &receiver_memory_handle, &receiver_memory_handle_size); } else { /* TODO: Implement converter to pack and unpack * register the whole region including the holes because we don't support sparse * registration. */ ptrdiff_t extent, lb; parsec_type_extent(dtt, &lb, &extent); (void)lb; - parsec_ce.mem_register(PARSEC_DATA_COPY_GET_PTR(deps->output[k].data.data), PARSEC_MEM_TYPE_CONTIGUOUS, - -1, parsec_datatype_uint8_t, - nbdtt * extent, - &receiver_memory_handle, &receiver_memory_handle_size); + rc = parsec_ce.mem_register(PARSEC_DATA_COPY_GET_PTR(deps->output[k].data.data), PARSEC_MEM_TYPE_CONTIGUOUS, + -1, parsec_datatype_uint8_t, + nbdtt * extent, + &receiver_memory_handle, &receiver_memory_handle_size); } + if( PARSEC_SUCCESS != rc ) { + if( PARSEC_REMOTE_DEP_AUTO_ALLOC == deps->output[k].data.remote.arena ) { + /* Some comm engines cannot register GPU pointers for this path. + * For AUTO receives only, fallback to host allocation and retry. */ + PARSEC_DATA_COPY_RELEASE(deps->output[k].data.data); + deps->output[k].data.data = NULL; + parsec_dep_type_description_t host_desc = deps->output[k].data.remote; + host_desc.device_index = 0; + deps->output[k].data.data = remote_dep_copy_allocate(&host_desc); + assert(NULL != deps->output[k].data.data); + if(parsec_ce.capabilites.supports_noncontiguous_datatype) { + rc = parsec_ce.mem_register(PARSEC_DATA_COPY_GET_PTR(deps->output[k].data.data), PARSEC_MEM_TYPE_NONCONTIGUOUS, + nbdtt, dtt, + -1, + &receiver_memory_handle, &receiver_memory_handle_size); + } else { + ptrdiff_t extent, lb; + parsec_type_extent(dtt, &lb, &extent); (void)lb; + rc = parsec_ce.mem_register(PARSEC_DATA_COPY_GET_PTR(deps->output[k].data.data), PARSEC_MEM_TYPE_CONTIGUOUS, + -1, parsec_datatype_uint8_t, + nbdtt * extent, + &receiver_memory_handle, &receiver_memory_handle_size); + } + if( PARSEC_SUCCESS != rc ) { + parsec_fatal("Failed to register AUTO receive buffer (k=%d rc=%d)", k, rc); + } + } + /* For non-AUTO receives, preserve previous behavior and let the + * comm engine deal with the registration return code. */ + } # if defined(PARSEC_DEBUG_NOISIER) MPI_Type_get_name(dtt, type_name, &len); @@ -2275,6 +2528,36 @@ remote_dep_ce_init(parsec_context_t* context) PARSEC_OBJ_CLASS(remote_dep_cb_data_t), sizeof(remote_dep_cb_data_t), offsetof(remote_dep_cb_data_t, mempool_owner), 1); + if( !parsec_remote_dep_auto_adts_initialized ) { + for(int shift = parsec_param_auto_bucket_min_shift; + shift <= parsec_param_auto_bucket_max_shift; + shift++) { + int i = shift - PARSEC_REMOTE_DEP_AUTO_BUCKET_MIN_SHIFT_LIMIT; + size_t bucket_size = ((size_t)1) << shift; + rc = parsec_arena_datatype_construct(&parsec_remote_dep_auto_adts[i], + bucket_size, + PARSEC_ARENA_ALIGNMENT_SSE, + parsec_datatype_int8_t); + if( PARSEC_SUCCESS != rc ) { + parsec_warning("[CE] Failed to initialize AUTO receive arena bucket %zu (error %d)\n", + bucket_size, rc); + for(int j = 0; j < PARSEC_REMOTE_DEP_AUTO_BUCKET_COUNT; j++) { + if( NULL != parsec_remote_dep_auto_adts[j].arena ) { + PARSEC_OBJ_RELEASE(parsec_remote_dep_auto_adts[j].arena); + parsec_remote_dep_auto_adts[j].arena = NULL; + parsec_remote_dep_auto_adts[j].opaque_dtt = PARSEC_DATATYPE_NULL; + } + } + parsec_mempool_destruct(parsec_remote_dep_cb_data_mempool); + free(parsec_remote_dep_cb_data_mempool); parsec_remote_dep_cb_data_mempool = NULL; + parsec_ce.tag_unregister(PARSEC_CE_REMOTE_DEP_GET_DATA_TAG); + parsec_ce.tag_unregister(PARSEC_CE_REMOTE_DEP_ACTIVATE_TAG); + parsec_comm_engine_fini(&parsec_ce); + return rc; + } + } + parsec_remote_dep_auto_adts_initialized = 1; + } /* Lazy or delayed initializations */ remote_dep_mpi_initialize_execution_stream(context); return PARSEC_SUCCESS; @@ -2298,6 +2581,16 @@ int remote_dep_ce_fini(parsec_context_t* context) free(parsec_mpi_same_pos_items); parsec_mpi_same_pos_items = NULL; parsec_mpi_same_pos_items_size = 0; } + if( parsec_remote_dep_auto_adts_initialized ) { + for(int i = 0; i < PARSEC_REMOTE_DEP_AUTO_BUCKET_COUNT; i++) { + if( NULL != parsec_remote_dep_auto_adts[i].arena ) { + PARSEC_OBJ_RELEASE(parsec_remote_dep_auto_adts[i].arena); + parsec_remote_dep_auto_adts[i].arena = NULL; + parsec_remote_dep_auto_adts[i].opaque_dtt = PARSEC_DATATYPE_NULL; + } + } + parsec_remote_dep_auto_adts_initialized = 0; + } PARSEC_OBJ_DESTRUCT(&dep_activates_fifo); PARSEC_OBJ_DESTRUCT(&dep_activates_noobj_fifo); diff --git a/tests/dsl/ptg/CMakeLists.txt b/tests/dsl/ptg/CMakeLists.txt index ed03f549f..252b1207e 100644 --- a/tests/dsl/ptg/CMakeLists.txt +++ b/tests/dsl/ptg/CMakeLists.txt @@ -14,6 +14,12 @@ endif(PARSEC_HAVE_RANDOM) parsec_addtest_executable(C complex_deps) target_ptg_sources(complex_deps PRIVATE "complex_deps.jdf") +parsec_addtest_executable(C receive_auto) +target_ptg_sources(receive_auto PRIVATE "receive_auto.jdf") + +parsec_addtest_executable(C receive_auto_mixed) +target_ptg_sources(receive_auto_mixed PRIVATE "receive_auto_mixed.jdf") + add_subdirectory(branching) add_subdirectory(choice) add_subdirectory(controlgather) diff --git a/tests/dsl/ptg/receive_auto.jdf b/tests/dsl/ptg/receive_auto.jdf new file mode 100644 index 000000000..1a25e9f29 --- /dev/null +++ b/tests/dsl/ptg/receive_auto.jdf @@ -0,0 +1,234 @@ +extern "C" %{ +/* + * Copyright (c) 2026 + */ + +#include +#if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) +#include +#endif +#include +#include +#include +#include + +#include "parsec.h" +#include "parsec/data_dist/matrix/two_dim_rectangle_cyclic.h" +#include "parsec/mca/device/device.h" + +static int g_errors = 0; +static int g_gpu_hits = 0; +static unsigned long long g_gpu_mask = 0ULL; + +static int count_bits_ull(unsigned long long x) +{ + int c = 0; + while( x ) { + c += (int)(x & 1ULL); + x >>= 1; + } + return c; +} + +static int count_visible_cuda_devices(void) +{ + int nb = 0; +#if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) + for(uint32_t i = 0; i < parsec_nb_devices; i++) { + parsec_device_module_t *device = parsec_mca_device_get(i); + if( NULL != device && PARSEC_DEV_CUDA == device->type ) { + nb++; + } + } +#endif + return nb; +} +%} + +descA [type = "parsec_matrix_block_cyclic_t*"] +NT [type = int] +MAX_BYTES [type = int] + +SEND(k) +k = 0 .. NT-2 +next = %{ return k + 1; %} +msg_bytes = %{ return 8 + (k % (MAX_BYTES-8)); %} + +: descA(k, 0) + +RW A <- descA(k, 0) + -> A RECV(next) [ type_remote = DEFAULT + layout_remote = parsec_datatype_int8_t + count_remote = %{ return msg_bytes; %} + displ_remote = 0 ] +BODY [type=CUDA] +{ +#if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) + if( k < 2 ) { + fprintf(stderr, "receive_auto SEND CUDA BODY: k=%d msg_bytes=%d gpu=%d\n", + k, msg_bytes, cuda_device->cuda_index); + } + cudaMemset(A, (int)(k & 0xff), (size_t)msg_bytes); +#endif +} +END + +BODY +{ + uint8_t *p = (uint8_t*)A; + memset(p, (int)(k & 0xff), (size_t)msg_bytes); +} +END + +RECV(k) +k = 1 .. NT-1 +prev = %{ return k - 1; %} +expected = %{ return 8 + (prev % (MAX_BYTES-8)); %} + +: descA(k, 0) + +READ A <- A SEND(prev) [ type_remote = AUTO ] +BODY [type=CUDA] +{ +#if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) + if( prev < 2 ) { + fprintf(stderr, "receive_auto CUDA BODY: rank task prev=%d expected=%d gpu=%d\n", + prev, expected, cuda_device->cuda_index); + } + if( cuda_device->cuda_index < 64 ) { + __sync_fetch_and_or(&g_gpu_mask, (1ULL << cuda_device->cuda_index)); + } + uint8_t first = 0, last = 0; + cudaMemcpy(&first, A, 1, cudaMemcpyDeviceToHost); + cudaMemcpy(&last, ((uint8_t*)A) + expected - 1, 1, cudaMemcpyDeviceToHost); + if( first != (uint8_t)(prev & 0xff) || + last != (uint8_t)(prev & 0xff) ) { + g_errors++; + } + g_gpu_hits++; +#endif +} +END + +BODY +{ + uint8_t *p = (uint8_t*)A; + for(int i = 0; i < expected; i++) { + if( p[i] != (uint8_t)(prev & 0xff) ) { + g_errors++; + break; + } + } +} +END + +extern "C" %{ +int main(int argc, char **argv) +{ + parsec_context_t *parsec; + parsec_receive_auto_taskpool_t *tp; + parsec_matrix_block_cyclic_t descA; + int rank = 0, world = 1; + int nt = 64; + int max_bytes = 256; + int rc; + int global_errors = 0; + int global_gpu_hits = 0; + int local_visible_cuda = 0; + int global_visible_cuda = 0; + int global_distinct_gpus = 0; + unsigned long long global_gpu_mask = 0ULL; + + int pargc = 0; + char **pargv = NULL; + for(int i = 1; i < argc; i++) { + if( 0 == strncmp(argv[i], "--", 2) ) { + pargc = argc - i; + pargv = argv + i; + break; + } + if( 0 == strncmp(argv[i], "-n=", 3) ) { + nt = strtol(argv[i] + 3, NULL, 10); + continue; + } + if( 0 == strncmp(argv[i], "-b=", 3) ) { + max_bytes = strtol(argv[i] + 3, NULL, 10); + continue; + } + } + if( nt < 2 ) nt = 2; + if( max_bytes < 16 ) max_bytes = 16; + +#ifdef PARSEC_HAVE_MPI + { + int provided; + MPI_Init_thread(NULL, NULL, MPI_THREAD_SERIALIZED, &provided); + } + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &world); +#endif + + parsec = parsec_init(-1, &pargc, &pargv); + assert(NULL != parsec); + local_visible_cuda = count_visible_cuda_devices(); + + parsec_matrix_block_cyclic_init(&descA, PARSEC_MATRIX_BYTE, PARSEC_MATRIX_TILE, + rank, max_bytes, 1, nt * max_bytes, 1, + 0, 0, nt * max_bytes, 1, + 1, world, 1, 1, 0, 0); + descA.mat = parsec_data_allocate(descA.super.nb_local_tiles * + descA.super.bsiz * + parsec_datadist_getsizeoftype(PARSEC_MATRIX_BYTE)); + assert(NULL != descA.mat); + + tp = parsec_receive_auto_new(&descA, nt, max_bytes); + assert(NULL != tp); + parsec_add2arena_rect(&tp->arenas_datatypes[PARSEC_receive_auto_DEFAULT_ADT_IDX], + parsec_datatype_int8_t, + descA.super.mb, descA.super.nb, descA.super.mb); + + rc = parsec_context_add_taskpool(parsec, &tp->super); + PARSEC_CHECK_ERROR(rc, "parsec_context_add_taskpool"); + rc = parsec_context_start(parsec); + PARSEC_CHECK_ERROR(rc, "parsec_context_start"); + rc = parsec_context_wait(parsec); + PARSEC_CHECK_ERROR(rc, "parsec_context_wait"); + + { + global_errors = g_errors; +#ifdef PARSEC_HAVE_MPI + MPI_Allreduce(&g_errors, &global_errors, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD); + MPI_Allreduce(&g_gpu_hits, &global_gpu_hits, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD); + MPI_Allreduce(&g_gpu_mask, &global_gpu_mask, 1, MPI_UNSIGNED_LONG_LONG, MPI_BOR, MPI_COMM_WORLD); + MPI_Allreduce(&local_visible_cuda, &global_visible_cuda, 1, MPI_INT, MPI_MAX, MPI_COMM_WORLD); +#else + global_gpu_hits = g_gpu_hits; + global_gpu_mask = g_gpu_mask; + global_visible_cuda = local_visible_cuda; +#endif + global_distinct_gpus = count_bits_ull(global_gpu_mask); + } + + parsec_taskpool_free(&tp->super); + free(descA.mat); + parsec_fini(&parsec); + +#ifdef PARSEC_HAVE_MPI + MPI_Finalize(); +#endif + if( global_errors ) { + fprintf(stderr, "receive_auto: data validation failed (%d global errors)\n", global_errors); + return EXIT_FAILURE; + } + if( global_visible_cuda >= 2 && global_distinct_gpus < 2 ) { + fprintf(stderr, "receive_auto: multi-GPU validation failed (visible_cuda=%d distinct_used=%d mask=0x%llx)\n", + global_visible_cuda, global_distinct_gpus, (unsigned long long)global_gpu_mask); + return EXIT_FAILURE; + } + if( rank == 0 ) { + printf("receive_auto: PASS (nt=%d, max_bytes=%d, gpu_hits=%d, distinct_gpus=%d, visible_cuda=%d)\n", + nt, max_bytes, global_gpu_hits, global_distinct_gpus, global_visible_cuda); + } + return EXIT_SUCCESS; +} +%} diff --git a/tests/dsl/ptg/receive_auto_mixed.jdf b/tests/dsl/ptg/receive_auto_mixed.jdf new file mode 100644 index 000000000..6ff963c3c --- /dev/null +++ b/tests/dsl/ptg/receive_auto_mixed.jdf @@ -0,0 +1,259 @@ +extern "C" %{ +/* + * Copyright (c) 2026 + */ + +#include +#if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) +#include +#endif +#include +#include +#include +#include + +#include "parsec.h" +#include "parsec/data_dist/matrix/two_dim_rectangle_cyclic.h" +#include "parsec/mca/device/device.h" + +static int g_errors = 0; +static int g_gpu_hits = 0; +static unsigned long long g_gpu_mask = 0ULL; + +static int count_bits_ull(unsigned long long x) +{ + int c = 0; + while( x ) { + c += (int)(x & 1ULL); + x >>= 1; + } + return c; +} + +static int count_visible_cuda_devices(void) +{ + int nb = 0; +#if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) + for(uint32_t i = 0; i < parsec_nb_devices; i++) { + parsec_device_module_t *device = parsec_mca_device_get(i); + if( NULL != device && PARSEC_DEV_CUDA == device->type ) { + nb++; + } + } +#endif + return nb; +} +%} + +descA [type = "parsec_matrix_block_cyclic_t*"] +NT [type = int] +MAX_BYTES [type = int] + +SEND(k) +k = 0 .. NT-2 +next = %{ return k + 1; %} +msg_bytes = %{ return 8 + (k % (MAX_BYTES-8)); %} + +: descA(k, 0) + +RW A <- descA(k, 0) + -> A RECV_AUTO(next) [ type_remote = DEFAULT + layout_remote = parsec_datatype_int8_t + count_remote = %{ return msg_bytes; %} + displ_remote = 0 ] + -> A RECV_FIXED(next) [ type_remote = DEFAULT + layout_remote = parsec_datatype_int8_t + count_remote = %{ return msg_bytes; %} + displ_remote = 0 ] +BODY [type=CUDA] +{ +#if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) + if( k < 2 ) { + fprintf(stderr, "receive_auto_mixed SEND CUDA BODY: k=%d msg_bytes=%d gpu=%d\n", + k, msg_bytes, cuda_device->cuda_index); + } + cudaMemset(A, (int)(k & 0xff), (size_t)msg_bytes); +#endif +} +END + +BODY +{ + uint8_t *p = (uint8_t*)A; + memset(p, (int)(k & 0xff), (size_t)msg_bytes); +} +END + +RECV_AUTO(k) +k = 1 .. NT-1 +prev = %{ return k - 1; %} +expected = %{ return 8 + (prev % (MAX_BYTES-8)); %} + +: descA(k, 0) + +READ A <- A SEND(prev) [ type_remote = AUTO ] +BODY [type=CUDA] +{ +#if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) + if( prev < 2 ) { + fprintf(stderr, "receive_auto_mixed CUDA BODY: rank task prev=%d expected=%d gpu=%d\n", + prev, expected, cuda_device->cuda_index); + } + if( cuda_device->cuda_index < 64 ) { + __sync_fetch_and_or(&g_gpu_mask, (1ULL << cuda_device->cuda_index)); + } + uint8_t first = 0, last = 0; + cudaMemcpy(&first, A, 1, cudaMemcpyDeviceToHost); + cudaMemcpy(&last, ((uint8_t*)A) + expected - 1, 1, cudaMemcpyDeviceToHost); + if( first != (uint8_t)(prev & 0xff) || + last != (uint8_t)(prev & 0xff) ) { + g_errors++; + } + g_gpu_hits++; +#endif +} +END + +BODY +{ + uint8_t *p = (uint8_t*)A; + for(int i = 0; i < expected; i++) { + if( p[i] != (uint8_t)(prev & 0xff) ) { + g_errors++; + break; + } + } +} +END + +RECV_FIXED(k) +k = 1 .. NT-1 +prev = %{ return k - 1; %} +expected = %{ return 8 + (prev % (MAX_BYTES-8)); %} + +: descA(k, 0) + +READ A <- A SEND(prev) [ type_remote = DEFAULT + layout_remote = parsec_datatype_int8_t + count_remote = %{ return expected; %} + displ_remote = 0 ] +BODY +{ + uint8_t *p = (uint8_t*)A; + for(int i = 0; i < expected; i++) { + if( p[i] != (uint8_t)(prev & 0xff) ) { + g_errors++; + break; + } + } +} +END + +extern "C" %{ +int main(int argc, char **argv) +{ + parsec_context_t *parsec; + parsec_receive_auto_mixed_taskpool_t *tp; + parsec_matrix_block_cyclic_t descA; + int rank = 0, world = 1; + int nt = 64; + int max_bytes = 256; + int rc; + int global_errors = 0; + int global_gpu_hits = 0; + int local_visible_cuda = 0; + int global_visible_cuda = 0; + int global_distinct_gpus = 0; + unsigned long long global_gpu_mask = 0ULL; + + int pargc = 0; + char **pargv = NULL; + for(int i = 1; i < argc; i++) { + if( 0 == strncmp(argv[i], "--", 2) ) { + pargc = argc - i; + pargv = argv + i; + break; + } + if( 0 == strncmp(argv[i], "-n=", 3) ) { + nt = strtol(argv[i] + 3, NULL, 10); + continue; + } + if( 0 == strncmp(argv[i], "-b=", 3) ) { + max_bytes = strtol(argv[i] + 3, NULL, 10); + continue; + } + } + if( nt < 2 ) nt = 2; + if( max_bytes < 16 ) max_bytes = 16; + +#ifdef PARSEC_HAVE_MPI + { + int provided; + MPI_Init_thread(NULL, NULL, MPI_THREAD_SERIALIZED, &provided); + } + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &world); +#endif + + parsec = parsec_init(-1, &pargc, &pargv); + assert(NULL != parsec); + local_visible_cuda = count_visible_cuda_devices(); + + parsec_matrix_block_cyclic_init(&descA, PARSEC_MATRIX_BYTE, PARSEC_MATRIX_TILE, + rank, max_bytes, 1, nt * max_bytes, 1, + 0, 0, nt * max_bytes, 1, + 1, world, 1, 1, 0, 0); + descA.mat = parsec_data_allocate(descA.super.nb_local_tiles * + descA.super.bsiz * + parsec_datadist_getsizeoftype(PARSEC_MATRIX_BYTE)); + assert(NULL != descA.mat); + + tp = parsec_receive_auto_mixed_new(&descA, nt, max_bytes); + assert(NULL != tp); + parsec_add2arena_rect(&tp->arenas_datatypes[PARSEC_receive_auto_mixed_DEFAULT_ADT_IDX], + parsec_datatype_int8_t, + descA.super.mb, descA.super.nb, descA.super.mb); + + rc = parsec_context_add_taskpool(parsec, &tp->super); + PARSEC_CHECK_ERROR(rc, "parsec_context_add_taskpool"); + rc = parsec_context_start(parsec); + PARSEC_CHECK_ERROR(rc, "parsec_context_start"); + rc = parsec_context_wait(parsec); + PARSEC_CHECK_ERROR(rc, "parsec_context_wait"); + + global_errors = g_errors; +#ifdef PARSEC_HAVE_MPI + MPI_Allreduce(&g_errors, &global_errors, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD); + MPI_Allreduce(&g_gpu_hits, &global_gpu_hits, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD); + MPI_Allreduce(&g_gpu_mask, &global_gpu_mask, 1, MPI_UNSIGNED_LONG_LONG, MPI_BOR, MPI_COMM_WORLD); + MPI_Allreduce(&local_visible_cuda, &global_visible_cuda, 1, MPI_INT, MPI_MAX, MPI_COMM_WORLD); +#else + global_gpu_hits = g_gpu_hits; + global_gpu_mask = g_gpu_mask; + global_visible_cuda = local_visible_cuda; +#endif + global_distinct_gpus = count_bits_ull(global_gpu_mask); + + parsec_taskpool_free(&tp->super); + free(descA.mat); + parsec_fini(&parsec); + +#ifdef PARSEC_HAVE_MPI + MPI_Finalize(); +#endif + if( global_errors ) { + fprintf(stderr, "receive_auto_mixed: data validation failed (%d global errors)\n", global_errors); + return EXIT_FAILURE; + } + if( global_visible_cuda >= 2 && global_distinct_gpus < 2 ) { + fprintf(stderr, "receive_auto_mixed: multi-GPU validation failed (visible_cuda=%d distinct_used=%d mask=0x%llx)\n", + global_visible_cuda, global_distinct_gpus, (unsigned long long)global_gpu_mask); + return EXIT_FAILURE; + } + if( rank == 0 ) { + printf("receive_auto_mixed: PASS (nt=%d, max_bytes=%d, gpu_hits=%d, distinct_gpus=%d, visible_cuda=%d)\n", + nt, max_bytes, global_gpu_hits, global_distinct_gpus, global_visible_cuda); + } + return EXIT_SUCCESS; +} +%} diff --git a/tests/dsl/ptg/run-receive-auto-tests.sh b/tests/dsl/ptg/run-receive-auto-tests.sh new file mode 100755 index 000000000..2374eb15c --- /dev/null +++ b/tests/dsl/ptg/run-receive-auto-tests.sh @@ -0,0 +1,196 @@ +#!/usr/bin/env bash + +set -euo pipefail + +BUILD_DIR="build" +NP=2 +NT=64 +MAX_BYTES=1024 +DO_BUILD=1 +LAUNCHER="auto" +REQUIRE_GPUS=0 + +usage() { + echo "Usage: $0 [options]" + echo + echo "Options:" + echo " --build-dir Build directory (default: build)" + echo " --np MPI ranks for distributed runs (default: 2)" + echo " --nt Number of task instances (default: 64)" + echo " --max-bytes Max message size in bytes (default: 1024)" + echo " --launcher Distributed launcher: auto|mpirun|srun (default: auto)" + echo " --require-gpus Fail unless nvidia-smi sees exactly N GPUs" + echo " --no-build Skip build step" + echo " -h, --help Show this help" +} + +while [[ $# -gt 0 ]]; do + case "$1" in + --build-dir) + BUILD_DIR="$2" + shift 2 + ;; + --np) + NP="$2" + shift 2 + ;; + --nt) + NT="$2" + shift 2 + ;; + --max-bytes) + MAX_BYTES="$2" + shift 2 + ;; + --launcher) + LAUNCHER="$2" + shift 2 + ;; + --require-gpus) + REQUIRE_GPUS="$2" + shift 2 + ;; + --no-build) + DO_BUILD=0 + shift + ;; + -h|--help) + usage + exit 0 + ;; + *) + echo "Unknown option: $1" >&2 + usage + exit 1 + ;; + esac +done + +EXE_AUTO="./${BUILD_DIR}/tests/dsl/ptg/receive_auto" +EXE_MIXED="./${BUILD_DIR}/tests/dsl/ptg/receive_auto_mixed" +GPU_MCA_ARGS=(-- --mca runtime_comm_auto_gpu_enable 1 --mca device_load_balance_allow_cpu 0 --mca runtime_comm_short_limit 0) +DIST_CMD=() +DIST_GPU_MCA_ARGS=() + +run_cmd() { + echo + echo ">>> $*" + "$@" +} + +extract_gpu_hits() { + local output="$1" + local hits + hits="$(echo "${output}" | sed -n 's/.*gpu_hits=\([0-9][0-9]*\).*/\1/p' | tail -n1)" + if [[ -z "${hits}" ]]; then + echo "Could not parse gpu_hits from output:" >&2 + echo "${output}" >&2 + return 1 + fi + echo "${hits}" +} + +count_visible_nvidia_gpus() { + if ! command -v nvidia-smi >/dev/null 2>&1; then + echo "nvidia-smi not found in PATH" >&2 + return 1 + fi + nvidia-smi -L | sed '/^[[:space:]]*$/d' | wc -l | tr -d '[:space:]' +} + +run_dist_cmd() { + "${DIST_CMD[@]}" "$@" +} + +if [[ "${LAUNCHER}" == "auto" ]]; then + if [[ -n "${SLURM_JOB_ID:-}" ]] && command -v srun >/dev/null 2>&1; then + LAUNCHER="srun" + else + LAUNCHER="mpirun" + fi +fi + +case "${LAUNCHER}" in + mpirun) + DIST_CMD=(env OMPI_MCA_psec=native PMIX_MCA_psec=native mpirun -np "${NP}") + DIST_GPU_MCA_ARGS=() + ;; + srun) + DIST_CMD=(env OMPI_MCA_psec=native PMIX_MCA_psec=native srun -n "${NP}" --gpus-per-task=1) + # One CUDA device per rank prevents each rank from reserving all GPUs. + DIST_GPU_MCA_ARGS=(--mca device_cuda_enabled 1) + ;; + *) + echo "Invalid launcher '${LAUNCHER}'. Use auto|mpirun|srun." >&2 + exit 1 + ;; +esac + +if [[ "${REQUIRE_GPUS}" -gt 0 ]]; then + VISIBLE_GPUS="$(count_visible_nvidia_gpus)" + if [[ "${VISIBLE_GPUS}" -ne "${REQUIRE_GPUS}" ]]; then + echo "GPU precheck failed: expected ${REQUIRE_GPUS}, found ${VISIBLE_GPUS}" >&2 + exit 1 + fi +fi + +if [[ ${DO_BUILD} -eq 1 ]]; then + run_cmd cmake --build "${BUILD_DIR}" --target receive_auto receive_auto_mixed -j8 +fi + +if [[ ! -x "${EXE_AUTO}" || ! -x "${EXE_MIXED}" ]]; then + echo "Missing test binaries. Expected:" >&2 + echo " ${EXE_AUTO}" >&2 + echo " ${EXE_MIXED}" >&2 + echo "Run without --no-build or check --build-dir." >&2 + exit 1 +fi + +echo +echo "== CPU local runs ==" +run_cmd "${EXE_AUTO}" "-n=${NT}" "-b=${MAX_BYTES}" +run_cmd "${EXE_MIXED}" "-n=${NT}" "-b=${MAX_BYTES}" + +echo +echo "== CPU distributed runs (${LAUNCHER} ranks=${NP}) ==" +run_cmd run_dist_cmd "${EXE_AUTO}" "-n=${NT}" "-b=${MAX_BYTES}" +run_cmd run_dist_cmd "${EXE_MIXED}" "-n=${NT}" "-b=${MAX_BYTES}" + +echo +echo "== GPU-forced local runs ==" +gpu_out_auto="$("${EXE_AUTO}" "-n=${NT}" "-b=${MAX_BYTES}" "${GPU_MCA_ARGS[@]}")" +echo "${gpu_out_auto}" +gpu_hits_auto="$(extract_gpu_hits "${gpu_out_auto}")" +if [[ "${gpu_hits_auto}" -le 0 ]]; then + echo "GPU validation failed for receive_auto: gpu_hits=${gpu_hits_auto}" >&2 + exit 1 +fi + +gpu_out_mixed="$("${EXE_MIXED}" "-n=${NT}" "-b=${MAX_BYTES}" "${GPU_MCA_ARGS[@]}")" +echo "${gpu_out_mixed}" +gpu_hits_mixed="$(extract_gpu_hits "${gpu_out_mixed}")" +if [[ "${gpu_hits_mixed}" -le 0 ]]; then + echo "GPU validation failed for receive_auto_mixed: gpu_hits=${gpu_hits_mixed}" >&2 + exit 1 +fi + +echo +echo "== GPU-forced distributed runs (${LAUNCHER} ranks=${NP}) ==" +gpu_mpi_out_auto="$(run_dist_cmd "${EXE_AUTO}" "-n=${NT}" "-b=${MAX_BYTES}" "${GPU_MCA_ARGS[@]}" "${DIST_GPU_MCA_ARGS[@]}")" +echo "${gpu_mpi_out_auto}" +gpu_mpi_hits_auto="$(extract_gpu_hits "${gpu_mpi_out_auto}")" +if [[ "${gpu_mpi_hits_auto}" -le 0 ]]; then + echo "GPU MPI validation failed for receive_auto: gpu_hits=${gpu_mpi_hits_auto}" >&2 + exit 1 +fi + +gpu_mpi_out_mixed="$(run_dist_cmd "${EXE_MIXED}" "-n=${NT}" "-b=${MAX_BYTES}" "${GPU_MCA_ARGS[@]}" "${DIST_GPU_MCA_ARGS[@]}")" +echo "${gpu_mpi_out_mixed}" +gpu_mpi_hits_mixed="$(extract_gpu_hits "${gpu_mpi_out_mixed}")" +if [[ "${gpu_mpi_hits_mixed}" -le 0 ]]; then + echo "GPU MPI validation failed for receive_auto_mixed: gpu_hits=${gpu_mpi_hits_mixed}" >&2 + exit 1 +fi + +echo +echo "All receive_auto tests passed."