@@ -267,30 +267,41 @@ static vine_msg_code_t handle_name(struct vine_manager *q, struct vine_worker_in
267
267
}
268
268
269
269
/*
270
- Handle a timeout request from a worker. Check if the worker has any important data before letting it go.
270
+ Handle a request from a worker to shutdown because it has been idle for a while.
271
+ However, do not accept the request if the manager has some further information
272
+ indicating that the worker should be kept around.
271
273
*/
272
274
273
- static void handle_worker_timeout (struct vine_manager * q , struct vine_worker_info * w )
275
+ static void handle_idle_disconnect_request (struct vine_manager * q , struct vine_worker_info * w )
274
276
{
275
- // Look at the files and check if any are endangered temps.
276
277
char * cachename ;
277
278
struct vine_file_replica * replica ;
278
- // debug(D_VINE, "Handling timeout request");
279
+
280
+ /* First check to see if this worker has any unique files that should not be lost. */
281
+
279
282
HASH_TABLE_ITERATE (w -> current_files , cachename , replica )
280
283
{
281
284
if (replica -> type == VINE_TEMP ) {
282
285
int c = vine_file_replica_table_count_replicas (q , cachename , VINE_FILE_REPLICA_STATE_READY );
283
286
if (c == 1 ) {
284
- debug (D_VINE , "Rejecting timeout request from worker %s (%s). Has unique file %s" , w -> hostname , w -> addrport , cachename );
287
+ debug (D_VINE , "Rejecting disconnect request from worker %s (%s). Has unique file %s" , w -> hostname , w -> addrport , cachename );
285
288
return ;
286
289
}
287
290
}
288
291
}
289
292
293
+ /*
294
+ Then, if it is not running any tasks, tell it to exit and shut down cleanly.
295
+ We do not disconnect just yet because there may be more messages pending,
296
+ or other information the worker wants to send as it shuts down.
297
+ The worker will disconnect on its own.
298
+ Also, we don't want to invalidate the worker object w in an unexpected location.
299
+ */
300
+
290
301
if (itable_size (w -> current_tasks ) == 0 ) {
291
- debug (D_VINE , "Accepting timeout request from worker %s (%s)." , w -> hostname , w -> addrport );
302
+ debug (D_VINE , "Accepting disconnect request from worker %s (%s)." , w -> hostname , w -> addrport );
292
303
q -> stats -> workers_idled_out ++ ;
293
- vine_manager_shut_down_worker (q , w );
304
+ vine_manager_send (q , w , "exit\n" );
294
305
}
295
306
296
307
return ;
@@ -311,7 +322,7 @@ static vine_msg_code_t handle_info(struct vine_manager *q, struct vine_worker_in
311
322
if (string_prefix_is (field , "tasks_running" )) {
312
323
w -> dynamic_tasks_running = atoi (value );
313
324
} else if (string_prefix_is (field , "idle-disconnect-request" )) {
314
- handle_worker_timeout (q , w );
325
+ handle_idle_disconnect_request (q , w );
315
326
} else if (string_prefix_is (field , "worker-id" )) {
316
327
free (w -> workerid );
317
328
w -> workerid = xxstrdup (value );
@@ -2467,6 +2478,10 @@ static vine_result_code_t handle_worker(struct vine_manager *q, struct link *l)
2467
2478
w = hash_table_lookup (q -> worker_table , key );
2468
2479
free (key );
2469
2480
2481
+ /* This should not happen, but just in case: */
2482
+ if (!w )
2483
+ return VINE_WORKER_FAILURE ;
2484
+
2470
2485
vine_msg_code_t mcode ;
2471
2486
mcode = vine_manager_recv_no_retry (q , w , line , sizeof (line ));
2472
2487
@@ -4776,7 +4791,12 @@ struct vine_task *vine_wait_for_task_id(struct vine_manager *q, int task_id, int
4776
4791
return vine_wait_internal (q , timeout , NULL , task_id );
4777
4792
}
4778
4793
4779
- /* return number of workers that failed */
4794
+ /*
4795
+ Consider all of the connected workers, and act open each connection
4796
+ that has pending input data, until the input buffer is empty.
4797
+ Return the number of workers that *failed* and disconnected.
4798
+ */
4799
+
4780
4800
static int poll_active_workers (struct vine_manager * q , int stoptime )
4781
4801
{
4782
4802
BEGIN_ACCUM_TIME (q , time_polling );
@@ -4808,9 +4828,14 @@ static int poll_active_workers(struct vine_manager *q, int stoptime)
4808
4828
4809
4829
int i ;
4810
4830
int workers_failed = 0 ;
4811
- // Then consider all existing active workers
4831
+
4832
+ /* Consider all active connections of any kind. */
4812
4833
for (i = 1 ; i < n ; i ++ ) {
4834
+
4835
+ /* If there is pending input data on that connection. */
4813
4836
if (q -> poll_table [i ].revents ) {
4837
+
4838
+ /* Act on the next input message, until there is no more buffered data. */
4814
4839
do {
4815
4840
if (handle_worker (q , q -> poll_table [i ].link ) == VINE_WORKER_FAILURE ) {
4816
4841
workers_failed ++ ;
0 commit comments