Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 43 additions & 13 deletions include/nod/nod.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ namespace nod {

// Destruct the signal object.
~signal_type() {
invalidate_disconnector();
invalidate_disconnector_and_wait();
}

/// Type that will be used to store the slots for this signal type.
Expand Down Expand Up @@ -518,10 +518,17 @@ namespace nod {
/// Disconnects all slots
/// @note This operation invalidates all scoped_connection objects
void disconnect_all_slots() {
mutex_lock_type lock{ _mutex };
_slots.clear();
_slot_count = 0;
invalidate_disconnector();
std::weak_ptr<detail::disconnector> weak;
{
mutex_lock_type lock{ _mutex };
_slots.clear();
_slot_count = 0;
weak = invalidate_disconnector();
}
// wait for the destruction of the shared disconnector outside
// of the lock, otherwise a concurrent call to disconnect could
// deadlock
wait_for_disconnector_destruction(weak);
}

private:
Expand All @@ -539,7 +546,7 @@ namespace nod {
/// This will effectively make all current connection objects to
/// to this signal incapable of disconnecting, since they keep a
/// weak pointer to the shared disconnector object.
void invalidate_disconnector() {
void invalidate_disconnector_and_wait() {
// If we are unlucky, some of the connected slots
// might be in the process of disconnecting from other threads.
// If this happens, we are risking to destruct the disconnector
Expand All @@ -550,8 +557,27 @@ namespace nod {
// instance. We then stall the destruction until all other weak
// pointers have released their "lock" (indicated by the fact that
// we will get a nullptr when locking our weak pointer).
wait_for_disconnector_destruction(invalidate_disconnector());
}

/// Invalidate the internal disconnector object in a way
/// that is safe according to the current thread policy.
///
/// This will effectively make all current connection objects to
/// to this signal incapable of disconnecting, since they keep a
/// weak pointer to the shared disconnector object.
///
/// This returns a weak_ptr to the internal disconnector object,
/// so that the caller may use it to wait until the disconnector
/// has been destroyed.
std::weak_ptr<detail::disconnector> invalidate_disconnector() {
std::weak_ptr<detail::disconnector> weak{_shared_disconnector};
_shared_disconnector.reset();
return weak;
}

/// Waits for the object referenced by the weak_ptr to be destroyed.
void wait_for_disconnector_destruction(std::weak_ptr<detail::disconnector> weak) {
while( weak.lock() != nullptr ) {
// we just yield here, allowing the OS to reschedule. We do
// this until all threads has released the disconnector object.
Expand Down Expand Up @@ -591,13 +617,17 @@ namespace nod {
/// be disconnected.
void disconnect( std::size_t index ) {
mutex_lock_type lock( _mutex );
assert( _slots.size() > index );
if( _slots[ index ] != nullptr ) {
--_slot_count;
}
_slots[ index ] = slot_type{};
while( _slots.size()>0 && !_slots.back() ) {
_slots.pop_back();
// index may have been valid when this function was called,
// but _slots could have been cleared whilst waiting for the lock.
if ( _slots.size() > index )
{
if( _slots[ index ] != nullptr ) {
--_slot_count;
}
_slots[ index ] = slot_type{};
while( _slots.size()>0 && !_slots.back() ) {
_slots.pop_back();
}
}
}

Expand Down
226 changes: 226 additions & 0 deletions tests/tests/thread_tests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
#include <catch.hpp>

#include <nod/nod.hpp>

#include <atomic>
#include <chrono>
#include <iostream>
#include <memory>
#include <thread>

TEST_CASE("Concurrent disconnection does not deadlock", "[thread_test]")
{
for (int run = 0; run < 100; run++)
{
INFO("Run #" << run);

nod::signal<void()> sig;
nod::connection conn;

conn = sig.connect([] {});

std::atomic_bool go(false);
std::atomic<int> ready(0);
std::atomic<int> finished(0);

// Use yield() to keep the threads relatively hot on the CPU

// t1 performs a disconnect() call from a connection object
std::thread t1([&] {
ready++;
while (!go)
{
std::this_thread::yield();
}
conn.disconnect();
finished++;
});

// t2 performs a call to disconnect_all_slots() on the signal
std::thread t2([&] {
ready++;
while (!go)
{
std::this_thread::yield();
}
sig.disconnect_all_slots();
finished++;
});

std::chrono::steady_clock::time_point const startPoint = std::chrono::steady_clock::now();

auto waitTimeExpired = [&startPoint] {
return (std::chrono::steady_clock::now() - startPoint) > std::chrono::seconds(1);
};

auto threadsReady = [&ready] { return ready == 2; };

auto hasFinished = [&finished] { return finished == 2; };

// Wait for both threads to be running
while (!waitTimeExpired() && !threadsReady())
{
std::this_thread::yield();
}

REQUIRE(!waitTimeExpired());

// Signal that the threads can now execute their logic
go = true;

// Wait for both threads to complete
while (!waitTimeExpired() && !hasFinished())
{
std::this_thread::yield();
}

REQUIRE(hasFinished());

t1.join();
t2.join();
}
}

TEST_CASE("Concurrent disconnection and destruction does not deadlock", "[thread_test]")
{
for (int run = 0; run < 100; run++)
{
INFO("Run #" << run);

std::shared_ptr<nod::signal<void()>> sig = std::make_shared<nod::signal<void()>>();
nod::connection conn;

conn = sig->connect([] {});

std::atomic_bool go(false);
std::atomic<int> ready(0);
std::atomic<int> finished(0);

// Use yield() to keep the threads relatively hot on the CPU

// t1 performs a disconnect() call from a connection object
std::thread t1([&] {
ready++;
while (!go)
{
std::this_thread::yield();
}
conn.disconnect();
finished++;
});

// t2 performs the destruction of the signal object
std::thread t2([&] {
ready++;
while (!go)
{
std::this_thread::yield();
}
sig.reset();
finished++;
});

std::chrono::steady_clock::time_point startPoint = std::chrono::steady_clock::now();

auto waitTimeExpired = [startPoint = std::chrono::steady_clock::now()] {
return (std::chrono::steady_clock::now() - startPoint) > std::chrono::seconds(1);
};

auto threadsReady = [&ready] { return ready == 2; };

auto hasFinished = [&finished] { return finished == 2; };

// Wait for both threads to be running
while (!waitTimeExpired() && !threadsReady())
{
std::this_thread::yield();
}

REQUIRE(!waitTimeExpired());

// Signal that the threads can now execute their logic
go = true;

// Wait for both threads to complete
while (!waitTimeExpired() && !hasFinished())
{
std::this_thread::yield();
}

REQUIRE(hasFinished());

t1.join();
t2.join();
}
}

TEST_CASE("Concurrent disconnection and connection does not deadlock", "[thread_test]")
{
for (int run = 0; run < 100; run++)
{
INFO("Run #" << run);

nod::signal<void()> sig;
nod::connection conn;

std::atomic_bool go(false);
std::atomic<int> ready(0);
std::atomic<int> finished(0);

// Use yield() to keep the threads relatively hot on the CPU

// t1 performs a connect() call on the signal
std::thread t1([&] {
ready++;
while (!go)
{
std::this_thread::yield();
}
conn = sig.connect([] {});
finished++;
});

// t2 performs a call to disconnect_all_slots() on the signal
std::thread t2([&] {
ready++;
while (!go)
{
std::this_thread::yield();
}
sig.disconnect_all_slots();
finished++;
});

auto waitTimeExpired = [startPoint = std::chrono::steady_clock::now()] {
return (std::chrono::steady_clock::now() - startPoint) > std::chrono::seconds(1);
};

auto threadsReady = [&ready] { return ready == 2; };

auto hasFinished = [&finished] { return finished == 2; };

// Wait for both threads to be running
while (!waitTimeExpired() && !threadsReady())
{
std::this_thread::yield();
}

REQUIRE(!waitTimeExpired());

// Signal that the threads can now execute their logic
go = true;

// Wait for both threads to complete
while (!waitTimeExpired() && !hasFinished())
{
std::this_thread::yield();
}

REQUIRE(hasFinished());

conn.disconnect();

t1.join();
t2.join();
}
}