Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ompi/communicator/comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -2417,6 +2417,11 @@ int ompi_comm_get_rprocs (ompi_communicator_t *local_comm, ompi_communicator_t *
goto err_exit;
}

/* When a process gets spawned, every local_comm process needs to create
* an intercomm with the spawnees to communicate. These spawned procs needs
* to be remembered for cleaning later on */
ompi_proc_retain_spawned_jobids(rprocs, rsize);

err_exit:
/* rprocs isn't freed unless we have an error,
since it is used in the communicator */
Expand Down
6 changes: 5 additions & 1 deletion ompi/dpm/dpm.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* reserved.
* Copyright (c) 2022 IBM Corporation. All rights reserved.
* Copyright (c) 2023 Jeffrey M. Squyres. All rights reserved.
* Copyright (c) 2025 BULL S.A.S. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -473,7 +474,10 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
} while (!opal_list_is_empty(&ilist));

/* call add_procs on the new ones */
rc = MCA_PML_CALL(add_procs(new_proc_list, opal_list_get_size(&ilist)));
rc = MCA_PML_CALL(add_procs(new_proc_list, i));
/* Register spawned procs names to clean them up after */
ompi_proc_retain_spawned_jobids(new_proc_list, i);

free(new_proc_list);
new_proc_list = NULL;
if (OMPI_SUCCESS != rc) {
Expand Down
83 changes: 81 additions & 2 deletions ompi/instance/instance.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* reserved.
* Copyright (c) 2023 Jeffrey M. Squyres. All rights reserved.
* Copyright (c) 2024 NVIDIA Corporation. All rights reserved.
* Copyright (c) 2025 Bull SAS. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand All @@ -19,6 +20,7 @@
#include "instance.h"

#include "opal/util/arch.h"
#include "opal/util/proc.h"

#include "opal/util/show_help.h"
#include "opal/util/argv.h"
Expand All @@ -39,6 +41,7 @@
#include "ompi/dpm/dpm.h"
#include "ompi/file/file.h"
#include "ompi/mpiext/mpiext.h"
#include "ompi/runtime/ompi_rte.h"

#include "ompi/mca/hook/base/base.h"
#include "ompi/mca/op/base/base.h"
Expand Down Expand Up @@ -110,13 +113,17 @@ static void ompi_instance_construct (ompi_instance_t *instance)
instance->i_name[0] = '\0';
instance->i_flags = 0;
instance->i_keyhash = NULL;
OBJ_CONSTRUCT(&instance->i_spawned_proc_namelists, opal_list_t);
OBJ_CONSTRUCT(&instance->i_spawned_proc_lock, opal_mutex_t);
OBJ_CONSTRUCT(&instance->s_lock, opal_mutex_t);
instance->errhandler_type = OMPI_ERRHANDLER_TYPE_INSTANCE;
instance->bsend_buffer = NULL;
}

static void ompi_instance_destruct(ompi_instance_t *instance)
{
OBJ_DESTRUCT(&instance->i_spawned_proc_namelists);
OBJ_DESTRUCT(&instance->i_spawned_proc_lock);
OBJ_DESTRUCT(&instance->s_lock);
}

Expand Down Expand Up @@ -177,18 +184,90 @@ static int ompi_instance_print_error (const char *error, int ret)
return ret;
}

/* This function is only needed for the world paradigm because it's the only one
* we can spawn processes in it for now */
void ompi_proc_retain_spawned_jobids(ompi_proc_t **spawned_procs, size_t list_size) {
const ompi_proc_t *spawned_proc;
opal_namelist_t *registered_proc;
ompi_process_name_t name;
ompi_rte_cmp_bitmask_t mask;

/* NULL if session paradigm, not NULL if world paradigm */
if (ompi_mpi_instance_default == NULL) {
return;
}

/* return the proc-struct which matches this jobid */
mask = OMPI_RTE_CMP_JOBID;

for (size_t i = 0; i < list_size; i++) {
/* The idea is to filter the procs that have the same jobid,
* aka the jobs in the same instance.
* After that we lookup if the jobid is already present, meaning this
* instance is already registered via the jobid of its procs.
* If the jobid is not present we add it */

int found = 0;
spawned_proc = spawned_procs[i];
if (OMPI_PROC_MY_NAME->jobid == spawned_proc->super.proc_name.jobid) {
continue;
}

name.jobid = spawned_proc->super.proc_name.jobid;
name.vpid = spawned_proc->super.proc_name.vpid;

opal_mutex_lock(&ompi_mpi_instance_default->i_spawned_proc_lock);
OPAL_LIST_FOREACH(registered_proc,
&ompi_mpi_instance_default->i_spawned_proc_namelists,
opal_namelist_t) {
if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask,
&registered_proc->name, &name)) {
found = 1;
break;
}
}

