@@ -232,12 +232,18 @@ MPI_Datatype *dts;
232232char * msgbody ;
233233pthread_mutex_t lock_am ;
234234int done_am = 0 ;
235+ pthread_t commthread ;
236+ bool commthread_running = true;
237+ static const int CAF_CT_TAG = 13 ;
235238
236239char err_buffer [MPI_MAX_ERROR_STRING ];
237240
238241/* All CAF runtime calls should use this comm instead of MPI_COMM_WORLD for
239242 * interoperability purposes. */
240243MPI_Comm CAF_COMM_WORLD ;
244+ MPI_Comm ct_COMM ;
245+
246+ static const int CT_STATUS_TERM_REQ = -1 ;
241247
242248static caf_teams_list * teams_list = NULL ;
243249static caf_used_teams_list * used_teams = NULL ;
@@ -405,6 +411,69 @@ helperFunction()
405411}
406412#endif
407413
414+ void *
415+ communication_thread (void * )
416+ {
417+ int ierr = 0 ;
418+ int cnt ;
419+ MPI_Status status ;
420+
421+ dprint ("ct: Started.\n" );
422+
423+ do
424+ {
425+ dprint ("ct: Waiting for request.\n" );
426+ ierr = MPI_Probe (MPI_ANY_SOURCE , CAF_CT_TAG , ct_COMM , & status );
427+ dprint ("ct: Woke up.\n" );
428+ if (status .MPI_ERROR == MPI_SUCCESS )
429+ {
430+ MPI_Get_count (& status , MPI_BYTE , & cnt );
431+
432+ struct
433+ {
434+ MPI_Win win ;
435+ size_t sz ;
436+ } msg ;
437+ if (cnt >= sizeof (msg ))
438+ {
439+ ierr = MPI_Recv (& msg , cnt , MPI_BYTE , status .MPI_SOURCE , status .MPI_TAG ,
440+ ct_COMM , & status );
441+ chk_err (ierr );
442+ dprint ("ct: Received request of size %ld.\n" , cnt );
443+
444+ void * bptr ;
445+ int flag ;
446+ ierr = MPI_Win_get_attr (msg .win , MPI_WIN_BASE , & bptr , & flag );
447+ chk_err (ierr );
448+ dprint ("ct: Local base for win %ld is %p (set: %b).\n" , msg .win , bptr ,
449+ flag );
450+ if (!flag )
451+ {
452+ dprint ("ct: Error: Window %p memory is not allocated.\n" , msg .win );
453+ }
454+ ierr = MPI_Send (bptr , msg .sz , MPI_BYTE , status .MPI_SOURCE ,
455+ status .MPI_TAG + 1 , ct_COMM );
456+ chk_err (ierr );
457+ }
458+ else if (!commthread_running )
459+ {
460+ /* Pickup empty message. */
461+ MPI_Recv (& msg , cnt , MPI_BYTE , status .MPI_SOURCE , status .MPI_TAG ,
462+ ct_COMM , & status );
463+ }
464+ else
465+ {
466+ dprint ("ct: Error: message to small, ignoring (got: %ld, exp: %ld).\n" ,
467+ cnt , sizeof (msg ));
468+ }
469+ }
470+ else
471+ chk_err (ierr );
472+ } while (commthread_running );
473+ dprint ("ct: Ended.\n" );
474+ return NULL ;
475+ }
476+
408477/* Keep in sync with single.c. */
409478
410479static void
@@ -841,7 +910,7 @@ PREFIX(init)(int *argc, char ***argv)
841910 if (caf_num_images == 0 )
842911 {
843912 int ierr = 0 , i = 0 , j = 0 , rc , prov_lev = 0 ;
844- int is_init = 0 , prior_thread_level = MPI_THREAD_FUNNELED ;
913+ int is_init = 0 , prior_thread_level = MPI_THREAD_MULTIPLE ;
845914 ierr = MPI_Initialized (& is_init );
846915 chk_err (ierr );
847916
@@ -850,6 +919,7 @@ PREFIX(init)(int *argc, char ***argv)
850919 ierr = MPI_Query_thread (& prior_thread_level );
851920 chk_err (ierr );
852921 }
922+ dprint ("Main thread: thread level: %d\n" , prior_thread_level );
853923#ifdef HELPER
854924 if (is_init )
855925 {
@@ -990,6 +1060,11 @@ PREFIX(init)(int *argc, char ***argv)
9901060 * win_model , flag );
9911061 }
9921062#endif
1063+
1064+ ierr = MPI_Comm_dup (MPI_COMM_WORLD , & ct_COMM );
1065+ chk_err (ierr );
1066+ ierr = pthread_create (& commthread , NULL , & communication_thread , NULL );
1067+ chk_err (ierr );
9931068 }
9941069}
9951070
@@ -1059,9 +1134,12 @@ finalize_internal(int status_code)
10591134 /* Add a conventional barrier to prevent images from quitting too early. */
10601135 if (status_code == 0 )
10611136 {
1062- dprint ("In barrier for finalize..." );
1063- ierr = MPI_Barrier (CAF_COMM_WORLD );
1064- chk_err (ierr );
1137+ if (caf_num_images > 1 )
1138+ {
1139+ dprint ("In barrier for finalize..." );
1140+ ierr = MPI_Barrier (CAF_COMM_WORLD );
1141+ chk_err (ierr );
1142+ }
10651143 }
10661144 else
10671145 /* Without failed images support, but a given status_code, we need to
@@ -1126,6 +1204,17 @@ finalize_internal(int status_code)
11261204 chk_err (ierr );
11271205#endif // MPI_VERSION
11281206
1207+ dprint ("Sending termination signal to communication thread.\n" );
1208+ commthread_running = false;
1209+ ierr = MPI_Send (NULL , 0 , MPI_BYTE , caf_this_image - 1 , CAF_CT_TAG , ct_COMM );
1210+ chk_err (ierr );
1211+ dprint ("Termination signal send, waiting for thread join.\n" );
1212+ ierr = pthread_join (commthread , NULL );
1213+ dprint ("Communication thread terminated with rc = %d.\n" , ierr );
1214+ dprint ("Freeing ct_COMM.\n" );
1215+ MPI_Comm_free (& ct_COMM );
1216+ dprint ("Freeed ct_COMM.\n" );
1217+
11291218 /* Free the global dynamic window. */
11301219 ierr = MPI_Win_free (& global_dynamic_win );
11311220 chk_err (ierr );
@@ -1200,6 +1289,7 @@ finalize_internal(int status_code)
12001289 caf_is_finalized = 1 ;
12011290#endif
12021291 free (sync_handles );
1292+
12031293 dprint ("Finalisation done!!!\n" );
12041294}
12051295
@@ -1348,9 +1438,15 @@ void PREFIX(register)(size_t size, caf_register_t type, caf_token_t *token,
13481438 p = TOKEN (mpi_token );
13491439
13501440#if MPI_VERSION >= 3
1351- ierr = MPI_Win_allocate (actual_size , 1 , MPI_INFO_NULL , CAF_COMM_WORLD ,
1352- & mem , p );
1441+ void * flavor ;
1442+ int flag = -1 ;
1443+ ierr = MPI_Win_allocate /*_shared*/ (actual_size , 1 , MPI_INFO_NULL ,
1444+ CAF_COMM_WORLD , & mem , p );
1445+ chk_err (ierr );
1446+ ierr = MPI_Win_get_attr (* p , MPI_WIN_CREATE_FLAVOR , & flavor , & flag );
13531447 chk_err (ierr );
1448+ dprint ("win %d has create flavor: %x, flag: %d.\n" , * p , * (int * )flavor ,
1449+ flag );
13541450 CAF_Win_lock_all (* p );
13551451#else
13561452 ierr = MPI_Alloc_mem (actual_size , MPI_INFO_NULL , & mem );
@@ -3683,11 +3779,23 @@ PREFIX(get)(caf_token_t token, size_t offset, int image_index,
36833779 {
36843780 const size_t trans_size
36853781 = ((dst_size > src_size ) ? src_size : dst_size ) * size ;
3686- CAF_Win_lock (MPI_LOCK_SHARED , remote_image , * p );
3687- ierr = MPI_Get (dest -> base_addr , trans_size , MPI_BYTE , remote_image ,
3688- offset , trans_size , MPI_BYTE , * p );
3782+ struct
3783+ {
3784+ MPI_Win win ;
3785+ size_t sz ;
3786+ } buf = {* p , trans_size };
3787+ int tag = CAF_CT_TAG ; // + caf_this_image) % 0xffff;
3788+ ierr
3789+ = MPI_Sendrecv (& buf , sizeof (buf ), MPI_BYTE , remote_image , tag ,
3790+ dest -> base_addr , trans_size , MPI_BYTE ,
3791+ remote_image , tag + 1 , ct_COMM , MPI_STATUS_IGNORE );
36893792 chk_err (ierr );
3690- CAF_Win_unlock (remote_image , * p );
3793+
3794+ // CAF_Win_lock(MPI_LOCK_SHARED, remote_image, *p);
3795+ // ierr = MPI_Get(dest->base_addr, trans_size, MPI_BYTE, remote_image,
3796+ // offset, trans_size, MPI_BYTE, *p);
3797+ // chk_err(ierr);
3798+ // CAF_Win_unlock(remote_image, *p);
36913799 }
36923800 else
36933801 {
0 commit comments