diff --git a/include/nod/nod.hpp b/include/nod/nod.hpp index 4f0335c..dd8d8d3 100644 --- a/include/nod/nod.hpp +++ b/include/nod/nod.hpp @@ -392,7 +392,7 @@ namespace nod { // Destruct the signal object. ~signal_type() { - invalidate_disconnector(); + invalidate_disconnector_and_wait(); } /// Type that will be used to store the slots for this signal type. @@ -518,10 +518,17 @@ namespace nod { /// Disconnects all slots /// @note This operation invalidates all scoped_connection objects void disconnect_all_slots() { - mutex_lock_type lock{ _mutex }; - _slots.clear(); - _slot_count = 0; - invalidate_disconnector(); + std::weak_ptr weak; + { + mutex_lock_type lock{ _mutex }; + _slots.clear(); + _slot_count = 0; + weak = invalidate_disconnector(); + } + // wait for the destruction of the shared disconnector outside + // of the lock, otherwise a concurrent call to disconnect could + // deadlock + wait_for_disconnector_destruction(weak); } private: @@ -539,7 +546,7 @@ namespace nod { /// This will effectively make all current connection objects to /// to this signal incapable of disconnecting, since they keep a /// weak pointer to the shared disconnector object. - void invalidate_disconnector() { + void invalidate_disconnector_and_wait() { // If we are unlucky, some of the connected slots // might be in the process of disconnecting from other threads. // If this happens, we are risking to destruct the disconnector @@ -550,8 +557,27 @@ namespace nod { // instance. We then stall the destruction until all other weak // pointers have released their "lock" (indicated by the fact that // we will get a nullptr when locking our weak pointer). + wait_for_disconnector_destruction(invalidate_disconnector()); + } + + /// Invalidate the internal disconnector object in a way + /// that is safe according to the current thread policy. + /// + /// This will effectively make all current connection objects to + /// to this signal incapable of disconnecting, since they keep a + /// weak pointer to the shared disconnector object. + /// + /// This returns a weak_ptr to the internal disconnector object, + /// so that the caller may use it to wait until the disconnector + /// has been destroyed. + std::weak_ptr invalidate_disconnector() { std::weak_ptr weak{_shared_disconnector}; _shared_disconnector.reset(); + return weak; + } + + /// Waits for the object referenced by the weak_ptr to be destroyed. + void wait_for_disconnector_destruction(std::weak_ptr weak) { while( weak.lock() != nullptr ) { // we just yield here, allowing the OS to reschedule. We do // this until all threads has released the disconnector object. @@ -591,13 +617,17 @@ namespace nod { /// be disconnected. void disconnect( std::size_t index ) { mutex_lock_type lock( _mutex ); - assert( _slots.size() > index ); - if( _slots[ index ] != nullptr ) { - --_slot_count; - } - _slots[ index ] = slot_type{}; - while( _slots.size()>0 && !_slots.back() ) { - _slots.pop_back(); + // index may have been valid when this function was called, + // but _slots could have been cleared whilst waiting for the lock. + if ( _slots.size() > index ) + { + if( _slots[ index ] != nullptr ) { + --_slot_count; + } + _slots[ index ] = slot_type{}; + while( _slots.size()>0 && !_slots.back() ) { + _slots.pop_back(); + } } } diff --git a/tests/tests/thread_tests.cpp b/tests/tests/thread_tests.cpp new file mode 100644 index 0000000..1566392 --- /dev/null +++ b/tests/tests/thread_tests.cpp @@ -0,0 +1,226 @@ +#include + +#include + +#include +#include +#include +#include +#include + +TEST_CASE("Concurrent disconnection does not deadlock", "[thread_test]") +{ + for (int run = 0; run < 100; run++) + { + INFO("Run #" << run); + + nod::signal sig; + nod::connection conn; + + conn = sig.connect([] {}); + + std::atomic_bool go(false); + std::atomic ready(0); + std::atomic finished(0); + + // Use yield() to keep the threads relatively hot on the CPU + + // t1 performs a disconnect() call from a connection object + std::thread t1([&] { + ready++; + while (!go) + { + std::this_thread::yield(); + } + conn.disconnect(); + finished++; + }); + + // t2 performs a call to disconnect_all_slots() on the signal + std::thread t2([&] { + ready++; + while (!go) + { + std::this_thread::yield(); + } + sig.disconnect_all_slots(); + finished++; + }); + + std::chrono::steady_clock::time_point const startPoint = std::chrono::steady_clock::now(); + + auto waitTimeExpired = [&startPoint] { + return (std::chrono::steady_clock::now() - startPoint) > std::chrono::seconds(1); + }; + + auto threadsReady = [&ready] { return ready == 2; }; + + auto hasFinished = [&finished] { return finished == 2; }; + + // Wait for both threads to be running + while (!waitTimeExpired() && !threadsReady()) + { + std::this_thread::yield(); + } + + REQUIRE(!waitTimeExpired()); + + // Signal that the threads can now execute their logic + go = true; + + // Wait for both threads to complete + while (!waitTimeExpired() && !hasFinished()) + { + std::this_thread::yield(); + } + + REQUIRE(hasFinished()); + + t1.join(); + t2.join(); + } +} + +TEST_CASE("Concurrent disconnection and destruction does not deadlock", "[thread_test]") +{ + for (int run = 0; run < 100; run++) + { + INFO("Run #" << run); + + std::shared_ptr> sig = std::make_shared>(); + nod::connection conn; + + conn = sig->connect([] {}); + + std::atomic_bool go(false); + std::atomic ready(0); + std::atomic finished(0); + + // Use yield() to keep the threads relatively hot on the CPU + + // t1 performs a disconnect() call from a connection object + std::thread t1([&] { + ready++; + while (!go) + { + std::this_thread::yield(); + } + conn.disconnect(); + finished++; + }); + + // t2 performs the destruction of the signal object + std::thread t2([&] { + ready++; + while (!go) + { + std::this_thread::yield(); + } + sig.reset(); + finished++; + }); + + std::chrono::steady_clock::time_point startPoint = std::chrono::steady_clock::now(); + + auto waitTimeExpired = [startPoint = std::chrono::steady_clock::now()] { + return (std::chrono::steady_clock::now() - startPoint) > std::chrono::seconds(1); + }; + + auto threadsReady = [&ready] { return ready == 2; }; + + auto hasFinished = [&finished] { return finished == 2; }; + + // Wait for both threads to be running + while (!waitTimeExpired() && !threadsReady()) + { + std::this_thread::yield(); + } + + REQUIRE(!waitTimeExpired()); + + // Signal that the threads can now execute their logic + go = true; + + // Wait for both threads to complete + while (!waitTimeExpired() && !hasFinished()) + { + std::this_thread::yield(); + } + + REQUIRE(hasFinished()); + + t1.join(); + t2.join(); + } +} + +TEST_CASE("Concurrent disconnection and connection does not deadlock", "[thread_test]") +{ + for (int run = 0; run < 100; run++) + { + INFO("Run #" << run); + + nod::signal sig; + nod::connection conn; + + std::atomic_bool go(false); + std::atomic ready(0); + std::atomic finished(0); + + // Use yield() to keep the threads relatively hot on the CPU + + // t1 performs a connect() call on the signal + std::thread t1([&] { + ready++; + while (!go) + { + std::this_thread::yield(); + } + conn = sig.connect([] {}); + finished++; + }); + + // t2 performs a call to disconnect_all_slots() on the signal + std::thread t2([&] { + ready++; + while (!go) + { + std::this_thread::yield(); + } + sig.disconnect_all_slots(); + finished++; + }); + + auto waitTimeExpired = [startPoint = std::chrono::steady_clock::now()] { + return (std::chrono::steady_clock::now() - startPoint) > std::chrono::seconds(1); + }; + + auto threadsReady = [&ready] { return ready == 2; }; + + auto hasFinished = [&finished] { return finished == 2; }; + + // Wait for both threads to be running + while (!waitTimeExpired() && !threadsReady()) + { + std::this_thread::yield(); + } + + REQUIRE(!waitTimeExpired()); + + // Signal that the threads can now execute their logic + go = true; + + // Wait for both threads to complete + while (!waitTimeExpired() && !hasFinished()) + { + std::this_thread::yield(); + } + + REQUIRE(hasFinished()); + + conn.disconnect(); + + t1.join(); + t2.join(); + } +}