Skip to content

Commit

Permalink
xpmem: Add XPMEM support
Browse files Browse the repository at this point in the history
Add new xpmem files to the repositories and include them in the build.

ofi_xpmem.h: includes structures which are needed by providers if they
             intend on using xpmem
xpmem_monitor.c: Needed for caching
xpmem.c: The interface implementation

Add the XPMEM point to point infrastructure which can be used by shm
provider (or other providers) to execute XPMEM transfer.

Signed-off-by: Amir Shehata <[email protected]>
  • Loading branch information
amirshehataornl committed Aug 9, 2023
1 parent 75d0cb8 commit 50ca016
Show file tree
Hide file tree
Showing 18 changed files with 685 additions and 22 deletions.
4 changes: 4 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ common_srcs = \
src/hmem_neuron.c \
src/hmem_synapseai.c \
src/hmem_ipc_cache.c \
src/xpmem.c \
src/xpmem_cache.c \
src/common.c \
src/enosys.c \
src/rbtree.c \
Expand Down Expand Up @@ -86,6 +88,7 @@ common_srcs = \
prov/util/src/ze_mem_monitor.c \
prov/util/src/cuda_ipc_monitor.c \
prov/util/src/rocr_ipc_monitor.c \
prov/util/src/xpmem_monitor.c \
prov/coll/src/coll_attr.c \
prov/coll/src/coll_av.c \
prov/coll/src/coll_av_set.c \
Expand Down Expand Up @@ -164,6 +167,7 @@ nodist_src_libfabric_la_SOURCES =
src_libfabric_la_SOURCES = \
include/ofi_hmem.h \
include/ofi_cma.h \
include/ofi_xpmem.h \
include/ofi.h \
include/ofi_abi.h \
include/ofi_atom.h \
Expand Down
1 change: 1 addition & 0 deletions include/ofi_mr.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ extern struct ofi_mem_monitor *rocr_monitor;
extern struct ofi_mem_monitor *rocr_ipc_monitor;
extern struct ofi_mem_monitor *ze_monitor;
extern struct ofi_mem_monitor *import_monitor;
extern struct ofi_mem_monitor *xpmem_monitor;

/*
* Used to store registered memory regions into a lookup map. This
Expand Down
7 changes: 4 additions & 3 deletions include/ofi_shm_p2p.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#endif

#include <ofi_cma.h>
#include <ofi_xpmem.h>
#include <ofi.h>
#include <ofi_iov.h>

Expand Down Expand Up @@ -77,9 +78,9 @@ ofi_shm_p2p_no_copy(struct iovec *local, unsigned long local_cnt,
static struct ofi_shm_p2p_ops p2p_ops[] = {
[FI_SHM_P2P_XPMEM] = {
.initialized = false,
.init = ofi_shm_p2p_no_init,
.cleanup = ofi_shm_p2p_no_cleanup,
.copy = ofi_shm_p2p_no_copy,
.init = xpmem_init,
.cleanup = xpmem_cleanup,
.copy = xpmem_copy,
},
[FI_SHM_P2P_CMA] = {
.initialized = false,
Expand Down
91 changes: 91 additions & 0 deletions include/ofi_xpmem.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* (C) Copyright 2023 UT-Battelle, LLC. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
* General Public License (GPL) Version 2, available from the file
* COPYING in the main directory of this source tree, or the
* BSD license below:
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above
* copyright notice, this list of conditions and the following
* disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

#ifndef OFI_XPMEM_H
#define OFI_XPMEM_H

#include <ofi_mr.h>
#include <ofi_util.h>

#if HAVE_XPMEM
#include <xpmem.h>
#else
/* define XPMEM specific types to allow the code to
* compile on different platforms and to allow smr_region to compile
* without xpmem available.
*/
typedef int64_t xpmem_apid_t;
typedef int64_t xpmem_segid_t;
#endif /* HAVE_XPMEM */

struct xpmem_client {
uint8_t cap;
xpmem_apid_t apid;
uintptr_t addr_max;
};

struct xpmem_pinfo {
/* XPMEM segment id for this process */
xpmem_segid_t seg_id;
/* maximum attachment address for this process. attempts to attach
* past this value may fail. */
uintptr_t address_max;
};

