diff --git a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp index 3d25e786cb..0efeb14019 100644 --- a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp @@ -68,6 +68,22 @@ struct observe_on , destination(std::move(d)) { } + + void finish(std::unique_lock& guard, typename mode::type end) const { + if (!guard.owns_lock()) { + abort(); + } + if (current == mode::Errored || current == mode::Disposed) {return;} + current = end; + queue_type fill_expired; + swap(fill_expired, fill_queue); + queue_type drain_expired; + swap(drain_expired, drain_queue); + RXCPP_UNWIND_AUTO([&](){guard.lock();}); + guard.unlock(); + lifetime.unsubscribe(); + destination.unsubscribe(); + } void ensure_processing(std::unique_lock& guard) const { if (!guard.owns_lock()) { @@ -75,42 +91,43 @@ struct observe_on } if (current == mode::Empty) { current = mode::Processing; + + if (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty()) { + finish(guard, mode::Disposed); + } auto keepAlive = this->shared_from_this(); - + auto drain = [keepAlive, this](const rxsc::schedulable& self){ using std::swap; try { - if (drain_queue.empty() || !destination.is_subscribed()) { - std::unique_lock guard(lock); - if (!destination.is_subscribed() || - (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) { - current = mode::Disposed; - queue_type expired; - swap(expired, fill_queue); - guard.unlock(); - lifetime.unsubscribe(); - destination.unsubscribe(); - return; - } - if (drain_queue.empty()) { - if (fill_queue.empty()) { - current = mode::Empty; + for (;;) { + if (drain_queue.empty() || !destination.is_subscribed()) { + std::unique_lock guard(lock); + if (!destination.is_subscribed() || + (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) { + finish(guard, mode::Disposed); return; } - swap(fill_queue, drain_queue); + if (drain_queue.empty()) { + if (fill_queue.empty()) { + current = mode::Empty; + return; + } + swap(fill_queue, drain_queue); + } } + auto notification = std::move(drain_queue.front()); + drain_queue.pop_front(); + notification->accept(destination); + std::unique_lock guard(lock); + self(); + if (lifetime.is_subscribed()) break; } - auto notification = std::move(drain_queue.front()); - drain_queue.pop_front(); - notification->accept(destination); - self(); } catch(...) { destination.on_error(std::current_exception()); std::unique_lock guard(lock); - current = mode::Errored; - queue_type expired; - swap(expired, fill_queue); + finish(guard, mode::Errored); } }; @@ -118,15 +135,12 @@ struct observe_on [&](){return coordinator.act(drain);}, destination); if (selectedDrain.empty()) { - current = mode::Errored; - using std::swap; - queue_type expired; - swap(expired, fill_queue); + finish(guard, mode::Errored); return; } auto processor = coordinator.get_worker(); - + RXCPP_UNWIND_AUTO([&](){guard.lock();}); guard.unlock(); @@ -143,16 +157,19 @@ struct observe_on void on_next(source_value_type v) const { std::unique_lock guard(state->lock); + if (state->current == mode::Errored || state->current == mode::Disposed) { return; } state->fill_queue.push_back(notification_type::on_next(std::move(v))); state->ensure_processing(guard); } void on_error(std::exception_ptr e) const { std::unique_lock guard(state->lock); + if (state->current == mode::Errored || state->current == mode::Disposed) { return; } state->fill_queue.push_back(notification_type::on_error(e)); state->ensure_processing(guard); } void on_completed() const { std::unique_lock guard(state->lock); + if (state->current == mode::Errored || state->current == mode::Disposed) { return; } state->fill_queue.push_back(notification_type::on_completed()); state->ensure_processing(guard); } @@ -163,7 +180,7 @@ struct observe_on this_type o(d, std::move(coor), cs); auto keepAlive = o.state; - cs.add([keepAlive](){ + cs.add([=](){ std::unique_lock guard(keepAlive->lock); keepAlive->ensure_processing(guard); }); @@ -262,6 +279,11 @@ class observe_on_one_worker : public coordination_base } }; +inline observe_on_one_worker observe_on_run_loop(const rxsc::run_loop& rl) { + static observe_on_one_worker r(rxsc::make_run_loop(rl)); + return r; +} + inline observe_on_one_worker observe_on_event_loop() { static observe_on_one_worker r(rxsc::make_event_loop()); return r; diff --git a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp index 332d2da2b4..438f4610fe 100644 --- a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp @@ -53,12 +53,16 @@ struct repeat : public operator_base } composite_subscription source_lifetime; output_type out; + composite_subscription::weak_subscription lifetime_token; void do_subscribe() { auto state = this->shared_from_this(); + + state->out.remove(state->lifetime_token); + state->source_lifetime.unsubscribe(); state->source_lifetime = composite_subscription(); - state->out.add(state->source_lifetime); + state->lifetime_token = state->out.add(state->source_lifetime); state->source.subscribe( state->out, diff --git a/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp index 17fe35a629..fb38902e20 100644 --- a/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp @@ -52,14 +52,12 @@ struct subscribe_on : public operator_base : public std::enable_shared_from_this , public subscribe_on_values { - subscribe_on_state_type(const subscribe_on_values& i, coordinator_type coor, const output_type& oarg) + subscribe_on_state_type(const subscribe_on_values& i, const output_type& oarg) : subscribe_on_values(i) - , coordinator(std::move(coor)) , out(oarg) { } composite_subscription source_lifetime; - coordinator_type coordinator; output_type out; private: subscribe_on_state_type& operator=(subscribe_on_state_type o) RXCPP_DELETE; @@ -72,33 +70,39 @@ struct subscribe_on : public operator_base auto controller = coordinator.get_worker(); // take a copy of the values for each subscription - auto state = std::make_shared(initial, std::move(coordinator), std::move(s)); + auto state = std::make_shared(initial, std::move(s)); + + auto sl = state->source_lifetime; + auto ol = state->out.get_subscription(); auto disposer = [=](const rxsc::schedulable&){ - state->source_lifetime.unsubscribe(); - state->out.unsubscribe(); + sl.unsubscribe(); + ol.unsubscribe(); coordinator_lifetime.unsubscribe(); }; auto selectedDisposer = on_exception( - [&](){return state->coordinator.act(disposer);}, + [&](){return coordinator.act(disposer);}, state->out); if (selectedDisposer.empty()) { return; } - - state->out.add([=](){ - controller.schedule(selectedDisposer.get()); - }); + state->source_lifetime.add([=](){ controller.schedule(selectedDisposer.get()); }); + state->out.add([=](){ + sl.unsubscribe(); + ol.unsubscribe(); + coordinator_lifetime.unsubscribe(); + }); + auto producer = [=](const rxsc::schedulable&){ state->source.subscribe(state->source_lifetime, state->out); }; auto selectedProducer = on_exception( - [&](){return state->coordinator.act(producer);}, + [&](){return coordinator.act(producer);}, state->out); if (selectedProducer.empty()) { return; diff --git a/Rx/v2/src/rxcpp/rx-scheduler.hpp b/Rx/v2/src/rxcpp/rx-scheduler.hpp index b84277523b..a9e029fd18 100644 --- a/Rx/v2/src/rxcpp/rx-scheduler.hpp +++ b/Rx/v2/src/rxcpp/rx-scheduler.hpp @@ -329,7 +329,7 @@ inline bool operator==(const worker& lhs, const worker& rhs) { inline bool operator!=(const worker& lhs, const worker& rhs) { return !(lhs == rhs); } - + class weak_worker { detail::worker_interface_weak_ptr inner; @@ -344,7 +344,7 @@ class weak_worker , lifetime(owner.lifetime) { } - + worker lock() const { return worker(lifetime, inner.lock()); } @@ -419,6 +419,9 @@ inline scheduler make_scheduler(ArgN&&... an) { return scheduler(std::static_pointer_cast(std::make_shared(std::forward(an)...))); } +inline scheduler make_scheduler(std::shared_ptr si) { + return scheduler(si); +} class schedulable : public schedulable_base { @@ -912,6 +915,7 @@ namespace rxsc=schedulers; } #include "schedulers/rx-currentthread.hpp" +#include "schedulers/rx-runloop.hpp" #include "schedulers/rx-newthread.hpp" #include "schedulers/rx-eventloop.hpp" #include "schedulers/rx-immediate.hpp" diff --git a/Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp b/Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp new file mode 100644 index 0000000000..db3c0ea5f2 --- /dev/null +++ b/Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp @@ -0,0 +1,202 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_RX_SCHEDULER_RUN_LOOP_HPP) +#define RXCPP_RX_SCHEDULER_RUN_LOOP_HPP + +#include "../rx-includes.hpp" + +namespace rxcpp { + +namespace schedulers { + +namespace detail { + +struct run_loop_state : public std::enable_shared_from_this +{ + typedef scheduler::clock_type clock_type; + + typedef detail::schedulable_queue< + clock_type::time_point> queue_item_time; + + typedef queue_item_time::item_type item_type; + typedef queue_item_time::const_reference const_reference_item_type; + + virtual ~run_loop_state() + { + } + + run_loop_state() + { + } + + composite_subscription lifetime; + mutable std::mutex lock; + mutable queue_item_time q; + recursion r; +}; + +} + + +struct run_loop_scheduler : public scheduler_interface +{ +private: + typedef run_loop_scheduler this_type; + run_loop_scheduler(const this_type&); + + struct run_loop_worker : public worker_interface + { + private: + typedef run_loop_worker this_type; + + run_loop_worker(const this_type&); + + public: + std::weak_ptr state; + + virtual ~run_loop_worker() + { + } + + explicit run_loop_worker(std::weak_ptr ws) + : state(ws) + { + } + + virtual clock_type::time_point now() const { + return clock_type::now(); + } + + virtual void schedule(const schedulable& scbl) const { + schedule(now(), scbl); + } + + virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { + if (scbl.is_subscribed()) { + auto st = state.lock(); + std::unique_lock guard(st->lock); + st->q.push(detail::run_loop_state::item_type(when, scbl)); + st->r.reset(false); + } + } + }; + + std::weak_ptr state; + +public: + explicit run_loop_scheduler(std::weak_ptr ws) + : state(ws) + { + } + virtual ~run_loop_scheduler() + { + } + + virtual clock_type::time_point now() const { + return clock_type::now(); + } + + virtual worker create_worker(composite_subscription cs) const { + auto lifetime = state.lock()->lifetime; + auto token = lifetime.add(cs); + cs.add([=](){lifetime.remove(token);}); + return worker(cs, create_worker_interface()); + } + + std::shared_ptr create_worker_interface() const { + return std::make_shared(state); + } +}; + +class run_loop +{ +private: + typedef run_loop this_type; + // don't allow this instance to copy/move since it owns current_thread queue + // for the thread it is constructed on. + run_loop(const this_type&); + run_loop(this_type&&); + + typedef scheduler::clock_type clock_type; + typedef detail::action_queue queue_type; + + typedef detail::run_loop_state::item_type item_type; + typedef detail::run_loop_state::const_reference_item_type const_reference_item_type; + + std::shared_ptr state; + std::shared_ptr sc; + +public: + run_loop() + : state(std::make_shared()) + , sc(std::make_shared(state)) + { + // take ownership so that the current_thread scheduler + // uses the same queue on this thread + queue_type::ensure(sc->create_worker_interface()); + } + ~run_loop() + { + state->lifetime.unsubscribe(); + + std::unique_lock guard(state->lock); + + // release ownership + queue_type::destroy(); + + auto expired = std::move(state->q); + if (!state->q.empty()) abort(); + } + + clock_type::time_point now() const { + return clock_type::now(); + } + + composite_subscription get_subscription() const { + return state->lifetime; + } + + bool empty() const { + return state->q.empty(); + } + + const_reference_item_type peek() const { + return state->q.top(); + } + + void dispatch() const { + std::unique_lock guard(state->lock); + if (state->q.empty()) { + return; + } + auto& peek = state->q.top(); + if (!peek.what.is_subscribed()) { + state->q.pop(); + return; + } + if (clock_type::now() < peek.when) { + return; + } + auto what = peek.what; + state->q.pop(); + state->r.reset(state->q.empty()); + guard.unlock(); + what(state->r.get_recurse()); + } + + scheduler get_scheduler() const { + return make_scheduler(sc); + } +}; + +inline scheduler make_run_loop(const run_loop& r) { + return r.get_scheduler(); +} + +} + +} + +#endif diff --git a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp index 9c4339cb97..96b721318f 100644 --- a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp +++ b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp @@ -26,6 +26,7 @@ class multicast_observer enum type { Invalid = 0, Casting, + Disposed, Completed, Errored }; @@ -102,6 +103,15 @@ class multicast_observer explicit multicast_observer(composite_subscription cs) : b(std::make_shared(cs)) { + auto keepAlive = b; + b->state->lifetime.add([keepAlive](){ + if (keepAlive->state->current == mode::Casting){ + keepAlive->state->current = mode::Disposed; + keepAlive->current_completer.reset(); + keepAlive->completer.reset(); + ++keepAlive->state->generation; + } + }); } trace_id get_id() const { return b->id; @@ -144,6 +154,13 @@ class multicast_observer return; } break; + case mode::Disposed: + { + guard.unlock(); + o.unsubscribe(); + return; + } + break; default: abort(); } diff --git a/Rx/v2/test/operators/observe_on.cpp b/Rx/v2/test/operators/observe_on.cpp index fd0a0f84ee..ce409bfa6d 100644 --- a/Rx/v2/test/operators/observe_on.cpp +++ b/Rx/v2/test/operators/observe_on.cpp @@ -10,7 +10,7 @@ namespace rxsub=rxcpp::subjects; const int static_onnextcalls = 100000; -SCENARIO("range observed on current_thread", "[hide][range][observe_on_debug][observe_on][long][perf]"){ +SCENARIO("range observed on new_thread", "[hide][range][observe_on_debug][observe_on][long][perf]"){ const int& onnextcalls = static_onnextcalls; GIVEN("a range"){ WHEN("multicasting a million ints"){ @@ -35,6 +35,7 @@ SCENARIO("range observed on current_thread", "[hide][range][observe_on_debug][ob rxs::range(1) .take(onnextcalls) .observe_on(el) + .as_blocking() .subscribe( cs, [c](int){ @@ -43,12 +44,11 @@ SCENARIO("range observed on current_thread", "[hide][range][observe_on_debug][ob [&](){ done = true; }); - while(!done || !disposed); auto expected = onnextcalls; REQUIRE(*c == expected); auto finish = clock::now(); auto msElapsed = duration_cast(finish-start); - std::cout << "range -> observe_on current_thread : " << (*c) << " on_next calls, " << msElapsed.count() << "ms elapsed, int-per-second " << *c / (msElapsed.count() / 1000.0) << std::endl; + std::cout << "range -> observe_on new_thread : " << (*c) << " on_next calls, " << msElapsed.count() << "ms elapsed, int-per-second " << *c / (msElapsed.count() / 1000.0) << std::endl; } } } diff --git a/Rx/v2/test/subjects/subject.cpp b/Rx/v2/test/subjects/subject.cpp index e1e91a0601..921b7d0c69 100644 --- a/Rx/v2/test/subjects/subject.cpp +++ b/Rx/v2/test/subjects/subject.cpp @@ -13,7 +13,7 @@ namespace rxsub=rxcpp::subjects; #include -const int static_onnextcalls = 100000000; +const int static_onnextcalls = 10000000; static int aliased = 0; SCENARIO("for loop locks mutex", "[hide][for][mutex][long][perf]"){ @@ -33,7 +33,7 @@ SCENARIO("for loop locks mutex", "[hide][for][mutex][long][perf]"){ } auto finish = clock::now(); auto msElapsed = duration_cast(finish-start); - std::cout << "loop mutex : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << std::endl; + std::cout << "loop mutex : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; } } @@ -76,7 +76,7 @@ SCENARIO("for loop calls void on_next(int)", "[hide][for][asyncobserver][baselin } auto finish = clock::now(); auto msElapsed = duration_cast(finish-start); - std::cout << "loop void : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << std::endl; + std::cout << "loop void : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; } } @@ -170,7 +170,7 @@ SCENARIO("for loop calls ready on_next(int)", "[hide][for][asyncobserver][ready] chunk(); auto finish = clock::now(); auto msElapsed = duration_cast(finish-start); - std::cout << "loop ready : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << std::endl; + std::cout << "loop ready : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; } } @@ -218,7 +218,7 @@ SCENARIO("for loop calls std::future on_next(int)", "[hide][for][asyncobse } auto finish = clock::now(); auto msElapsed = duration_cast(finish-start); - std::cout << "loop future : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << std::endl; + std::cout << "loop future : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; } } @@ -245,7 +245,7 @@ SCENARIO("for loop calls observer", "[hide][for][observer][perf]"){ o.on_completed(); auto finish = clock::now(); auto msElapsed = duration_cast(finish-start); - std::cout << "loop -> observer : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << std::endl; + std::cout << "loop -> observer : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec"<< std::endl; } } } @@ -271,7 +271,7 @@ SCENARIO("for loop calls subscriber", "[hide][for][subscriber][perf]"){ o.on_completed(); auto finish = clock::now(); auto msElapsed = duration_cast(finish-start); - std::cout << "loop -> subscriber : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << std::endl; + std::cout << "loop -> subscriber : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; } } } @@ -297,7 +297,7 @@ SCENARIO("range calls subscriber", "[hide][range][subscriber][perf]"){ auto finish = clock::now(); auto msElapsed = duration_cast(finish-start); - std::cout << "range -> subscriber : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << std::endl; + std::cout << "range -> subscriber : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; } } } @@ -362,7 +362,7 @@ SCENARIO("for loop calls subject", "[hide][for][subject][subjects][long][perf]") o.on_completed(); auto finish = clock::now(); auto msElapsed = duration_cast(finish-start); - std::cout << "loop -> subject : " << n << " subscribed, " << (*c) << " on_next calls, " << msElapsed.count() << "ms elapsed " << std::endl; + std::cout << "loop -> subject : " << n << " subscribed, " << (*c) << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; } } } @@ -426,7 +426,7 @@ SCENARIO("range calls subject", "[hide][range][subject][subjects][long][perf]"){ .subscribe(o); auto finish = clock::now(); auto msElapsed = duration_cast(finish-start); - std::cout << "range -> subject : " << n << " subscribed, " << (*c) << " on_next calls, " << msElapsed.count() << "ms elapsed " << std::endl; + std::cout << "range -> subject : " << n << " subscribed, " << (*c) << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec"<< std::endl; } } } diff --git a/Rx/v2/test/subscriptions/subscription.cpp b/Rx/v2/test/subscriptions/subscription.cpp index dd74fff793..a8f269d324 100644 --- a/Rx/v2/test/subscriptions/subscription.cpp +++ b/Rx/v2/test/subscriptions/subscription.cpp @@ -21,7 +21,7 @@ SCENARIO("observe subscription", "[hide]"){ } } -static const int static_subscriptions = 100000; +static const int static_subscriptions = 10000; SCENARIO("for loop subscribes to map", "[hide][for][just][subscribe][long][perf]"){ const int& subscriptions = static_subscriptions; diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt index b52265c784..db6dc940ce 100644 --- a/projects/CMake/CMakeLists.txt +++ b/projects/CMake/CMakeLists.txt @@ -85,6 +85,7 @@ set(RX_SOURCES ${RXCPP_DIR}/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/schedulers/rx-immediate.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp + ${RXCPP_DIR}/Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/schedulers/rx-sameworker.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/schedulers/rx-test.hpp ${RXCPP_DIR}/Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp @@ -107,4 +108,4 @@ set(RX_SOURCES source_group("src" FILES ${RX_SOURCES}) add_library(RxCpp SHARED ${RX_SOURCES}) -SET_TARGET_PROPERTIES(RxCpp PROPERTIES LINKER_LANGUAGE CXX) \ No newline at end of file +SET_TARGET_PROPERTIES(RxCpp PROPERTIES LINKER_LANGUAGE CXX)