diff --git a/VERSION b/VERSION index 207c18e0e50..8f7f24187f1 100644 --- a/VERSION +++ b/VERSION @@ -6,6 +6,8 @@ # Copyright (c) 2016-2021 IBM Corporation. All rights reserved. # Copyright (c) 2017 Los Alamos National Security, LLC. All rights # reserved. +# Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights +# reserved. # This is the VERSION file for Open MPI, describing the precise # version of Open MPI in this distribution. The various components of @@ -97,6 +99,7 @@ libompitrace_so_version=0:0:0 # OMPI layer libmca_ompi_common_ompio_so_version=0:0:0 libmca_ompi_common_monitoring_so_version=0:0:0 +libmca_ompi_common_ucx_so_version=0:0:0 # OPAL layer libmca_opal_common_cuda_so_version=0:0:0 diff --git a/configure.ac b/configure.ac index cae1ed71b3a..b3a980a0a0f 100644 --- a/configure.ac +++ b/configure.ac @@ -30,6 +30,7 @@ # reserved. # Copyright (c) 2020 Amazon.com, Inc. or its affiliates. # All Rights reserved. +# Copyright (c) 2020 Huawei Technologies Co., Ltd. All Rights reserved. # $COPYRIGHT$ # # Additional copyrights may follow @@ -196,6 +197,7 @@ AC_SUBST(libmca_opal_common_sm_so_version) AC_SUBST(libmca_opal_common_ugni_so_version) AC_SUBST(libmca_ompi_common_ompio_so_version) AC_SUBST(libmca_ompi_common_monitoring_so_version) +AC_SUBST(libmca_ompi_common_ucx_so_version) AC_SUBST(libmca_opal_common_ucx_so_version) # diff --git a/ompi/mca/common/ucx/Makefile.am b/ompi/mca/common/ucx/Makefile.am new file mode 100644 index 00000000000..6aa7dd4c5bf --- /dev/null +++ b/ompi/mca/common/ucx/Makefile.am @@ -0,0 +1,97 @@ +# +# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana +# University Research and Technology +# Corporation. All rights reserved. +# Copyright (c) 2004-2007 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2005 The Regents of the University of California. +# All rights reserved. +# Copyright (c) 2008-2018 University of Houston. All rights reserved. +# Copyright (c) 2016 IBM Corporation. All rights reserved. +# Copyright (c) 2017-2018 Research Organization for Information Science +# and Technology (RIST). All rights reserved. +# Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights +# reserved. +# +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +headers = \ + common_ucx_freelist.h \ + common_ucx_datatype.h \ + common_ucx.h + +sources = \ + common_ucx_datatype.c \ + common_ucx.c + + +# To simplify components that link to this library, we will *always* +# have an output libtool library named libmca__.la -- even +# for case 2) described above (i.e., so there's no conditional logic +# necessary in component Makefile.am's that link to this library). +# Hence, if we're creating a noinst version of this library (i.e., +# case 2), we sym link it to the libmca__.la name +# (libtool will do the Right Things under the covers). See the +# all-local and clean-local rules, below, for how this is effected. + +lib_LTLIBRARIES = +noinst_LTLIBRARIES = +comp_inst = libmca_common_ucx.la +comp_noinst = libmca_common_ucx_noinst.la + +if MCA_BUILD_ompi_common_ucx_DSO +lib_LTLIBRARIES += $(comp_inst) +else +noinst_LTLIBRARIES += $(comp_noinst) +endif + +libmca_common_ucx_la_SOURCES = $(headers) $(sources) +libmca_common_ucx_la_CPPFLAGS = $(common_ucx_CPPFLAGS) +libmca_common_ucx_la_LDFLAGS = \ + -version-info $(libmca_ompi_common_ucx_so_version) \ + $(common_ucx_LDFLAGS) +libmca_common_ucx_la_LIBADD = \ + $(common_ucx_LIBS) \ + $(OPAL_TOP_BUILDDIR)/opal/mca/common/ucx/lib@OPAL_LIB_PREFIX@mca_common_ucx.la +libmca_common_ucx_noinst_la_SOURCES = $(headers) $(sources) +libmca_common_ucx_noinst_la_CPPFLAGS = $(common_ucx_CPPFLAGS) +libmca_common_ucx_noinst_la_LDFLAGS = $(common_ucx_LDFLAGS) +libmca_common_ucx_noinst_la_LIBADD = $(common_ucx_LIBS) \ + $(OPAL_TOP_BUILDDIR)/opal/mca/common/ucx/lib@OPAL_LIB_PREFIX@mca_common_ucx.la + +# Conditionally install the header files + +if WANT_INSTALL_HEADERS +ompidir = $(ompiincludedir)/ompi/mca/common/ucx +ompi_HEADERS = $(headers) +else +ompidir = $(includedir) +endif + + +# These two rules will sym link the "noinst" libtool library filename +# to the installable libtool library filename in the case where we are +# compiling this component statically (case 2), described above). +V=0 +OMPI_V_LN_SCOMP = $(ompi__v_LN_SCOMP_$V) +ompi__v_LN_SCOMP_ = $(ompi__v_LN_SCOMP_$AM_DEFAULT_VERBOSITY) +ompi__v_LN_SCOMP_0 = @echo " LN_S " `basename $(comp_inst)`; + +all-local: + $(OMPI_V_LN_SCOMP) if test -z "$(lib_LTLIBRARIES)"; then \ + rm -f "$(comp_inst)"; \ + $(LN_S) "$(comp_noinst)" "$(comp_inst)"; \ + fi + +clean-local: + if test -z "$(lib_LTLIBRARIES)"; then \ + rm -f "$(comp_inst)"; \ + fi diff --git a/ompi/mca/common/ucx/common_ucx.c b/ompi/mca/common/ucx/common_ucx.c new file mode 100644 index 00000000000..ea751c02950 --- /dev/null +++ b/ompi/mca/common/ucx/common_ucx.c @@ -0,0 +1,262 @@ +/* + * Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "common_ucx.h" + +#include "ompi/communicator/communicator.h" +#include "ompi/mca/topo/base/base.h" +#include "ompi/op/op.h" + +ompi_common_ucx_module_t ompi_common_ucx = {0}; + +static void mca_common_ucx_request_init_common(ompi_request_t* ompi_req, + bool req_persistent, + ompi_request_state_t state, + ompi_request_free_fn_t req_free, + ompi_request_cancel_fn_t req_cancel) +{ + OMPI_REQUEST_INIT(ompi_req, req_persistent); + ompi_req->req_state = state; + ompi_req->req_start = mca_common_ucx_start; + ompi_req->req_free = req_free; + ompi_req->req_cancel = req_cancel; + /* This field is used to attach persistant request to a temporary req. + * Receive (ucp_tag_recv_nb) may call completion callback + * before the field is set. If the field is not NULL then mca_common_ucx_preq_completion() + * will try to complete bogus persistant request. + */ + ompi_req->req_complete_cb_data = NULL; +} + +static int mca_common_ucx_request_cancel(ompi_request_t *req, int flag) +{ + ucp_request_cancel(opal_common_ucx.ucp_worker, req); + return OMPI_SUCCESS; +} + +static void mca_common_ucx_request_init(void *request) +{ + ompi_request_t* ompi_req = request; + OBJ_CONSTRUCT(ompi_req, ompi_request_t); + mca_common_ucx_request_init_common(ompi_req, false, OMPI_REQUEST_ACTIVE, + mca_common_ucx_request_free, + mca_common_ucx_request_cancel); +} + +static void mca_common_ucx_request_cleanup(void *request) +{ + ompi_request_t* ompi_req = request; + ompi_req->req_state = OMPI_REQUEST_INVALID; + OMPI_REQUEST_FINI(ompi_req); + OBJ_DESTRUCT(ompi_req); +} + +int mca_common_ucx_start(size_t count, ompi_request_t** requests) +{ + mca_common_ucx_persistent_request_t *preq; + ompi_request_t *tmp_req; + size_t i; + + for (i = 0; i < count; ++i) { + preq = (mca_common_ucx_persistent_request_t *)requests[i]; + + if ((preq == NULL) || + ((OMPI_REQUEST_PML != preq->ompi.req_type) && + (OMPI_REQUEST_COLL != preq->ompi.req_type))) { + /* Skip irrelevant requests */ + continue; + } + + MCA_COMMON_UCX_ASSERT(preq->ompi.req_state != OMPI_REQUEST_INVALID); + preq->ompi.req_state = OMPI_REQUEST_ACTIVE; + mca_common_ucx_request_reset(&preq->ompi); + + tmp_req = preq->start_cb(preq); + + if (tmp_req == NULL) { + MCA_COMMON_UCX_VERBOSE(8, "completed immediately, completing persistent request %p", + (void*)preq); + preq->ompi.req_status.MPI_ERROR = MPI_SUCCESS; + ompi_request_complete(&preq->ompi, true); + } else if (!UCS_PTR_IS_ERR(tmp_req)) { + if (REQUEST_COMPLETE(tmp_req)) { + /* tmp_req is already completed */ + MCA_COMMON_UCX_VERBOSE(8, "completing persistent request %p", (void*)preq); + mca_common_ucx_persistent_request_complete(preq, tmp_req); + } else { + /* tmp_req would be completed by callback and trigger completion + * of preq */ + MCA_COMMON_UCX_VERBOSE(8, "temporary request %p will complete persistent request %p", + (void*)tmp_req, (void*)preq); + tmp_req->req_complete_cb_data = preq; + preq->tmp_req = tmp_req; + } + } else { + MCA_COMMON_UCX_ERROR("request failed: %s", + ucs_status_string(UCS_PTR_STATUS(tmp_req))); + return OMPI_ERROR; + } + } + + return OMPI_SUCCESS; +} + +static int mca_common_ucx_persistent_request_free(ompi_request_t **rptr) +{ + mca_common_ucx_persistent_request_t* preq = (mca_common_ucx_persistent_request_t*)*rptr; + ompi_request_t *tmp_req = preq->tmp_req; + + preq->ompi.req_state = OMPI_REQUEST_INVALID; + if (tmp_req != NULL) { + mca_common_ucx_persistent_request_detach(preq, tmp_req); + ucp_request_free(tmp_req); + } + + COMMON_UCX_FREELIST_RETURN(&ompi_common_ucx.requests, &preq->ompi.super); + *rptr = MPI_REQUEST_NULL; + return OMPI_SUCCESS; +} + +static int mca_common_ucx_persistent_request_cancel(ompi_request_t *req, int flag) +{ + mca_common_ucx_persistent_request_t* preq = (mca_common_ucx_persistent_request_t*)req; + + if (preq->tmp_req != NULL) { + ucp_request_cancel(opal_common_ucx.ucp_worker, preq->tmp_req); + } + return OMPI_SUCCESS; +} + +static void mca_common_ucx_persisternt_request_construct(mca_common_ucx_persistent_request_t* req) +{ + mca_common_ucx_request_init_common(&req->ompi, true, OMPI_REQUEST_INACTIVE, + mca_common_ucx_persistent_request_free, + mca_common_ucx_persistent_request_cancel); + req->tmp_req = NULL; +} + +static void mca_common_ucx_persisternt_request_destruct(mca_common_ucx_persistent_request_t* req) +{ + req->ompi.req_state = OMPI_REQUEST_INVALID; + OMPI_REQUEST_FINI(&req->ompi); +} + +OBJ_CLASS_INSTANCE(mca_common_ucx_persistent_request_t, + ompi_request_t, + mca_common_ucx_persisternt_request_construct, + mca_common_ucx_persisternt_request_destruct); + +static int mca_common_completed_request_free(struct ompi_request_t** rptr) +{ + *rptr = MPI_REQUEST_NULL; + return OMPI_SUCCESS; +} + +static int mca_common_completed_request_cancel(struct ompi_request_t* ompi_req, int flag) +{ + return OMPI_SUCCESS; +} + +static void mca_common_ucx_completed_request_init(ompi_request_t *ompi_req) +{ + mca_common_ucx_request_init_common(ompi_req, false, OMPI_REQUEST_ACTIVE, + mca_common_completed_request_free, + mca_common_completed_request_cancel); + ompi_req->req_mpi_object.comm = &ompi_mpi_comm_world.comm; + ompi_request_complete(ompi_req, false); +} + +int mca_common_ucx_open(const char *prefix, size_t *request_size) +{ + ucp_params_t ucp_params; + + /* Initialize UCX context */ + ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES | + UCP_PARAM_FIELD_REQUEST_SIZE | + UCP_PARAM_FIELD_REQUEST_INIT | + UCP_PARAM_FIELD_REQUEST_CLEANUP | + UCP_PARAM_FIELD_TAG_SENDER_MASK | + UCP_PARAM_FIELD_MT_WORKERS_SHARED | + UCP_PARAM_FIELD_ESTIMATED_NUM_EPS; + ucp_params.features = UCP_FEATURE_TAG | + /* The features below are for SPML-UCX */ + UCP_FEATURE_RMA | + UCP_FEATURE_AMO32 | + UCP_FEATURE_AMO64; + ucp_params.request_size = sizeof(ompi_request_t); + ucp_params.request_init = mca_common_ucx_request_init; + ucp_params.request_cleanup = mca_common_ucx_request_cleanup; + ucp_params.tag_sender_mask = MCA_COMMON_UCX_SPECIFIC_SOURCE_MASK; + ucp_params.mt_workers_shared = 0; /* we do not need mt support for context + since it will be protected by worker */ + ucp_params.estimated_num_eps = ompi_process_info.myprocid.rank; + +#ifdef HAVE_DECL_UCP_PARAM_FIELD_ESTIMATED_NUM_PPN + ucp_params.estimated_num_ppn = opal_process_info.num_local_peers + 1; + ucp_params.field_mask |= UCP_PARAM_FIELD_ESTIMATED_NUM_PPN; +#endif + + return opal_common_ucx_open(prefix, &ucp_params, request_size); +} + +int mca_common_ucx_close(void) +{ + return opal_common_ucx_close(); +} + +void mca_common_ucx_enable(void) +{ + if (ompi_common_ucx.is_initialized) { + return; + } + ompi_common_ucx.is_initialized = 1; + + OBJ_CONSTRUCT(&ompi_common_ucx.requests, mca_common_ucx_freelist_t); + + COMMON_UCX_FREELIST_INIT(&ompi_common_ucx.requests, + mca_common_ucx_persistent_request_t, + MCA_COMMON_UCX_PERSISTENT_REQUEST_SLACK, + 128, -1, 128); + + OBJ_CONSTRUCT(&ompi_common_ucx.completed_request, ompi_request_t); + mca_common_ucx_completed_request_init(&ompi_common_ucx.completed_request); + + OBJ_CONSTRUCT(&ompi_common_ucx.datatype_ctx, mca_common_ucx_datatype_ctx_t); + +} + +static void mca_common_ucx_common_finalize(void) +{ + if (!ompi_common_ucx.is_initialized) { + return; + } + ompi_common_ucx.is_initialized = 0; + + OBJ_DESTRUCT(&ompi_common_ucx.datatype_ctx); + + ompi_common_ucx.completed_request.req_state = OMPI_REQUEST_INVALID; + OMPI_REQUEST_FINI(&ompi_common_ucx.completed_request); + OBJ_DESTRUCT(&ompi_common_ucx.completed_request); + + OBJ_DESTRUCT(&ompi_common_ucx.requests); +} + +int mca_common_ucx_init(const mca_base_component_t *version) +{ + return opal_common_ucx_init(ompi_mpi_thread_multiple, version); +} + +int mca_common_ucx_cleanup(void) +{ + mca_common_ucx_common_finalize(); + + return opal_common_ucx_cleanup(); +} diff --git a/ompi/mca/common/ucx/common_ucx.h b/ompi/mca/common/ucx/common_ucx.h new file mode 100644 index 00000000000..8fcf382a647 --- /dev/null +++ b/ompi/mca/common/ucx/common_ucx.h @@ -0,0 +1,136 @@ +/* -*- Mode: C; c-basic-offset:4 ; -*- */ +/* + * Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef MCA_COMMON_UCX_H +#define MCA_COMMON_UCX_H + +#include "common_ucx_datatype.h" + +#define MCA_COMMON_UCX_SPECIFIC_SOURCE_MASK 0x800000fffffffffful + +/* + * Below is the slack reserved after each UCX common persistent request. This + * addresses the need for higher-level components, i.e. PML and COLL, to have + * a longer persistent context while sharing this piece of code. + */ +#define MCA_COMMON_UCX_PERSISTENT_REQUEST_SLACK (64) + +typedef struct mca_common_ucx_persistent_request mca_common_ucx_persistent_request_t; +typedef ompi_request_t* (*mca_common_ucx_persistent_start_cb_f) + (mca_common_ucx_persistent_request_t *preq); + +struct mca_common_ucx_persistent_request { + ompi_request_t ompi; + mca_common_ucx_persistent_start_cb_f start_cb; + ompi_request_t *tmp_req; +}; + +OBJ_CLASS_DECLARATION(mca_common_ucx_persistent_request_t); + +__opal_attribute_always_inline__ +static inline void mca_common_ucx_set_status(ompi_status_public_t* mpi_status, + ucs_status_t status) +{ + if (OPAL_LIKELY(status == UCS_OK)) { + mpi_status->MPI_ERROR = MPI_SUCCESS; + mpi_status->_cancelled = false; + } else if (status == UCS_ERR_CANCELED) { + mpi_status->_cancelled = true; + } else { + mpi_status->MPI_ERROR = MPI_ERR_INTERN; + } +} + +static inline int +mca_common_ucx_persistent_request_init(ompi_request_type_t req_type, + struct ompi_communicator_t* comm, + mca_common_ucx_persistent_start_cb_f cb, + mca_common_ucx_persistent_request_t **request) +{ + mca_common_ucx_persistent_request_t *req; + + req = (mca_common_ucx_persistent_request_t*) + COMMON_UCX_FREELIST_GET(&ompi_common_ucx.requests); + if (req == NULL) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + req->ompi.req_type = req_type; + req->ompi.req_state = OMPI_REQUEST_INACTIVE; + req->ompi.req_mpi_object.comm = comm; + req->start_cb = cb; + + *request = req; + return OMPI_SUCCESS; +} + +static inline void mca_common_ucx_request_reset(ompi_request_t *req) +{ + req->req_complete = REQUEST_PENDING; +} + +static inline int mca_common_ucx_request_free(ompi_request_t **rptr) +{ + ompi_request_t *req = *rptr; + + MCA_COMMON_UCX_VERBOSE(9, "free request *%p=%p", (void*)rptr, (void*)req); + + *rptr = MPI_REQUEST_NULL; + mca_common_ucx_request_reset(req); + ucp_request_free(req); + return OMPI_SUCCESS; +} + +static void +mca_common_ucx_persistent_request_detach(mca_common_ucx_persistent_request_t *preq, + ompi_request_t *tmp_req) +{ + tmp_req->req_complete_cb_data = NULL; + preq->tmp_req = NULL; +} + +static inline void +mca_common_ucx_persistent_request_complete(mca_common_ucx_persistent_request_t *preq, + ompi_request_t *tmp_req) +{ + preq->ompi.req_status = tmp_req->req_status; + mca_common_ucx_request_reset(tmp_req); + mca_common_ucx_persistent_request_detach(preq, tmp_req); + ucp_request_free(tmp_req); + ompi_request_complete(&preq->ompi, true); +} + +static inline void mca_common_ucx_preq_completion(ompi_request_t *tmp_req) +{ + mca_common_ucx_persistent_request_t *preq; + + ompi_request_complete(tmp_req, false); + preq = (mca_common_ucx_persistent_request_t*)tmp_req->req_complete_cb_data; + if (preq != NULL) { + MCA_COMMON_UCX_ASSERT(preq->tmp_req != NULL); + mca_common_ucx_persistent_request_complete(preq, tmp_req); + } +} + +int mca_common_ucx_open(const char *prefix, size_t *request_size); + +int mca_common_ucx_close(void); + +void mca_common_ucx_enable(void); + +int mca_common_ucx_init(const mca_base_component_t *version); + +int mca_common_ucx_cleanup(void); + +int mca_common_ucx_start(size_t count, ompi_request_t** requests); + +int mca_common_ucx_dump(struct ompi_communicator_t* comm, int verbose); + +#endif /* MCA_COMMON_UCX_H */ diff --git a/ompi/mca/common/ucx/common_ucx_datatype.c b/ompi/mca/common/ucx/common_ucx_datatype.c new file mode 100644 index 00000000000..a1857707031 --- /dev/null +++ b/ompi/mca/common/ucx/common_ucx_datatype.c @@ -0,0 +1,336 @@ +/* + * Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED. + * Copyright (c) 2019 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "common_ucx_datatype.h" + +#include "ompi/proc/proc.h" +#include "ompi/runtime/mpiruntime.h" +#include "ompi/attribute/attribute.h" + +#include +#include + +#ifdef HAVE_UCP_REQUEST_PARAM_T +#define COMMON_UCX_DATATYPE_SET_VALUE(_datatype, _val) \ + (_datatype)->op_param.send._val; \ + (_datatype)->op_param.bsend._val; \ + (_datatype)->op_param.recv._val; +#endif + +static void* common_ucx_generic_datatype_start_pack(void *context, const void *buffer, + size_t count) +{ + ompi_datatype_t *datatype = context; + mca_common_ucx_convertor_t *convertor; + + convertor = (mca_common_ucx_convertor_t *) + COMMON_UCX_FREELIST_GET(&ompi_common_ucx.datatype_ctx.convs); + + OMPI_DATATYPE_RETAIN(datatype); + convertor->datatype = datatype; + opal_convertor_copy_and_prepare_for_send(ompi_proc_local_proc->super.proc_convertor, + &datatype->super, count, buffer, 0, + &convertor->opal_conv); + return convertor; +} + +static void* common_ucx_generic_datatype_start_unpack(void *context, void *buffer, + size_t count) +{ + ompi_datatype_t *datatype = context; + mca_common_ucx_convertor_t *convertor; + + convertor = (mca_common_ucx_convertor_t *) + COMMON_UCX_FREELIST_GET(&ompi_common_ucx.datatype_ctx.convs); + + OMPI_DATATYPE_RETAIN(datatype); + convertor->datatype = datatype; + convertor->offset = 0; + opal_convertor_copy_and_prepare_for_recv(ompi_proc_local_proc->super.proc_convertor, + &datatype->super, count, buffer, 0, + &convertor->opal_conv); + return convertor; +} + +static size_t common_ucx_generic_datatype_packed_size(void *state) +{ + mca_common_ucx_convertor_t *convertor = state; + size_t size; + + opal_convertor_get_packed_size(&convertor->opal_conv, &size); + return size; +} + +static size_t common_ucx_generic_datatype_pack(void *state, size_t offset, + void *dest, size_t max_length) +{ + mca_common_ucx_convertor_t *convertor = state; + uint32_t iov_count; + struct iovec iov; + size_t length; + + iov_count = 1; + iov.iov_base = dest; + iov.iov_len = max_length; + + opal_convertor_set_position(&convertor->opal_conv, &offset); + length = max_length; + opal_convertor_pack(&convertor->opal_conv, &iov, &iov_count, &length); + return length; +} + +static ucs_status_t common_ucx_generic_datatype_unpack(void *state, size_t offset, + const void *src, size_t length) +{ + mca_common_ucx_convertor_t *convertor = state; + + uint32_t iov_count; + struct iovec iov; + opal_convertor_t conv; + + iov_count = 1; + iov.iov_base = (void*)src; + iov.iov_len = length; + + /* in case if unordered message arrived - create separate convertor to + * unpack data. */ + if (offset != convertor->offset) { + OBJ_CONSTRUCT(&conv, opal_convertor_t); + opal_convertor_copy_and_prepare_for_recv(ompi_proc_local_proc->super.proc_convertor, + &convertor->datatype->super, + convertor->opal_conv.count, + convertor->opal_conv.pBaseBuf, 0, + &conv); + opal_convertor_set_position(&conv, &offset); + opal_convertor_unpack(&conv, &iov, &iov_count, &length); + opal_convertor_cleanup(&conv); + OBJ_DESTRUCT(&conv); + /* permanently switch to un-ordered mode */ + convertor->offset = 0; + } else { + opal_convertor_unpack(&convertor->opal_conv, &iov, &iov_count, &length); + convertor->offset += length; + } + return UCS_OK; +} + +static void common_ucx_generic_datatype_finish(void *state) +{ + mca_common_ucx_convertor_t *convertor = state; + + opal_convertor_cleanup(&convertor->opal_conv); + OMPI_DATATYPE_RELEASE(convertor->datatype); + COMMON_UCX_FREELIST_RETURN(&ompi_common_ucx.datatype_ctx.convs, + &convertor->super); +} + +static ucp_generic_dt_ops_t common_ucx_generic_datatype_ops = { + .start_pack = common_ucx_generic_datatype_start_pack, + .start_unpack = common_ucx_generic_datatype_start_unpack, + .packed_size = common_ucx_generic_datatype_packed_size, + .pack = common_ucx_generic_datatype_pack, + .unpack = common_ucx_generic_datatype_unpack, + .finish = common_ucx_generic_datatype_finish +}; + +int mca_common_ucx_datatype_attr_del_fn(ompi_datatype_t* datatype, int keyval, + void *attr_val, void *extra) +{ + ucp_datatype_t ucp_datatype = (ucp_datatype_t)attr_val; + +#ifdef HAVE_UCP_REQUEST_PARAM_T + free((void*)datatype->pml_data); +#else + COMMON_UCX_ASSERT((uint64_t)ucp_datatype == datatype->pml_data); +#endif + ucp_dt_destroy(ucp_datatype); + datatype->pml_data = COMMON_UCX_DATATYPE_INVALID; + return OMPI_SUCCESS; +} + +__opal_attribute_always_inline__ +static inline int mca_common_ucx_datatype_is_contig(ompi_datatype_t *datatype) +{ + ptrdiff_t lb; + + ompi_datatype_type_lb(datatype, &lb); + + return (datatype->super.flags & OPAL_DATATYPE_FLAG_CONTIGUOUS) && + (datatype->super.flags & OPAL_DATATYPE_FLAG_NO_GAPS) && + (lb == 0); +} + +#ifdef HAVE_UCP_REQUEST_PARAM_T +__opal_attribute_always_inline__ static inline +mca_common_ucx_datatype_t *mca_common_ucx_init_nbx_datatype(ompi_datatype_t *datatype, + ucp_datatype_t ucp_datatype, + size_t size) +{ + mca_common_ucx_datatype_t *ucx_datatype; + int is_contig_pow2; + + ucx_datatype = malloc(sizeof(*ucx_datatype)); + if (ucx_datatype == NULL) { + int err = MPI_ERR_INTERN; + MCA_COMMON_UCX_ERROR("Failed to allocate datatype structure"); + /* TODO: this error should return to the caller and invoke an error + * handler from the MPI API call. + * For now, it is fatal. */ + ompi_mpi_errors_are_fatal_comm_handler(NULL, &err, "Failed to allocate datatype structure"); + } + + /* clone an initial template of the datatype */ + memcpy(ucx_datatype, &ompi_common_ucx.datatype_init, sizeof(*ucx_datatype)); + + ucx_datatype->datatype = ucp_datatype; + + is_contig_pow2 = mca_common_ucx_datatype_is_contig(datatype) && + (size && !(size & (size - 1))); /* is_pow2(size) */ + if (is_contig_pow2) { + ucx_datatype->size_shift = (int)(log(size) / log(2.0)); /* log2(size) */ + } else { + ucx_datatype->size_shift = 0; + COMMON_UCX_DATATYPE_SET_VALUE(ucx_datatype, op_attr_mask |= UCP_OP_ATTR_FIELD_DATATYPE); + COMMON_UCX_DATATYPE_SET_VALUE(ucx_datatype, datatype = ucp_datatype); + } + + return ucx_datatype; +} +#endif + +ucp_datatype_t mca_common_ucx_init_datatype(ompi_datatype_t *datatype) +{ + size_t size = 0; /* init to suppress compiler warning */ + ucp_datatype_t ucp_datatype; + ucs_status_t status; + int ret; + + if (mca_common_ucx_datatype_is_contig(datatype)) { + ompi_datatype_type_size(datatype, &size); + ucp_datatype = ucp_dt_make_contig(size); + goto out; + } + + status = ucp_dt_create_generic(&common_ucx_generic_datatype_ops, + datatype, &ucp_datatype); + if (status != UCS_OK) { + int err = MPI_ERR_INTERN; + MCA_COMMON_UCX_ERROR("Failed to create UCX datatype for %s", datatype->name); + /* TODO: this error should return to the caller and invoke an error + * handler from the MPI API call. + * For now, it is fatal. */ + ompi_mpi_errors_are_fatal_comm_handler(NULL, &err, "Failed to allocate datatype structure"); + } + + /* Add custom attribute, to clean up UCX resources when OMPI datatype is + * released. + */ + if (ompi_datatype_is_predefined(datatype)) { + MCA_COMMON_UCX_ASSERT(datatype->id < OMPI_DATATYPE_MAX_PREDEFINED); + ompi_common_ucx.datatype_ctx.predefined_types[datatype->id] = ucp_datatype; + } else { + ret = ompi_attr_set_c(TYPE_ATTR, datatype, &datatype->d_keyhash, + ompi_common_ucx.datatype_ctx.datatype_attr_keyval, + (void*)ucp_datatype, false); + if (ret != OMPI_SUCCESS) { + int err = MPI_ERR_INTERN; + MCA_COMMON_UCX_ERROR("Failed to add UCX datatype attribute for %s: %d", + datatype->name, ret); + /* TODO: this error should return to the caller and invoke an error + * handler from the MPI API call. + * For now, it is fatal. */ + ompi_mpi_errors_are_fatal_comm_handler(NULL, &err, "Failed to allocate datatype structure"); + } + } +out: + MCA_COMMON_UCX_VERBOSE(7, "created generic UCX datatype 0x%"PRIx64, ucp_datatype) + +#ifdef HAVE_UCP_REQUEST_PARAM_T + UCS_STATIC_ASSERT(sizeof(datatype->pml_data) >= sizeof(mca_common_ucx_datatype_t*)); + datatype->pml_data = (uint64_t)mca_common_ucx_init_nbx_datatype(datatype, + ucp_datatype, + size); +#else + datatype->pml_data = ucp_datatype; +#endif + + return ucp_datatype; +} + +static void mca_common_ucx_convertor_construct(mca_common_ucx_convertor_t *convertor) +{ + OBJ_CONSTRUCT(&convertor->opal_conv, opal_convertor_t); +} + +static void mca_common_ucx_convertor_destruct(mca_common_ucx_convertor_t *convertor) +{ + OBJ_DESTRUCT(&convertor->opal_conv); +} + +OBJ_CLASS_INSTANCE(mca_common_ucx_convertor_t, + opal_free_list_item_t, + mca_common_ucx_convertor_construct, + mca_common_ucx_convertor_destruct); + + +static void mca_common_ucx_datatype_ctx_construct(mca_common_ucx_datatype_ctx_t *ctx) +{ + ompi_attribute_fn_ptr_union_t copy_fn; + ompi_attribute_fn_ptr_union_t del_fn; + int ret, i; + + /* Create a key for adding custom attributes to datatypes */ + copy_fn.attr_datatype_copy_fn = + (MPI_Type_internal_copy_attr_function*)MPI_TYPE_NULL_COPY_FN; + del_fn.attr_datatype_delete_fn = mca_common_ucx_datatype_attr_del_fn; + ret = ompi_attr_create_keyval(TYPE_ATTR, copy_fn, del_fn, + &ctx->datatype_attr_keyval, NULL, 0, + NULL); + if (ret != OMPI_SUCCESS) { + return; + } + + for (i = 0; i < OMPI_DATATYPE_MAX_PREDEFINED; ++i) { + ctx->predefined_types[i] = COMMON_UCX_DATATYPE_INVALID; + } + + OBJ_CONSTRUCT(&ctx->convs, mca_common_ucx_freelist_t); + + COMMON_UCX_FREELIST_INIT(&ctx->convs, mca_common_ucx_convertor_t, 0, 128, -1, 128); +} + +static void mca_common_ucx_datatype_ctx_destruct(mca_common_ucx_datatype_ctx_t *ctx) +{ + int i; + + if (ctx->datatype_attr_keyval != MPI_KEYVAL_INVALID) { + ompi_attr_free_keyval(TYPE_ATTR, &ctx->datatype_attr_keyval, false); + } + + for (i = 0; i < OMPI_DATATYPE_MAX_PREDEFINED; ++i) { + if (ctx->predefined_types[i] != COMMON_UCX_DATATYPE_INVALID) { + ucp_dt_destroy(ctx->predefined_types[i]); + ctx->predefined_types[i] = COMMON_UCX_DATATYPE_INVALID; + } + } + + OBJ_DESTRUCT(&ctx->convs); +} + +OBJ_CLASS_INSTANCE(mca_common_ucx_datatype_ctx_t, + opal_object_t, + mca_common_ucx_datatype_ctx_construct, + mca_common_ucx_datatype_ctx_destruct); diff --git a/ompi/mca/common/ucx/common_ucx_datatype.h b/ompi/mca/common/ucx/common_ucx_datatype.h new file mode 100644 index 00000000000..98db6f946d1 --- /dev/null +++ b/ompi/mca/common/ucx/common_ucx_datatype.h @@ -0,0 +1,113 @@ +/* + * Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED. + * Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef COMMON_UCX_DATATYPE_H_ +#define COMMON_UCX_DATATYPE_H_ + +#include "common_ucx_freelist.h" + +#include "ompi/request/request.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/datatype/ompi_datatype.h" +#include "ompi/datatype/ompi_datatype_internal.h" + +#include "opal/mca/common/ucx/common_ucx.h" + +#define COMMON_UCX_DATATYPE_INVALID 0 + +#ifdef HAVE_UCP_REQUEST_PARAM_T +typedef struct { + ucp_datatype_t datatype; + int size_shift; + struct { + ucp_request_param_t send; + ucp_request_param_t bsend; + ucp_request_param_t recv; + } op_param; +} mca_common_ucx_datatype_t; +#endif + +typedef struct mca_common_ucx_datatype_ctx { + opal_object_t super; + int datatype_attr_keyval; + ucp_datatype_t predefined_types[OMPI_DATATYPE_MPI_MAX_PREDEFINED]; + mca_common_ucx_freelist_t convs; /* Converters pool */ +} mca_common_ucx_datatype_ctx_t; + +typedef struct mca_common_ucx_convertor { + opal_free_list_item_t super; + ompi_datatype_t *datatype; + opal_convertor_t opal_conv; + size_t offset; +} mca_common_ucx_convertor_t; + +typedef struct ompi_common_ucx_module { + ompi_request_t completed_request; + mca_common_ucx_freelist_t requests; + mca_common_ucx_datatype_ctx_t datatype_ctx; + mca_common_ucx_datatype_t datatype_init; + int is_initialized; +} ompi_common_ucx_module_t; + +extern ompi_common_ucx_module_t ompi_common_ucx; + +ucp_datatype_t mca_common_ucx_init_datatype(ompi_datatype_t *datatype); + +int mca_common_ucx_datatype_attr_del_fn(ompi_datatype_t* datatype, int keyval, + void *attr_val, void *extra); + +OBJ_CLASS_DECLARATION(mca_common_ucx_convertor_t); +OBJ_CLASS_DECLARATION(mca_common_ucx_datatype_ctx_t); + + +__opal_attribute_always_inline__ +static inline ucp_datatype_t mca_common_ucx_get_datatype(ompi_datatype_t *datatype) +{ +#ifdef HAVE_UCP_REQUEST_PARAM_T + mca_common_ucx_datatype_t *ucp_type = (mca_common_ucx_datatype_t*)datatype->pml_data; + + if (OPAL_LIKELY(ucp_type != COMMON_UCX_DATATYPE_INVALID)) { + return ucp_type->datatype; + } +#else + ucp_datatype_t ucp_type = datatype->pml_data; + + if (OPAL_LIKELY(ucp_type != COMMON_UCX_DATATYPE_INVALID)) { + return ucp_type; + } +#endif + + return mca_common_ucx_init_datatype(datatype); +} + +#ifdef HAVE_UCP_REQUEST_PARAM_T +__opal_attribute_always_inline__ +static inline mca_common_ucx_datatype_t* +mca_common_ucx_get_op_data(ompi_datatype_t *datatype) +{ + mca_common_ucx_datatype_t *ucp_type = (mca_common_ucx_datatype_t*)datatype->pml_data; + + if (OPAL_LIKELY(ucp_type != COMMON_UCX_DATATYPE_INVALID)) { + return ucp_type; + } + + mca_common_ucx_init_datatype(datatype); + return (mca_common_ucx_datatype_t*)datatype->pml_data; +} + +__opal_attribute_always_inline__ +static inline size_t mca_common_ucx_get_data_size(mca_common_ucx_datatype_t *op_data, + size_t count) +{ + return count << op_data->size_shift; +} +#endif + +#endif /* COMMON_UCX_DATATYPE_H_ */ diff --git a/ompi/mca/common/ucx/common_ucx_freelist.h b/ompi/mca/common/ucx/common_ucx_freelist.h new file mode 100644 index 00000000000..8916910bcfc --- /dev/null +++ b/ompi/mca/common/ucx/common_ucx_freelist.h @@ -0,0 +1,30 @@ +/* + * Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED. + * Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef COMMON_UCX_FREELIST_H_ +#define COMMON_UCX_FREELIST_H_ + +#include "opal/class/opal_free_list.h" + + +#define mca_common_ucx_freelist_t opal_free_list_t + +#define COMMON_UCX_FREELIST_GET(_freelist) \ + opal_free_list_get (_freelist) + +#define COMMON_UCX_FREELIST_RETURN(_freelist, _item) \ + opal_free_list_return(_freelist, _item) + +#define COMMON_UCX_FREELIST_INIT(_fl, _type, _headroom, _initial, _max, _batch) \ + opal_free_list_init(_fl, sizeof(_type) + _headroom, 8, OBJ_CLASS(_type), \ + 0, 0, _initial, _max, _batch, NULL, 0, NULL, NULL, NULL) + + +#endif /* COMMON_UCX_FREELIST_H_ */ diff --git a/ompi/mca/common/ucx/configure.m4 b/ompi/mca/common/ucx/configure.m4 new file mode 100644 index 00000000000..ab937ae44e1 --- /dev/null +++ b/ompi/mca/common/ucx/configure.m4 @@ -0,0 +1,31 @@ +# -*- shell-script -*- +# +# Copyright (c) 2018 Mellanox Technologies. All rights reserved. +# Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# MCA_ompi_common_ucx_CONFIG([action-if-can-compile], +# [action-if-cant-compile]) +# ------------------------------------------------ +AC_DEFUN([MCA_ompi_common_ucx_CONFIG],[ + AC_CONFIG_FILES([ompi/mca/common/ucx/Makefile]) + common_ucx_happy="no" + OMPI_CHECK_UCX([common_ucx], + [common_ucx_happy="yes"], + [common_ucx_happy="no"]) + + AS_IF([test "$common_ucx_happy" = "yes"], + [$1], + [$2]) + + + # substitute in the things needed to build common_ucx + AC_SUBST([common_ucx_CPPFLAGS]) + AC_SUBST([common_ucx_LDFLAGS]) + AC_SUBST([common_ucx_LIBS]) +]) diff --git a/ompi/mca/pml/ucx/Makefile.am b/ompi/mca/pml/ucx/Makefile.am index 6b8355031f2..018b235f008 100644 --- a/ompi/mca/pml/ucx/Makefile.am +++ b/ompi/mca/pml/ucx/Makefile.am @@ -38,7 +38,7 @@ mcacomponentdir = $(ompilibdir) mcacomponent_LTLIBRARIES = $(component_install) mca_pml_ucx_la_SOURCES = $(local_sources) mca_pml_ucx_la_LIBADD = $(top_builddir)/ompi/lib@OMPI_LIBMPI_NAME@.la $(pml_ucx_LIBS) \ - $(OPAL_TOP_BUILDDIR)/opal/mca/common/ucx/lib@OPAL_LIB_PREFIX@mca_common_ucx.la + $(OMPI_TOP_BUILDDIR)/ompi/mca/common/ucx/libmca_common_ompi_ucx.la mca_pml_ucx_la_LDFLAGS = -module -avoid-version $(pml_ucx_LDFLAGS) noinst_LTLIBRARIES = $(component_noinst) diff --git a/ompi/mca/pml/ucx/pml_ucx.c b/ompi/mca/pml/ucx/pml_ucx.c index 6bee4dea29f..5389fcd99cd 100644 --- a/ompi/mca/pml/ucx/pml_ucx.c +++ b/ompi/mca/pml/ucx/pml_ucx.c @@ -7,6 +7,7 @@ * and Technology (RIST). All rights reserved. * Copyright (c) 2018 IBM Corporation. All rights reserved. * Copyright (c) 2019 Intel, Inc. All rights reserved. + * Copyright (c) 2021 Huawei Technologies Co., Ltd. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -22,6 +23,7 @@ #include "ompi/message/message.h" #include "ompi/runtime/ompi_spc.h" #include "ompi/mca/pml/base/pml_base_bsend.h" +#include "ompi/mca/common/ucx/common_ucx.h" #include "opal/mca/common/ucx/common_ucx.h" #if OPAL_CUDA_SUPPORT #include "opal/mca/common/cuda/common_cuda.h" @@ -80,261 +82,22 @@ mca_pml_ucx_module_t ompi_pml_ucx = { .pml_max_contextid = (1ul << (PML_UCX_CONTEXT_BITS)) - 1, .pml_max_tag = (1ul << (PML_UCX_TAG_BITS - 1)) - 1, .pml_flags = 0 /* flags */ - }, - .ucp_context = NULL, - .ucp_worker = NULL + } }; #define PML_UCX_REQ_ALLOCA() \ ((char *)alloca(ompi_pml_ucx.request_size) + ompi_pml_ucx.request_size); -#if HAVE_UCP_WORKER_ADDRESS_FLAGS -static int mca_pml_ucx_send_worker_address_type(int addr_flags, int modex_scope) -{ - ucs_status_t status; - ucp_worker_attr_t attrs; - int rc; - - attrs.field_mask = UCP_WORKER_ATTR_FIELD_ADDRESS | - UCP_WORKER_ATTR_FIELD_ADDRESS_FLAGS; - attrs.address_flags = addr_flags; - - status = ucp_worker_query(ompi_pml_ucx.ucp_worker, &attrs); - if (UCS_OK != status) { - PML_UCX_ERROR("Failed to query UCP worker address"); - return OMPI_ERROR; - } - - OPAL_MODEX_SEND(rc, modex_scope, &mca_pml_ucx_component.pmlm_version, - (void*)attrs.address, attrs.address_length); - - ucp_worker_release_address(ompi_pml_ucx.ucp_worker, attrs.address); - - if (OMPI_SUCCESS != rc) { - return OMPI_ERROR; - } - - PML_UCX_VERBOSE(2, "Pack %s worker address, size %ld", - (modex_scope == PMIX_LOCAL) ? "local" : "remote", - attrs.address_length); - - return OMPI_SUCCESS; -} -#endif - -static int mca_pml_ucx_send_worker_address(void) -{ - ucs_status_t status; - -#if !HAVE_UCP_WORKER_ADDRESS_FLAGS - ucp_address_t *address; - size_t addrlen; - int rc; - - status = ucp_worker_get_address(ompi_pml_ucx.ucp_worker, &address, &addrlen); - if (UCS_OK != status) { - PML_UCX_ERROR("Failed to get worker address"); - return OMPI_ERROR; - } - - PML_UCX_VERBOSE(2, "Pack worker address, size %ld", addrlen); - - OPAL_MODEX_SEND(rc, PMIX_GLOBAL, - &mca_pml_ucx_component.pmlm_version, (void*)address, addrlen); - - ucp_worker_release_address(ompi_pml_ucx.ucp_worker, address); - - if (OMPI_SUCCESS != rc) { - goto err; - } -#else - /* Pack just network device addresses for remote node peers */ - status = mca_pml_ucx_send_worker_address_type(UCP_WORKER_ADDRESS_FLAG_NET_ONLY, - PMIX_REMOTE); - if (UCS_OK != status) { - goto err; - } - - status = mca_pml_ucx_send_worker_address_type(0, PMIX_LOCAL); - if (UCS_OK != status) { - goto err; - } -#endif - - return OMPI_SUCCESS; - -err: - PML_UCX_ERROR("Open MPI couldn't distribute EP connection details"); - return OMPI_ERROR; -} - -static int mca_pml_ucx_recv_worker_address(ompi_proc_t *proc, - ucp_address_t **address_p, - size_t *addrlen_p) -{ - int ret; - - *address_p = NULL; - OPAL_MODEX_RECV(ret, &mca_pml_ucx_component.pmlm_version, &proc->super.proc_name, - (void**)address_p, addrlen_p); - if (ret < 0) { - PML_UCX_ERROR("Failed to receive UCX worker address: %s (%d)", - opal_strerror(ret), ret); - } - - PML_UCX_VERBOSE(2, "Got proc %d address, size %ld", - proc->super.proc_name.vpid, *addrlen_p); - return ret; -} - -int mca_pml_ucx_open(void) -{ - unsigned major_version, minor_version, release_number; - ucp_context_attr_t attr; - ucp_params_t params; - ucp_config_t *config; - ucs_status_t status; - - /* Check version */ - ucp_get_version(&major_version, &minor_version, &release_number); - PML_UCX_VERBOSE(1, "mca_pml_ucx_open: UCX version %u.%u.%u", - major_version, minor_version, release_number); - - if ((major_version == 1) && (minor_version == 8)) { - /* disabled due to issue #8321 */ - PML_UCX_VERBOSE(1, "UCX PML is disabled because the run-time UCX version " - "is 1.8, which has a known catastrophic issue"); - return OMPI_ERROR; - } - - /* Read options */ - status = ucp_config_read("MPI", NULL, &config); - if (UCS_OK != status) { - return OMPI_ERROR; - } - - /* Initialize UCX context */ - params.field_mask = UCP_PARAM_FIELD_FEATURES | - UCP_PARAM_FIELD_REQUEST_SIZE | - UCP_PARAM_FIELD_REQUEST_INIT | - UCP_PARAM_FIELD_REQUEST_CLEANUP | - UCP_PARAM_FIELD_TAG_SENDER_MASK | - UCP_PARAM_FIELD_MT_WORKERS_SHARED | - UCP_PARAM_FIELD_ESTIMATED_NUM_EPS; - params.features = UCP_FEATURE_TAG; - params.request_size = sizeof(ompi_request_t); - params.request_init = mca_pml_ucx_request_init; - params.request_cleanup = mca_pml_ucx_request_cleanup; - params.tag_sender_mask = PML_UCX_SPECIFIC_SOURCE_MASK; - params.mt_workers_shared = 0; /* we do not need mt support for context - since it will be protected by worker */ - params.estimated_num_eps = ompi_proc_world_size(); - -#if HAVE_DECL_UCP_PARAM_FIELD_ESTIMATED_NUM_PPN - params.estimated_num_ppn = opal_process_info.num_local_peers + 1; - params.field_mask |= UCP_PARAM_FIELD_ESTIMATED_NUM_PPN; -#endif - - status = ucp_init(¶ms, config, &ompi_pml_ucx.ucp_context); - ucp_config_release(config); - - if (UCS_OK != status) { - return OMPI_ERROR; - } - - /* Query UCX attributes */ - attr.field_mask = UCP_ATTR_FIELD_REQUEST_SIZE; -#if HAVE_UCP_ATTR_MEMORY_TYPES - attr.field_mask |= UCP_ATTR_FIELD_MEMORY_TYPES; -#endif - status = ucp_context_query(ompi_pml_ucx.ucp_context, &attr); - if (UCS_OK != status) { - ucp_cleanup(ompi_pml_ucx.ucp_context); - ompi_pml_ucx.ucp_context = NULL; - return OMPI_ERROR; - } - - ompi_pml_ucx.request_size = attr.request_size; - ompi_pml_ucx.cuda_initialized = false; - -#if HAVE_UCP_ATTR_MEMORY_TYPES && OPAL_CUDA_SUPPORT - if (attr.memory_types & UCS_BIT(UCS_MEMORY_TYPE_CUDA)) { - mca_common_cuda_stage_one_init(); - ompi_pml_ucx.cuda_initialized = true; - } -#endif - return OMPI_SUCCESS; -} - -int mca_pml_ucx_close(void) -{ - PML_UCX_VERBOSE(1, "mca_pml_ucx_close"); - -#if OPAL_CUDA_SUPPORT - if (ompi_pml_ucx.cuda_initialized) { - mca_common_cuda_fini(); - } -#endif - if (ompi_pml_ucx.ucp_context != NULL) { - ucp_cleanup(ompi_pml_ucx.ucp_context); - ompi_pml_ucx.ucp_context = NULL; - } - return OMPI_SUCCESS; -} - int mca_pml_ucx_init(int enable_mpi_threads) { - ucp_worker_params_t params; - ucp_worker_attr_t attr; - ucs_status_t status; int i, rc; PML_UCX_VERBOSE(1, "mca_pml_ucx_init"); - /* TODO check MPI thread mode */ - params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE; - if (enable_mpi_threads) { - params.thread_mode = UCS_THREAD_MODE_MULTI; - } else { - params.thread_mode = UCS_THREAD_MODE_SINGLE; - } - -#if HAVE_DECL_UCP_WORKER_FLAG_IGNORE_REQUEST_LEAK - if (!ompi_pml_ucx.request_leak_check) { - params.field_mask |= UCP_WORKER_PARAM_FIELD_FLAGS; - params.flags |= UCP_WORKER_FLAG_IGNORE_REQUEST_LEAK; - } -#endif - - status = ucp_worker_create(ompi_pml_ucx.ucp_context, ¶ms, - &ompi_pml_ucx.ucp_worker); - if (UCS_OK != status) { - PML_UCX_ERROR("Failed to create UCP worker"); - rc = OMPI_ERROR; - goto err; - } - - attr.field_mask = UCP_WORKER_ATTR_FIELD_THREAD_MODE; - status = ucp_worker_query(ompi_pml_ucx.ucp_worker, &attr); - if (UCS_OK != status) { - PML_UCX_ERROR("Failed to query UCP worker thread level"); - rc = OMPI_ERROR; - goto err_destroy_worker; - } - - if (enable_mpi_threads && (attr.thread_mode != UCS_THREAD_MODE_MULTI)) { - /* UCX does not support multithreading, disqualify current PML for now */ - /* TODO: we should let OMPI to fallback to THREAD_SINGLE mode */ - PML_UCX_WARN("UCP worker does not support MPI_THREAD_MULTIPLE. " - "PML UCX could not be selected"); - rc = OMPI_ERR_NOT_SUPPORTED; - goto err_destroy_worker; - } - - rc = mca_pml_ucx_send_worker_address(); + rc = mca_common_ucx_init(&mca_pml_ucx_component.pmlm_version); if (rc < 0) { - goto err_destroy_worker; - } + return rc; + } ompi_pml_ucx.datatype_attr_keyval = MPI_KEYVAL_INVALID; for (i = 0; i < OMPI_DATATYPE_MAX_PREDEFINED; ++i) { @@ -352,15 +115,9 @@ int mca_pml_ucx_init(int enable_mpi_threads) opal_progress_register(mca_pml_ucx_progress); PML_UCX_VERBOSE(2, "created ucp context %p, worker %p", - (void *)ompi_pml_ucx.ucp_context, - (void *)ompi_pml_ucx.ucp_worker); + (void *)opal_common_ucx.ucp_context, + (void *)opal_common_ucx.ucp_worker); return OMPI_SUCCESS; - -err_destroy_worker: - ucp_worker_destroy(ompi_pml_ucx.ucp_worker); -err: - ompi_pml_ucx.ucp_worker = NULL; - return rc; } int mca_pml_ucx_cleanup(void) @@ -369,7 +126,7 @@ int mca_pml_ucx_cleanup(void) PML_UCX_VERBOSE(1, "mca_pml_ucx_cleanup"); - opal_progress_unregister(mca_pml_ucx_progress); + (void) mca_common_ucx_cleanup(); if (ompi_pml_ucx.datatype_attr_keyval != MPI_KEYVAL_INVALID) { ompi_attr_free_keyval(TYPE_ATTR, &ompi_pml_ucx.datatype_attr_keyval, false); @@ -389,11 +146,6 @@ int mca_pml_ucx_cleanup(void) OBJ_DESTRUCT(&ompi_pml_ucx.convs); OBJ_DESTRUCT(&ompi_pml_ucx.persistent_reqs); - if (ompi_pml_ucx.ucp_worker != NULL) { - ucp_worker_destroy(ompi_pml_ucx.ucp_worker); - ompi_pml_ucx.ucp_worker = NULL; - } - return OMPI_SUCCESS; } @@ -406,17 +158,19 @@ static ucp_ep_h mca_pml_ucx_add_proc_common(ompi_proc_t *proc) ucp_ep_h ep; int ret; - ret = mca_pml_ucx_recv_worker_address(proc, &address, &addrlen); + const opal_process_name_t *proc_name = &proc->super.proc_name; + + ret = opal_common_ucx_recv_worker_address(proc_name, &address, &addrlen); if (ret < 0) { return NULL; } - PML_UCX_VERBOSE(2, "connecting to proc. %d", proc->super.proc_name.vpid); + PML_UCX_VERBOSE(2, "connecting to proc. %d", proc_name->vpid); ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS; ep_params.address = address; - status = ucp_ep_create(ompi_pml_ucx.ucp_worker, &ep_params, &ep); + status = ucp_ep_create(opal_common_ucx.ucp_worker, &ep_params, &ep); free(address); if (UCS_OK != status) { PML_UCX_ERROR("ucp_ep_create(proc=%d) failed: %s", @@ -483,7 +237,7 @@ int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs) } ret = opal_common_ucx_del_procs(del_procs, nprocs, OMPI_PROC_MY_NAME->vpid, - ompi_pml_ucx.num_disconnect, ompi_pml_ucx.ucp_worker); + ompi_pml_ucx.num_disconnect, opal_common_ucx.ucp_worker); free(del_procs); return ret; @@ -518,7 +272,7 @@ int mca_pml_ucx_enable(bool enable) int mca_pml_ucx_progress(void) { - return ucp_worker_progress(ompi_pml_ucx.ucp_worker); + return ucp_worker_progress(opal_common_ucx.ucp_worker); } int mca_pml_ucx_add_comm(struct ompi_communicator_t* comm) @@ -575,11 +329,11 @@ int mca_pml_ucx_irecv(void *buf, size_t count, ompi_datatype_t *datatype, PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm); #if HAVE_DECL_UCP_TAG_RECV_NBX - req = (ompi_request_t*)ucp_tag_recv_nbx(ompi_pml_ucx.ucp_worker, buf, + req = (ompi_request_t*)ucp_tag_recv_nbx(opal_common_ucx.ucp_worker, buf, mca_pml_ucx_get_data_size(op_data, count), ucp_tag, ucp_tag_mask, param); #else - req = (ompi_request_t*)ucp_tag_recv_nb(ompi_pml_ucx.ucp_worker, buf, count, + req = (ompi_request_t*)ucp_tag_recv_nb(opal_common_ucx.ucp_worker, buf, count, mca_pml_ucx_get_datatype(datatype), ucp_tag, ucp_tag_mask, mca_pml_ucx_recv_completion); @@ -620,15 +374,15 @@ int mca_pml_ucx_recv(void *buf, size_t count, ompi_datatype_t *datatype, int src PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm); #if HAVE_DECL_UCP_TAG_RECV_NBX - ucp_tag_recv_nbx(ompi_pml_ucx.ucp_worker, buf, + ucp_tag_recv_nbx(opal_common_ucx.ucp_worker, buf, mca_pml_ucx_get_data_size(op_data, count), ucp_tag, ucp_tag_mask, ¶m); #else - ucp_tag_recv_nbr(ompi_pml_ucx.ucp_worker, buf, count, + ucp_tag_recv_nbr(opal_common_ucx.ucp_worker, buf, count, mca_pml_ucx_get_datatype(datatype), ucp_tag, ucp_tag_mask, req); #endif - MCA_COMMON_UCX_PROGRESS_LOOP(ompi_pml_ucx.ucp_worker) { + MCA_COMMON_UCX_PROGRESS_LOOP(opal_common_ucx.ucp_worker) { status = ucp_request_test(req, &info); if (status != UCS_INPROGRESS) { result = mca_pml_ucx_set_recv_status_safe(mpi_status, status, &info); @@ -876,7 +630,9 @@ mca_pml_ucx_send_nb(ucp_ep_h ep, const void *buf, size_t count, return OMPI_SUCCESS; } else if (!UCS_PTR_IS_ERR(req)) { PML_UCX_VERBOSE(8, "got request %p", (void*)req); - MCA_COMMON_UCX_WAIT_LOOP(req, ompi_pml_ucx.ucp_worker, "ucx send", ompi_request_free(&req)); + MCA_COMMON_UCX_WAIT_LOOP(req, OPAL_COMMON_UCX_REQUEST_TYPE_UCP, + opal_common_ucx.ucp_worker, "ucx send", + ompi_request_free(&req)); } else { PML_UCX_ERROR("ucx send failed: %s", ucs_status_string(UCS_PTR_STATUS(req))); return OMPI_ERROR; @@ -919,7 +675,8 @@ mca_pml_ucx_send_nbr(ucp_ep_h ep, const void *buf, size_t count, } #endif - MCA_COMMON_UCX_WAIT_LOOP(req, ompi_pml_ucx.ucp_worker, "ucx send nbr", (void)0); + MCA_COMMON_UCX_WAIT_LOOP(req, OPAL_COMMON_UCX_REQUEST_TYPE_UCP, + opal_common_ucx.ucp_worker, "ucx send nbr", (void)0); } #endif @@ -970,14 +727,14 @@ int mca_pml_ucx_iprobe(int src, int tag, struct ompi_communicator_t* comm, PML_UCX_TRACE_PROBE("iprobe", src, tag, comm); PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm); - ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag, ucp_tag_mask, + ucp_msg = ucp_tag_probe_nb(opal_common_ucx.ucp_worker, ucp_tag, ucp_tag_mask, 0, &info); if (ucp_msg != NULL) { *matched = 1; mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info); } else { (++progress_count % opal_common_ucx.progress_iterations) ? - (void)ucp_worker_progress(ompi_pml_ucx.ucp_worker) : opal_progress(); + (void)ucp_worker_progress(opal_common_ucx.ucp_worker) : opal_progress(); *matched = 0; } return OMPI_SUCCESS; @@ -994,8 +751,8 @@ int mca_pml_ucx_probe(int src, int tag, struct ompi_communicator_t* comm, PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm); - MCA_COMMON_UCX_PROGRESS_LOOP(ompi_pml_ucx.ucp_worker) { - ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag, + MCA_COMMON_UCX_PROGRESS_LOOP(opal_common_ucx.ucp_worker) { + ucp_msg = ucp_tag_probe_nb(opal_common_ucx.ucp_worker, ucp_tag, ucp_tag_mask, 0, &info); if (ucp_msg != NULL) { mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info); @@ -1017,7 +774,7 @@ int mca_pml_ucx_improbe(int src, int tag, struct ompi_communicator_t* comm, PML_UCX_TRACE_PROBE("improbe", src, tag, comm); PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm); - ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag, ucp_tag_mask, + ucp_msg = ucp_tag_probe_nb(opal_common_ucx.ucp_worker, ucp_tag, ucp_tag_mask, 1, &info); if (ucp_msg != NULL) { PML_UCX_MESSAGE_NEW(comm, ucp_msg, &info, message); @@ -1026,7 +783,7 @@ int mca_pml_ucx_improbe(int src, int tag, struct ompi_communicator_t* comm, mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info); } else { (++progress_count % opal_common_ucx.progress_iterations) ? - (void)ucp_worker_progress(ompi_pml_ucx.ucp_worker) : opal_progress(); + (void)ucp_worker_progress(opal_common_ucx.ucp_worker) : opal_progress(); *matched = 0; } return OMPI_SUCCESS; @@ -1043,8 +800,8 @@ int mca_pml_ucx_mprobe(int src, int tag, struct ompi_communicator_t* comm, PML_UCX_TRACE_PROBE("mprobe", src, tag, comm); PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm); - MCA_COMMON_UCX_PROGRESS_LOOP(ompi_pml_ucx.ucp_worker) { - ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag, ucp_tag_mask, + MCA_COMMON_UCX_PROGRESS_LOOP(opal_common_ucx.ucp_worker) { + ucp_msg = ucp_tag_probe_nb(opal_common_ucx.ucp_worker, ucp_tag, ucp_tag_mask, 1, &info); if (ucp_msg != NULL) { PML_UCX_MESSAGE_NEW(comm, ucp_msg, &info, message); @@ -1063,7 +820,7 @@ int mca_pml_ucx_imrecv(void *buf, size_t count, ompi_datatype_t *datatype, PML_UCX_TRACE_MRECV("imrecv", buf, count, datatype, message); - req = (ompi_request_t*)ucp_tag_msg_recv_nb(ompi_pml_ucx.ucp_worker, buf, count, + req = (ompi_request_t*)ucp_tag_msg_recv_nb(opal_common_ucx.ucp_worker, buf, count, mca_pml_ucx_get_datatype(datatype), (*message)->req_ptr, mca_pml_ucx_recv_completion); @@ -1086,7 +843,7 @@ int mca_pml_ucx_mrecv(void *buf, size_t count, ompi_datatype_t *datatype, PML_UCX_TRACE_MRECV("mrecv", buf, count, datatype, message); - req = (ompi_request_t*)ucp_tag_msg_recv_nb(ompi_pml_ucx.ucp_worker, buf, count, + req = (ompi_request_t*)ucp_tag_msg_recv_nb(opal_common_ucx.ucp_worker, buf, count, mca_pml_ucx_get_datatype(datatype), (*message)->req_ptr, mca_pml_ucx_recv_completion); @@ -1129,7 +886,7 @@ int mca_pml_ucx_start(size_t count, ompi_request_t** requests) mca_pml_ucx_psend_completion); } else { PML_UCX_VERBOSE(8, "start recv request %p", (void*)preq); - tmp_req = (ompi_request_t*)ucp_tag_recv_nb(ompi_pml_ucx.ucp_worker, + tmp_req = (ompi_request_t*)ucp_tag_recv_nb(opal_common_ucx.ucp_worker, preq->buffer, preq->count, preq->datatype.datatype, preq->tag, diff --git a/ompi/mca/pml/ucx/pml_ucx.h b/ompi/mca/pml/ucx/pml_ucx.h index 7ad9f646d1d..983f9d5c4c3 100644 --- a/ompi/mca/pml/ucx/pml_ucx.h +++ b/ompi/mca/pml/ucx/pml_ucx.h @@ -39,10 +39,6 @@ typedef struct pml_ucx_convertor mca_pml_ucx_convertor_t; struct mca_pml_ucx_module { mca_pml_base_module_t super; - /* UCX global objects */ - ucp_context_h ucp_context; - ucp_worker_h ucp_worker; - /* Datatypes */ int datatype_attr_keyval; ucp_datatype_t predefined_types[OMPI_DATATYPE_MPI_MAX_PREDEFINED]; @@ -64,8 +60,6 @@ struct mca_pml_ucx_module { extern mca_pml_base_component_2_1_0_t mca_pml_ucx_component; extern mca_pml_ucx_module_t ompi_pml_ucx; -int mca_pml_ucx_open(void); -int mca_pml_ucx_close(void); int mca_pml_ucx_init(int enable_mpi_threads); int mca_pml_ucx_cleanup(void); diff --git a/ompi/mca/pml/ucx/pml_ucx_component.c b/ompi/mca/pml/ucx/pml_ucx_component.c index 0ffe641ca80..1265548482e 100644 --- a/ompi/mca/pml/ucx/pml_ucx_component.c +++ b/ompi/mca/pml/ucx/pml_ucx_component.c @@ -1,5 +1,6 @@ /* * Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED. + * Copyright (c) 2021 Huawei Technologies Co., Ltd. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -10,6 +11,7 @@ #include "pml_ucx.h" #include "opal/mca/memory/base/base.h" +#include "ompi/mca/common/ucx/common_ucx.h" static int mca_pml_ucx_component_register(void); @@ -87,14 +89,14 @@ static int mca_pml_ucx_component_open(void) { opal_common_ucx_mca_register(); - return mca_pml_ucx_open(); + return mca_common_ucx_open("MPI", &ompi_pml_ucx.request_size); } static int mca_pml_ucx_component_close(void) { int rc; - rc = mca_pml_ucx_close(); + rc = mca_common_ucx_close(); if (rc != 0) { return rc; } @@ -110,7 +112,7 @@ mca_pml_ucx_component_init(int* priority, bool enable_progress_threads, opal_common_ucx_support_level_t support_level; int ret; - support_level = opal_common_ucx_support_level(ompi_pml_ucx.ucp_context); + support_level = opal_common_ucx_support_level(opal_common_ucx.ucp_context); if (support_level == OPAL_COMMON_UCX_SUPPORT_NONE) { return NULL; } diff --git a/ompi/mca/pml/ucx/pml_ucx_request.c b/ompi/mca/pml/ucx/pml_ucx_request.c index 744d5803587..5a66a32dbe8 100644 --- a/ompi/mca/pml/ucx/pml_ucx_request.c +++ b/ompi/mca/pml/ucx/pml_ucx_request.c @@ -31,7 +31,7 @@ static int mca_pml_ucx_request_free(ompi_request_t **rptr) static int mca_pml_ucx_request_cancel(ompi_request_t *req, int flag) { - ucp_request_cancel(ompi_pml_ucx.ucp_worker, req); + ucp_request_cancel(opal_common_ucx.ucp_worker, req); return OMPI_SUCCESS; } @@ -230,7 +230,7 @@ static int mca_pml_ucx_persistent_request_cancel(ompi_request_t *req, int flag) mca_pml_ucx_persistent_request_t* preq = (mca_pml_ucx_persistent_request_t*)req; if (preq->tmp_req != NULL) { - ucp_request_cancel(ompi_pml_ucx.ucp_worker, preq->tmp_req); + ucp_request_cancel(opal_common_ucx.ucp_worker, preq->tmp_req); } return OMPI_SUCCESS; } diff --git a/opal/mca/common/ucx/common_ucx.c b/opal/mca/common/ucx/common_ucx.c index 52a704c2f24..901399b47af 100644 --- a/opal/mca/common/ucx/common_ucx.c +++ b/opal/mca/common/ucx/common_ucx.c @@ -3,6 +3,8 @@ * Copyright (c) 2019 Intel, Inc. All rights reserved. * Copyright (c) 2019 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights + * reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -31,7 +33,9 @@ opal_common_ucx_module_t opal_common_ucx = {.verbose = 0, .progress_iterations = 100, .registered = 0, .opal_mem_hooks = 0, - .tls = NULL}; + .tls = NULL, + .ref_count = 0, + .first_version = NULL}; static void opal_common_ucx_mem_release_cb(void *buf, size_t length, void *cbdata, bool from_alloc) { @@ -41,13 +45,14 @@ static void opal_common_ucx_mem_release_cb(void *buf, size_t length, void *cbdat OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component) { static const char *default_tls = "rc_verbs,ud_verbs,rc_mlx5,dc_mlx5,cuda_ipc,rocm_ipc"; - static const char *default_devices = "mlx*"; + static const char *default_devices = "mlx*,hns*"; static int registered = 0; static int hook_index; static int verbose_index; static int progress_index; static int tls_index; static int devices_index; + static int request_leak_check; if (!registered) { verbose_index = mca_base_var_register("opal", "opal_common", "ucx", "verbose", @@ -89,6 +94,20 @@ OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t * "bump its priority above ob1. Special values: any (any available)", MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0, OPAL_INFO_LVL_3, MCA_BASE_VAR_SCOPE_LOCAL, opal_common_ucx.devices); + +#if HAVE_DECL_UCP_WORKER_FLAG_IGNORE_REQUEST_LEAK + opal_common_ucx.request_leak_check = false; + request_leak_check = mca_base_var_register( + "opal", "opal_common", "ucx", "request_leak_check", + "Enable showing a warning during MPI_Finalize if some " + "non-blocking MPI requests have not been released", + MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_3, MCA_BASE_VAR_SCOPE_LOCAL, + &opal_common_ucx.request_leak_check); +#else + /* If UCX does not support ignoring leak check, then it's always enabled */ + opal_common_ucx.request_leak_check = true; +#endif + registered = 1; } if (component) { @@ -107,6 +126,9 @@ OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t * mca_base_var_register_synonym(devices_index, component->mca_project_name, component->mca_type_name, component->mca_component_name, "devices", 0); + mca_base_var_register_synonym(request_leak_check, component->mca_project_name, + component->mca_type_name, component->mca_component_name, + "request_leak_check", 0); } } @@ -221,8 +243,7 @@ OPAL_DECLSPEC opal_common_ucx_support_level_t opal_common_ucx_support_level(ucp_ /* Check for special value "any" */ if (is_any_tl && is_any_device) { - MCA_COMMON_UCX_VERBOSE(1, "ucx is enabled on any transport or device", - *opal_common_ucx.tls); + MCA_COMMON_UCX_VERBOSE(1, "ucx is enabled on any transport or device"); support_level = OPAL_COMMON_UCX_SUPPORT_DEVICE; goto out; } @@ -390,13 +411,14 @@ OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker) return ret; } -static void opal_common_ucx_wait_all_requests(void **reqs, int count, ucp_worker_h worker) +static void opal_common_ucx_wait_all_requests(void **reqs, int count, + ucp_worker_h worker, enum opal_common_ucx_req_type type) { int i; MCA_COMMON_UCX_VERBOSE(2, "waiting for %d disconnect requests", count); for (i = 0; i < count; ++i) { - opal_common_ucx_wait_request(reqs[i], worker, "ucp_disconnect_nb"); + opal_common_ucx_wait_request(reqs[i], worker, type, "ucp_disconnect_nb"); reqs[i] = NULL; } } @@ -439,7 +461,8 @@ OPAL_DECLSPEC int opal_common_ucx_del_procs_nofence(opal_common_ucx_del_proc_t * } else { dreqs[num_reqs++] = dreq; if (num_reqs >= max_disconnect) { - opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker); + opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker, + OPAL_COMMON_UCX_REQUEST_TYPE_UCP); num_reqs = 0; } } @@ -448,7 +471,8 @@ OPAL_DECLSPEC int opal_common_ucx_del_procs_nofence(opal_common_ucx_del_proc_t * /* num_reqs == 0 is processed by opal_common_ucx_wait_all_requests routine, * so suppress coverity warning */ /* coverity[uninit_use_in_call] */ - opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker); + opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker, + OPAL_COMMON_UCX_REQUEST_TYPE_UCP); free(dreqs); return OPAL_SUCCESS; @@ -462,3 +486,306 @@ OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, s return opal_common_ucx_mca_pmix_fence(worker); } + + +#if HAVE_UCP_WORKER_ADDRESS_FLAGS +static int opal_common_ucx_send_worker_address_type(const mca_base_component_t *version, + int addr_flags, int modex_scope) +{ + ucs_status_t status; + ucp_worker_attr_t attrs; + int rc; + + attrs.field_mask = UCP_WORKER_ATTR_FIELD_ADDRESS | + UCP_WORKER_ATTR_FIELD_ADDRESS_FLAGS; + attrs.address_flags = addr_flags; + + status = ucp_worker_query(opal_common_ucx.ucp_worker, &attrs); + if (UCS_OK != status) { + MCA_COMMON_UCX_ERROR("Failed to query UCP worker address"); + return OPAL_ERROR; + } + + OPAL_MODEX_SEND(rc, modex_scope, version, (void*)attrs.address, attrs.address_length); + + ucp_worker_release_address(opal_common_ucx.ucp_worker, attrs.address); + + if (OPAL_SUCCESS != rc) { + return OPAL_ERROR; + } + + MCA_COMMON_UCX_VERBOSE(2, "Pack %s worker address, size %ld", + (modex_scope == PMIX_LOCAL) ? "local" : "remote", + attrs.address_length); + + return OPAL_SUCCESS; +} +#endif + +static int opal_common_ucx_send_worker_address(const mca_base_component_t *version) +{ + ucs_status_t status; + +#if !HAVE_UCP_WORKER_ADDRESS_FLAGS + ucp_address_t *address; + size_t addrlen; + int rc; + + status = ucp_worker_get_address(opal_common_ucx.ucp_worker, &address, &addrlen); + if (UCS_OK != status) { + MCA_COMMON_UCX_ERROR("Failed to get worker address"); + return OPAL_ERROR; + } + + MCA_COMMON_UCX_VERBOSE(2, "Pack worker address, size %ld", addrlen); + + OPAL_MODEX_SEND(rc, PMIX_GLOBAL, version, (void*)address, addrlen); + + ucp_worker_release_address(opal_common_ucx.ucp_worker, address); + + if (OPAL_SUCCESS != rc) { + goto err; + } +#else + /* Pack just network device addresses for remote node peers */ + status = opal_common_ucx_send_worker_address_type(version, + UCP_WORKER_ADDRESS_FLAG_NET_ONLY, + PMIX_REMOTE); + if (UCS_OK != status) { + goto err; + } + + status = opal_common_ucx_send_worker_address_type(version, 0, PMIX_LOCAL); + if (UCS_OK != status) { + goto err; + } +#endif + + return OPAL_SUCCESS; + +err: + MCA_COMMON_UCX_ERROR("Open MPI couldn't distribute EP connection details"); + return OPAL_ERROR; +} + +int opal_common_ucx_recv_worker_address(const opal_process_name_t *proc_name, + ucp_address_t **address_p, + size_t *addrlen_p) +{ + int ret; + + const mca_base_component_t *version = opal_common_ucx.first_version; + + *address_p = NULL; + OPAL_MODEX_RECV(ret, version, proc_name, (void**)address_p, addrlen_p); + if (ret < 0) { + MCA_COMMON_UCX_ERROR("Failed to receive UCX worker address: %s (%d)", + opal_strerror(ret), ret); + } + + return ret; +} + +int opal_common_ucx_open(const char *prefix, + const ucp_params_t *ucp_params, + size_t *request_size) +{ + unsigned major_version, minor_version, release_number; + ucp_context_attr_t attr; + ucs_status_t status; + int just_query = 0; + + if (opal_common_ucx.ref_count++ > 0) { + just_query = 1; + goto query; + } + + /* Check version */ + ucp_get_version(&major_version, &minor_version, &release_number); + MCA_COMMON_UCX_VERBOSE(1, "opal_common_ucx_open: UCX version %u.%u.%u", + major_version, minor_version, release_number); + + if ((major_version == 1) && (minor_version == 8)) { + /* disabled due to issue #8321 */ + MCA_COMMON_UCX_VERBOSE(1, "UCX is disabled because the run-time UCX" + " version is 1.8, which has a known catastrophic" + " issue"); + goto open_error; + } + + ucp_config_t *config; + status = ucp_config_read(prefix, NULL, &config); + if (UCS_OK != status) { + goto open_error; + } + + status = ucp_init(ucp_params, config, &opal_common_ucx.ucp_context); + ucp_config_release(config); + + if (UCS_OK != status) { + goto open_error; + } + +query: + /* Query UCX attributes */ + attr.field_mask = UCP_ATTR_FIELD_REQUEST_SIZE; +#if HAVE_UCP_ATTR_MEMORY_TYPES + attr.field_mask |= UCP_ATTR_FIELD_MEMORY_TYPES; +#endif + status = ucp_context_query(opal_common_ucx.ucp_context, &attr); + if (UCS_OK != status) { + goto cleanup_ctx; + } + + *request_size = attr.request_size; + if (just_query) { + return OPAL_SUCCESS; + } + + /* Initialize CUDA, if supported */ + opal_common_ucx.cuda_initialized = false; +#if HAVE_UCP_ATTR_MEMORY_TYPES && OPAL_CUDA_SUPPORT + if (attr.memory_types & UCS_BIT(UCS_MEMORY_TYPE_CUDA)) { + mca_common_cuda_stage_one_init(); + opal_common_ucx.cuda_initialized = true; + } +#endif + + return OPAL_SUCCESS; + +cleanup_ctx: + ucp_cleanup(opal_common_ucx.ucp_context); + +open_error: + opal_common_ucx.ucp_context = NULL; /* In case anyone comes querying */ + return OPAL_ERROR; +} + +int opal_common_ucx_close(void) +{ + MCA_COMMON_UCX_VERBOSE(1, "opal_common_ucx_close"); + + MCA_COMMON_UCX_ASSERT(opal_common_ucx.ref_count > 0); + + if (--opal_common_ucx.ref_count > 0) { + return OPAL_SUCCESS; + } + +#if OPAL_CUDA_SUPPORT + if (opal_common_ucx.cuda_initialized) { + mca_common_cuda_fini(); + } +#endif + + if (opal_common_ucx.ucp_context != NULL) { + ucp_cleanup(opal_common_ucx.ucp_context); + opal_common_ucx.ucp_context = NULL; + } + + return OPAL_SUCCESS; +} + +static int opal_common_ucx_init_worker(int enable_mpi_threads) +{ + ucp_worker_params_t params; + ucp_worker_attr_t attr; + ucs_status_t status; + int rc; + + + /* TODO check MPI thread mode */ + params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE; + if (enable_mpi_threads) { + params.thread_mode = UCS_THREAD_MODE_MULTI; + } else { + params.thread_mode = UCS_THREAD_MODE_SINGLE; + } + + status = ucp_worker_create(opal_common_ucx.ucp_context, ¶ms, + &opal_common_ucx.ucp_worker); + if (UCS_OK != status) { + MCA_COMMON_UCX_ERROR("Failed to create UCP worker"); + return OPAL_ERROR; + } + +#if HAVE_DECL_UCP_WORKER_FLAG_IGNORE_REQUEST_LEAK + if (!opal_common_ucx.request_leak_check) { + params.field_mask |= UCP_WORKER_PARAM_FIELD_FLAGS; + params.flags |= UCP_WORKER_FLAG_IGNORE_REQUEST_LEAK; + } +#endif + + attr.field_mask = UCP_WORKER_ATTR_FIELD_THREAD_MODE; + status = ucp_worker_query(opal_common_ucx.ucp_worker, &attr); + if (UCS_OK != status) { + MCA_COMMON_UCX_ERROR("Failed to query UCP worker thread level"); + rc = OPAL_ERROR; + goto err_destroy_worker; + } + + if (enable_mpi_threads && (attr.thread_mode != UCS_THREAD_MODE_MULTI)) { + /* UCX does not support multithreading, disqualify component for now */ + /* TODO: we should let OMPI to fallback to THREAD_SINGLE mode */ + MCA_COMMON_UCX_WARN("UCP worker does not support MPI_THREAD_MULTIPLE"); + rc = OPAL_ERR_NOT_SUPPORTED; + goto err_destroy_worker; + } + + MCA_COMMON_UCX_VERBOSE(2, "created ucp context %p, worker %p", + (void *)opal_common_ucx.ucp_context, + (void *)opal_common_ucx.ucp_worker); + + return OPAL_SUCCESS; + +err_destroy_worker: + ucp_worker_destroy(opal_common_ucx.ucp_worker); + return rc; +} + +static int opal_common_ucx_progress(void) +{ + return (int) ucp_worker_progress(opal_common_ucx.ucp_worker); +} + +int opal_common_ucx_init(int enable_mpi_threads, + const mca_base_component_t *version) +{ + int rc; + + if (opal_common_ucx.first_version != NULL) { + return OPAL_SUCCESS; + } + + rc = opal_common_ucx_init_worker(enable_mpi_threads); + if (rc < 0) { + return rc; + } + + rc = opal_common_ucx_send_worker_address(version); + if (rc < 0) { + MCA_COMMON_UCX_ERROR("Failed to send worker address") + ucp_worker_destroy(opal_common_ucx.ucp_worker); + } else { + opal_common_ucx.first_version = version; + } + + opal_progress_register(opal_common_ucx_progress); + + return rc; +} + +int opal_common_ucx_cleanup(void) +{ + if (opal_common_ucx.ref_count > 1) { + return OPAL_SUCCESS; + } + + opal_progress_unregister(opal_common_ucx_progress); + + if (opal_common_ucx.ucp_worker != NULL) { + ucp_worker_destroy(opal_common_ucx.ucp_worker); + opal_common_ucx.ucp_worker = NULL; + } + + return OPAL_SUCCESS; +} diff --git a/opal/mca/common/ucx/common_ucx.h b/opal/mca/common/ucx/common_ucx.h index a7312b27f9e..18aa60de192 100644 --- a/opal/mca/common/ucx/common_ucx.h +++ b/opal/mca/common/ucx/common_ucx.h @@ -5,6 +5,8 @@ * and Technology (RIST). All rights reserved. * Copyright (c) 2019-2020 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. + * Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights + * reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -24,6 +26,8 @@ #include "opal/class/opal_list.h" #include "opal/include/opal/constants.h" #include "opal/mca/mca.h" +#include "opal/util/proc.h" +#include "opal/util/output.h" #include "opal/runtime/opal_progress.h" #include "opal/util/output.h" @@ -56,42 +60,56 @@ BEGIN_C_DECLS __FILE__ ":" MCA_COMMON_UCX_QUOTE(__LINE__) " " __VA_ARGS__); \ } -/* progress loop to allow call UCX/opal progress */ +enum opal_common_ucx_req_type { + OPAL_COMMON_UCX_REQUEST_TYPE_UCP = 0 +}; + +/* progress loop to allow call UCX/opal progress, while testing requests by type */ /* used C99 for-statement variable initialization */ -#define MCA_COMMON_UCX_PROGRESS_LOOP(_worker) \ - for (unsigned iter = 0;; (++iter % opal_common_ucx.progress_iterations) \ - ? (void) ucp_worker_progress(_worker) \ - : opal_progress()) - -#define MCA_COMMON_UCX_WAIT_LOOP(_request, _worker, _msg, _completed) \ - do { \ - ucs_status_t status; \ - /* call UCX progress */ \ - MCA_COMMON_UCX_PROGRESS_LOOP(_worker) \ - { \ - status = opal_common_ucx_request_status(_request); \ - if (UCS_INPROGRESS != status) { \ - _completed; \ - if (OPAL_LIKELY(UCS_OK == status)) { \ - return OPAL_SUCCESS; \ - } else { \ - MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s", (_msg) ? (_msg) : __func__, \ - UCS_PTR_STATUS(_request), \ - ucs_status_string(UCS_PTR_STATUS(_request))); \ - return OPAL_ERROR; \ - } \ - } \ - } \ +#define MCA_COMMON_UCX_PROGRESS_LOOP(_worker) \ + unsigned iter_mask = (unsigned) opal_common_ucx.progress_iterations; \ + iter_mask = UCS_MASK(8 * sizeof(unsigned) - __builtin_clz(iter_mask)); \ + for (unsigned iter = 0;; ((++iter & iter_mask) == 0) ? opal_progress() : \ + (void)ucp_worker_progress(_worker)) + +#define MCA_COMMON_UCX_WAIT_LOOP(_request, _req_type, \ + _worker, _msg, _completed) \ + do { \ + ucs_status_t status; \ + /* call UCX progress */ \ + MCA_COMMON_UCX_PROGRESS_LOOP(_worker) { \ + status = opal_common_ucx_request_status(_request, _req_type); \ + if (UCS_INPROGRESS != status) { \ + _completed; \ + if (OPAL_LIKELY(UCS_OK == status)) { \ + return OPAL_SUCCESS; \ + } else { \ + status = UCS_PTR_STATUS(_request); \ + MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s", \ + (_msg) ? (_msg) : __func__, \ + status, ucs_status_string(status)); \ + return OPAL_ERROR; \ + } \ + } \ + } \ } while (0) typedef struct opal_common_ucx_module { - int output; - int verbose; - int progress_iterations; - int registered; - bool opal_mem_hooks; - char **tls; - char **devices; + int output; + int verbose; + int progress_iterations; + int registered; + bool opal_mem_hooks; + char **tls; + char **devices; + + ucp_worker_h ucp_worker; + ucp_context_h ucp_context; + + unsigned ref_count; + const mca_base_component_t *first_version; + bool cuda_initialized; + bool request_leak_check; } opal_common_ucx_module_t; typedef struct opal_common_ucx_del_proc { @@ -128,6 +146,18 @@ OPAL_DECLSPEC int opal_common_ucx_del_procs_nofence(opal_common_ucx_del_proc_t * ucp_worker_h worker); OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component); + +int opal_common_ucx_open(const char *prefix, + const ucp_params_t *ucp_params, + size_t *request_size); +int opal_common_ucx_close(void); +int opal_common_ucx_init(int enable_mpi_threads, + const mca_base_component_t *version); +int opal_common_ucx_cleanup(void); +int opal_common_ucx_recv_worker_address(const opal_process_name_t *proc_name, + ucp_address_t **address_p, + size_t *addrlen_p); + /** * Load an integer value of \c size bytes from \c ptr and cast it to uint64_t. */ @@ -160,19 +190,30 @@ static inline void opal_common_ucx_store_uint64(uint64_t value, void *ptr, size_ } } -static inline ucs_status_t opal_common_ucx_request_status(ucs_status_ptr_t request) +static inline +ucs_status_t opal_common_ucx_request_status(ucs_status_ptr_t request, + enum opal_common_ucx_req_type type) { + switch (type) { + case OPAL_COMMON_UCX_REQUEST_TYPE_UCP: #if !HAVE_DECL_UCP_REQUEST_CHECK_STATUS - ucp_tag_recv_info_t info; + ucp_tag_recv_info_t info; - return ucp_request_test(request, &info); + return ucp_request_test(request, &info); #else - return ucp_request_check_status(request); + return ucp_request_check_status(request); #endif + + default: + break; + } + return OPAL_ERROR; } -static inline int opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker, - const char *msg) +static inline +int opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker, + enum opal_common_ucx_req_type type, + const char *msg) { /* check for request completed or failed */ if (OPAL_LIKELY(UCS_OK == request)) { @@ -183,7 +224,7 @@ static inline int opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_wor return OPAL_ERROR; } - MCA_COMMON_UCX_WAIT_LOOP(request, worker, msg, ucp_request_free(request)); + MCA_COMMON_UCX_WAIT_LOOP(request, type, worker, msg, ucp_request_free(request)); } static inline int opal_common_ucx_ep_flush(ucp_ep_h ep, ucp_worker_h worker) @@ -192,7 +233,9 @@ static inline int opal_common_ucx_ep_flush(ucp_ep_h ep, ucp_worker_h worker) ucs_status_ptr_t request; request = ucp_ep_flush_nb(ep, 0, opal_common_ucx_empty_complete_cb); - return opal_common_ucx_wait_request(request, worker, "ucp_ep_flush_nb"); + return opal_common_ucx_wait_request(request, worker, + OPAL_COMMON_UCX_REQUEST_TYPE_UCP, + "ucp_ep_flush_nb"); #else ucs_status_t status; @@ -207,7 +250,10 @@ static inline int opal_common_ucx_worker_flush(ucp_worker_h worker) ucs_status_ptr_t request; request = ucp_worker_flush_nb(worker, 0, opal_common_ucx_empty_complete_cb); - return opal_common_ucx_wait_request(request, worker, "ucp_worker_flush_nb"); + + return opal_common_ucx_wait_request(request, worker, + OPAL_COMMON_UCX_REQUEST_TYPE_UCP, + "ucp_worker_flush_nb"); #else ucs_status_t status; @@ -223,9 +269,13 @@ static inline int opal_common_ucx_atomic_fetch(ucp_ep_h ep, ucp_atomic_fetch_op_ { ucs_status_ptr_t request; - request = ucp_atomic_fetch_nb(ep, opcode, value, result, op_size, remote_addr, rkey, + request = ucp_atomic_fetch_nb(ep, opcode, value, result, + op_size, remote_addr, rkey, opal_common_ucx_empty_complete_cb); - return opal_common_ucx_wait_request(request, worker, "ucp_atomic_fetch_nb"); + + return opal_common_ucx_wait_request(request, worker, + OPAL_COMMON_UCX_REQUEST_TYPE_UCP, + "ucp_atomic_fetch_nb"); } static inline ucs_status_ptr_t @@ -241,6 +291,7 @@ static inline int opal_common_ucx_atomic_cswap(ucp_ep_h ep, uint64_t compare, ui ucp_rkey_h rkey, ucp_worker_h worker) { opal_common_ucx_store_uint64(value, result, op_size); + return opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, result, op_size, remote_addr, rkey, worker); } @@ -251,6 +302,7 @@ opal_common_ucx_atomic_cswap_nb(ucp_ep_h ep, uint64_t compare, uint64_t value, v ucp_send_callback_t req_handler, ucp_worker_h worker) { opal_common_ucx_store_uint64(value, result, op_size); + return opal_common_ucx_atomic_fetch_nb(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, result, op_size, remote_addr, rkey, req_handler, worker); } diff --git a/opal/mca/common/ucx/common_ucx_wpool.c b/opal/mca/common/ucx/common_ucx_wpool.c index 1ea7f40932e..a16970c2f7b 100644 --- a/opal/mca/common/ucx/common_ucx_wpool.c +++ b/opal/mca/common/ucx/common_ucx_wpool.c @@ -81,7 +81,9 @@ static opal_common_ucx_winfo_t *_winfo_create(opal_common_ucx_wpool_t *wpool) static void _winfo_destructor(opal_common_ucx_winfo_t *winfo) { if (winfo->inflight_req != UCS_OK) { - opal_common_ucx_wait_request_mt(winfo->inflight_req, "opal_common_ucx_flush"); + opal_common_ucx_wait_request_mt(winfo->inflight_req, + OPAL_COMMON_UCX_REQUEST_TYPE_UCP, + "opal_common_ucx_flush"); winfo->inflight_req = UCS_OK; } @@ -783,11 +785,13 @@ OPAL_DECLSPEC int opal_common_ucx_winfo_flush(opal_common_ucx_winfo_t *winfo, in req = ucp_worker_flush_nb(winfo->worker, 0, opal_common_ucx_empty_complete_cb); } if (UCS_PTR_IS_PTR(req)) { - ((opal_common_ucx_request_t *) req)->winfo = winfo; + ((opal_common_ucx_request_t *)req)->winfo = winfo; } if (OPAL_COMMON_UCX_FLUSH_B) { - rc = opal_common_ucx_wait_request_mt(req, "ucp_ep_flush_nb"); + rc = opal_common_ucx_wait_request_mt(req, + OPAL_COMMON_UCX_REQUEST_TYPE_UCP, + "ucp_ep_flush_nb"); } else { *req_ptr = req; } diff --git a/opal/mca/common/ucx/common_ucx_wpool.h b/opal/mca/common/ucx/common_ucx_wpool.h index 5d9f271bbb3..e0224483073 100644 --- a/opal/mca/common/ucx/common_ucx_wpool.h +++ b/opal/mca/common/ucx/common_ucx_wpool.h @@ -254,7 +254,9 @@ OPAL_DECLSPEC int opal_common_ucx_winfo_flush(opal_common_ucx_winfo_t *winfo, in opal_common_ucx_flush_scope_t scope, ucs_status_ptr_t *req_ptr); -static inline int opal_common_ucx_wait_request_mt(ucs_status_ptr_t request, const char *msg) +static inline +int opal_common_ucx_wait_request_mt(ucs_status_ptr_t request, + enum opal_common_ucx_req_type type, const char *msg) { ucs_status_t status; int ctr = 0, ret = 0; @@ -277,7 +279,7 @@ static inline int opal_common_ucx_wait_request_mt(ucs_status_ptr_t request, cons opal_mutex_lock(&winfo->mutex); do { ret = ucp_worker_progress(winfo->worker); - status = opal_common_ucx_request_status(request); + status = opal_common_ucx_request_status(request, type); if (status != UCS_INPROGRESS) { ucp_request_free(request); if (OPAL_UNLIKELY(UCS_OK != status)) { @@ -311,7 +313,9 @@ static inline int _periodical_flush_nb(opal_common_ucx_wpmem_t *mem, opal_common opal_common_ucx_flush_scope_t scope; if (winfo->inflight_req != UCS_OK) { - rc = opal_common_ucx_wait_request_mt(winfo->inflight_req, "opal_common_ucx_flush_nb"); + rc = opal_common_ucx_wait_request_mt(winfo->inflight_req, + OPAL_COMMON_UCX_REQUEST_TYPE_UCP, + "opal_common_ucx_flush_nb"); if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) { MCA_COMMON_UCX_VERBOSE(1, "opal_common_ucx_wait_request failed: %d", rc); return rc; diff --git a/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c b/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c index 45b0ce00692..57854487d0e 100644 --- a/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c +++ b/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c @@ -65,6 +65,7 @@ int mca_atomic_ucx_cswap(shmem_ctx_t ctx, } return opal_common_ucx_wait_request(status_ptr, ucx_ctx->ucp_worker[0], + OPAL_COMMON_UCX_REQUEST_TYPE_UCP, #if HAVE_DECL_UCP_ATOMIC_OP_NBX "ucp_atomic_op_nbx"); #else diff --git a/oshmem/mca/atomic/ucx/atomic_ucx_module.c b/oshmem/mca/atomic/ucx/atomic_ucx_module.c index 8a9a4a06311..078366c8572 100644 --- a/oshmem/mca/atomic/ucx/atomic_ucx_module.c +++ b/oshmem/mca/atomic/ucx/atomic_ucx_module.c @@ -120,6 +120,7 @@ int mca_atomic_ucx_fop(shmem_ctx_t ctx, status_ptr = ucp_atomic_op_nbx(ucx_ctx->ucp_peers[pe].ucp_conn, op, &value, 1, rva, ucx_mkey->rkey, ¶m); return opal_common_ucx_wait_request(status_ptr, ucx_ctx->ucp_worker[0], + OPAL_COMMON_UCX_REQUEST_TYPE_UCP, "ucp_atomic_op_nbx"); #else status_ptr = ucp_atomic_fetch_nb(ucx_ctx->ucp_peers[pe].ucp_conn, @@ -127,6 +128,7 @@ int mca_atomic_ucx_fop(shmem_ctx_t ctx, rva, ucx_mkey->rkey, opal_common_ucx_empty_complete_cb); return opal_common_ucx_wait_request(status_ptr, ucx_ctx->ucp_worker[0], + OPAL_COMMON_UCX_REQUEST_TYPE_UCP, "ucp_atomic_fetch_nb"); #endif } diff --git a/oshmem/mca/spml/ucx/spml_ucx.c b/oshmem/mca/spml/ucx/spml_ucx.c index fba5b2cb806..a959a979b25 100644 --- a/oshmem/mca/spml/ucx/spml_ucx.c +++ b/oshmem/mca/spml/ucx/spml_ucx.c @@ -827,11 +827,15 @@ int mca_spml_ucx_get(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_add #if HAVE_DECL_UCP_GET_NBX request = ucp_get_nbx(ucx_ctx->ucp_peers[src].ucp_conn, dst_addr, size, (uint64_t)rva, ucx_mkey->rkey, &mca_spml_ucx_request_param); - return opal_common_ucx_wait_request(request, ucx_ctx->ucp_worker[0], "ucp_get_nbx"); + return opal_common_ucx_wait_request(request, ucx_ctx->ucp_worker[0], + OPAL_COMMON_UCX_REQUEST_TYPE_UCP, + "ucp_get_nbx"); #elif HAVE_DECL_UCP_GET_NB request = ucp_get_nb(ucx_ctx->ucp_peers[src].ucp_conn, dst_addr, size, (uint64_t)rva, ucx_mkey->rkey, opal_common_ucx_empty_complete_cb); - return opal_common_ucx_wait_request(request, ucx_ctx->ucp_worker[0], "ucp_get_nb"); + return opal_common_ucx_wait_request(request, ucx_ctx->ucp_worker[0], + OPAL_COMMON_UCX_REQUEST_TYPE_UCP, + "ucp_get_nb"); #else status = ucp_get(ucx_ctx->ucp_peers[src].ucp_conn, dst_addr, size, (uint64_t)rva, ucx_mkey->rkey); @@ -918,11 +922,15 @@ int mca_spml_ucx_put(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_add #if HAVE_DECL_UCP_PUT_NBX request = ucp_put_nbx(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size, (uint64_t)rva, ucx_mkey->rkey, &mca_spml_ucx_request_param); - res = opal_common_ucx_wait_request(request, ucx_ctx->ucp_worker[0], "ucp_put_nbx"); + res = opal_common_ucx_wait_request(request, ucx_ctx->ucp_worker[0], + OPAL_COMMON_UCX_REQUEST_TYPE_UCP, + "ucp_put_nbx"); #elif HAVE_DECL_UCP_PUT_NB request = ucp_put_nb(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size, (uint64_t)rva, ucx_mkey->rkey, opal_common_ucx_empty_complete_cb); - res = opal_common_ucx_wait_request(request, ucx_ctx->ucp_worker[0], "ucp_put_nb"); + res = opal_common_ucx_wait_request(request, ucx_ctx->ucp_worker[0], + OPAL_COMMON_UCX_REQUEST_TYPE_UCP, + "ucp_put_nb"); #else status = ucp_put(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size, (uint64_t)rva, ucx_mkey->rkey);