struct xpmem {
struct xpmem_pinfo pinfo;
/* maximum size that will be used with a single memcpy call.
* On some systems we see better peformance if we chunk the
* copy into multiple memcpy calls. */
uint64_t memcpy_chunk_size;
};

extern struct xpmem *xpmem;

int ofi_xpmem_cache_search(struct ofi_mr_cache *cache,
struct iovec *iov, uint64_t peer_id,
struct ofi_mr_entry **mr_entry,
struct xpmem_client *xpmem);

int ofi_xpmem_cache_open(struct ofi_mr_cache **cache);
void ofi_xpmem_cache_destroy(struct ofi_mr_cache *cache);

int xpmem_init(void);
int xpmem_cleanup(void);
int xpmem_copy(struct iovec *local, unsigned long local_cnt,
struct iovec *remote, unsigned long remote_cnt,
size_t total, pid_t pid, bool write, void *user_data);
int ofi_xpmem_enable(struct xpmem_pinfo *peer,
struct xpmem_client *xpmem);
void ofi_xpmem_release(struct xpmem_client *xpmem);

#endif /* OFI_XPMEM_H */
3 changes: 3 additions & 0 deletions libfabric.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,7 @@
<ClCompile Include="prov\util\src\ze_mem_monitor.c" />
<ClCompile Include="prov\util\src\cuda_ipc_monitor.c" />
<ClCompile Include="prov\util\src\rocr_ipc_monitor.c" />
<ClCompile Include="prov\util\src\xpmem_monitor.c" />
<ClCompile Include="prov\coll\src\coll_attr.c" />
<ClCompile Include="prov\coll\src\coll_av.c" />
<ClCompile Include="prov\coll\src\coll_av_set.c" />
Expand All @@ -795,6 +796,8 @@
<ClCompile Include="src\hmem_neuron.c" />
<ClCompile Include="src\hmem_synapseai.c" />
<ClCompile Include="src\hmem_ipc_cache.c" />
<ClCompile Include="src\xpmem_cache.c" />
<ClCompile Include="src\xpmem.c" />
<ClCompile Include="src\indexer.c" />
<ClCompile Include="src\iov.c" />
<ClCompile Include="src\ofi_str.c" />
Expand Down
13 changes: 13 additions & 0 deletions man/fi_shm.7.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,19 @@ The *shm* provider checks for the following environment variables:
page fault is reported, so that there is valid address translation for the
remaining addresses in the command. This minimizes DSA page faults. Default
false

*FI_SHM_USE_XPMEM*
: SHM can use SAR, CMA or XPMEM for host memory transfer. If
FI_SHM_USE_XPMEM is set to 1, the provider will select XPMEM over CMA if
XPMEM is available. Otherwise, if neither CMA nor XPMEM are available
SHM shall default to the SAR protocol. Default 0

*FI_XPMEM_MEMCPY_CHUNKSIZE*
: The maximum size which will be used with a single memcpy call. XPMEM
copy performance improves when buffers are divided into smaller
chunks. This environment variable is provided to fine tune performance
on different systems. Default 262144

# SEE ALSO

[`fabric`(7)](fabric.7.html),
Expand Down
9 changes: 7 additions & 2 deletions prov/shm/src/smr.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

#include <ofi.h>
#include <ofi_enosys.h>
#include <ofi_shm_p2p.h>
#include <ofi_rbuf.h>
#include <ofi_list.h>
#include <ofi_signal.h>
Expand All @@ -75,6 +76,7 @@ struct smr_env {
int disable_cma;
int use_dsa_sar;
size_t max_gdrcopy_size;
int use_xpmem;
};