if (0 == found) {
opal_namelist_t *namelist = OBJ_NEW(opal_namelist_t);
namelist->name.jobid = name.jobid;
namelist->name.vpid = 0; /* not needed for lookup */
opal_list_append(&ompi_mpi_instance_default->i_spawned_proc_namelists,
&namelist->super);
}
opal_mutex_unlock(&ompi_mpi_instance_default->i_spawned_proc_lock);
}
return;
}

static int ompi_mpi_instance_cleanup_pml (void)
{
/* call del_procs on all allocated procs even though some may not be known
* to the pml layer. the pml layer is expected to be resilient and ignore
* any unknown procs. */
size_t nprocs = 0;
ompi_proc_t **procs;
opal_namelist_t *registered_name;
opal_namelist_t *next;

procs = ompi_proc_get_allocated (&nprocs);
MCA_PML_CALL(del_procs(procs, nprocs));
free(procs);

/* If we are in a world paradigm and spawned processes we need to clean */
if (ompi_mpi_instance_default != NULL) {

/* Let's loop on all spawned jobids and del_proc the concerned procs */
OPAL_LIST_FOREACH_SAFE(registered_name, next,
&ompi_mpi_instance_default->i_spawned_proc_namelists,
opal_namelist_t) {

procs = ompi_proc_get_by_name(&registered_name->name, &nprocs);
MCA_PML_CALL(del_procs(procs, nprocs));
opal_list_remove_item(&ompi_mpi_instance_default->i_spawned_proc_namelists,
&registered_name->super);
}
}

return OMPI_SUCCESS;
}

