@@ -293,7 +293,7 @@ check_incoming_control_channel(struct context *c)
293293
294294 struct gc_arena gc = gc_new ();
295295 struct buffer buf = alloc_buf_gc (len , & gc );
296- if (tls_rec_payload (c -> c2 .tls_multi , & buf ))
296+ while (tls_rec_payload (c -> c2 .tls_multi , & buf ))
297297 {
298298 while (BLEN (& buf ) > 1 )
299299 {
@@ -305,10 +305,6 @@ check_incoming_control_channel(struct context *c)
305305 }
306306 }
307307 }
308- else
309- {
310- msg (D_PUSH_ERRORS , "WARNING: Receive control message failed" );
311- }
312308
313309 gc_free (& gc );
314310}
@@ -372,20 +368,18 @@ check_connection_established(struct context *c)
372368}
373369
374370bool
375- send_control_channel_string_dowork (struct tls_session * session , const char * str ,
376- msglvl_t msglevel )
371+ send_control_channel_string_dowork (struct tls_multi * multi , struct key_state * ks , const char * str , msglvl_t msglevel )
377372{
378373 struct gc_arena gc = gc_new ();
379374 bool stat ;
380375
381- ASSERT (session );
382- struct key_state * ks = & session -> key [KS_PRIMARY ];
376+ ASSERT (multi );
383377
384378 /* buffered cleartext write onto TLS control channel */
385379 stat = tls_send_payload (ks , (uint8_t * )str , strlen (str ) + 1 );
386380
387381 msg (msglevel , "SENT CONTROL [%s]: '%s' (status=%d)" ,
388- session -> common_name ? session -> common_name : "UNDEF" , sanitize_control_message (str , & gc ),
382+ multi -> common_name ? multi -> common_name : "UNDEF" , sanitize_control_message (str , & gc ),
389383 (int )stat );
390384
391385 gc_free (& gc );
@@ -402,12 +396,12 @@ reschedule_multi_process(struct context *c)
402396bool
403397send_control_channel_string (struct context * c , const char * str , msglvl_t msglevel )
404398{
405- if (c -> c2 .tls_multi )
399+ struct tls_multi * multi = c -> c2 .tls_multi ;
400+ if (multi )
406401 {
407- struct tls_session * session = & c -> c2 . tls_multi -> session [ TM_ACTIVE ] ;
408- bool ret = send_control_channel_string_dowork (session , str , msglevel );
402+ struct key_state * ks = tls_select_encryption_key_init ( multi ) ;
403+ bool ret = send_control_channel_string_dowork (multi , ks , str , msglevel );
409404 reschedule_multi_process (c );
410-
411405 return ret ;
412406 }
413407 return true;
@@ -2398,11 +2392,13 @@ get_io_flags_udp(struct context *c, struct multi_io *multi_io, const unsigned in
23982392}
23992393
24002394void
2401- io_wait_dowork (struct context * c , const unsigned int flags )
2395+ io_wait_dowork (struct context * c , const unsigned int flags , int t )
24022396{
24032397 unsigned int out_socket ;
24042398 unsigned int out_tuntap ;
24052399 struct event_set_return esr [4 ];
2400+ struct event_set * event_set = ((t & THREAD_RTWL ) != 0 ) ? c -> c2 .event_set : c -> c2 .event_set2 ;
2401+ unsigned int * event_set_status = ((t & THREAD_RTWL ) != 0 ) ? & (c -> c2 .event_set_status ) : & (c -> c2 .event_set_status2 );
24062402
24072403 /* These shifts all depend on EVENT_READ and EVENT_WRITE */
24082404 static uintptr_t socket_shift = SOCKET_SHIFT ; /* depends on SOCKET_READ and SOCKET_WRITE */
@@ -2420,29 +2416,29 @@ io_wait_dowork(struct context *c, const unsigned int flags)
24202416 /*
24212417 * Decide what kind of events we want to wait for.
24222418 */
2423- event_reset (c -> c2 . event_set );
2419+ event_reset (event_set );
24242420
2425- multi_io_process_flags (c , c -> c2 . event_set , flags , & out_socket , & out_tuntap );
2421+ multi_io_process_flags (c , event_set , flags , & out_socket , & out_tuntap );
24262422
24272423#if defined(TARGET_LINUX ) || defined(TARGET_FREEBSD )
24282424 if (out_socket & EVENT_READ && c -> c2 .did_open_tun )
24292425 {
2430- dco_event_set (& c -> c1 .tuntap -> dco , c -> c2 . event_set , (void * )dco_shift );
2426+ dco_event_set (& c -> c1 .tuntap -> dco , event_set , (void * )dco_shift );
24312427 }
24322428#endif
24332429
24342430#ifdef ENABLE_MANAGEMENT
24352431 if (management )
24362432 {
2437- management_socket_set (management , c -> c2 . event_set , (void * )management_shift , NULL );
2433+ management_socket_set (management , event_set , (void * )management_shift , NULL );
24382434 }
24392435#endif
24402436
24412437#ifdef ENABLE_ASYNC_PUSH
24422438 /* arm inotify watcher */
24432439 if (c -> options .mode == MODE_SERVER )
24442440 {
2445- event_ctl (c -> c2 . event_set , c -> c2 .inotify_fd , EVENT_READ , (void * )file_shift );
2441+ event_ctl (event_set , c -> c2 .inotify_fd , EVENT_READ , (void * )file_shift );
24462442 }
24472443#endif
24482444
@@ -2456,7 +2452,7 @@ io_wait_dowork(struct context *c, const unsigned int flags)
24562452 * (6) timeout (tv) expired
24572453 */
24582454
2459- c -> c2 . event_set_status = ES_ERROR ;
2455+ * event_set_status = ES_ERROR ;
24602456
24612457 if (!c -> sig -> signal_received )
24622458 {
@@ -2474,14 +2470,14 @@ io_wait_dowork(struct context *c, const unsigned int flags)
24742470 /*
24752471 * Wait for something to happen.
24762472 */
2477- status = event_wait (c -> c2 . event_set , & c -> c2 .timeval , esr , SIZE (esr ));
2473+ status = event_wait (event_set , & c -> c2 .timeval , esr , SIZE (esr ));
24782474
24792475 check_status (status , "event_wait" , NULL , NULL );
24802476
24812477 if (status > 0 )
24822478 {
24832479 int i ;
2484- c -> c2 . event_set_status = 0 ;
2480+ * event_set_status = 0 ;
24852481 for (i = 0 ; i < status ; ++ i )
24862482 {
24872483 const struct event_set_return * e = & esr [i ];
@@ -2492,7 +2488,7 @@ io_wait_dowork(struct context *c, const unsigned int flags)
24922488 struct event_arg * ev_arg = (struct event_arg * )e -> arg ;
24932489 if (ev_arg -> type != EVENT_ARG_LINK_SOCKET )
24942490 {
2495- c -> c2 . event_set_status = ES_ERROR ;
2491+ * event_set_status = ES_ERROR ;
24962492 msg (D_LINK_ERRORS , "io_work: non socket event delivered" );
24972493 return ;
24982494 }
@@ -2504,30 +2500,26 @@ io_wait_dowork(struct context *c, const unsigned int flags)
25042500 shift = (uintptr_t )e -> arg ;
25052501 }
25062502
2507- c -> c2 . event_set_status |= ((e -> rwflags & 3 ) << shift );
2503+ * event_set_status |= ((e -> rwflags & 3 ) << shift );
25082504 }
25092505 }
2510- else if (status == 0 )
2511- {
2512- c -> c2 .event_set_status = ES_TIMEOUT ;
2513- }
25142506 }
2515- if (sockets_read_residual (c ))
2507+ if (sockets_read_residual (c -> c2 . link_sockets , c -> c1 . link_sockets_num ))
25162508 {
2517- c -> c2 . event_set_status |= (SOCKET_READ << SOCKET_SHIFT );
2509+ * event_set_status |= (SOCKET_READ << SOCKET_SHIFT );
25182510 }
25192511 }
25202512
25212513 /* 'now' should always be a reasonably up-to-date timestamp */
25222514 update_time ();
25232515
25242516 /* set signal_received if a signal was received */
2525- if (c -> c2 . event_set_status & ES_ERROR )
2517+ if (* event_set_status & ES_ERROR )
25262518 {
25272519 get_signal (& c -> sig -> signal_received );
25282520 }
25292521
2530- dmsg (D_EVENT_WAIT , "I/O WAIT status=0x%04x" , c -> c2 . event_set_status );
2522+ dmsg (D_EVENT_WAIT , "I/O WAIT status=0x%04x" , * event_set_status );
25312523}
25322524
25332525void threaded_fwd_inp_intf (struct context * c , struct link_socket * sock , struct thread_pointer * b )
@@ -2547,7 +2539,7 @@ void threaded_fwd_inp_intf(struct context *c, struct link_socket *sock, struct t
25472539}
25482540
25492541void
2550- process_io (struct context * c , struct link_socket * sock , struct thread_pointer * b )
2542+ process_io (struct context * c , struct link_socket * sock , struct thread_pointer * b , int t )
25512543{
25522544 const unsigned int status = c -> c2 .event_set_status ;
25532545
@@ -2560,17 +2552,17 @@ process_io(struct context *c, struct link_socket *sock, struct thread_pointer *b
25602552#endif
25612553
25622554 /* TCP/UDP port ready to accept write */
2563- if (status & SOCKET_WRITE )
2555+ if (( status & SOCKET_WRITE ) && (( t & THREAD_RTWL ) != 0 ) )
25642556 {
25652557 process_outgoing_link (c , sock );
25662558 }
25672559 /* TUN device ready to accept write */
2568- else if (status & TUN_WRITE )
2560+ else if (( status & TUN_WRITE ) && (( t & THREAD_RLWT ) != 0 ) )
25692561 {
25702562 process_outgoing_tun (c , sock );
25712563 }
25722564 /* Incoming data on TCP/UDP port */
2573- else if (status & SOCKET_READ )
2565+ else if (( status & SOCKET_READ ) && (( t & THREAD_RLWT ) != 0 ) )
25742566 {
25752567 read_incoming_link (c , sock );
25762568 if (!IS_SIG (c ))
@@ -2579,15 +2571,77 @@ process_io(struct context *c, struct link_socket *sock, struct thread_pointer *b
25792571 }
25802572 }
25812573 /* Incoming data on TUN device */
2582- else if (status & TUN_READ )
2574+ else if (( status & TUN_READ ) && (( t & THREAD_RTWL ) != 0 ) )
25832575 {
25842576 threaded_fwd_inp_intf (c , sock , b );
25852577 }
2586- else if (status & DCO_READ )
2578+ else if (( status & DCO_READ ) && (( t & THREAD_RTWL ) != 0 ) )
25872579 {
25882580 if (!IS_SIG (c ))
25892581 {
25902582 process_incoming_dco (c );
25912583 }
25922584 }
25932585}
2586+
2587+ void * threaded_process_io (void * a )
2588+ {
2589+ /*
2590+ dual mode commit notes:
2591+ - thread1 handles tunn-read->-link-send and thread2 handles link-read->-tunn-send
2592+ - set thread1 to handle the pre_select() call as it falls under the threaded paths
2593+ and the function will eventually overwrite the c2.buf and c2.to_link buffer variables
2594+ which will then cause data conflict and corruption errors if not called from thread1
2595+ - the server is slower to move through the key session states than the client is
2596+ so hold onto old keys longer before rotation and delay using new keys before selection
2597+ - dual mode is based on the organization and separation code implemented in bulk mode
2598+ */
2599+
2600+ struct dual_args * d = (struct dual_args * )a ;
2601+ struct thread_pointer * b = d -> b ;
2602+
2603+ int t = d -> t ;
2604+ unsigned int f = d -> f ;
2605+ uint8_t buff [5 ];
2606+ size_t leng ;
2607+
2608+ fd_set rfds ;
2609+ struct timeval timo ;
2610+
2611+ while (true)
2612+ {
2613+ if (b -> p -> z != 1 ) { break ; }
2614+
2615+ FD_ZERO (& rfds ); FD_SET (d -> w [0 ][0 ], & rfds );
2616+ timo .tv_sec = 1 ; timo .tv_usec = 750000 ;
2617+ select (d -> w [0 ][0 ]+ 1 , & rfds , NULL , NULL , & timo );
2618+
2619+ if (b -> p -> z != 1 ) { break ; }
2620+
2621+ if (FD_ISSET (d -> w [0 ][0 ], & rfds ))
2622+ {
2623+ leng = read (d -> w [0 ][0 ], buff , 1 );
2624+ if (leng < 1 ) { /* no-op */ }
2625+
2626+ if (d -> a == TA_UNDEF )
2627+ {
2628+ multi_io_process_io (b , f , t );
2629+ }
2630+ else if (d -> a == TA_TIMEOUT )
2631+ {
2632+ struct multi_context * m = b -> p -> m [b -> i - 1 ];
2633+ multi_io_action (m , NULL , TA_TIMEOUT , false, f , t );
2634+ }
2635+ else if (d -> a == TA_FORWARD )
2636+ {
2637+ struct context * c = d -> c ;
2638+ process_io (c , c -> c2 .link_sockets [0 ], b , t );
2639+ }
2640+
2641+ d -> z = 0 ;
2642+ leng = write (d -> w [1 ][1 ], buff , 1 );
2643+ }
2644+ }
2645+
2646+ return NULL ;
2647+ }
0 commit comments