Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stateful allocator support for concurrent_queue and concurrent_bounde… #1520

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 47 additions & 35 deletions include/oneapi/tbb/concurrent_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,25 +137,29 @@ class concurrent_queue {
}

concurrent_queue& operator=( const concurrent_queue& other ) {
//TODO: implement support for std::allocator_traits::propagate_on_container_copy_assignment
if (my_queue_representation != other.my_queue_representation) {
clear();
if (my_queue_representation == other.my_queue_representation)
return *this;
clear();
if (queue_allocator_traits::propagate_on_container_move_assignment::value) {
my_allocator = other.my_allocator;
my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
}
my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
return *this;
YexuanXiao marked this conversation as resolved.
Show resolved Hide resolved
}

concurrent_queue& operator=( concurrent_queue&& other ) {
//TODO: implement support for std::allocator_traits::propagate_on_container_move_assignment
if (my_queue_representation != other.my_queue_representation) {
clear();
if (my_queue_representation == other.my_queue_representation)
YexuanXiao marked this conversation as resolved.
Show resolved Hide resolved
return *this;
clear();
if (queue_allocator_traits::propagate_on_container_move_assignment::value) {
my_allocator = std::move(other.my_allocator);
internal_swap(other);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid that with the full support for stateful allocators, propagation and uses-allocator construction, we cannot use internal_swap as an implementation for move semantics anymore:

Consider this->my_allocator to be stateful allocator.
After internal_swap, the memory allocated by *this would be transferred to other but the allocator with the correct state would not be transfered to *this (and it cannot be since the standard requires us to move-construct allocators.
Because of these, we will need a "fair" move semantics for both constructor and the assignment operator

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my_queue_representation is allocated by r1::cache_aligned_allocate instead of my_allocator and all memory allocated by my_allocator has been deallocated at line 153, so I don’t think this is an issue.

} else {
if (my_allocator == other.my_allocator) {
internal_swap(other);
} else {
my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item);
my_queue_representation->assign(*other.my_queue_representation, my_allocator, move_construct_item);
other.clear();
my_allocator = std::move(other.my_allocator);
}
}
return *this;
Expand All @@ -178,8 +182,12 @@ class concurrent_queue {
}

void swap ( concurrent_queue& other ) {
//TODO: implement support for std::allocator_traits::propagate_on_container_swap
__TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
if (queue_allocator_traits::propagate_on_container_swap::value) {
YexuanXiao marked this conversation as resolved.
Show resolved Hide resolved
using std::swap;
swap(my_allocator, other.my_allocator);
} else {
__TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets move it inside internal_swap

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed all the containers’ internal_swap, which typically have two overloads. I think they can be combined into a single function using if:

void internal_swap(concurrent_queue& src) {
   if (!allocator_traits_type::queue_allocator_traits::propagate_on_container)
        __TBB_ASSERT(my_allocator == src.my_allocator, "Swapping with unequal allocators is not allowed");
    using std::swap;
    swap(my_queue_representation, src.my_queue_representation);
}

The compiler will optimize it and eliminate the unreachable branch.
Allocator helpers have the same pattern. Do we need to perform these cleanups, or should we keep things as they are?

}
internal_swap(other);
}

Expand Down Expand Up @@ -253,15 +261,13 @@ class concurrent_queue {
template <typename Container, typename Value, typename A>
friend class concurrent_queue_iterator;

static void copy_construct_item(T* location, const void* src) {
// TODO: use allocator_traits for copy construction
new (location) value_type(*static_cast<const value_type*>(src));
// queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src));

static void copy_construct_item(queue_allocator_type& allocator, T* location, const void* src) {
queue_allocator_traits::construct(allocator, location, *static_cast<const T*>(src));
}

static void move_construct_item(T* location, const void* src) {
// TODO: use allocator_traits for move construction
new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
static void move_construct_item(queue_allocator_type& allocator, T* location, const void* src) {
queue_allocator_traits::construct(allocator, location, std::move(*static_cast<value_type*>(const_cast<void*>(src))));
}

queue_allocator_type my_allocator;
Expand Down Expand Up @@ -416,25 +422,29 @@ class concurrent_bounded_queue {
}

concurrent_bounded_queue& operator=( const concurrent_bounded_queue& other ) {
//TODO: implement support for std::allocator_traits::propagate_on_container_copy_assignment
if (my_queue_representation != other.my_queue_representation) {
clear();
if (my_queue_representation == other.my_queue_representation)
return *this;
clear();
if (queue_allocator_traits::propagate_on_container_move_assignment::value) {
my_allocator = other.my_allocator;
my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
}
my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
return *this;
}

concurrent_bounded_queue& operator=( concurrent_bounded_queue&& other ) {
//TODO: implement support for std::allocator_traits::propagate_on_container_move_assignment
if (my_queue_representation != other.my_queue_representation) {
clear();
if (my_queue_representation == other.my_queue_representation)
return *this;
clear();
if (queue_allocator_traits::propagate_on_container_move_assignment::value) {
my_allocator = std::move(other.my_allocator);
internal_swap(other);
} else {
if (my_allocator == other.my_allocator) {
internal_swap(other);
} else {
my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item);
my_queue_representation->assign(*other.my_queue_representation, my_allocator, move_construct_item);
other.clear();
my_allocator = std::move(other.my_allocator);
}
}
return *this;
Expand All @@ -457,8 +467,12 @@ class concurrent_bounded_queue {
}

void swap ( concurrent_bounded_queue& other ) {
//TODO: implement support for std::allocator_traits::propagate_on_container_swap
__TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
if (queue_allocator_traits::propagate_on_container_swap::value) {
using std::swap;
swap(my_allocator, other.my_allocator);
} else {
__TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
}
internal_swap(other);
}

Expand Down Expand Up @@ -641,14 +655,12 @@ class concurrent_bounded_queue {
r1::abort_bounded_queue_monitors(my_monitors);
}

static void copy_construct_item(T* location, const void* src) {
// TODO: use allocator_traits for copy construction
new (location) value_type(*static_cast<const value_type*>(src));
static void copy_construct_item(queue_allocator_type& ator, T* location, const void* src) {
queue_allocator_traits::construct(ator, location, *static_cast<const T*>(src));
}

static void move_construct_item(T* location, const void* src) {
// TODO: use allocator_traits for move construction
new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
static void move_construct_item(queue_allocator_type& ator, T* location, const void* src) {
queue_allocator_traits::construct(ator, location, std::move(*static_cast<value_type*>(const_cast<void*>(src))));
}

template <typename Container, typename Value, typename A>
Expand Down
8 changes: 4 additions & 4 deletions include/oneapi/tbb/detail/_concurrent_queue_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class micro_queue {
using page_allocator_traits = tbb::detail::allocator_traits<page_allocator_type>;

public:
using item_constructor_type = void (*)(value_type* location, const void* src);
using item_constructor_type = void (*)(queue_allocator_type&, value_type* location, const void* src);
micro_queue() = default;
micro_queue( const micro_queue& ) = delete;
micro_queue& operator=( const micro_queue& ) = delete;
Expand Down Expand Up @@ -254,7 +254,7 @@ class micro_queue {
new_page->mask.store(src_page->mask.load(std::memory_order_relaxed), std::memory_order_relaxed);
for (; begin_in_page!=end_in_page; ++begin_in_page, ++g_index) {
if (new_page->mask.load(std::memory_order_relaxed) & uintptr_t(1) << begin_in_page) {
copy_item(*new_page, begin_in_page, *src_page, begin_in_page, construct_item);
copy_item(allocator, *new_page, begin_in_page, *src_page, begin_in_page, construct_item);
}
}
return new_page;
Expand Down Expand Up @@ -324,11 +324,11 @@ class micro_queue {
~destroyer() {my_value.~T();}
}; // class destroyer

void copy_item( padded_page& dst, size_type dindex, const padded_page& src, size_type sindex,
void copy_item( queue_allocator_type& allocator, padded_page& dst, size_type dindex, const padded_page& src, size_type sindex,
item_constructor_type construct_item )
{
auto& src_item = src[sindex];
construct_item( &dst[dindex], static_cast<const void*>(&src_item) );
construct_item( allocator, &dst[dindex], static_cast<const void*>(&src_item) );
}

void assign_and_destroy_item( void* dst, padded_page& src, size_type index ) {
Expand Down
Loading