11
11
#include " reactor-cpp/assert.hh"
12
12
#include " reactor-cpp/dependency_graph.hh"
13
13
#include " reactor-cpp/logging.hh"
14
+ #include " reactor-cpp/reaction.hh"
14
15
#include " reactor-cpp/scheduler.hh"
15
16
16
17
#include < algorithm>
17
18
#include < atomic>
19
+ #include < cstddef>
18
20
#include < iterator>
19
21
#include < vector>
20
22
@@ -52,7 +54,7 @@ GroupedSchedulingPolicy::GroupedSchedulingPolicy(Scheduler<GroupedSchedulingPoli
52
54
: scheduler_(scheduler)
53
55
, environment_(env) {}
54
56
55
- void GroupedSchedulingPolicy::init () {
57
+ void GroupedSchedulingPolicy::init () { // NOLINT
56
58
ReactionDependencyGraph graph{environment_.top_level_reactors ()};
57
59
ReactionDependencyGraph reduced_graph = graph.transitive_reduction ();
58
60
GroupedDependencyGraph grouped_graph{reduced_graph};
@@ -64,11 +66,13 @@ void GroupedSchedulingPolicy::init() {
64
66
auto g = reduced_grouped_graph.get_graph ();
65
67
66
68
std::map<GroupedDependencyGraph::GroupGraph::vertex_descriptor, std::shared_ptr<ReactionGroup>> vertex_to_group;
69
+ std::vector<std::shared_ptr<ReactionGroup>> all_groups;
67
70
68
71
// create a reaction group for each vertex and keep track of all groups without dependencies in initial_groups_
69
72
for (auto * vertex : boost::make_iterator_range (vertices (g))) {
70
73
auto group = std::make_shared<ReactionGroup>();
71
74
vertex_to_group[vertex] = group;
75
+ all_groups.emplace_back (group);
72
76
73
77
if (boost::in_degree (vertex, g) == 0 ) {
74
78
initial_groups_.push_back (group.get ());
@@ -108,12 +112,69 @@ void GroupedSchedulingPolicy::init() {
108
112
num_groups_ = vertex_to_group.size ();
109
113
group_queue_.init (std::max (num_groups_, environment_.num_workers () + 1 ));
110
114
111
- log ::Debug () << " Identified reaction groups: " ;
112
115
for (const auto & [_, group] : vertex_to_group) {
113
- log ::Debug () << " * Group " << group->id << ' :' ;
114
- log ::Debug () << " + reactions:" ;
115
- for (auto [_, reaction] : group->reactions ) {
116
- log ::Debug () << " - " << reaction->fqn ();
116
+ auto & successors = group->successors ;
117
+ std::set<ReactionGroup*> in_super_group;
118
+ if (successors.size () > 1 ) {
119
+ for (auto * successor_a : successors) {
120
+ if (in_super_group.count (successor_a) == 0 && successor_a->num_predecessors <= 1 ) {
121
+ std::vector<ReactionGroup*> same_successors;
122
+ std::copy_if (successors.begin (), successors.end (), std::back_insert_iterator (same_successors),
123
+ [successor_a](ReactionGroup* successor_b) {
124
+ return successor_b->num_predecessors <= 1 &&
125
+ successor_a->successors == successor_b->successors ;
126
+ });
127
+ if (same_successors.size () > 1 ) {
128
+ auto super_group = std::make_shared<ReactionGroup>();
129
+ all_groups.emplace_back (super_group);
130
+
131
+ super_group->id = id_counter++;
132
+ super_group->successors = successor_a->successors ;
133
+ super_group->num_predecessors = successor_a->num_predecessors ;
134
+ super_group->waiting_for .store (successor_a->num_predecessors , std::memory_order_release);
135
+ super_group->triggered_sub_groups .resize (same_successors.size ());
136
+
137
+ for (auto * sub_group : same_successors) {
138
+ super_group->sub_groups .emplace_back (sub_group);
139
+ sub_group->super_group = super_group;
140
+
141
+ // remove sub_group from the successor list of our starting group
142
+ auto it = std::find (group->successors .begin (), group->successors .end (), sub_group);
143
+ reactor_assert (it != group->successors .end ());
144
+ group->successors .erase (it);
145
+ }
146
+ // add the newly creates super_group as a successor to the starting group
147
+ group->successors .emplace_back (super_group.get ());
148
+
149
+ log ::Debug () << " Super Group for Group " << successor_a->id << ' :' ;
150
+ for (auto * elem : same_successors) {
151
+ in_super_group.insert (elem);
152
+ log ::Debug () << " - Group " << elem->id ;
153
+ }
154
+ }
155
+ }
156
+ }
157
+ }
158
+ }
159
+
160
+ log ::Debug () << " Identified reaction groups: " ;
161
+ for (const auto & group : all_groups) {
162
+ if (group->sub_groups .empty ()) {
163
+ log ::Debug () << " * Group " << group->id << ' :' ;
164
+ log ::Debug () << " + reactions:" ;
165
+ for (auto [_, reaction] : group->reactions ) {
166
+ log ::Debug () << " - " << reaction->fqn ();
167
+ }
168
+ if (group->super_group != nullptr ) {
169
+ log ::Debug () << " + super group: " << group->super_group ->id ;
170
+ }
171
+ } else {
172
+ log ::Debug () << " * Super Group " << group->id << ' :' ;
173
+ reactor_assert (group->reactions .empty ());
174
+ log ::Debug () << " + sub groups:" ;
175
+ for (auto * sub_group : group->sub_groups ) {
176
+ log ::Debug () << " - " << sub_group->id ;
177
+ }
117
178
}
118
179
log ::Debug () << " + successors:" ;
119
180
for (const auto * successor : group->successors ) {
@@ -146,17 +207,53 @@ auto GroupedSchedulingPolicy::finalize_group_and_notify_successors(ReactionGroup
146
207
return 1 == groups_to_process_.fetch_sub (1 , std::memory_order_acq_rel);
147
208
}
148
209
210
+ void GroupedSchedulingPolicy::notify_super_group (ReactionGroup* group, std::vector<ReactionGroup*>& out_ready_groups) {
211
+ // the group is a super group with triggered sub groups
212
+ // -> extract all the triggered subgroups
213
+ std::atomic_thread_fence (std::memory_order_release);
214
+ auto num_triggered = group->triggered_sub_groups_write_pos .load (std::memory_order_relaxed);
215
+ for (std::size_t i{0 }; i < num_triggered; i++) {
216
+ out_ready_groups.emplace_back (group->triggered_sub_groups [i]);
217
+ group->triggered_sub_groups [i]->triggered .store (false , std::memory_order_relaxed);
218
+ }
219
+ group->waiting_for .store (group->num_predecessors , std::memory_order_relaxed);
220
+ group->triggered_sub_groups_write_pos .store (0 , std::memory_order_relaxed);
221
+
222
+ // we do not need to process the untriggered subgroups and can directly decrement the counter
223
+ const auto num_untriggered = group->sub_groups .size () - num_triggered;
224
+ if (num_untriggered > 0 ) {
225
+ groups_to_process_.fetch_sub (num_untriggered, std::memory_order_acq_rel);
226
+ }
227
+
228
+ // update successor if there is any
229
+ if (!group->successors .empty ()) {
230
+ reactor_assert (group->successors .size () == 1 );
231
+ reactor_assert (group->successors [0 ]->num_predecessors == 1 );
232
+ reactor_assert (group->successors [0 ]->sub_groups .empty ());
233
+
234
+ group->successors [0 ]->waiting_for .fetch_sub (num_untriggered, std::memory_order_acq_rel);
235
+ // if none of the groups was triggered, then we can directly notify the successor
236
+ if (num_triggered == 0 ) {
237
+ notify_groups (group->successors , out_ready_groups);
238
+ }
239
+ }
240
+ }
241
+
149
242
void GroupedSchedulingPolicy::notify_groups (const std::vector<ReactionGroup*>& groups,
150
243
std::vector<ReactionGroup*>& out_ready_groups) {
151
244
for (auto * group : groups) {
152
245
// decrement the waiting for counter
153
246
auto old = group->waiting_for .fetch_sub (1 , std::memory_order_relaxed);
247
+
154
248
// If the old value was 1 (or 0), then all dependencies are fulfilled and the group is ready for execution
155
249
if (old <= 1 ) {
156
250
// If the group was triggered, then add it to the ready queue. Otherwise, we skip the group and check its
157
251
// successors.
252
+ log ::Debug () << " Group " << group->id << " is ready" ;
158
253
if (group->triggered .exchange (false , std::memory_order_relaxed)) {
159
254
out_ready_groups.emplace_back (group);
255
+ } else if (!group->sub_groups .empty ()) {
256
+ notify_super_group (group, out_ready_groups);
160
257
} else {
161
258
finalize_group_and_notify_successors (group, out_ready_groups);
162
259
}
@@ -241,7 +338,15 @@ void GroupedSchedulingPolicy::trigger_reaction(Reaction* reaction) {
241
338
log ::Debug () << " (GroupedSchedulingPolicy) trigger reaction " << reaction->fqn () << " in Group " << group->id ;
242
339
auto & triggered_reaction_pair = group->reactions [reaction->index ()];
243
340
triggered_reaction_pair.first = true ;
244
- group->triggered .store (true , std::memory_order_release);
341
+ std::atomic_thread_fence (std::memory_order_release);
342
+ bool old = group->triggered .exchange (true , std::memory_order_relaxed);
343
+ // Also notify the super group if there is one and if we did not notify it yet.
344
+ if (!old && group->super_group != nullptr ) {
345
+ log ::Debug () << " (GroupedSchedulingPolicy) trigger group " << group->id << " in super group "
346
+ << group->super_group ->id ;
347
+ auto pos = group->super_group ->triggered_sub_groups_write_pos .fetch_add (1 , std::memory_order_relaxed);
348
+ group->super_group ->triggered_sub_groups [pos] = group.get ();
349
+ }
245
350
}
246
351
247
352
void GroupedSchedulingPolicy::process_group (const Worker<GroupedSchedulingPolicy>& worker, ReactionGroup* group) {
0 commit comments