Expand Down Expand Up @@ -989,14 +1068,14 @@ int ompi_mpi_instance_finalize (ompi_instance_t **instance)
{
int ret = OMPI_SUCCESS;

OBJ_RELEASE(*instance);

opal_mutex_lock (&instance_lock);
if (0 == opal_atomic_add_fetch_32 (&ompi_instance_count, -1)) {
ret = ompi_mpi_instance_finalize_common ();
}
opal_mutex_unlock (&instance_lock);

OBJ_RELEASE(*instance);

*instance = &ompi_mpi_instance_null.instance;

return ret;
Expand Down
13 changes: 12 additions & 1 deletion ompi/instance/instance.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2018-2025 Triad National Security, LLC. All rights reserved.
* Copyright (c) 2025 BULL S.A.S. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -34,6 +35,8 @@ struct ompi_instance_t {

/* Attributes */
opal_hash_table_t *i_keyhash;
opal_mutex_t i_spawned_proc_lock;
opal_list_t i_spawned_proc_namelists;

/* index in Fortran <-> C translation array (for when I get around
* to implementing fortran support-- UGH) */
Expand Down Expand Up @@ -88,7 +91,7 @@ OBJ_CLASS_DECLARATION(ompi_instance_t);
* the PREDEFINED_COMMUNICATOR_PAD macro?
* A: Most likely not, but it would be good to check.
*/
#define PREDEFINED_INSTANCE_PAD 512
#define PREDEFINED_INSTANCE_PAD 1024

struct ompi_predefined_instance_t {
ompi_instance_t instance;
Expand Down Expand Up @@ -120,6 +123,14 @@ int ompi_mpi_instance_retain (void);
*/
void ompi_mpi_instance_release (void);

/**
* @brief Saves jobid of spawned procs to cleanup upon finalize
*
* @param[in] spawned_proc_list list of procs that were spawned
* @param[in] list_size size of the list of procs that were spawned
*/
void ompi_proc_retain_spawned_jobids(ompi_proc_t **spawned_proc_list, size_t list_size);

/**
* @brief Create a new MPI instance
*
Expand Down
12 changes: 10 additions & 2 deletions ompi/mca/pml/ubcl/pml_ubcl_endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,18 @@ int mca_pml_ubcl_endpoint_release(ompi_proc_t *proc)
ubcl_error_t ret = UBCL_SUCCESS;
int ompi_error = OMPI_SUCCESS;
mca_common_ubcl_endpoint_t *endpoint = NULL;
assert(NULL != proc);

if (NULL == proc) {
OPAL_OUTPUT_VERBOSE((90, mca_pml_ubcl_component.output, "pml_ubcl_endpoint release : proc is NULL"));
return OMPI_SUCCESS;
}

endpoint = (proc)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML];
assert(NULL != endpoint);

if (NULL == endpoint) {
OPAL_OUTPUT_VERBOSE((50, mca_pml_ubcl_component.output, "pml_ubcl_endpoint release : endpoint is NULL"));
return OMPI_SUCCESS;
}

endpoint_refcount = opal_atomic_sub_fetch_32(&endpoint->refcount, 1);
if (0 == endpoint_refcount) {
Expand Down
26 changes: 17 additions & 9 deletions ompi/proc/proc.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* Copyright (c) 2015-2017 Mellanox Technologies. All rights reserved.
*
* Copyright (c) 2021 Nanook Consulting. All rights reserved.
* Copyright (c) 2025 Bull SAS. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -417,25 +418,19 @@ int ompi_proc_world_size (void)
return ompi_process_info.num_procs;
}

ompi_proc_t **ompi_proc_get_allocated (size_t *size)
ompi_proc_t **ompi_proc_get_by_name(const ompi_process_name_t *name, size_t *size)
{
ompi_proc_t **procs;
ompi_proc_t *proc;
size_t count = 0;
ompi_rte_cmp_bitmask_t mask;
ompi_process_name_t my_name;

/* check bozo case */
if (NULL == ompi_proc_local_proc) {
return NULL;
}
mask = OMPI_RTE_CMP_JOBID;
my_name = *OMPI_CAST_RTE_NAME(&ompi_proc_local_proc->super.proc_name);

/* First count how many match this jobid */
opal_mutex_lock (&ompi_proc_lock);
OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, OMPI_CAST_RTE_NAME(&proc->super.proc_name), &my_name)) {
if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, OMPI_CAST_RTE_NAME(&proc->super.proc_name), name)) {
++count;
}
}
Expand All @@ -450,7 +445,7 @@ ompi_proc_t **ompi_proc_get_allocated (size_t *size)
/* now save only the procs that match this jobid */
count = 0;
OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, &proc->super.proc_name, &my_name)) {
if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, &proc->super.proc_name, name)) {
/* DO NOT RETAIN THIS OBJECT - the reference count on this
* object will be adjusted by external callers. The intent
* here is to allow the reference count to drop to zero if
Expand All @@ -474,6 +469,19 @@ ompi_proc_t **ompi_proc_get_allocated (size_t *size)
return procs;
}

ompi_proc_t **ompi_proc_get_allocated (size_t *size)
{
ompi_process_name_t my_name;

/* check bozo case */
if (NULL == ompi_proc_local_proc) {
return NULL;
}
my_name = *OMPI_CAST_RTE_NAME(&ompi_proc_local_proc->super.proc_name);

return ompi_proc_get_by_name(&my_name, size);
}

ompi_proc_t **ompi_proc_world (size_t *size)
{
ompi_proc_t **procs;
Expand Down
19 changes: 19 additions & 0 deletions ompi/proc/proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* Copyright (c) 2015-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2021 Nanook Consulting. All rights reserved.
* Copyright (c) 2025 BULL S.A.S. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -192,6 +193,24 @@ OMPI_DECLSPEC ompi_proc_t** ompi_proc_world(size_t* size);

OMPI_DECLSPEC int ompi_proc_world_size (void);

/**
* Returns the list of proc with the given name
* Returns the list of proc associated with the jobid of the given
* name. If at least one proc with the jobid, then the name is known and we
* return the procs.
*
* @note The reference count of each process in the array is
* NOT incremented.
*
* @param[in] name Name containing the jobid of wanted processes
* @param[in] size Number of processes in the ompi_proc_t array
*
* @return Array of pointers to proc instances under the same name in the current
* MPI_COMM_WORLD, or NULL if there is an internal failure.
*/
OMPI_DECLSPEC ompi_proc_t **ompi_proc_get_by_name(const ompi_process_name_t *name,
size_t *size);

/**
* Returns the list of proc instances associated with this job.
*
Expand Down