Skip to content

Commit 962c21e

Browse files
committed
Add CML style Ch
1 parent e96159e commit 962c21e

File tree

14 files changed

+560
-149
lines changed

14 files changed

+560
-149
lines changed

bench/bench_ch.ml

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
open Multicore_bench
2+
open Picos
3+
open Picos_std_sync
4+
open Picos_std_structured
5+
6+
let run_one_domain ~budgetf () =
7+
let n_msgs = 200 * Util.iter_factor in
8+
let t = Ch.create () in
9+
let giver () =
10+
for i = 1 to n_msgs do
11+
Ch.give t i
12+
done
13+
and taker () =
14+
for _ = 1 to n_msgs do
15+
Ch.take t |> ignore
16+
done
17+
in
18+
let init _ = () in
19+
let wrap _ () = Scheduler.run in
20+
let work _ () =
21+
Run.all (if Random.bool () then [ taker; giver ] else [ giver; taker ])
22+
in
23+
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
24+
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"
25+
26+
let yielder () =
27+
while true do
28+
Fiber.yield ()
29+
done
30+
31+
let run_one ~budgetf ~n_givers ~n_takers () =
32+
let n_domains = n_givers + n_takers in
33+
34+
let n_msgs = 200 / n_domains * Util.iter_factor in
35+
36+
let t = Ch.create ~padded:true () in
37+
38+
let n_msgs_to_give = Atomic.make 0 |> Multicore_magic.copy_as_padded in
39+
let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in
40+
41+
let init _ =
42+
Atomic.set n_msgs_to_give n_msgs;
43+
Atomic.set n_msgs_to_take n_msgs
44+
in
45+
let wrap _ () = Scheduler.run in
46+
let work i () =
47+
Flock.join_after ~on_return:`Terminate @@ fun () ->
48+
Flock.fork yielder;
49+
begin
50+
if i < n_givers then
51+
let rec work () =
52+
let n = Util.alloc n_msgs_to_give in
53+
if 0 < n then begin
54+
for i = 1 to n do
55+
Ch.give t i
56+
done;
57+
work ()
58+
end
59+
in
60+
work ()
61+
else
62+
let rec work () =
63+
let n = Util.alloc n_msgs_to_take in
64+
if 0 < n then begin
65+
for _ = 1 to n do
66+
Ch.take t |> ignore
67+
done;
68+
work ()
69+
end
70+
in
71+
work ()
72+
end
73+
in
74+
75+
let config =
76+
let format role n =
77+
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
78+
in
79+
Printf.sprintf "%s, %s" (format "giver" n_givers) (format "taker" n_takers)
80+
in
81+
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
82+
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config
83+
84+
let run_suite ~budgetf =
85+
run_one_domain ~budgetf ()
86+
@ (Util.cross [ 1; 2; 4 ] [ 1; 2; 4 ]
87+
|> List.concat_map @@ fun (n_givers, n_takers) ->
88+
if Picos_domain.recommended_domain_count () < n_givers + n_takers then []
89+
else run_one ~budgetf ~n_givers ~n_takers ())

bench/dune

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
(run %{test} -brief "Picos_mpscq")
1919
(run %{test} -brief "Picos_htbl")
2020
(run %{test} -brief "Picos_stdio")
21+
(run %{test} -brief "Picos_sync Ch")
2122
(run %{test} -brief "Picos_sync Stream")
2223
(run %{test} -brief "Fib")
2324
(run %{test} -brief "Picos binaries")

bench/main.ml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ let benchmarks =
1717
("Picos_mpscq", Bench_mpscq.run_suite);
1818
("Picos_htbl", Bench_htbl.run_suite);
1919
("Picos_stdio", Bench_stdio.run_suite);
20+
("Picos_sync Ch", Bench_ch.run_suite);
2021
("Picos_sync Stream", Bench_stream.run_suite);
2122
("Fib", Bench_fib.run_suite);
2223
("Picos binaries", Bench_binaries.run_suite);

lib/picos_std.structured/picos_std_structured.mli

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,11 @@ end
456456
Ivar.read ivar
457457
end;
458458
459+
Flock.fork begin fun () ->
460+
let ch = Ch.create () in
461+
Ch.give ch "never"
462+
end;
463+
459464
Flock.fork begin fun () ->
460465
let stream = Stream.create () in
461466
Stream.read (Stream.tap stream)
@@ -514,6 +519,8 @@ end
514519
count of the latch never reaches [0],
515520
- {{!Picos_std_sync.Ivar.read} [Ivar.read]} never returns, because the
516521
incremental variable is never filled,
522+
- {{!Picos_std_sync.Ch.give} [Ch.give]} never returns, because no fiber
523+
{{!Picos_std_sync.Ch.take} takes} the message,
517524
- {{!Picos_std_sync.Stream.read} [Stream.read]} never returns, because the
518525
stream is never pushed to,
519526
- {{!Picos_io.Unix.read} [Unix.read]} never returns, because the socket is

lib/picos_std.sync/ch.ml

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
open Picos_std_event
2+
open Picos
3+
module Atomic = Multicore_magic.Transparent_atomic
4+
module Tx = Computation.Tx
5+
6+
type 'a taker =
7+
| T : {
8+
computation : (unit -> 'r) Computation.t;
9+
result : 'a -> 'r;
10+
}
11+
-> 'a taker
12+
13+
type 'a giver =
14+
| G : {
15+
computation : (unit -> 'r) Computation.t;
16+
result : unit -> 'r;
17+
value : 'a;
18+
}
19+
-> 'a giver
20+
21+
type 'a state = { givers : 'a giver Q.t; takers : 'a taker Q.t }
22+
23+
let empty = { givers = T Zero; takers = T Zero }
24+
25+
type 'a t = 'a state Atomic.t
26+
27+
let create ?padded () = Atomic.make empty |> Multicore_magic.copy_as ?padded
28+
29+
(* *)
30+
31+
let[@inline never] wait computation =
32+
let trigger = Trigger.create () in
33+
if Computation.try_attach computation trigger then
34+
match Trigger.await trigger with
35+
| None -> ()
36+
| Some (exn, bt) ->
37+
if Computation.try_cancel computation Exit bt then
38+
Printexc.raise_with_backtrace exn bt
39+
40+
(* *)
41+
42+
let rec give t value backoff =
43+
let before = Atomic.fenceless_get t in
44+
match before.takers with
45+
| Q.T Zero ->
46+
let computation = Computation.create ~mode:`LIFO () in
47+
let self = G { computation; result = Fun.id; value } in
48+
let givers = Q.add before.givers self in
49+
let after = { givers; takers = T Zero } in
50+
if Atomic.compare_and_set t before after then wait computation
51+
else give t value (Backoff.once backoff)
52+
| Q.T (One _ as takers) ->
53+
let (T { computation; result }) = Q.head takers in
54+
let got = Computation.try_return computation (fun () -> result value) in
55+
let takers = Q.tail takers in
56+
let givers = before.givers in
57+
let after =
58+
if takers == T Zero && givers == T Zero then empty
59+
else { givers; takers }
60+
in
61+
let no_contention = Atomic.compare_and_set t before after in
62+
if not got then
63+
give t value (if no_contention then backoff else Backoff.once backoff)
64+
65+
let rec take t backoff =
66+
let before = Atomic.fenceless_get t in
67+
match before.givers with
68+
| Q.T Zero ->
69+
let computation = Computation.create ~mode:`LIFO () in
70+
let self = T { computation; result = Fun.id } in
71+
let takers = Q.add before.takers self in
72+
let after = { givers = T Zero; takers } in
73+
if Atomic.compare_and_set t before after then begin
74+
wait computation;
75+
Computation.await computation ()
76+
end
77+
else take t (Backoff.once backoff)
78+
| Q.T (One _ as givers) ->
79+
let (G { computation; result; value }) = Q.head givers in
80+
let got = Computation.try_return computation result in
81+
let givers = Q.tail givers in
82+
let takers = before.takers in
83+
let after =
84+
if givers == T Zero && takers == T Zero then empty
85+
else { givers; takers }
86+
in
87+
let no_contention = Atomic.compare_and_set t before after in
88+
if got then value
89+
else take t (if no_contention then backoff else Backoff.once backoff)
90+
91+
(* *)
92+
93+
let rec give_as t (G gr as self) before selfs (Cons head_r as head : _ S.cons)
94+
tail =
95+
let (T tr as taker) = head_r.value in
96+
if Tx.same tr.computation gr.computation then
97+
let selfs = S.cons taker selfs in
98+
give_as_advance t self before selfs head tail
99+
else
100+
let tx = Tx.create () in
101+
let result = tr.result in
102+
let value = gr.value in
103+
if not (Tx.try_return tx tr.computation (fun () -> result value)) then
104+
give_as_advance t self before selfs head tail
105+
else if
106+
(not (Tx.try_return tx gr.computation gr.result))
107+
|| not (Tx.try_commit tx)
108+
then
109+
if not (Computation.is_running gr.computation) then ( (* TODO *) )
110+
else if Computation.is_running tr.computation then
111+
give_as t self before selfs head tail
112+
else give_as_advance t self before selfs head tail
113+
else
114+
let takers =
115+
if head == tail then Q.reverse_as_queue selfs
116+
else
117+
let head = S.reverse_to (S.as_cons head_r.next) selfs in
118+
Q.T (One { head; tail; cons = tail })
119+
in
120+
let givers = before.givers in
121+
let after =
122+
if takers == Q.T Zero && givers == Q.T Zero then empty
123+
else { givers; takers }
124+
in
125+
if not (Atomic.compare_and_set t before after) then
126+
( (* TODO: avoid leak *) )
127+
128+
and give_as_advance t self before selfs (Cons head_r as head : _ S.cons) tail =
129+
if head != tail then give_as t self before selfs (S.as_cons head_r.next) tail
130+
else
131+
let takers = Q.reverse_as_queue selfs in
132+
let givers = Q.add before.givers self in
133+
let after = { givers; takers } in
134+
if not (Atomic.compare_and_set t before after) then give_as_start t self
135+
136+
and give_as_start t self =
137+
let before = Atomic.get t in
138+
match before.takers with
139+
| Q.T Zero ->
140+
let takers = Q.T Zero in
141+
let givers = Q.singleton self in
142+
let after = { givers; takers } in
143+
if not (Atomic.compare_and_set t before after) then give_as_start t self
144+
| Q.T (One r as o) ->
145+
Q.exec o;
146+
give_as t self before (T Nil) r.head r.cons
147+
148+
let give_evt t value =
149+
let request computation result =
150+
give_as_start t (G { computation; result; value })
151+
in
152+
Event.from_request { request }
153+
154+
(* *)
155+
156+
let rec take_as t (T tr as self) before selfs (Cons head_r as head : _ S.cons)
157+
tail =
158+
let (G gr as giver) = head_r.value in
159+
if Tx.same tr.computation gr.computation then
160+
let selfs = S.cons giver selfs in
161+
take_as_advance t self before selfs head tail
162+
else
163+
let tx = Tx.create () in
164+
let result = tr.result in
165+
let value = gr.value in
166+
if not (Tx.try_return tx gr.computation gr.result) then
167+
take_as_advance t self before selfs head tail
168+
else if
169+
(not (Tx.try_return tx tr.computation (fun () -> result value)))
170+
|| not (Tx.try_commit tx)
171+
then
172+
if not (Computation.is_running gr.computation) then ( (* TODO *) )
173+
else if Computation.is_running tr.computation then
174+
take_as t self before selfs head tail
175+
else take_as_advance t self before selfs head tail
176+
else
177+
let takers = before.takers in
178+
let givers =
179+
if head == tail then Q.reverse_as_queue selfs
180+
else
181+
let head = S.reverse_to (S.as_cons head_r.next) selfs in
182+
Q.T (One { head; tail; cons = tail })
183+
in
184+
let after =
185+
if takers == Q.T Zero && givers == Q.T Zero then empty
186+
else { givers; takers }
187+
in
188+
if not (Atomic.compare_and_set t before after) then
189+
( (* TODO: avoid leak *) )
190+
191+
and take_as_advance t self before selfs (Cons head_r as head : _ S.cons) tail =
192+
if head != tail then take_as t self before selfs (S.as_cons head_r.next) tail
193+
else
194+
let givers = Q.reverse_as_queue selfs in
195+
let takers = Q.add before.takers self in
196+
let after = { givers; takers } in
197+
if not (Atomic.compare_and_set t before after) then take_as_start t self
198+
199+
and take_as_start t self =
200+
let before = Atomic.get t in
201+
match before.givers with
202+
| Q.T Zero ->
203+
let givers = Q.T Zero in
204+
let takers = Q.singleton self in
205+
let after = { givers; takers } in
206+
if not (Atomic.compare_and_set t before after) then take_as_start t self
207+
| Q.T (One r as o) ->
208+
Q.exec o;
209+
take_as t self before (T Nil) r.head r.cons
210+
211+
let take_evt t =
212+
let request computation result =
213+
take_as_start t (T { computation; result })
214+
in
215+
Event.from_request { request }
216+
217+
(* *)
218+
219+
let[@inline] take t = take t Backoff.default
220+
let[@inline] give t value = give t value Backoff.default

lib/picos_std.sync/condition.ml

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ let broadcast (t : t) =
99
if Atomic.get t != T Zero then
1010
match Atomic.exchange t (T Zero) with
1111
| T Zero -> ()
12-
| T (One _ as q) -> Q.iter q Trigger.signal
12+
| T (One _ as q) -> Q.iter Trigger.signal q
1313

1414
(* We try to avoid starvation of signal by making it so that when, at the start
1515
of signal or wait, the head is empty, the tail is reversed into the head.
@@ -38,26 +38,28 @@ let rec cleanup backoff trigger (t : t) =
3838
else if not (Atomic.compare_and_set t before after) then
3939
cleanup (Backoff.once backoff) trigger t
4040

41-
let rec wait (t : t) mutex trigger fiber backoff =
41+
let rec wait (t : t) mutex cons fiber backoff =
4242
let before = Atomic.get t in
43-
let after = Q.add before trigger in
43+
let after = Q.add_cons before cons in
4444
if Atomic.compare_and_set t before after then begin
4545
Mutex.unlock_as (Fiber.Maybe.of_fiber fiber) mutex Backoff.default;
46+
let trigger = S.value cons in
4647
let result = Trigger.await trigger in
4748
let forbid = Fiber.exchange fiber ~forbid:true in
48-
Mutex.lock_as (Fiber.Maybe.of_fiber fiber) mutex Nothing Backoff.default;
49+
Mutex.lock_as (Fiber.Maybe.of_fiber fiber) mutex (T Nil) Backoff.default;
4950
Fiber.set fiber ~forbid;
5051
match result with
5152
| None -> ()
5253
| Some (exn, bt) ->
5354
cleanup Backoff.default trigger t;
5455
Printexc.raise_with_backtrace exn bt
5556
end
56-
else wait t mutex trigger fiber (Backoff.once backoff)
57+
else wait t mutex cons fiber (Backoff.once backoff)
5758

5859
let wait t mutex =
5960
let fiber = Fiber.current () in
6061
let trigger = Trigger.create () in
61-
wait t mutex trigger fiber Backoff.default
62+
let cons = S.Cons { value = trigger; next = T Nil } in
63+
wait t mutex cons fiber Backoff.default
6264

6365
let[@inline] signal t = signal t Backoff.default

0 commit comments

Comments
 (0)