extern struct smr_env smr_env;
Expand Down Expand Up @@ -228,6 +230,7 @@ struct smr_ep {
struct dlist_entry ipc_cpy_pend_list;

int ep_idx;
enum ofi_shm_p2p_type p2p_type;
struct smr_sock_info *sock_info;
void *dsa_context;
void (*smr_progress_ipc_list)(struct smr_ep *ep);
Expand Down Expand Up @@ -308,9 +311,11 @@ static inline bool smr_vma_enabled(struct smr_ep *ep,
struct smr_region *peer_smr)
{
if (ep->region == peer_smr)
return ep->region->cma_cap_self == SMR_VMA_CAP_ON;
return (ep->region->cma_cap_self == SMR_VMA_CAP_ON ||
ep->region->xpmem_cap_self == SMR_VMA_CAP_ON);
else
return ep->region->cma_cap_peer == SMR_VMA_CAP_ON;
return (ep->region->cma_cap_peer == SMR_VMA_CAP_ON ||
peer_smr->xpmem_cap_self == SMR_VMA_CAP_ON);
}

static inline bool smr_ze_ipc_enabled(struct smr_region *smr,
Expand Down
16 changes: 16 additions & 0 deletions prov/shm/src/smr_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "smr_signal.h"
#include "smr.h"
#include "smr_dsa.h"
#include "ofi_xpmem.h"

extern struct fi_ops_msg smr_msg_ops, smr_no_recv_msg_ops;
extern struct fi_ops_tagged smr_tag_ops, smr_no_recv_tag_ops;
Expand Down Expand Up @@ -1354,6 +1355,11 @@ static int smr_ep_ctrl(struct fid *fid, int command, void *arg)
else
ep->smr_progress_ipc_list = smr_progress_ipc_list_noop;

if (smr_env.use_xpmem)
ep->region->xpmem_cap_self = SMR_VMA_CAP_ON;
else
ep->region->xpmem_cap_self = SMR_VMA_CAP_OFF;

if (!ep->srx) {
domain = container_of(ep->util_ep.domain,
struct smr_domain,
Expand Down Expand Up @@ -1387,6 +1393,13 @@ static int smr_ep_ctrl(struct fid *fid, int command, void *arg)
if (smr_env.use_dsa_sar)
smr_dsa_context_init(ep);

/* if XPMEM is on after exchanging peer info, then set the
* endpoint p2p to XPMEM so it can be used on the fast
* path
*/
if (ep->region->xpmem_cap_self == SMR_VMA_CAP_ON)
ep->p2p_type = FI_SHM_P2P_XPMEM;

break;
default:
return -FI_ENOSYS;
Expand Down Expand Up @@ -1506,6 +1519,9 @@ int smr_endpoint(struct fid_domain *domain, struct fi_info *info,
ep->util_ep.ep_fid.atomic = &smr_atomic_ops;

*ep_fid = &ep->util_ep.ep_fid;

/* default to CMA for p2p */
ep->p2p_type = FI_SHM_P2P_CMA;
return 0;
ep:
ofi_endpoint_close(&ep->util_ep);
Expand Down
5 changes: 5 additions & 0 deletions prov/shm/src/smr_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct smr_env smr_env = {
.disable_cma = false,
.use_dsa_sar = false,
.max_gdrcopy_size = 3072,
.use_xpmem = false,
};

static void smr_init_env(void)
Expand All @@ -54,6 +55,7 @@ static void smr_init_env(void)
fi_param_get_size_t(&smr_prov, "rx_size", &smr_info.rx_attr->size);
fi_param_get_bool(&smr_prov, "disable_cma", &smr_env.disable_cma);
fi_param_get_bool(&smr_prov, "use_dsa_sar", &smr_env.use_dsa_sar);
fi_param_get_bool(&smr_prov, "use_xpmem", &smr_env.use_xpmem);
}

static void smr_resolve_addr(const char *node, const char *service,
Expand Down Expand Up @@ -220,6 +222,9 @@ SHM_INI
"Enable CPU touching of memory pages in DSA command \
descriptor when page fault is reported. \
Default: false");
fi_param_define(&smr_prov, "use_xpmem", FI_PARAM_BOOL,
"Enable XPMEM over CMA when possible "
"(default: false)");

smr_init_env();

Expand Down
8 changes: 5 additions & 3 deletions prov/shm/src/smr_progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ static int smr_progress_iov(struct smr_cmd *cmd, struct iovec *iov,
size_t iov_count, size_t *total_len,
struct smr_ep *ep, int err)
{
enum ofi_shm_p2p_type p2p_type = FI_SHM_P2P_CMA;
struct smr_region *peer_smr;
struct xpmem_client *xpmem;
struct smr_resp *resp;
int ret;

Expand All @@ -321,10 +321,12 @@ static int smr_progress_iov(struct smr_cmd *cmd, struct iovec *iov,
goto out;
}

ret = ofi_shm_p2p_copy(p2p_type, iov, iov_count, cmd->msg.data.iov,
xpmem = &smr_peer_data(ep->region)[cmd->msg.hdr.id].xpmem;

ret = ofi_shm_p2p_copy(ep->p2p_type, iov, iov_count, cmd->msg.data.iov,
cmd->msg.data.iov_count, cmd->msg.hdr.size,
peer_smr->pid, cmd->msg.hdr.op == ofi_op_read_req,
NULL);
xpmem);
if (!ret)
*total_len = cmd->msg.hdr.size;

Expand Down
27 changes: 15 additions & 12 deletions prov/shm/src/smr_rma.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,14 @@ static void smr_format_rma_resp(struct smr_cmd *cmd, fi_addr_t peer_id,
cmd->msg.hdr.size = total_len;
}

static ssize_t smr_rma_fast(struct smr_region *peer_smr, const struct iovec *iov,
size_t iov_count, const struct fi_rma_iov *rma_iov,
size_t rma_count, void **desc, int peer_id, void *context,
static ssize_t smr_rma_fast(struct smr_ep *ep, struct smr_region *peer_smr,
const struct iovec *iov, size_t iov_count,
const struct fi_rma_iov *rma_iov, size_t rma_count,
void **desc, int peer_id, int peer_idx, void *context,
uint32_t op, uint64_t op_flags)
{
struct iovec cma_iovec[SMR_IOV_LIMIT], rma_iovec[SMR_IOV_LIMIT];
enum ofi_shm_p2p_type p2p_type = FI_SHM_P2P_CMA;
struct iovec vma_iovec[SMR_IOV_LIMIT], rma_iovec[SMR_IOV_LIMIT];
struct xpmem_client *xpmem;
struct smr_cmd_entry *ce;
size_t total_len;
int ret, i;
Expand All @@ -71,17 +72,19 @@ static ssize_t smr_rma_fast(struct smr_region *peer_smr, const struct iovec *iov
if (ret == -FI_ENOENT)
return -FI_EAGAIN;

memcpy(cma_iovec, iov, sizeof(*iov) * iov_count);
memcpy(vma_iovec, iov, sizeof(*iov) * iov_count);
for (i = 0; i < rma_count; i++) {
rma_iovec[i].iov_base = (void *) rma_iov[i].addr;
rma_iovec[i].iov_len = rma_iov[i].len;
}

total_len = ofi_total_iov_len(iov, iov_count);

ret = ofi_shm_p2p_copy(p2p_type, rma_iovec, iov_count,
xpmem = &smr_peer_data(ep->region)[peer_idx].xpmem;

ret = ofi_shm_p2p_copy(ep->p2p_type, vma_iovec, iov_count,
rma_iovec, rma_count, total_len, peer_smr->pid,
op == ofi_op_write, NULL);
op == ofi_op_write, xpmem);

if (ret) {
smr_cmd_queue_discard(ce, pos);
Expand Down Expand Up @@ -133,8 +136,8 @@ static ssize_t smr_generic_rma(struct smr_ep *ep, const struct iovec *iov,
ofi_spin_lock(&ep->tx_lock);

if (cmds == 1) {
err = smr_rma_fast(peer_smr, iov, iov_count, rma_iov,
rma_count, desc, peer_id, context, op,
err = smr_rma_fast(ep, peer_smr, iov, iov_count, rma_iov,
rma_count, desc, peer_id, id, context, op,
op_flags);
if (err) {
FI_WARN(&smr_prov, FI_LOG_EP_CTRL,
Expand Down Expand Up @@ -341,8 +344,8 @@ static ssize_t smr_generic_rma_inject(struct fid_ep *ep_fid, const void *buf,
rma_iov.key = key;

if (cmds == 1) {
ret = smr_rma_fast(peer_smr, &iov, 1, &rma_iov, 1, NULL,
peer_id, NULL, ofi_op_write, flags);
ret = smr_rma_fast(ep, peer_smr, &iov, 1, &rma_iov, 1, NULL,
peer_id, id, NULL, ofi_op_write, flags);
goto out;
}

Expand Down
Loading

0 comments on commit 50ca016

Please sign in to comment.