forked from coq/coq
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathworkerPool.ml
130 lines (109 loc) · 4.01 KB
/
workerPool.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
(************************************************************************)
(* * The Coq Proof Assistant / The Coq Development Team *)
(* v * Copyright INRIA, CNRS and contributors *)
(* <O___,, * (see version control and CREDITS file for authors & dates) *)
(* \VV/ **************************************************************)
(* // * This file is distributed under the terms of the *)
(* * GNU Lesser General Public License Version 2.1 *)
(* * (see LICENSE file for the text of the license) *)
(************************************************************************)
type worker_id = string
type 'a cpanel = {
exit : unit -> unit; (* called by manager to exit instead of Thread.exit *)
cancelled : unit -> bool; (* manager checks for a request of termination *)
extra : 'a; (* extra stuff to pass to the manager *)
}
module type PoolModel = sig
(* this shall come from a Spawn.* model *)
type process
val spawn : int -> CoqworkmgrApi.priority -> worker_id * process * CThread.thread_ic * out_channel
(* this defines the main loop of the manager *)
type extra
val manager :
extra cpanel -> worker_id * process * CThread.thread_ic * out_channel -> unit
end
module Make(Model : PoolModel) = struct
type worker = {
name : worker_id;
cancel : bool ref;
manager : Thread.t;
process : Model.process;
}
type pre_pool = {
workers : worker list ref;
count : int ref;
extra_arg : Model.extra;
}
type pool = { lock : Mutex.t; pool : pre_pool }
let magic_no = 17
let master_handshake worker_id ic oc =
try
Marshal.to_channel oc magic_no []; flush oc;
let n = (CThread.thread_friendly_input_value ic : int) in
if n <> magic_no then begin
Printf.eprintf "Handshake with %s failed: protocol mismatch\n" worker_id;
exit 1;
end
with e when CErrors.noncritical e ->
Printf.eprintf "Handshake with %s failed: %s\n"
worker_id (Printexc.to_string e);
exit 1
let worker_handshake slave_ic slave_oc =
try
let v = (CThread.thread_friendly_input_value slave_ic : int) in
if v <> magic_no then begin
prerr_endline "Handshake failed: protocol mismatch\n";
exit 1;
end;
Marshal.to_channel slave_oc v []; flush slave_oc;
with e when CErrors.noncritical e ->
prerr_endline ("Handshake failed: " ^ Printexc.to_string e);
exit 1
let locking { lock; pool = p } f =
try
Mutex.lock lock;
let x = f p in
Mutex.unlock lock;
x
with e -> Mutex.unlock lock; raise e
let rec create_worker extra pool priority id =
let cancel = ref false in
let name, process, ic, oc as worker = Model.spawn id priority in
master_handshake name ic oc;
let exit () = cancel := true; cleanup pool priority; Thread.exit () in
let cancelled () = !cancel in
let cpanel = { exit; cancelled; extra } in
let manager = CThread.create (Model.manager cpanel) worker in
{ name; cancel; manager; process }
and cleanup x priority = locking x begin fun { workers; count; extra_arg } ->
workers := List.map (function
| { cancel } as w when !cancel = false -> w
| _ -> let n = !count in incr count; create_worker extra_arg x priority n)
!workers
end
let n_workers x = locking x begin fun { workers } ->
List.length !workers
end
let is_empty x = locking x begin fun { workers } -> !workers = [] end
let create extra_arg ~size priority = let x = {
lock = Mutex.create ();
pool = {
extra_arg;
workers = ref [];
count = ref size;
}} in
locking x begin fun { workers } ->
workers := CList.init size (create_worker extra_arg x priority)
end;
x
let cancel n x = locking x begin fun { workers } ->
List.iter (fun { name; cancel } -> if n = name then cancel := true) !workers
end
let cancel_all x = locking x begin fun { workers } ->
List.iter (fun { cancel } -> cancel := true) !workers
end
let destroy x = locking x begin fun { workers } ->
List.iter (fun { cancel } -> cancel := true) !workers;
workers := []
end
end