Skip to content

Commit f64ea76

Browse files
author
Rafał Hibner
committed
Merge branch 'BackpressureCombiner' into combined2
2 parents a49f504 + 5ce4c6e commit f64ea76

3 files changed

Lines changed: 40 additions & 46 deletions

File tree

cpp/src/arrow/acero/backpressure.cc

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,13 @@ void BackpressureController::Resume() {
3030
}
3131

3232
BackpressureCombiner::BackpressureCombiner(
33-
std::unique_ptr<BackpressureControl> backpressure_control)
34-
: backpressure_control_(std::move(backpressure_control)) {}
33+
std::unique_ptr<BackpressureControl> backpressure_control, bool pause_on_any)
34+
: pause_on_any_(pause_on_any),
35+
backpressure_control_(std::move(backpressure_control)) {}
3536

3637
// Called from Source nodes
37-
void BackpressureCombiner::Pause(Source* output, bool strong_connection) {
38+
void BackpressureCombiner::Pause(Source* output) {
3839
std::lock_guard<std::mutex> lg(mutex_);
39-
auto& paused_ = strong_connection ? strong_paused_ : weak_paused_;
40-
auto& paused_count_ = strong_connection ? strong_paused_count_ : weak_paused_count_;
41-
4240
if (!paused_[output]) {
4341
paused_[output] = true;
4442
paused_count_++;
@@ -47,25 +45,23 @@ void BackpressureCombiner::Pause(Source* output, bool strong_connection) {
4745
}
4846

4947
// Called from Source nodes
50-
void BackpressureCombiner::Resume(Source* output, bool strong_connection) {
48+
void BackpressureCombiner::Resume(Source* output) {
5149
std::lock_guard<std::mutex> lg(mutex_);
52-
auto& paused_ = strong_connection ? strong_paused_ : weak_paused_;
53-
auto& paused_count_ = strong_connection ? strong_paused_count_ : weak_paused_count_;
5450
if (paused_.find(output) == paused_.end()) {
5551
paused_[output] = false;
5652
UpdatePauseStateUnlocked();
57-
}
58-
if (paused_[output]) {
53+
} else if (paused_[output]) {
5954
paused_[output] = false;
6055
paused_count_--;
6156
UpdatePauseStateUnlocked();
6257
}
6358
}
6459

6560
void BackpressureCombiner::UpdatePauseStateUnlocked() {
66-
bool should_be_paused =
67-
strong_paused_count_ > 0 ||
68-
(weak_paused_count_ > 0 && weak_paused_count_ == weak_paused_.size());
61+
bool should_be_paused = (paused_count_ > 0);
62+
if (!pause_on_any_) {
63+
should_be_paused = should_be_paused && (paused_count_ == paused_.size());
64+
}
6965
if (should_be_paused) {
7066
if (!paused) {
7167
backpressure_control_->Pause();
@@ -79,25 +75,24 @@ void BackpressureCombiner::UpdatePauseStateUnlocked() {
7975
}
8076
}
8177

82-
BackpressureCombiner::Source::Source(BackpressureCombiner* ctrl, bool strong_connection) {
78+
BackpressureCombiner::Source::Source(BackpressureCombiner* ctrl) {
8379
if (ctrl) {
84-
AddController(ctrl, strong_connection);
80+
AddController(ctrl);
8581
}
8682
}
8783

88-
void BackpressureCombiner::Source::AddController(BackpressureCombiner* ctrl,
89-
bool strong_connection) {
90-
ctrl->Resume(this, strong_connection); // populate map in controller
91-
connections_.push_back(Connection{ctrl, strong_connection});
84+
void BackpressureCombiner::Source::AddController(BackpressureCombiner* ctrl) {
85+
ctrl->Resume(this); // populate map in controller
86+
connections_.push_back(ctrl);
9287
}
9388
void BackpressureCombiner::Source::Pause() {
9489
for (auto& conn_ : connections_) {
95-
conn_.ctrl->Pause(this, conn_.strong);
90+
conn_->Pause(this);
9691
}
9792
}
9893
void BackpressureCombiner::Source::Resume() {
9994
for (auto& conn_ : connections_) {
100-
conn_.ctrl->Resume(this, conn_.strong);
95+
conn_->Resume(this);
10196
}
10297
}
10398

cpp/src/arrow/acero/backpressure.h

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -53,40 +53,35 @@ class BackpressureControlWrapper : public BackpressureControl {
5353
// strong Source within controller
5454
class ARROW_ACERO_EXPORT BackpressureCombiner {
5555
public:
56-
explicit BackpressureCombiner(
57-
std::unique_ptr<BackpressureControl> backpressure_control);
56+
explicit BackpressureCombiner(std::unique_ptr<BackpressureControl> backpressure_control,
57+
bool pause_on_any = true);
5858

5959
// Instances of Source can be used as usual BackpresureControl.
6060
// Source can be connected with one or more BackpressureCombiner
6161
class ARROW_ACERO_EXPORT Source : public BackpressureControl {
6262
public:
6363
// strong - strong_connection=true
6464
// weak - strong_connection=false
65-
explicit Source(BackpressureCombiner* ctrl = nullptr, bool strong_connection = true);
66-
void AddController(BackpressureCombiner* ctrl, bool strong_connection = true);
65+
explicit Source(BackpressureCombiner* ctrl = nullptr);
66+
void AddController(BackpressureCombiner* ctrl);
6767
void Pause() override;
6868
void Resume() override;
6969

7070
private:
71-
struct Connection {
72-
BackpressureCombiner* ctrl;
73-
bool strong;
74-
};
75-
std::vector<Connection> connections_;
71+
std::vector<BackpressureCombiner*> connections_;
7672
};
7773

7874
private:
7975
friend class Source;
80-
void Pause(Source* output, bool strong_connection);
81-
void Resume(Source* output, bool strong_connection);
76+
void Pause(Source* output);
77+
void Resume(Source* output);
8278

8379
void UpdatePauseStateUnlocked();
80+
bool pause_on_any_;
8481
std::unique_ptr<BackpressureControl> backpressure_control_;
8582
std::mutex mutex_;
86-
std::unordered_map<Source*, bool> strong_paused_;
87-
std::unordered_map<Source*, bool> weak_paused_;
88-
size_t strong_paused_count_{0};
89-
size_t weak_paused_count_{0};
83+
std::unordered_map<Source*, bool> paused_;
84+
size_t paused_count_{0};
9085
bool paused{false};
9186
};
9287

cpp/src/arrow/acero/backpressure_test.cc

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,17 @@ class MonitorBackpressureControl : public acero::BackpressureControl {
3232

3333
TEST(BackpressureCombiner, Basic) {
3434
std::atomic<bool> paused{false};
35-
BackpressureCombiner combiner(std::make_unique<MonitorBackpressureControl>(paused));
35+
BackpressureCombiner or_combiner(std::make_unique<MonitorBackpressureControl>(paused));
36+
BackpressureCombiner::Source strong_source1(&or_combiner);
37+
BackpressureCombiner::Source strong_source2;
38+
strong_source2.AddController(&or_combiner);
3639

37-
BackpressureCombiner::Source weak_source1(&combiner, false);
40+
BackpressureCombiner and_combiner(
41+
std::make_unique<BackpressureCombiner::Source>(&or_combiner),
42+
/*pause_on_any=*/false);
43+
BackpressureCombiner::Source weak_source1(&and_combiner);
3844
BackpressureCombiner::Source weak_source2;
39-
weak_source2.AddController(&combiner, false);
40-
BackpressureCombiner::Source strong_source1(&combiner);
41-
BackpressureCombiner::Source strong_source2;
42-
strong_source2.AddController(&combiner);
45+
weak_source2.AddController(&and_combiner);
4346

4447
// Any strong causes pause
4548
ASSERT_FALSE(paused);
@@ -102,11 +105,12 @@ TEST(BackpressureCombiner, OnlyStrong) {
102105

103106
TEST(BackpressureCombiner, OnlyWeak) {
104107
std::atomic<bool> paused{false};
105-
BackpressureCombiner combiner(std::make_unique<MonitorBackpressureControl>(paused));
108+
BackpressureCombiner combiner(std::make_unique<MonitorBackpressureControl>(paused),
109+
false);
106110

107-
BackpressureCombiner::Source weak_source1(&combiner, false);
111+
BackpressureCombiner::Source weak_source1(&combiner);
108112
BackpressureCombiner::Source weak_source2;
109-
weak_source2.AddController(&combiner, false);
113+
weak_source2.AddController(&combiner);
110114

111115
// All weak cause pause
112116
ASSERT_FALSE(paused);

0 commit comments

Comments
 (0)