diff --git a/Makefile.am b/Makefile.am index 62af4ca8..51accd3b 100644 --- a/Makefile.am +++ b/Makefile.am @@ -61,9 +61,18 @@ endif if USE_ONLINE AM_CPPFLAGS += ${ARGOBOTS_CFLAGS} ${SWM_CFLAGS} -DUSE_ONLINE=1 -LDADD += ${SWM_LIBS} ${ARGOBOTS_LIBS} +LDADD += ${ARGOBOTS_LIBS} +if USE_SWM +AM_CPPFLAGS += -DUSE_SWM=1 +LDADD += ${SWM_LIBS} src_libcodes_la_SOURCES += src/workload/methods/codes-online-comm-wrkld.C endif +if USE_CONC +src_libcodes_la_SOURCES += src/workload/methods/codes-conc-online-comm-wrkld.C +AM_CPPFLAGS += ${CONCEPTUAL_CFLAGS} -DUSE_CONC=1 +LDADD += ${CONCEPTUAL_LIBS} +endif +endif if USE_DUMPI AM_CPPFLAGS += ${DUMPI_CFLAGS} -DUSE_DUMPI=1 diff --git a/codes/codes-conc-addon.h b/codes/codes-conc-addon.h new file mode 100644 index 00000000..1a921b40 --- /dev/null +++ b/codes/codes-conc-addon.h @@ -0,0 +1,131 @@ +/* + * Copyright (C) 2017 University of Chicago. + * See COPYRIGHT notice in top-level directory. + * + */ + +#ifndef CODES_CONC_ADDON_H +#define CODES_CONC_ADDON_H + +#ifdef __cplusplus +extern "C" { +#endif + +#ifdef USE_CONC +#include +#endif +#include + +#define MAX_CONC_ARGV 128 + +typedef struct conc_bench_param conc_bench_param; + + +struct conc_bench_param { + char conc_program[MAX_CONC_ARGV]; + int conc_argc; + char config_in[MAX_CONC_ARGV][MAX_CONC_ARGV]; + char *conc_argv[MAX_CONC_ARGV]; +}; + + +int codes_conc_bench_load( + const char* program, + int argc, + char *argv[]); + +void CODES_MPI_Comm_size (MPI_Comm comm, int *size); +void CODES_MPI_Comm_rank( MPI_Comm comm, int *rank ); +void CODES_MPI_Finalize(); +void CODES_MPI_Send(const void *buf, + int count, + MPI_Datatype datatype, + int dest, + int tag, + MPI_Comm comm); +void CODES_MPI_Recv(void *buf, + int count, + MPI_Datatype datatype, + int source, + int tag, + MPI_Comm comm, + MPI_Status *status); +void CODES_MPI_Sendrecv(const void *sendbuf, + int sendcount, + MPI_Datatype sendtype, + int dest, + int sendtag, + void *recvbuf, + int recvcount, + MPI_Datatype recvtype, + int source, + int recvtag, + MPI_Comm comm, + MPI_Status *status); +void CODES_MPI_Barrier(MPI_Comm comm); +void CODES_MPI_Isend(const void *buf, + int count, + MPI_Datatype datatype, + int dest, + int tag, + MPI_Comm comm, + MPI_Request *request); +void CODES_MPI_Irecv(void *buf, + int count, + MPI_Datatype datatype, + int source, + int tag, + MPI_Comm comm, + MPI_Request *request); +void CODES_MPI_Waitall(int count, + MPI_Request array_of_requests[], + MPI_Status array_of_statuses[]); +void CODES_MPI_Reduce(const void *sendbuf, + void *recvbuf, + int count, + MPI_Datatype datatype, + MPI_Op op, + int root, + MPI_Comm comm); +void CODES_MPI_Allreduce(const void *sendbuf, + void *recvbuf, + int count, + MPI_Datatype datatype, + MPI_Op op, + MPI_Comm comm); +void CODES_MPI_Bcast(void *buffer, + int count, + MPI_Datatype datatype, + int root, + MPI_Comm comm); +void CODES_MPI_Alltoall(const void *sendbuf, + int sendcount, + MPI_Datatype sendtype, + void *recvbuf, + int recvcount, + MPI_Datatype recvtype, + MPI_Comm comm); +void CODES_MPI_Alltoallv(const void *sendbuf, + const int *sendcounts, + const int *sdispls, + MPI_Datatype sendtype, + void *recvbuf, + const int *recvcounts, + const int *rdispls, + MPI_Datatype recvtype, + MPI_Comm comm); + +/* implementation structure */ +struct codes_conceptual_bench { + char *program_name; /* name of the conceptual program */ + int (*conceptual_main)(int argc, char *argv[]); +}; + + +void codes_conceptual_add_bench(struct codes_conceptual_bench const * method); + +#ifdef __cplusplus +} +#endif + +#endif /* CODES_CONC_ADDON_H */ diff --git a/configure.ac b/configure.ac index de842d59..62ddb567 100755 --- a/configure.ac +++ b/configure.ac @@ -121,22 +121,48 @@ AM_CONDITIONAL(USE_DARSHAN, [test "x${use_darshan}" = xyes]) # check for Argobots AC_ARG_WITH([online],[AS_HELP_STRING([--with-online@<:@=DIR@:>@], - [Build with the online workloads and argobots support])], - [use_online=yes],[use_online=no]) -if test "x${use_online}" != "xno" ; then + [Build with the online workloads and argobots support])]) +if test "x${with_online}" != "x" ; then AM_CONDITIONAL(USE_ONLINE, true) PKG_CHECK_MODULES_STATIC([ARGOBOTS], [argobots], [], [AC_MSG_ERROR([Could not find working argobots installation via pkg-config])]) + AC_DEFINE_UNQUOTED([ONLINE_CONFIGDIR], ["$with_online"], [if using json data files, + specify config directory]) +else + AM_CONDITIONAL(USE_ONLINE, false) +fi + +#check for SWM +AC_ARG_WITH([swm],[AS_HELP_STRING([--with-swm@<:@=DIR@:>@], + [location of SWM installation])]) +if test "x${with_swm}" != "x" ; then + AM_CONDITIONAL(USE_SWM, true) PKG_CHECK_MODULES_STATIC([SWM], [swm], [], [AC_MSG_ERROR([Could not find working swm installation via pkg-config])]) PKG_CHECK_VAR([SWM_DATAROOTDIR], [swm], [datarootdir], [], [AC_MSG_ERROR[Could not find shared directory in SWM]]) AC_DEFINE_UNQUOTED([SWM_DATAROOTDIR], ["$SWM_DATAROOTDIR"], [if using json - data files]) + data files]) else - AM_CONDITIONAL(USE_ONLINE, false) + AM_CONDITIONAL(USE_SWM, false) fi +#check for Conceptual +AC_ARG_WITH([conceptual],[AS_HELP_STRING([--with-conceptual@<:@=DIR@:>@], + [location of Conceptual installation])]) +if test "x${with_conceptual}" != "x" ; then + AC_CHECK_FILES([${with_conceptual}/lib/libncptl.a], + AM_CONDITIONAL(USE_CONC, true), + AC_MSG_ERROR(Could not find Conceptual libraries libncptl.a)) + CONCEPTUAL_CFLAGS="-I${with_conceptual}/include" + CONCEPTUAL_LIBS="-L${with_conceptual}/lib/ -lncptl" + AC_SUBST(CONCEPTUAL_LIBS) + AC_SUBST(CONCEPTUAL_CFLAGS) +else + AM_CONDITIONAL(USE_CONC, false) +fi + + # check for Recorder AM_CONDITIONAL(USE_RECORDER, true) RECORDER_CPPFLAGS="-DUSE_RECORDER=1" diff --git a/maint/codes.pc.in b/maint/codes.pc.in index 4775e63f..1d182dec 100644 --- a/maint/codes.pc.in +++ b/maint/codes.pc.in @@ -19,6 +19,8 @@ argobots_cflags=@ARGOBOTS_CFLAGS@ swm_libs=@SWM_LIBS@ swm_cflags=@SWM_CFLAGS@ swm_datarootdir=@SWM_DATAROOTDIR@ +conceptual_libs=@CONCEPTUAL_LIBS@ +conceptual_cflags=@CONCEPTUAL_CFLAGS@ Name: codes-base Description: Base functionality for CODES storage simulation diff --git a/scripts/conceptual_benchmarks/translate_conc_src.py b/scripts/conceptual_benchmarks/translate_conc_src.py new file mode 100644 index 00000000..ccb44fc1 --- /dev/null +++ b/scripts/conceptual_benchmarks/translate_conc_src.py @@ -0,0 +1,181 @@ +import string +import sys +import os + +MPI_OPS = [ 'MPI_Send', 'MPI_Recv', 'MPI_Barrier', 'MPI_Isend', 'MPI_Irecv', 'MPI_Waitall', + 'MPI_Reduce', 'MPI_Allreduce', 'MPI_Bcast', 'MPI_Alltoall', 'MPI_Alltoallv', + 'MPI_Comm_size', 'MPI_Comm_rank'] + +LOG = [ 'logfiletmpl_default', 'ncptl_log_write', 'ncptl_log_compute_aggregates', 'ncptl_log_commit_data'] + +def eliminate_logging(inLines): + for idx, line in enumerate(inLines): + if 'Generate and broadcast a UUID' in line: + for i in range(1, 3): + inLines[idx+i] = "//"+inLines[idx+i] + elif 'ncptl_free (logfile_uuid)' in line: + for i in range(0, 12): + inLines[idx-i] = "//"+inLines[idx-i] + elif 'int mpiresult' in line: + for i in range(0,30): + inLines[idx+i] = "//"+inLines[idx+i] + else: + for elem in LOG: + if elem in line: + inLines[idx] = "//"+line + +def eliminate_conc_init(inLines): + for idx, line in enumerate(inLines): + if 'NCPTL_RUN_TIME_VERSION' in line: + inLines[idx] = "//"+line + if 'atexit (conc_exit_handler)' in line: + inLines[idx] = "//"+line + if 'Inform the run-time library' in line: + for i in range(1, 4): + inLines[idx+i] = "//"+inLines[idx+i] + +def make_static_var(inLines): + for idx, line in enumerate(inLines): + if 'Dummy variable to help mark other variables as used' in line: + inLines[idx+1]="static " + inLines[idx+1] + if 'void conc_mark_variables_used' in line: + inLines[idx]="static " + line + if '/* Program-specific variables */' in line: + start = idx+1 + if '* Function declarations *' in line: + end = idx-2 + + for i in range(start, end): + inLines[i]="static "+inLines[i] + + +def manipulate_mpi_ops(inLines, program_name): + for idx, line in enumerate(inLines): + # subcomm + if 'MPI_' not in line: # not MPI + if "int main" in line: + # inLines[idx] = "static int "+program_name+"_main(int* argc, char *argv[])" + inLines[idx] = line.replace("int main", "static int "+program_name+"_main") + else: + continue + else: # MPI + if 'MPI_Init' in line: + inLines[idx] = "//"+line + elif 'MPI_Errhandler_' in line: # error handling ignored + inLines[idx] = "//"+line + elif 'mpiresult = MPI_Finalize();' in line: + inLines[idx] = "CODES_MPI_Finalize();" + inLines[idx+2] = "exitcode = 0;" + elif 'MPI_Comm_get_attr' in line: + inLines[idx] = "//"+line + else: + for ops in MPI_OPS: + if ops in line: + inLines[idx] = line.replace(ops,"CODES_"+ops) + +def adding_struct(inLines, program_name): + new_struct = [ '/* fill in function pointers for this method */' , + 'struct codes_conceptual_bench '+program_name+'_bench = ' , + '{' , + '.program_name = "'+program_name+'",' , + '.conceptual_main = '+program_name+'_main,' , + '};' ] + + codes_include = '#include "codes/codes-conc-addon.h"' + for idx, line in enumerate(inLines): + if "* Include files *" in line: + inLines.insert(idx-1, codes_include) + break + + # adding struct at the end + for i in range(0, len(new_struct)): + inLines.append(new_struct[i]) + + +def insert_if_not_exist(content, idx, hls): + exist = False + for i in range(idx[0], idx[1]): + if hls[i] in content: + exist = True + break + + if not exist: + hls.insert(idx[0], content) + +def translate_conc_to_codes(filepath, codespath): + # get program name + program_name = filepath.split("/")[-1].replace(".c","") + + with open(filepath, 'r') as infile: + content = infile.read() + inLines = content.split('\n') + + eliminate_logging(inLines) + eliminate_conc_init(inLines) + make_static_var(inLines) + manipulate_mpi_ops(inLines, program_name) + adding_struct(inLines, program_name) + + # output program file + with open(codespath+"src/workload/conceputal-skeleton-apps/conc-"+program_name+".c","w+") as outFile: + outFile.writelines(["%s\n" % item for item in inLines]) + + # modify interface file + program_struct = "extern struct codes_conceptual_bench "+program_name+"_bench;\n" + program_struct_idx=[] + program_definition = " &"+program_name+"_bench,\n" + program_definition_idx=[] + with open(codespath+"src/workload/codes-conc-addon.c","r+") as header: + hls = header.readlines() + for idx, line in enumerate(hls): + if '/* list of available benchmarks begin */' in line: + program_struct_idx.append(idx+1) + elif '/* list of available benchmarks end */' in line: + program_struct_idx.append(idx) + insert_if_not_exist(program_struct, program_struct_idx, hls) + + for idx, line in enumerate(hls): + if '/* default benchmarks begin */' in line: + program_definition_idx.append(idx+1) + elif '/* default benchmarks end */' in line: + program_definition_idx.append(idx) + insert_if_not_exist(program_definition, program_definition_idx, hls) + + header.seek(0) + header.writelines(hls) + + # modify makefile + program_compile = "src_libcodes_la_SOURCES += src/workload/conceputal-skeleton-apps/conc-"+program_name+".c\n" + program_compile_idx = [] + with open(codespath+"Makefile.am","r+") as makefile: + mfls = makefile.readlines() + for idx, line in enumerate(mfls): + if "CONCEPTUAL_LIBS" in line: + program_compile_idx.append(idx+1) + break + for i in range(program_compile_idx[0], len(mfls)): + if 'endif' in mfls[i]: + program_compile_idx.append(i) + break + insert_if_not_exist(program_compile, program_compile_idx, mfls) + makefile.seek(0) + makefile.writelines(mfls) + + +if __name__ == "__main__": + if len(sys.argv) != 4: + print 'Need 2 arguments: 1. path to files to be converted \t2. path to CODES directory\t3. path to ncptl executable' + sys.exit(1) + + os.chdir(sys.argv[1]) + for benchfile in next(os.walk(sys.argv[1]))[2]: # for all files + if benchfile.lower().endswith('.ncptl'): + cfile = benchfile.replace('.ncptl','.c') + cfile = cfile.replace("-","") + os.system(sys.argv[3]+' --backend=c_mpi --no-compile '+benchfile+' --output '+cfile) + print "adding bench file: %s" % cfile + translate_conc_to_codes(sys.argv[1]+cfile, sys.argv[2]) + + + + diff --git a/src/Makefile.subdir b/src/Makefile.subdir index 69624255..c559a51d 100644 --- a/src/Makefile.subdir +++ b/src/Makefile.subdir @@ -105,7 +105,8 @@ nobase_include_HEADERS = \ codes/net/express-mesh.h \ codes/net/torus.h \ codes/codes-mpi-replay.h \ - codes/configfile.h + codes/configfile.h \ + codes/codes-conc-addon.h #codes/codes-nw-workload.h @@ -151,6 +152,7 @@ src_libcodes_la_SOURCES = \ src/util/codes-mapping-context.c \ src/util/codes-comm.c \ src/workload/codes-workload.c \ + src/workload/codes-conc-addon.c \ src/workload/methods/codes-iolang-wrkld.c \ src/workload/methods/codes-checkpoint-wrkld.c \ src/workload/methods/test-workload-method.c \ diff --git a/src/network-workloads/model-net-mpi-replay.c b/src/network-workloads/model-net-mpi-replay.c index c7cee68f..5f34b5c8 100644 --- a/src/network-workloads/model-net-mpi-replay.c +++ b/src/network-workloads/model-net-mpi-replay.c @@ -106,6 +106,7 @@ static float noise = 1.0; static int num_nw_lps = 0, num_mpi_lps = 0; static int num_syn_clients; +static int finished_syn_clients; static int syn_type = 0; FILE * workload_log = NULL; @@ -522,8 +523,11 @@ static void notify_background_traffic( int num_jobs = codes_jobmap_get_num_jobs(jobmap_ctx); - for(int other_id = 0; other_id < num_jobs; other_id++) + //Assumption: synthetic job are at the bottom of workload configure file + for(int other_id = num_jobs - 1; other_id >= 0; other_id--) { + if(finished_syn_clients == num_syn_clients) + break; if(other_id == jid.job) continue; @@ -548,6 +552,7 @@ static void notify_background_traffic( m_new = (struct nw_message*)tw_event_data(e); m_new->msg_type = CLI_BCKGND_FIN; tw_event_send(e); + finished_syn_clients += 1; } } return; @@ -575,15 +580,25 @@ static void notify_neighbor( tw_bf * bf, struct nw_message * m) { - if(ns->local_rank == num_dumpi_traces - 1 + int num_ranks = codes_jobmap_get_num_ranks(ns->app_id, jobmap_ctx); + + //if all application workloads finishes, notify background traffic to stop + if(ns->local_rank == num_ranks - 1 && ns->is_finished == 1 && ns->neighbor_completed == 1) { -// printf("\n All workloads completed, notifying background traffic "); - bf->c0 = 1; - notify_background_traffic(ns, lp, bf, m); + // printf("\n All ranks completed"); + num_dumpi_traces -= num_ranks; + + if(num_dumpi_traces == 0) { + // printf("\n All workloads completed, notifying background traffic "); + bf->c0 = 1; + notify_background_traffic(ns, lp, bf, m); + } return; } + + struct codes_jobmap_id nbr_jid; nbr_jid.job = ns->app_id; @@ -2132,7 +2147,7 @@ void nw_test_init(nw_state* s, tw_lp* lp) strcpy(params_d.cortex_gen, cortex_gen); #endif } - else if(strcmp(workload_type, "online") == 0){ + else if(strcmp(workload_type, "swm-online") == 0){ online_comm_params oc_params; @@ -2151,7 +2166,25 @@ void nw_test_init(nw_state* s, tw_lp* lp) * online, it is the number of ranks to be simulated. */ oc_params.nprocs = num_traces_of_job[lid.job]; params = (char*)&oc_params; - strcpy(type_name, "online_comm_workload"); + strcpy(type_name, "swm_online_comm_workload"); + } + else if(strcmp(workload_type, "conc-online") == 0){ + + online_comm_params oc_params; + + if(strlen(workload_name) > 0) + { + strcpy(oc_params.workload_name, workload_name); + } + else if(strlen(workloads_conf_file) > 0) + { + strcpy(oc_params.workload_name, file_name_of_job[lid.job]); + } + /*TODO: nprocs is different for dumpi and online workload. for + * online, it is the number of ranks to be simulated. */ + oc_params.nprocs = num_traces_of_job[lid.job]; + params = (char*)&oc_params; + strcpy(type_name, "conc_online_comm_workload"); } int rc = configuration_get_value_int(&config, "PARAMS", "num_qos_levels", NULL, &num_qos_levels); @@ -2622,16 +2655,20 @@ void nw_test_finalize(nw_state* s, tw_lp* lp) return; if(strncmp(file_name_of_job[lid.job], "synthetic", 9) == 0) avg_msg_time = (s->send_time / s->num_recvs); - else if(strcmp(workload_type, "online") == 0) - codes_workload_finalize("online_comm_workload", params, s->app_id, s->local_rank); + else if(strcmp(workload_type, "swm-online") == 0) + codes_workload_finalize("swm_online_comm_workload", params, s->app_id, s->local_rank); + else if(strcmp(workload_type, "conc-online") == 0) + codes_workload_finalize("conc_online_comm_workload", params, s->app_id, s->local_rank); } else { if(s->nw_id >= (tw_lpid)num_net_traces) return; - if(strcmp(workload_type, "online") == 0) - codes_workload_finalize("online_comm_workload", params, s->app_id, s->local_rank); + if(strcmp(workload_type, "swm-online") == 0) + codes_workload_finalize("swm_online_comm_workload", params, s->app_id, s->local_rank); + if(strcmp(workload_type, "conc-online") == 0) + codes_workload_finalize("conc_online_comm_workload", params, s->app_id, s->local_rank); } struct msg_size_info * tmp_msg = NULL; @@ -2942,7 +2979,7 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv ) #endif codes_comm_update(); - if(strcmp(workload_type, "dumpi") != 0 && strcmp(workload_type, "online") != 0) + if(strcmp(workload_type, "dumpi") != 0 && strcmp(workload_type, "swm-online") != 0 && strcmp(workload_type, "conc-online") != 0) { if(tw_ismaster()) printf("Usage: mpirun -np n ./modelnet-mpi-replay --sync=1/3" @@ -2960,6 +2997,10 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv ) tw_end(); return -1; } + /* Currently rendezvous protocol cannot work with Conceptual */ + if(strcmp(workload_type, "conc-online") == 0) { + EAGER_THRESHOLD = INT64_MAX; + } jobmap_ctx = NULL; // make sure it's NULL if it's not used @@ -2982,7 +3023,7 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv ) if(ref != EOF && strncmp(file_name_of_job[i], "synthetic", 9) == 0) { - num_syn_clients = num_traces_of_job[i]; + num_syn_clients += num_traces_of_job[i]; num_net_traces += num_traces_of_job[i]; } else if(ref!=EOF) diff --git a/src/networks/model-net/dragonfly-custom.C b/src/networks/model-net/dragonfly-custom.C index 49c2c513..c051bb46 100644 --- a/src/networks/model-net/dragonfly-custom.C +++ b/src/networks/model-net/dragonfly-custom.C @@ -2402,8 +2402,8 @@ void dragonfly_custom_router_final(router_state * s, written = 0; if(!s->router_id) { - written = sprintf(s->output_buf, "# Format "); - written += sprintf(s->output_buf + written, "# Router ports in the order: %d green links, %d black links %d global channels \n", + written = sprintf(s->output_buf2, "# Format "); + written += sprintf(s->output_buf2 + written, "# Router ports in the order: %d green links, %d black links %d global channels \n", p->num_router_cols * p->num_row_chans, p->num_router_rows * p->num_col_chans, p->num_global_channels); } written += sprintf(s->output_buf2 + written, "\n %llu %d %d", diff --git a/src/workload/codes-conc-addon.c b/src/workload/codes-conc-addon.c new file mode 100644 index 00000000..ef5843b2 --- /dev/null +++ b/src/workload/codes-conc-addon.c @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2013 University of Chicago. + * See COPYRIGHT notice in top-level directory. + * + */ +#include +#include +#include +#include + +#ifdef USE_CONC +/* list of available benchmarks begin */ +/* list of available benchmarks end */ +#endif + +static struct codes_conceptual_bench const * bench_array_default[] = +{ +#ifdef USE_CONC + /* default benchmarks begin */ + /* default benchmarks end */ +#endif + NULL +}; + +// once initialized, adding a bench generator is an error +static int is_bench_init = 0; +static int num_user_benchs = 0; +static struct codes_conceptual_bench const ** bench_array = NULL; + + + +// only call this once +static void init_bench_methods(void) +{ + if (is_bench_init) + return; + if (bench_array == NULL) + bench_array = bench_array_default; + else { + // note - includes null char + int num_default_benchs = + (sizeof(bench_array_default) / sizeof(bench_array_default[0])); + printf("\n Num default methods %d ", num_default_benchs); + bench_array = realloc(bench_array, + (num_default_benchs + num_user_benchs + 1) * + sizeof(*bench_array)); + memcpy(bench_array+num_user_benchs, bench_array_default, + num_default_benchs * sizeof(*bench_array_default)); + } + is_bench_init = 1; +} + + +int codes_conc_bench_load( + const char *program, + int argc, + char *argv[]) +{ + init_bench_methods(); + + int i; + int ret; + + for(i=0; bench_array[i] != NULL; i++) + { + if(strcmp(bench_array[i]->program_name, program) == 0) + { + /* load appropriate workload generator */ + ret = bench_array[i]->conceptual_main(argc, argv); + if(ret < 0) + { + return(-1); + } + return(i); + } + } + fprintf(stderr, "Error: failed to find benchmark program %s\n", program); + return(-1); +} + +void codes_conceptual_add_bench(struct codes_conceptual_bench const * bench) +{ + static int bench_array_cap = 10; + if (is_bench_init) + tw_error(TW_LOC, + "adding a conceptual benchmark method after initialization is forbidden"); + else if (bench_array == NULL){ + bench_array = malloc(bench_array_cap * sizeof(*bench_array)); + assert(bench_array); + } + + if (num_user_benchs == bench_array_cap) { + bench_array_cap *= 2; + bench_array = realloc(bench_array, + bench_array_cap * sizeof(*bench_array)); + assert(bench_array); + } + bench_array[num_user_benchs++] = bench; +} + + + diff --git a/src/workload/codes-workload-dump.c b/src/workload/codes-workload-dump.c index 66d410ab..5a9c94a8 100644 --- a/src/workload/codes-workload-dump.c +++ b/src/workload/codes-workload-dump.c @@ -215,7 +215,7 @@ int main(int argc, char *argv[]) wparams = (char*)&d_params; } } - else if(strcmp(type, "online_comm_workload") == 0){ + else if(strcmp(type, "swm_online_comm_workload") == 0 || strcmp(type, "conc_online_comm_workload") == 0){ if (n == -1){ fprintf(stderr, "Expected \"--num-ranks\" argument for online workload\n"); @@ -448,7 +448,7 @@ int main(int argc, char *argv[]) } } while (op.op_type != CODES_WK_END); - if(strcmp(type, "online_comm_workload") == 0) + if(strcmp(type, "swm_online_comm_workload") == 0 || strcmp(type, "conc_online_comm_workload") == 0) { codes_workload_finalize(type, wparams, 0, i); } diff --git a/src/workload/codes-workload.c b/src/workload/codes-workload.c index 934066ce..e21d4fad 100644 --- a/src/workload/codes-workload.c +++ b/src/workload/codes-workload.c @@ -34,9 +34,14 @@ extern struct codes_workload_method darshan_mpi_io_workload_method; #ifdef USE_RECORDER extern struct codes_workload_method recorder_io_workload_method; #endif -#ifdef USE_ONLINE -extern struct codes_workload_method online_comm_workload_method; + +#ifdef USE_SWM +extern struct codes_workload_method swm_online_comm_workload_method; +#endif +#ifdef USE_CONC +extern struct codes_workload_method conc_online_comm_workload_method; #endif + extern struct codes_workload_method checkpoint_workload_method; extern struct codes_workload_method iomock_workload_method; @@ -58,8 +63,11 @@ static struct codes_workload_method const * method_array_default[] = #endif #endif -#ifdef USE_ONLINE - &online_comm_workload_method, +#ifdef USE_SWM + &swm_online_comm_workload_method, +#endif +#ifdef USE_CONC + &conc_online_comm_workload_method, #endif #ifdef USE_RECORDER &recorder_io_workload_method, diff --git a/src/workload/methods/codes-conc-online-comm-wrkld.C b/src/workload/methods/codes-conc-online-comm-wrkld.C new file mode 100644 index 00000000..c3caba0a --- /dev/null +++ b/src/workload/methods/codes-conc-online-comm-wrkld.C @@ -0,0 +1,845 @@ +/* + * Copyright (C) 2014 University of Chicago + * See COPYRIGHT notice in top-level directory. + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "codes/codes-workload.h" +#include "codes/quickhash.h" +#include "codes/codes-jobmap.h" +#include "codes_config.h" +#include "codes/codes-conc-addon.h" + +#define ALLREDUCE_SHORT_MSG_SIZE 2048 + +//#define DBG_COMM 0 + +using namespace std; + +static struct qhash_table *rank_tbl = NULL; +static int rank_tbl_pop = 0; +static int total_rank_cnt = 0; +ABT_thread global_prod_thread = NULL; +ABT_xstream self_es; +long cpu_freq = 1.0; +long num_allreduce = 0; +long num_isends = 0; +long num_irecvs = 0; +long num_barriers = 0; +long num_sends = 0; +long num_recvs = 0; +long num_sendrecv = 0; +long num_waitalls = 0; + +//std::map send_count; +//std::map isend_count; +//std::map allreduce_count; + +struct shared_context { + int my_rank; + uint32_t wait_id; + int num_ranks; + char workload_name[MAX_NAME_LENGTH_WKLD]; + void * swm_obj; + void * conc_params; + ABT_thread producer; + std::deque fifo; +}; + +struct rank_mpi_context { + struct qhash_head hash_link; + int app_id; + struct shared_context sctx; +}; + +typedef struct rank_mpi_compare { + int app_id; + int rank; +} rank_mpi_compare; + + + +/* Conceptual online workload implementations */ +void CODES_MPI_Comm_size (MPI_Comm comm, int *size) +{ + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err; + + err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + + *size = sctx->num_ranks; +} + +void CODES_MPI_Comm_rank( MPI_Comm comm, int *rank ) +{ + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err; + + err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + + *rank = sctx->my_rank; +} + +void CODES_MPI_Finalize() +{ + /* Add an event in the shared queue and then yield */ + struct codes_workload_op wrkld_per_rank; + + wrkld_per_rank.op_type = CODES_WK_END; + + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + sctx->fifo.push_back(&wrkld_per_rank); + + // printf("\n rank %d finalize workload: num_sends %ld num_recvs %ld num_isends %ld num_irecvs %ld num_allreduce %ld num_barrier %ld num_waitalls %ld\n", + // sctx->my_rank, num_sends, num_recvs, num_isends, num_irecvs, num_allreduce, num_barriers, num_waitalls); + // printf("Rank %d yield to CODES thread: %p\n", sctx->my_rank, global_prod_thread); + ABT_thread_yield_to(global_prod_thread); +} + +void CODES_MPI_Send(const void *buf, + int count, + MPI_Datatype datatype, + int dest, + int tag, + MPI_Comm comm) +{ + /* add an event in the shared queue and then yield */ + // printf("\n Sending to rank %d ", comm_id); + struct codes_workload_op wrkld_per_rank; + + int datatypesize; + MPI_Type_size(datatype, &datatypesize); + + wrkld_per_rank.op_type = CODES_WK_SEND; + wrkld_per_rank.u.send.tag = tag; + wrkld_per_rank.u.send.count = count; + wrkld_per_rank.u.send.data_type = datatype; + wrkld_per_rank.u.send.num_bytes = count * datatypesize; + wrkld_per_rank.u.send.dest_rank = dest; + + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + wrkld_per_rank.u.send.source_rank = sctx->my_rank; + sctx->fifo.push_back(&wrkld_per_rank); + // printf("Rank %d Send Event to dest %d: %lld, fifo size: %lu\n", sctx->my_rank, dest, + // wrkld_per_rank.u.send.num_bytes, sctx->fifo.size()); + + // printf("Rank %d yield to CODES thread: %p\n", sctx->my_rank, global_prod_thread); + int rc = ABT_thread_yield_to(global_prod_thread); + num_sends++; +} + +void CODES_MPI_Recv(void *buf, + int count, + MPI_Datatype datatype, + int source, + int tag, + MPI_Comm comm, + MPI_Status *status) +{ + /* Add an event in the shared queue and then yield */ + struct codes_workload_op wrkld_per_rank; + + int datatypesize; + MPI_Type_size(datatype, &datatypesize); + + wrkld_per_rank.op_type = CODES_WK_RECV; + wrkld_per_rank.u.recv.tag = tag; + wrkld_per_rank.u.recv.source_rank = source; + wrkld_per_rank.u.recv.data_type = datatype; + wrkld_per_rank.u.recv.count = count; + wrkld_per_rank.u.recv.num_bytes = count * datatypesize; + + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + wrkld_per_rank.u.recv.dest_rank = sctx->my_rank; + sctx->fifo.push_back(&wrkld_per_rank); + // printf("Rank %d Recv event from %d bytes %d fifo size %lu\n", sctx->my_rank, source, wrkld_per_rank.u.recv.num_bytes, sctx->fifo.size()); + + // printf("Rank %d yield to CODES thread: %p\n", sctx->my_rank, global_prod_thread); + ABT_thread_yield_to(global_prod_thread); + num_recvs++; +} + +void CODES_MPI_Sendrecv(const void *sendbuf, + int sendcount, + MPI_Datatype sendtype, + int dest, + int sendtag, + void *recvbuf, + int recvcount, + MPI_Datatype recvtype, + int source, + int recvtag, + MPI_Comm comm, + MPI_Status *status) +{ + /* sendrecv events */ + struct codes_workload_op send_op; + + int datatypesize1, datatypesize2; + MPI_Type_size(sendtype, &datatypesize1); + MPI_Type_size(recvtype, &datatypesize2); + + send_op.op_type = CODES_WK_SEND; + send_op.u.send.tag = sendtag; + send_op.u.send.count = sendcount; + send_op.u.send.data_type = sendtype; + send_op.u.send.num_bytes = sendcount * datatypesize1; + send_op.u.send.dest_rank = dest; + + struct codes_workload_op recv_op; + + recv_op.op_type = CODES_WK_RECV; + recv_op.u.recv.tag = recvtag; + recv_op.u.recv.source_rank = source; + recv_op.u.recv.count = recvcount; + recv_op.u.recv.data_type = recvtype; + recv_op.u.recv.num_bytes = recvcount * datatypesize2; + + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + + /* Add an event in the shared queue and then yield */ + recv_op.u.recv.dest_rank = sctx->my_rank; + send_op.u.send.source_rank = sctx->my_rank; + sctx->fifo.push_back(&send_op); + sctx->fifo.push_back(&recv_op); + + ABT_thread_yield_to(global_prod_thread); + num_sendrecv++; +} + + +void CODES_MPI_Barrier(MPI_Comm comm) +{ + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err; + int rank, size, src, dest, mask; + + err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + + rank = sctx->my_rank; + size = sctx->num_ranks; + mask = 0x1; + + while(mask < size) { + dest = (rank + mask) % size; + src = (rank - mask + size) % size; + + CODES_MPI_Sendrecv(NULL, 0, MPI_INT, dest, 1234, NULL, 0, MPI_INT, src, 1234, + comm, NULL); + + mask <<= 1; + } + num_barriers++; +} + +void CODES_MPI_Isend(const void *buf, + int count, + MPI_Datatype datatype, + int dest, + int tag, + MPI_Comm comm, + MPI_Request *request) +{ + /* add an event in the shared queue and then yield */ + // printf("\n Sending to rank %d ", comm_id); + struct codes_workload_op wrkld_per_rank; + + int datatypesize; + MPI_Type_size(datatype, &datatypesize); + + wrkld_per_rank.op_type = CODES_WK_ISEND; + wrkld_per_rank.u.send.tag = tag; + wrkld_per_rank.u.send.count = count; + wrkld_per_rank.u.send.data_type = datatype; + wrkld_per_rank.u.send.num_bytes = count * datatypesize; + wrkld_per_rank.u.send.dest_rank = dest; + + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + wrkld_per_rank.u.send.source_rank = sctx->my_rank; + sctx->fifo.push_back(&wrkld_per_rank); + + *request = sctx->wait_id; + wrkld_per_rank.u.send.req_id = *request; + sctx->wait_id++; + + ABT_thread_yield_to(global_prod_thread); + num_isends++; +} + +void CODES_MPI_Irecv(void *buf, + int count, + MPI_Datatype datatype, + int source, + int tag, + MPI_Comm comm, + MPI_Request *request) +{ + /* Add an event in the shared queue and then yield */ + struct codes_workload_op wrkld_per_rank; + + int datatypesize; + MPI_Type_size(datatype, &datatypesize); + + wrkld_per_rank.op_type = CODES_WK_IRECV; + wrkld_per_rank.u.recv.tag = tag; + wrkld_per_rank.u.recv.source_rank = source; + wrkld_per_rank.u.recv.count = count; + wrkld_per_rank.u.recv.data_type = datatype; + wrkld_per_rank.u.recv.num_bytes = count * datatypesize; + + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + wrkld_per_rank.u.recv.dest_rank = sctx->my_rank; + sctx->fifo.push_back(&wrkld_per_rank); + + *request = sctx->wait_id; + wrkld_per_rank.u.recv.req_id = *request; + sctx->wait_id++; + + ABT_thread_yield_to(global_prod_thread); + num_irecvs++; +} + +void CODES_MPI_Waitall(int count, + MPI_Request array_of_requests[], + MPI_Status array_of_statuses[]) +{ + num_waitalls++; + /* Add an event in the shared queue and then yield */ + struct codes_workload_op wrkld_per_rank; + + wrkld_per_rank.op_type = CODES_WK_WAITALL; + /* TODO: Check how to convert cycle count into delay? */ + wrkld_per_rank.u.waits.count = count; + wrkld_per_rank.u.waits.req_ids = (unsigned int*)calloc(count, sizeof(int)); + + for(int i = 0; i < count; i++) + wrkld_per_rank.u.waits.req_ids[i] = array_of_requests[i]; + + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + sctx->fifo.push_back(&wrkld_per_rank); + + ABT_thread_yield_to(global_prod_thread); +} + +void CODES_MPI_Reduce(const void *sendbuf, + void *recvbuf, + int count, + MPI_Datatype datatype, + MPI_Op op, + int root, + MPI_Comm comm) +{ + //todo +} + +void CODES_MPI_Allreduce(const void *sendbuf, + void *recvbuf, + int count, + MPI_Datatype datatype, + MPI_Op op, + MPI_Comm comm) +{ + int comm_size, rank, type_size, i, send_idx, recv_idx, last_idx, send_cnt, recv_cnt; + int pof2, mask, rem, newrank, newdst, dst, *cnts, *disps; + + CODES_MPI_Comm_size(comm, &comm_size); + CODES_MPI_Comm_rank(comm, &rank); + MPI_Type_size(datatype, &type_size); + + cnts = disps = NULL; + + pof2 = 1; + while (pof2 <= comm_size) pof2 <<= 1; + pof2 >>=1; + + rem = comm_size - pof2; + + /* In the non-power-of-two case, all even-numbered + processes of rank < 2*rem send their data to + (rank+1). These even-numbered processes no longer + participate in the algorithm until the very end. The + remaining processes form a nice power-of-two. */ + if (rank < 2*rem) { + if (rank % 2 == 0) { /* even */ + CODES_MPI_Send(NULL, count, datatype, rank+1, -1002, comm); + newrank = -1; + } else { /* odd */ + CODES_MPI_Recv(NULL, count, datatype, rank-1, -1002, comm, NULL); + newrank = rank / 2; + } + } else { + newrank = rank - rem; + } + + /* If op is user-defined or count is less than pof2, use + recursive doubling algorithm. Otherwise do a reduce-scatter + followed by allgather. (If op is user-defined, + derived datatypes are allowed and the user could pass basic + datatypes on one process and derived on another as long as + the type maps are the same. Breaking up derived + datatypes to do the reduce-scatter is tricky, therefore + using recursive doubling in that case.) */ + if (newrank != -1) { + if ((count*type_size <= 81920 ) || (count < pof2)) { + mask = 0x1; + while (mask < pof2) { + newdst = newrank ^ mask; + dst = (newdst < rem) ? newdst*2 + 1 : newdst + rem; + + CODES_MPI_Sendrecv(NULL, count, datatype, dst, -1002, NULL, count, datatype, dst, -1002, comm, NULL); + mask <<= 1; + } + } else { + /* do a reduce-scatter followed by allgather */ + /* for the reduce-scatter, calculate the count that + each process receives and the displacement within + the buffer */ + + cnts = (int*)malloc(pof2*sizeof(int)); + disps = (int*)malloc(pof2*sizeof(int)); + + for (i=0; i<(pof2-1); i++) + cnts[i] = count/pof2; + cnts[pof2-1] = count - (count/pof2)*(pof2-1); + + disps[0] = 0; + for (i=1; i>= 1; + while (mask > 0) { + newdst = newrank ^ mask; + /* find real rank of dest */ + dst = (newdst < rem) ? newdst*2 + 1 : newdst + rem; + + send_cnt = recv_cnt = 0; + if (newrank < newdst) { + if (mask != pof2/2) + last_idx = last_idx + pof2/(mask*2); + + recv_idx = send_idx + pof2/(mask*2); + for (i=send_idx; i newdst) send_idx = recv_idx; + mask >>= 1; + } + } + } + + if(rank < 2*rem) { + if(rank % 2) {/* odd */ + CODES_MPI_Send(NULL, count, datatype, rank-1, -1002, comm); + } else { + CODES_MPI_Recv(NULL, count, datatype, rank+1, -1002, comm, NULL); + } + } + + if(cnts) free(cnts); + if(disps) free(disps); +} + +void CODES_MPI_Bcast(void *buffer, + int count, + MPI_Datatype datatype, + int root, + MPI_Comm comm) +{ + //todo +} + +void CODES_MPI_Alltoall(const void *sendbuf, + int sendcount, + MPI_Datatype sendtype, + void *recvbuf, + int recvcount, + MPI_Datatype recvtype, + MPI_Comm comm) +{ + int *sendcounts, *sdispls, *recvcounts, *rdispls; + int i, comm_size; + + CODES_MPI_Comm_size(comm, &comm_size); + + for (i=0; isctx.my_rank == in->rank && tmp->app_id == in->app_id) + return 1; + return 0; +} +static void workload_caller(void * arg) +{ + shared_context* sctx = static_cast(arg); + + //printf("\n workload name %s ", sctx->workload_name); + if(strncmp(sctx->workload_name, "conceptual", 10) == 0) + { + conc_bench_param * conc_params = static_cast (sctx->conc_params); + // printf("program: %s\n",conc_params->conc_program); + // printf("argc: %d\n",conc_params->conc_argc); + int i; + for (i=0; iconc_argc; i++){ + conc_params->conc_argv[i] = conc_params->config_in[i]; + } + // conc_params->argv = &conc_params->conc_argv; + codes_conc_bench_load(conc_params->conc_program, + conc_params->conc_argc, + conc_params->conc_argv); + } +} + +static int comm_online_workload_load(const char * params, int app_id, int rank) +{ + /* LOAD parameters from JSON file*/ + online_comm_params * o_params = (online_comm_params*)params; + int nprocs = o_params->nprocs; + + rank_mpi_context *my_ctx = new rank_mpi_context; + //my_ctx = (rank_mpi_context*)caloc(1, sizeof(rank_mpi_context)); + assert(my_ctx); + my_ctx->sctx.my_rank = rank; + my_ctx->sctx.num_ranks = nprocs; + my_ctx->sctx.wait_id = 0; + my_ctx->app_id = app_id; + + void** generic_ptrs; + int array_len = 1; + generic_ptrs = (void**)calloc(array_len, sizeof(void*)); + generic_ptrs[0] = (void*)&rank; + + strcpy(my_ctx->sctx.workload_name, o_params->workload_name); + boost::property_tree::ptree root, child; + string swm_path, conc_path; + + if(strncmp(o_params->workload_name, "conceptual", 10) == 0) + { + conc_path.append(ONLINE_CONFIGDIR); + conc_path.append("/conceptual.json"); + } + else + tw_error(TW_LOC, "\n Undefined workload type %s ", o_params->workload_name); + + // printf("\npath %s\n", conc_path.c_str()); + try { + std::ifstream jsonFile(conc_path.c_str()); + boost::property_tree::json_parser::read_json(jsonFile, root); + + // printf("workload_name: %s\n", o_params->workload_name); + conc_bench_param *tmp_params = (conc_bench_param *) calloc(1, sizeof(conc_bench_param)); + strcpy(tmp_params->conc_program, &o_params->workload_name[11]); + child = root.get_child(tmp_params->conc_program); + tmp_params->conc_argc = child.get("argc"); + int i = 0; + BOOST_FOREACH(boost::property_tree::ptree::value_type &v, child.get_child("argv")) + { + assert(v.first.empty()); // array elements have no names + // tmp_params->conc_argv[i] = (char *) v.second.data().c_str(); + strcpy(tmp_params->config_in[i], v.second.data().c_str()); + i += 1; + } + my_ctx->sctx.conc_params = (void*) tmp_params; + } + catch(std::exception & e) + { + printf("%s \n", e.what()); + return -1; + } + + if(global_prod_thread == NULL) + { + ABT_xstream_self(&self_es); + ABT_thread_self(&global_prod_thread); + } + ABT_thread_create_on_xstream(self_es, + &workload_caller, (void*)&(my_ctx->sctx), + ABT_THREAD_ATTR_NULL, &(my_ctx->sctx.producer)); + + // printf("Rank %d create app thread %p\n", rank, my_ctx->sctx.producer); + rank_mpi_compare cmp; + cmp.app_id = app_id; + cmp.rank = rank; + + if(!rank_tbl) + { + rank_tbl = qhash_init(hash_rank_compare, quickhash_64bit_hash, nprocs); + if(!rank_tbl) + return -1; + } + qhash_add(rank_tbl, &cmp, &(my_ctx->hash_link)); + rank_tbl_pop++; + + return 0; +} + +static void comm_online_workload_get_next(int app_id, int rank, struct codes_workload_op * op) +{ + /* At this point, we will use the "call" function. The send/receive/wait + * definitions will be replaced by our own function definitions that will do a + * yield to argobots if an event is not available. */ + /* if shared queue is empty then yield */ + + rank_mpi_context * temp_data; + struct qhash_head * hash_link = NULL; + rank_mpi_compare cmp; + cmp.rank = rank; + cmp.app_id = app_id; + hash_link = qhash_search(rank_tbl, &cmp); + if(!hash_link) + { + printf("\n not found for rank id %d , %d", rank, app_id); + op->op_type = CODES_WK_END; + return; + } + temp_data = qhash_entry(hash_link, rank_mpi_context, hash_link); + assert(temp_data); + while(temp_data->sctx.fifo.empty()) + { + // printf("Rank %d fifo empty, yield to app %p\n", rank, temp_data->sctx.producer); + int rc = ABT_thread_yield_to(temp_data->sctx.producer); + } + struct codes_workload_op * front_op = temp_data->sctx.fifo.front(); + assert(front_op); + // printf("Pop op %d to CODES\n", front_op->op_type); + *op = *front_op; + temp_data->sctx.fifo.pop_front(); + return; +} +static int comm_online_workload_get_rank_cnt(const char *params, int app_id) +{ + online_comm_params * o_params = (online_comm_params*)params; + int nprocs = o_params->nprocs; + return nprocs; +} + +static int comm_online_workload_finalize(const char* params, int app_id, int rank) +{ + rank_mpi_context * temp_data; + struct qhash_head * hash_link = NULL; + rank_mpi_compare cmp; + cmp.rank = rank; + cmp.app_id = app_id; + hash_link = qhash_search(rank_tbl, &cmp); + if(!hash_link) + { + printf("\n not found for rank id %d , %d", rank, app_id); + return -1; + } + temp_data = qhash_entry(hash_link, rank_mpi_context, hash_link); + assert(temp_data); + + int rc; + rc = ABT_thread_join(temp_data->sctx.producer); + // printf("thread terminate rc=%d\n", rc); + rc = ABT_thread_free(&(temp_data->sctx.producer)); + // printf("thread free rc=%d\n", rc); + free(temp_data->sctx.conc_params); + return 0; +} +extern "C" { +/* workload method name and function pointers for the CODES workload API */ +struct codes_workload_method conc_online_comm_workload_method = +{ + //.method_name = + (char*)"conc_online_comm_workload", + //.codes_workload_read_config = + NULL, + //.codes_workload_load = + comm_online_workload_load, + //.codes_workload_get_next = + comm_online_workload_get_next, + // .codes_workload_get_next_rc2 = + NULL, + // .codes_workload_get_rank_cnt + comm_online_workload_get_rank_cnt, + // .codes_workload_finalize = + comm_online_workload_finalize +}; +} // closing brace for extern "C" + diff --git a/src/workload/methods/codes-online-comm-wrkld.C b/src/workload/methods/codes-online-comm-wrkld.C index 1ded7a92..7e012552 100644 --- a/src/workload/methods/codes-online-comm-wrkld.C +++ b/src/workload/methods/codes-online-comm-wrkld.C @@ -950,10 +950,10 @@ static int comm_online_workload_finalize(const char* params, int app_id, int ran } extern "C" { /* workload method name and function pointers for the CODES workload API */ -struct codes_workload_method online_comm_workload_method = +struct codes_workload_method swm_online_comm_workload_method = { //.method_name = - (char*)"online_comm_workload", + (char*)"swm_online_comm_workload", //.codes_workload_read_config = NULL, //.codes_workload_load =