@@ -269,30 +269,41 @@ static vine_msg_code_t handle_name(struct vine_manager *q, struct vine_worker_in
269
269
}
270
270
271
271
/*
272
- Handle a timeout request from a worker. Check if the worker has any important data before letting it go.
272
+ Handle a request from a worker to shutdown because it has been idle for a while.
273
+ However, do not accept the request if the manager has some further information
274
+ indicating that the worker should be kept around.
273
275
*/
274
276
275
- static void handle_worker_timeout (struct vine_manager * q , struct vine_worker_info * w )
277
+ static void handle_idle_disconnect_request (struct vine_manager * q , struct vine_worker_info * w )
276
278
{
277
- // Look at the files and check if any are endangered temps.
278
279
char * cachename ;
279
280
struct vine_file_replica * replica ;
280
- // debug(D_VINE, "Handling timeout request");
281
+
282
+ /* First check to see if this worker has any unique files that should not be lost. */
283
+
281
284
HASH_TABLE_ITERATE (w -> current_files , cachename , replica )
282
285
{
283
286
if (replica -> type == VINE_TEMP ) {
284
287
int c = vine_file_replica_table_count_replicas (q , cachename , VINE_FILE_REPLICA_STATE_READY );
285
288
if (c == 1 ) {
286
- debug (D_VINE , "Rejecting timeout request from worker %s (%s). Has unique file %s" , w -> hostname , w -> addrport , cachename );
289
+ debug (D_VINE , "Rejecting disconnect request from worker %s (%s). Has unique file %s" , w -> hostname , w -> addrport , cachename );
287
290
return ;
288
291
}
289
292
}
290
293
}
291
294
295
+ /*
296
+ Then, if it is not running any tasks, tell it to exit and shut down cleanly.
297
+ We do not disconnect just yet because there may be more messages pending,
298
+ or other information the worker wants to send as it shuts down.
299
+ The worker will disconnect on its own.
300
+ Also, we don't want to invalidate the worker object w in an unexpected location.
301
+ */
302
+
292
303
if (itable_size (w -> current_tasks ) == 0 ) {
293
- debug (D_VINE , "Accepting timeout request from worker %s (%s)." , w -> hostname , w -> addrport );
304
+ debug (D_VINE , "Accepting disconnect request from worker %s (%s)." , w -> hostname , w -> addrport );
294
305
q -> stats -> workers_idled_out ++ ;
295
- vine_manager_shut_down_worker (q , w );
306
+ vine_manager_send (q , w , "exit\n" );
296
307
}
297
308
298
309
return ;
@@ -313,7 +324,7 @@ static vine_msg_code_t handle_info(struct vine_manager *q, struct vine_worker_in
313
324
if (string_prefix_is (field , "tasks_running" )) {
314
325
w -> dynamic_tasks_running = atoi (value );
315
326
} else if (string_prefix_is (field , "idle-disconnect-request" )) {
316
- handle_worker_timeout (q , w );
327
+ handle_idle_disconnect_request (q , w );
317
328
} else if (string_prefix_is (field , "worker-id" )) {
318
329
free (w -> workerid );
319
330
w -> workerid = xxstrdup (value );
@@ -2507,6 +2518,10 @@ static vine_result_code_t handle_worker(struct vine_manager *q, struct link *l)
2507
2518
w = hash_table_lookup (q -> worker_table , key );
2508
2519
free (key );
2509
2520
2521
+ /* This should not happen, but just in case: */
2522
+ if (!w )
2523
+ return VINE_WORKER_FAILURE ;
2524
+
2510
2525
vine_msg_code_t mcode ;
2511
2526
mcode = vine_manager_recv_no_retry (q , w , line , sizeof (line ));
2512
2527
@@ -4961,7 +4976,12 @@ struct vine_task *vine_wait_for_task_id(struct vine_manager *q, int task_id, int
4961
4976
return vine_wait_internal (q , timeout , NULL , task_id );
4962
4977
}
4963
4978
4964
- /* return number of workers that failed */
4979
+ /*
4980
+ Consider all of the connected workers, and act open each connection
4981
+ that has pending input data, until the input buffer is empty.
4982
+ Return the number of workers that *failed* and disconnected.
4983
+ */
4984
+
4965
4985
static int poll_active_workers (struct vine_manager * q , int stoptime )
4966
4986
{
4967
4987
BEGIN_ACCUM_TIME (q , time_polling );
@@ -4993,9 +5013,14 @@ static int poll_active_workers(struct vine_manager *q, int stoptime)
4993
5013
4994
5014
int i ;
4995
5015
int workers_failed = 0 ;
4996
- // Then consider all existing active workers
5016
+
5017
+ /* Consider all active connections of any kind. */
4997
5018
for (i = 1 ; i < n ; i ++ ) {
5019
+
5020
+ /* If there is pending input data on that connection. */
4998
5021
if (q -> poll_table [i ].revents ) {
5022
+
5023
+ /* Act on the next input message, until there is no more buffered data. */
4999
5024
do {
5000
5025
if (handle_worker (q , q -> poll_table [i ].link ) == VINE_WORKER_FAILURE ) {
5001
5026
workers_failed ++ ;
0 commit comments