Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minimal changes to allow single move only values to be pushed and poped (version 2) #41

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
75 changes: 75 additions & 0 deletions include/boost/lockfree/detail/move_payload.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// boost lockfree: move_payload helper
//
// Copyright (C) 2011 Tim Blechmann
//
// Distributed under the Boost Software License, Version 1.0. (See
// accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

#ifndef BOOST_LOCKFREE_DETAIL_MOVE_PAYLOAD_HPP_INCLUDED
#define BOOST_LOCKFREE_DETAIL_MOVE_PAYLOAD_HPP_INCLUDED

#include <boost/mpl/if.hpp>
#include <boost/type_traits/is_convertible.hpp>

#if defined(_MSC_VER)
#pragma warning(push)
#pragma warning(disable: 4512) // assignment operator could not be generated
#endif

namespace boost {
namespace lockfree {
namespace detail {

struct move_convertible
{
template <typename T, typename U>
static void move(T & t, U & u)
{
u = std::move(t);
}
};

struct move_constructible_and_assignable
{
template <typename T, typename U>
static void move(T & t, U & u)
{
u = U(std::move(t));
}
};

template <typename T, typename U>
void move_payload(T & t, U & u)
{
typedef typename boost::mpl::if_<typename boost::is_convertible<T, U>::type,
move_convertible,
move_constructible_and_assignable
>::type move_type;
move_type::move(t, u);
}

template <typename T>
struct consume_via_move
{
consume_via_move(T & out):
out_(out)
{}

template <typename U>
void operator()(U & element)
{
move_payload(element, out_);
}

T & out_;
};


}}}

#if defined(_MSC_VER)
#pragma warning(pop)
#endif

#endif /* BOOST_LOCKFREE_DETAIL_COPY_PAYLOAD_HPP_INCLUDED */
129 changes: 107 additions & 22 deletions include/boost/lockfree/spsc_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,25 @@
#include <algorithm>
#include <memory>

#include <boost/config.hpp> // for BOOST_NO_CXX11_

#include <boost/aligned_storage.hpp>
#include <boost/assert.hpp>

#include <boost/static_assert.hpp>
#include <boost/utility.hpp>
#include <boost/utility/enable_if.hpp>
#include <boost/config.hpp> // for BOOST_LIKELY

#include <boost/type_traits/has_trivial_destructor.hpp>
#include <boost/type_traits/is_convertible.hpp>
#include <boost/type_traits/is_copy_constructible.hpp>

#include <boost/lockfree/detail/atomic.hpp>
#include <boost/lockfree/detail/copy_payload.hpp>
#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
#include <boost/lockfree/detail/move_payload.hpp>
#include <boost/move/move.hpp>
#endif
#include <boost/lockfree/detail/parameter.hpp>
#include <boost/lockfree/detail/prefix.hpp>

Expand Down Expand Up @@ -98,7 +105,26 @@ class ringbuffer_base
return write_available(write_index, read_index, max_size);
}

bool push(T const & t, T * buffer, size_t max_size)
#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
bool push(T && t, T * buffer, size_t max_size )
{
const size_t write_index = write_index_.load( memory_order_relaxed ); // only written from push thread
const size_t next = next_index( write_index, max_size );

if( next == read_index_.load( memory_order_acquire ) )
return false; /* ringbuffer is full */

new (buffer + write_index) T(boost::move(t)); // move-construct

write_index_.store( next, memory_order_release );

return true;
}
#endif

template<typename U>
typename boost::enable_if<boost::integral_constant<bool, boost::is_copy_constructible<U>::value && boost::is_same<T,U>::value>, bool>::type
push(U const & t, U * buffer, size_t max_size)
{
const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread
const size_t next = next_index(write_index, max_size);
Expand Down Expand Up @@ -382,21 +408,30 @@ class ringbuffer_base
return write_index == read_index;
}

template< class OutputIterator >
OutputIterator copy_and_delete( T * first, T * last, OutputIterator out )

