Skip to content

Commit 81196d5

Browse files
committed
Add CML style Ch
1 parent a5e27af commit 81196d5

File tree

16 files changed

+575
-149
lines changed

16 files changed

+575
-149
lines changed

bench/bench_ch.ml

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
open Multicore_bench
2+
open Picos
3+
open Picos_std_sync
4+
open Picos_std_structured
5+
6+
let run_one_domain ~budgetf () =
7+
let n_msgs = 200 * Util.iter_factor in
8+
let t = Ch.create () in
9+
let giver () =
10+
for i = 1 to n_msgs do
11+
Ch.give t i
12+
done
13+
and taker () =
14+
for _ = 1 to n_msgs do
15+
Ch.take t |> ignore
16+
done
17+
in
18+
let init _ = () in
19+
let wrap _ () = Scheduler.run in
20+
let work _ () =
21+
Run.all (if Random.bool () then [ taker; giver ] else [ giver; taker ])
22+
in
23+
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
24+
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"
25+
26+
let yielder () =
27+
while true do
28+
Fiber.yield ()
29+
done
30+
31+
let run_one ~budgetf ~n_givers ~n_takers () =
32+
let n_domains = n_givers + n_takers in
33+
34+
let n_msgs = 200 / n_domains * Util.iter_factor in
35+
36+
let t = Ch.create ~padded:true () in
37+
38+
let n_msgs_to_give = Atomic.make 0 |> Multicore_magic.copy_as_padded in
39+
let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in
40+
41+
let init _ =
42+
Atomic.set n_msgs_to_give n_msgs;
43+
Atomic.set n_msgs_to_take n_msgs
44+
in
45+
let wrap _ () = Scheduler.run in
46+
let work i () =
47+
Flock.join_after ~on_return:`Terminate @@ fun () ->
48+
Flock.fork yielder;
49+
begin
50+
if i < n_givers then
51+
let rec work () =
52+
let n = Util.alloc n_msgs_to_give in
53+
if 0 < n then begin
54+
for i = 1 to n do
55+
Ch.give t i
56+
done;
57+
work ()
58+
end
59+
in
60+
work ()
61+
else
62+
let rec work () =
63+
let n = Util.alloc n_msgs_to_take in
64+
if 0 < n then begin
65+
for _ = 1 to n do
66+
Ch.take t |> ignore
67+
done;
68+
work ()
69+
end
70+
in
71+
work ()
72+
end
73+
in
74+
75+
let config =
76+
let format role n =
77+
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
78+
in
79+
Printf.sprintf "%s, %s" (format "giver" n_givers) (format "taker" n_takers)
80+
in
81+
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
82+
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config
83+
84+
let run_suite ~budgetf =
85+
run_one_domain ~budgetf ()
86+
@ (Util.cross [ 1; 2; 4 ] [ 1; 2; 4 ]
87+
|> List.concat_map @@ fun (n_givers, n_takers) ->
88+
if Picos_domain.recommended_domain_count () < n_givers + n_takers then []
89+
else run_one ~budgetf ~n_givers ~n_takers ())

bench/dune

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
(run %{test} -brief "Picos_mpscq")
1919
(run %{test} -brief "Picos_htbl")
2020
(run %{test} -brief "Picos_stdio")
21+
(run %{test} -brief "Picos_sync Ch")
2122
(run %{test} -brief "Picos_sync Stream")
2223
(run %{test} -brief "Fib")
2324
(run %{test} -brief "Picos binaries")

bench/main.ml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ let benchmarks =
1717
("Picos_mpscq", Bench_mpscq.run_suite);
1818
("Picos_htbl", Bench_htbl.run_suite);
1919
("Picos_stdio", Bench_stdio.run_suite);
20+
("Picos_sync Ch", Bench_ch.run_suite);
2021
("Picos_sync Stream", Bench_stream.run_suite);
2122
("Fib", Bench_fib.run_suite);
2223
("Picos binaries", Bench_binaries.run_suite);

lib/picos/bootstrap/picos_bootstrap.ml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,17 @@ module Computation = struct
291291

292292
let returned value = Atomic.make (make_returned value)
293293
let finished = Atomic.make (make_returned ())
294+
295+
let exited : unit t =
296+
Atomic.make
297+
(S (Canceled { exn = Exit; bt = Printexc.get_callstack 0; tx = Stopped }))
298+
299+
let exited () =
300+
let open struct
301+
external unsafe_generalize : unit t -> 'a t = "%identity"
302+
end in
303+
unsafe_generalize exited
304+
294305
let try_return t value = try_terminate t (make_returned value) Backoff.default
295306
let try_finish t = try_terminate t returned_unit Backoff.default
296307

lib/picos/picos.mli

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,10 @@ module Computation : sig
539539
val finished : unit t
540540
(** [finished] is a constant finished computation. *)
541541

542+
val exited : unit -> 'a t
543+
(** [exited ()] returns a constant computation canceled with the {!Exit}
544+
exception. *)
545+
542546
val try_return : 'a t -> 'a -> bool
543547
(** [try_return computation value] attempts to complete the computation with
544548
the specified [value] and returns [true] on success. Otherwise returns

lib/picos_std.structured/picos_std_structured.mli

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,11 @@ end
456456
Ivar.read ivar
457457
end;
458458
459+
Flock.fork begin fun () ->
460+
let ch = Ch.create () in
461+
Ch.give ch "never"
462+
end;
463+
459464
Flock.fork begin fun () ->
460465
let stream = Stream.create () in
461466
Stream.read (Stream.tap stream)
@@ -514,6 +519,8 @@ end
514519
count of the latch never reaches [0],
515520
- {{!Picos_std_sync.Ivar.read} [Ivar.read]} never returns, because the
516521
incremental variable is never filled,
522+
- {{!Picos_std_sync.Ch.give} [Ch.give]} never returns, because no fiber
523+
{{!Picos_std_sync.Ch.take} takes} the message,
517524
- {{!Picos_std_sync.Stream.read} [Stream.read]} never returns, because the
518525
stream is never pushed to,
519526
- {{!Picos_io.Unix.read} [Unix.read]} never returns, because the socket is

lib/picos_std.sync/ch.ml

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
open Picos_std_event
2+
open Picos
3+
module Atomic = Multicore_magic.Transparent_atomic
4+
module Tx = Computation.Tx
5+
6+
type 'a taker =
7+
| T : {
8+
computation : (unit -> 'r) Computation.t;
9+
result : 'a -> 'r;
10+
}
11+
-> 'a taker
12+
13+
type 'a giver =
14+
| G : {
15+
computation : (unit -> 'r) Computation.t;
16+
result : unit -> 'r;
17+
value : 'a;
18+
}
19+
-> 'a giver
20+
21+
type 'a state = { givers : 'a giver Q.t; takers : 'a taker Q.t }
22+
23+
let empty = { givers = T Zero; takers = T Zero }
24+
25+
type 'a t = 'a state Atomic.t
26+
27+
let create ?padded () = Atomic.make empty |> Multicore_magic.copy_as ?padded
28+
29+
(* *)
30+
31+
let[@inline never] wait computation =
32+
let trigger = Trigger.create () in
33+
if Computation.try_attach computation trigger then
34+
match Trigger.await trigger with
35+
| None -> ()
36+
| Some (exn, bt) ->
37+
if Computation.try_cancel computation Exit bt then
38+
Printexc.raise_with_backtrace exn bt
39+
40+
(* *)
41+
42+
let rec give t value backoff =
43+
let before = Atomic.fenceless_get t in
44+
match before.takers with
45+
| Q.T Zero ->
46+
let computation = Computation.create ~mode:`LIFO () in
47+
let self = G { computation; result = Fun.id; value } in
48+
let givers = Q.add before.givers self in
49+
let after = { givers; takers = T Zero } in
50+
if Atomic.compare_and_set t before after then wait computation
51+
else give t value (Backoff.once backoff)
52+
| Q.T (One _ as takers) ->
53+
let (T { computation; result }) = Q.head takers in
54+
let got = Computation.try_return computation (fun () -> result value) in
55+
let takers = Q.tail takers in
56+
let givers = before.givers in
57+
let after =
58+
if takers == T Zero && givers == T Zero then empty
59+
else { givers; takers }
60+
in
61+
let no_contention = Atomic.compare_and_set t before after in
62+
if not got then
63+
give t value (if no_contention then backoff else Backoff.once backoff)
64+
65+
let rec take t backoff =
66+
let before = Atomic.fenceless_get t in
67+
match before.givers with
68+
| Q.T Zero ->
69+
let computation = Computation.create ~mode:`LIFO () in
70+
let self = T { computation; result = Fun.id } in
71+
let takers = Q.add before.takers self in
72+
let after = { givers = T Zero; takers } in
73+
if Atomic.compare_and_set t before after then begin
74+
wait computation;
75+
Computation.await computation ()
76+
end
77+
else take t (Backoff.once backoff)
78+
| Q.T (One _ as givers) ->
79+
let (G { computation; result; value }) = Q.head givers in
80+
let got = Computation.try_return computation result in
81+
let givers = Q.tail givers in
82+
let takers = before.takers in
83+
let after =
84+
if givers == T Zero && takers == T Zero then empty
85+
else { givers; takers }
86+
in
87+
let no_contention = Atomic.compare_and_set t before after in
88+
if got then value
89+
else take t (if no_contention then backoff else Backoff.once backoff)
90+
91+
(* *)
92+
93+
let rec give_as t (G gr as self) before selfs (Cons head_r as head : _ S.cons)
94+
tail =
95+
let (T tr as taker) = head_r.value in
96+
if Tx.same tr.computation gr.computation then
97+
let selfs = S.cons taker selfs in
98+
give_as_advance t self before selfs head tail
99+
else
100+
let tx = Tx.create () in
101+
let result = tr.result in
102+
let value = gr.value in
103+
if not (Tx.try_return tx tr.computation (fun () -> result value)) then
104+
give_as_advance t self before selfs head tail
105+
else if
106+
(not (Tx.try_return tx gr.computation gr.result))
107+
|| not (Tx.try_commit tx)
108+
then
109+
if not (Computation.is_running gr.computation) then ( (* TODO *) )
110+
else if Computation.is_running tr.computation then
111+
give_as t self before selfs head tail
112+
else give_as_advance t self before selfs head tail
113+
else
114+
let takers =
115+
if head == tail then Q.reverse_as_queue selfs
116+
else
117+
let head = S.reverse_to (S.as_cons head_r.next) selfs in
118+
Q.T (One { head; tail; cons = tail })
119+
in
120+
let givers = before.givers in
121+
let after =
122+
if takers == Q.T Zero && givers == Q.T Zero then empty
123+
else { givers; takers }
124+
in
125+
if not (Atomic.compare_and_set t before after) then
126+
( (* TODO: avoid leak *) )
127+
128+
and give_as_advance t self before selfs (Cons head_r as head : _ S.cons) tail =
129+
if head != tail then give_as t self before selfs (S.as_cons head_r.next) tail
130+
else
131+
let takers = Q.reverse_as_queue selfs in
132+
let givers = Q.add before.givers self in
133+
let after = { givers; takers } in
134+
if not (Atomic.compare_and_set t before after) then give_as_start t self
135+
136+
and give_as_start t self =
137+
let before = Atomic.get t in
138+
match before.takers with
139+
| Q.T Zero ->
140+
let takers = Q.T Zero in
141+
let givers = Q.singleton self in
142+
let after = { givers; takers } in
143+
if not (Atomic.compare_and_set t before after) then give_as_start t self
144+
| Q.T (One r as o) ->
145+
Q.exec o;
146+
give_as t self before (T Nil) r.head r.cons
147+
148+
let give_evt t value =
149+
let request computation result =
150+
give_as_start t (G { computation; result; value })
151+
in
152+
Event.from_request { request }
153+
154+
(* *)
155+
156+
let rec take_as t (T tr as self) before selfs (Cons head_r as head : _ S.cons)
157+
tail =
158+
let (G gr as giver) = head_r.value in
159+
if Tx.same tr.computation gr.computation then
160+
let selfs = S.cons giver selfs in
161+
take_as_advance t self before selfs head tail
162+
else
163+
let tx = Tx.create () in
164+
let result = tr.result in
165+
let value = gr.value in
166+
if not (Tx.try_return tx gr.computation gr.result) then
167+
take_as_advance t self before selfs head tail
168+
else if
169+
(not (Tx.try_return tx tr.computation (fun () -> result value)))
170+
|| not (Tx.try_commit tx)
171+
then
172+
if not (Computation.is_running gr.computation) then ( (* TODO *) )
173+
else if Computation.is_running tr.computation then
174+
take_as t self before selfs head tail
175+
else take_as_advance t self before selfs head tail
176+
else
177+
let takers = before.takers in
178+
let givers =
179+
if head == tail then Q.reverse_as_queue selfs
180+
else
181+
let head = S.reverse_to (S.as_cons head_r.next) selfs in
182+
Q.T (One { head; tail; cons = tail })
183+
in
184+
let after =
185+
if takers == Q.T Zero && givers == Q.T Zero then empty
186+
else { givers; takers }
187+
in
188+
if not (Atomic.compare_and_set t before after) then
189+
( (* TODO: avoid leak *) )
190+
191+
and take_as_advance t self before selfs (Cons head_r as head : _ S.cons) tail =
192+
if head != tail then take_as t self before selfs (S.as_cons head_r.next) tail
193+
else
194+
let givers = Q.reverse_as_queue selfs in
195+
let takers = Q.add before.takers self in
196+
let after = { givers; takers } in
197+
if not (Atomic.compare_and_set t before after) then take_as_start t self
198+
199+
and take_as_start t self =
200+
let before = Atomic.get t in
201+
match before.givers with
202+
| Q.T Zero ->
203+
let givers = Q.T Zero in
204+
let takers = Q.singleton self in
205+
let after = { givers; takers } in
206+
if not (Atomic.compare_and_set t before after) then take_as_start t self
207+
| Q.T (One r as o) ->
208+
Q.exec o;
209+
take_as t self before (T Nil) r.head r.cons
210+
211+
let take_evt t =
212+
let request computation result =
213+
take_as_start t (T { computation; result })
214+
in
215+
Event.from_request { request }
216+
217+
(* *)
218+
219+
let[@inline] take t = take t Backoff.default
220+
let[@inline] give t value = give t value Backoff.default

0 commit comments

Comments
 (0)