Skip to content

Commit

Permalink
fix window_with_time_or_count
Browse files Browse the repository at this point in the history
reported in #277
  • Loading branch information
Kirk Shoop committed Nov 25, 2016
1 parent 4ab756b commit 8290f92
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 2 deletions.
4 changes: 2 additions & 2 deletions Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ struct window_with_time_or_count

void on_next(T v) const {
auto localState = state;
auto work = [v, localState](const rxsc::schedulable&){
auto work = [v, localState](const rxsc::schedulable& self){
localState->subj.get_subscriber().on_next(v);
if (++localState->cursor == localState->count) {
release_window(localState->subj_id, localState->worker.now(), localState);
release_window(localState->subj_id, localState->worker.now(), localState)(self);
}
};
auto selectedWork = on_exception(
Expand Down
50 changes: 50 additions & 0 deletions Rx/v2/test/operators/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1172,3 +1172,53 @@ SCENARIO("buffer with time or count, only time triggered", "[buffer_with_time_or
}
}
}

SCENARIO("buffer with time or count, only count triggered", "[buffer_with_time_or_count][operators]"){
GIVEN("1 hot observable of ints."){
auto sc = rxsc::make_test();
auto so = rx::synchronize_in_one_worker(sc);
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
const rxsc::test::messages<std::vector<int>> v_on;

auto xs = sc.make_hot_observable({
on.next(205, 1),
on.next(305, 2),
on.next(505, 3),
on.next(605, 4),
on.next(610, 5),
on.completed(850)
});
WHEN("group ints on intervals"){
using namespace std::chrono;

auto res = w.start(
[&]() {
return xs
.buffer_with_time_or_count(milliseconds(370), 2, so)
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();
}
);

THEN("the output contains groups of ints"){
auto required = rxu::to_vector({
v_on.next(306, rxu::to_vector({ 1, 2 })),
v_on.next(606, rxu::to_vector({ 3, 4 })),
v_on.next(851, rxu::to_vector({ 5 })),
v_on.completed(851)
});
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}

THEN("there was one subscription and one unsubscription to the xs"){
auto required = rxu::to_vector({
on.subscribe(200, 850)
});
auto actual = xs.subscriptions();
REQUIRE(required == actual);
}
}
}
}
57 changes: 57 additions & 0 deletions Rx/v2/test/operators/window.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "../test.h"

#include <rxcpp/operators/rx-reduce.hpp>

SCENARIO("window count, basic", "[window][operators]"){
GIVEN("1 hot observable of ints."){
auto sc = rxsc::make_test();
Expand Down Expand Up @@ -979,3 +981,58 @@ SCENARIO("window with time or count, only time triggered", "[window_with_time_or
}
}
}

SCENARIO("window with time or count, only count triggered", "[window_with_time_or_count][operators]"){
GIVEN("1 hot observable of ints."){
auto sc = rxsc::make_test();
auto so = rx::synchronize_in_one_worker(sc);
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
const rxsc::test::messages<rx::observable<int>> o_on;

auto xs = sc.make_hot_observable({
on.next(205, 1),
on.next(305, 2),
on.next(505, 3),
on.next(605, 4),
on.next(610, 5),
on.completed(850)
});

WHEN("group each int with the next 2 ints"){
using namespace std::chrono;

auto res = w.start(
[&]() {
return xs
.window_with_time_or_count(milliseconds(370), 2, so)
.map([](rx::observable<int> w){
return w.count();
})
.merge()
// forget type to workaround lambda deduction bug on msvc 2013
.as_dynamic();
}
);

THEN("the output contains merged groups of ints"){
auto required = rxu::to_vector({
on.next(306, 2),
on.next(606, 2),
on.next(851, 1),
on.completed(851)
});
auto actual = res.get_observer().messages();
REQUIRE(required == actual);
}

THEN("there was one subscription and one unsubscription to the observable"){
auto required = rxu::to_vector({
o_on.subscribe(200, 850)
});
auto actual = xs.subscriptions();
REQUIRE(required == actual);
}
}
}
}

0 comments on commit 8290f92

Please sign in to comment.