template<class OutputIterator>
typename boost::enable_if<boost::has_trivial_destructor<T>, OutputIterator>::type
copy_and_delete(T * first, T * last, OutputIterator out)
{
if (boost::has_trivial_destructor<T>::value) {
return std::copy(first, last, out); // will use memcpy if possible
} else {
for (; first != last; ++first, ++out) {
*out = *first;
first->~T();
}
return out;
return std::copy(first, last, out); // will use memcpy if possible
}

template<class OutputIterator>
typename boost::disable_if<boost::has_trivial_destructor<T>, OutputIterator>::type
copy_and_delete(T * first, T * last, OutputIterator out)
{
for (; first != last; ++first, ++out) {
#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
*out = boost::move(*first);
#else
*out = *first;
#endif
first->~T();
}
return out;
}

template< class Functor >
template<class Functor>
void run_functor_and_delete( T * first, T * last, Functor & functor )
{
for (; first != last; ++first) {
Expand Down Expand Up @@ -445,11 +480,21 @@ class compile_time_sized_ringbuffer:
}

public:
bool push(T const & t)

template<typename U>
typename boost::enable_if<boost::integral_constant<bool, boost::is_copy_constructible<U>::value && boost::is_same<T, U>::value>,bool>::type
push(U const & t)
{
return ringbuffer_base<T>::push(t, data(), max_size);
}

#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
bool push(T && t)
{
return ringbuffer_base<T>::push(boost::move(t), data(), max_size);
}
#endif

template <typename Functor>
bool consume_one(Functor & f)
{
Expand Down Expand Up @@ -558,11 +603,20 @@ class runtime_sized_ringbuffer:
Alloc::deallocate(array_, max_elements_);
}

bool push(T const & t)
template< typename U>
typename boost::enable_if< boost::integral_constant<bool, boost::is_copy_constructible<U>::value && boost::is_same<T,U>::value>, bool >::type
push(U const & t)
{
return ringbuffer_base<T>::push(t, &*array_, max_elements_);
}

#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
bool push(T && t)
{
return ringbuffer_base<T>::push(boost::move(t), &*array_, max_elements_);
}
#endif

template <typename Functor>
bool consume_one(Functor & f)
{
Expand Down Expand Up @@ -756,10 +810,26 @@ class spsc_queue:
*
* \note Thread-safe and wait-free
* */
bool push(T const & t)
bool
push(T const & u)
{
return base_type::push(u);
}

#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
/** Pushes object t to the ringbuffer.
*
* \pre only one thread is allowed to push data to the spsc_queue
* \post object will be pushed to the spsc_queue, unless it is full.
* \return true, if the push operation is successful.
*
* \note Thread-safe and wait-free
* */
bool push(T && t)
{
return base_type::push(t);
return base_type::push(boost::move(t));
}
#endif

/** Pops one object from ringbuffer.
*
Expand All @@ -772,25 +842,40 @@ class spsc_queue:
bool pop ()
{
detail::consume_noop consume_functor;
return consume_one( consume_functor );
return consume_one(consume_functor);
}

/** Pops one object from ringbuffer.
/** Pops one object from ringbuffer. If it is move assignable it will be moved,
* otherwise it will be copied.
*
* \pre only one thread is allowed to pop data to the spsc_queue
* \post if ringbuffer is not empty, object will be copied to ret.
* \post if ringbuffer is not empty, object will be assigned to ret.
* \return true, if the pop operation is successful, false if ringbuffer was empty.
*
* \note Thread-safe and wait-free
*/
#ifdef BOOST_NO_CXX11_RVALUE_REFERENCES

template <typename U>
typename boost::enable_if<typename is_convertible<T, U>::type, bool>::type
pop (U & ret)
pop(U & ret)
{
detail::consume_via_copy<U> consume_functor(ret);
return consume_one( consume_functor );
return consume_one(consume_functor);
}

#else

template <typename U>
typename boost::enable_if<typename is_convertible<T, U>::type, bool>::type
pop(U & ret)
{
detail::consume_via_move<U> consume_functor( ret );
return consume_one(consume_functor);
}

#endif

/** Pushes as many objects from the array t as there is space.
*
* \pre only one thread is allowed to push data to the spsc_queue
Expand Down
32 changes: 31 additions & 1 deletion test/spsc_queue_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
#include <iostream>
#include <memory>

#include "test_helpers.hpp"
#include "test_common.hpp"
#include "test_helpers.hpp"

using namespace boost;
using namespace boost::lockfree;
Expand Down Expand Up @@ -405,3 +405,33 @@ BOOST_AUTO_TEST_CASE( spsc_queue_reset_test )

BOOST_REQUIRE(f.empty());
}

#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES

BOOST_AUTO_TEST_CASE( spsc_queue_unique_ptr_push_pop_test )
{
spsc_queue<std::unique_ptr<int[]>, capacity<64> > f;

BOOST_REQUIRE(f.empty());

unique_ptr<int[]> in;
unique_ptr<int[]> out;

const int fortytwo = 42;

in.reset( new int[1] );
in[0] = fortytwo;
int* data = in.get();

BOOST_REQUIRE( f.push( std::move(in) ) );
BOOST_REQUIRE( f.pop(out) );

BOOST_REQUIRE( out.get() == data );
BOOST_REQUIRE( out[0] == fortytwo );

f.reset();

BOOST_REQUIRE(f.empty());
}

#endif