diff --git a/CHANGELOG.md b/CHANGELOG.md index b024e277..abba639e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,24 @@ +v0.6.0 +====== +Better support for asynchronous RPC and event handlers. + +Breaking Changes: +- `Session` and `CoroSession` now take an extra `boost::asio::io_service` + argument in their `create()` functions. This IO service is now used for + executing user-provided handlers. It can be the same one used by the + transport connectors. +- Support for non-handshaking raw socket transports has been + removed (closes #92). + +Enhancements: +- Added `basicCoroRpc()`, `basicCoroEvent()`, `unpackedCoroRpc()`, and + `unpackedCoroEvent()` wrappers, which execute a call/event slot within the + context of a coroutine. This should make it easier to implement RPC/event + handlers that need to run asynchronously themselves (closes #91). +- `Invocation` and `Event` now have an `iosvc()` getter, which returns the + user-provided `asio::io_service` (closes #91). +- Added `Variant` conversion facilities for `std::set` and `set::unordered_set`. + v0.5.3 ====== Fixes and enhancements. diff --git a/CMakeLists.txt b/CMakeLists.txt index 78c5d5b3..71c2238a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -37,12 +37,6 @@ set(PATH_INCLUDE_MSGPACK ${PROJECT_SOURCE_DIR}/ext/msgpack-c/include CACHE PATH set(PATH_INCLUDE_CATCH ${PROJECT_SOURCE_DIR}/ext/Catch/include CACHE PATH "Catch include path") -# Add GUI variables that let the user specify that legacy connectors should be -# used for tests and examples. -option(CPPWAMP_USE_NON_HANDSHAKING_TRANSPORTS - "Use non-handshaking raw socket transports in tests and examples" - OFF) - # Confirm that the user's choices for the Boost library paths are valid. unset(BOOST_ROOT) set(BOOST_INCLUDEDIR ${PATH_INCLUDE_BOOST}) diff --git a/README.md b/README.md index dddef666..c56c6f48 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { // Specify a TCP transport and JSON serialization auto tcp = connector(iosvc, TcpHost("localhost", 8001)); - auto session = wamp::CoroSession<>::create(tcp); + auto session = wamp::CoroSession<>::create(iosvc, tcp); session->connect(yield); auto sessionInfo = session->join(Realm("myrealm"), yield); std::cout << "Client joined. Session ID = " diff --git a/cppwamp/CMakeLists.txt b/cppwamp/CMakeLists.txt index 62fc8768..9dfb044e 100644 --- a/cppwamp/CMakeLists.txt +++ b/cppwamp/CMakeLists.txt @@ -15,13 +15,14 @@ set(HEADERS include/cppwamp/connector.hpp include/cppwamp/conversion.hpp include/cppwamp/corosession.hpp - include/cppwamp/dialoguedata.hpp + include/cppwamp/corounpacker.hpp include/cppwamp/error.hpp include/cppwamp/json.hpp include/cppwamp/msgpack.hpp include/cppwamp/null.hpp include/cppwamp/options.hpp include/cppwamp/payload.hpp + include/cppwamp/peerdata.hpp include/cppwamp/rawsockoptions.hpp include/cppwamp/registration.hpp include/cppwamp/session.hpp @@ -37,20 +38,21 @@ set(HEADERS include/cppwamp/version.hpp include/cppwamp/visitor.hpp include/cppwamp/wampdefs.hpp + include/cppwamp/internal/asynctask.hpp include/cppwamp/internal/asioconnector.hpp include/cppwamp/internal/asioendpoint.hpp include/cppwamp/internal/asiolistener.hpp include/cppwamp/internal/asiotransport.hpp + include/cppwamp/internal/base64.hpp include/cppwamp/internal/callee.hpp include/cppwamp/internal/client.hpp include/cppwamp/internal/clientinterface.hpp include/cppwamp/internal/config.hpp - include/cppwamp/internal/dialogue.hpp include/cppwamp/internal/endian.hpp include/cppwamp/internal/integersequence.hpp - include/cppwamp/internal/legacyasioendpoint.hpp - include/cppwamp/internal/legacyasiotransport.hpp include/cppwamp/internal/messagetraits.hpp + include/cppwamp/internal/passkey.hpp + include/cppwamp/internal/peer.hpp include/cppwamp/internal/precompiled.hpp include/cppwamp/internal/rawsockconnector.hpp include/cppwamp/internal/rawsockhandshake.hpp @@ -65,18 +67,19 @@ set(HEADERS include/cppwamp/internal/varianttraitsfwd.hpp include/cppwamp/internal/variantvisitors.hpp include/cppwamp/internal/wampmessage.hpp + include/cppwamp/types/set.hpp include/cppwamp/types/tuple.hpp include/cppwamp/types/boostoptional.hpp include/cppwamp/types/unorderedmap.hpp + include/cppwamp/types/unorderedset.hpp ) set(INLINES include/cppwamp/internal/asyncresult.ipp - include/cppwamp/internal/base64.hpp include/cppwamp/internal/blob.ipp include/cppwamp/internal/conversion.ipp include/cppwamp/internal/corosession.ipp - include/cppwamp/internal/dialoguedata.ipp + include/cppwamp/internal/corounpacker.ipp include/cppwamp/internal/endian.ipp include/cppwamp/internal/error.ipp include/cppwamp/internal/json.ipp @@ -84,8 +87,8 @@ set(INLINES include/cppwamp/internal/msgpack.ipp include/cppwamp/internal/null.ipp include/cppwamp/internal/options.ipp - include/cppwamp/internal/passkey.hpp include/cppwamp/internal/payload.ipp + include/cppwamp/internal/peerdata.ipp include/cppwamp/internal/rawsockoptions.ipp include/cppwamp/internal/registration.ipp include/cppwamp/internal/session.ipp diff --git a/cppwamp/cppwamp.pro b/cppwamp/cppwamp.pro index 126fed6c..8c215734 100644 --- a/cppwamp/cppwamp.pro +++ b/cppwamp/cppwamp.pro @@ -20,13 +20,14 @@ HEADERS += \ include/cppwamp/connector.hpp \ include/cppwamp/conversion.hpp \ include/cppwamp/corosession.hpp \ - include/cppwamp/dialoguedata.hpp \ + include/cppwamp/corounpacker.hpp \ include/cppwamp/error.hpp \ include/cppwamp/json.hpp \ include/cppwamp/msgpack.hpp \ include/cppwamp/null.hpp \ include/cppwamp/options.hpp \ include/cppwamp/payload.hpp \ + include/cppwamp/peerdata.hpp \ include/cppwamp/rawsockoptions.hpp \ include/cppwamp/registration.hpp \ include/cppwamp/session.hpp \ @@ -42,20 +43,21 @@ HEADERS += \ include/cppwamp/version.hpp \ include/cppwamp/visitor.hpp \ include/cppwamp/wampdefs.hpp \ + include/cppwamp/internal/asynctask.hpp \ include/cppwamp/internal/asioconnector.hpp \ include/cppwamp/internal/asioendpoint.hpp \ include/cppwamp/internal/asiolistener.hpp \ include/cppwamp/internal/asiotransport.hpp \ + include/cppwamp/internal/base64.hpp \ include/cppwamp/internal/callee.hpp \ include/cppwamp/internal/client.hpp \ include/cppwamp/internal/clientinterface.hpp \ include/cppwamp/internal/config.hpp \ - include/cppwamp/internal/dialogue.hpp \ include/cppwamp/internal/endian.hpp \ include/cppwamp/internal/integersequence.hpp \ - include/cppwamp/internal/legacyasioendpoint.hpp \ - include/cppwamp/internal/legacyasiotransport.hpp \ include/cppwamp/internal/messagetraits.hpp \ + include/cppwamp/internal/passkey.hpp \ + include/cppwamp/internal/peer.hpp \ include/cppwamp/internal/precompiled.hpp \ include/cppwamp/internal/rawsockconnector.hpp \ include/cppwamp/internal/rawsockhandshake.hpp \ @@ -70,16 +72,17 @@ HEADERS += \ include/cppwamp/internal/varianttraitsfwd.hpp \ include/cppwamp/internal/variantvisitors.hpp \ include/cppwamp/internal/wampmessage.hpp \ + include/cppwamp/types/set.hpp \ include/cppwamp/types/tuple.hpp \ include/cppwamp/types/boostoptional.hpp \ include/cppwamp/types/unorderedmap.hpp \ + include/cppwamp/types/unorderedset.hpp \ \ include/cppwamp/internal/asyncresult.ipp \ - include/cppwamp/internal/base64.hpp \ include/cppwamp/internal/blob.ipp \ include/cppwamp/internal/conversion.ipp \ include/cppwamp/internal/corosession.ipp \ - include/cppwamp/internal/dialoguedata.ipp \ + include/cppwamp/internal/corounpacker.ipp \ include/cppwamp/internal/endian.ipp \ include/cppwamp/internal/error.ipp \ include/cppwamp/internal/json.ipp \ @@ -87,8 +90,8 @@ HEADERS += \ include/cppwamp/internal/msgpack.ipp \ include/cppwamp/internal/null.ipp \ include/cppwamp/internal/options.ipp \ - include/cppwamp/internal/passkey.hpp \ include/cppwamp/internal/payload.ipp \ + include/cppwamp/internal/peerdata.ipp \ include/cppwamp/internal/rawsockoptions.ipp \ include/cppwamp/internal/registration.ipp \ include/cppwamp/internal/session.ipp \ diff --git a/cppwamp/include/cppwamp/asyncresult.hpp b/cppwamp/include/cppwamp/asyncresult.hpp index 2a45996a..55491920 100644 --- a/cppwamp/include/cppwamp/asyncresult.hpp +++ b/cppwamp/include/cppwamp/asyncresult.hpp @@ -87,6 +87,20 @@ class AsyncResult //------------------------------------------------------------------------------ template using AsyncHandler = std::function)>; + +//------------------------------------------------------------------------------ +/** Type traits template used to obtain the result type of an asynchronous + handler. */ +//------------------------------------------------------------------------------ +template +struct ResultTypeOfHandler {}; + +//------------------------------------------------------------------------------ +/** ResultTypeOfHandler specialization for AsyncHandler */ +//------------------------------------------------------------------------------ +template +struct ResultTypeOfHandler> {using Type = AsyncResult;}; + } // namespace wamp #include "internal/asyncresult.ipp" diff --git a/cppwamp/include/cppwamp/connector.hpp b/cppwamp/include/cppwamp/connector.hpp index 3cb20176..3d8dce6d 100644 --- a/cppwamp/include/cppwamp/connector.hpp +++ b/cppwamp/include/cppwamp/connector.hpp @@ -32,7 +32,7 @@ namespace internal {class ClientInterface;} The Session class uses these Connector objects when attempting to establish a connection to the router. - @see connector, legacyConnector */ + @see connector */ //------------------------------------------------------------------------------ class Connector : public std::enable_shared_from_this { diff --git a/cppwamp/include/cppwamp/corosession.hpp b/cppwamp/include/cppwamp/corosession.hpp index 621a5c3c..cc98b403 100644 --- a/cppwamp/include/cppwamp/corosession.hpp +++ b/cppwamp/include/cppwamp/corosession.hpp @@ -113,10 +113,10 @@ class CoroSession : public TBase using YieldContext = boost::asio::basic_yield_context; /** Creates a new CoroSession instance. */ - static Ptr create(const Connector::Ptr& connector); + static Ptr create(AsioService& userIosvc, const Connector::Ptr& connector); /** Creates a new CoroSession instance. */ - static Ptr create(const ConnectorList& connectors); + static Ptr create(AsioService& userIosvc, const ConnectorList& connectors); using Base::connect; using Base::join; diff --git a/cppwamp/include/cppwamp/corounpacker.hpp b/cppwamp/include/cppwamp/corounpacker.hpp new file mode 100644 index 00000000..60f86e41 --- /dev/null +++ b/cppwamp/include/cppwamp/corounpacker.hpp @@ -0,0 +1,265 @@ +/*------------------------------------------------------------------------------ + Copyright Butterfly Energy Systems 2014-2015. + Distributed under the Boost Software License, Version 1.0. + (See accompanying file LICENSE_1_0.txt or copy at + http://www.boost.org/LICENSE_1_0.txt) +------------------------------------------------------------------------------*/ + +#ifndef CPPWAMP_COROUNPACKER_HPP +#define CPPWAMP_COROUNPACKER_HPP + +//------------------------------------------------------------------------------ +/** @file + Contains utilities for unpacking positional arguments passed to + event slots and call slots that spawn coroutines. */ +//------------------------------------------------------------------------------ + +#include +#include +#include "unpacker.hpp" + +namespace wamp +{ + +//------------------------------------------------------------------------------ +/** Wrapper around an event coroutine slot which automatically unpacks + positional payload arguments. + The [wamp::unpackedCoroEvent](@ref CoroEventUnpacker::unpackedCoroEvent) + convenience function should be used to construct instances of + CoroEventUnpacker. + @see [wamp::unpackedCoroEvent](@ref CoroEventUnpacker::unpackedCoroEvent) + @see @ref UnpackedCoroutineEventSlots + @tparam TSlot Function type to be wrapped. Must have the signature + `void function(Event, TArgs..., boost::asio::yield_context)`. + @tparam TArgs List of static types the event slot expects following the + Event parameter and preceding the `boost::asio::yield_context` + parameter. */ +//------------------------------------------------------------------------------ +template +class CoroEventUnpacker +{ +public: + /// The function type to be wrapped. + using Slot = TSlot; + + /** Constructor taking a callable target. */ + explicit CoroEventUnpacker(Slot slot); + + /** Spawns a new coroutine and executes the stored event slot. + The coroutine will be spawned using `event.iosvc()`. + The `event.args()` positional arguments will be unpacked and passed + to the stored event slot as additional parameters. */ + void operator()(Event&& event); + +private: + using Yield = boost::asio::yield_context; + + template + void invoke(Event&& event, internal::IntegerSequence); + + std::shared_ptr slot_; +}; + +//------------------------------------------------------------------------------ +/** @relates CoroEventUnpacker + Converts an unpacked event slot into a regular slot than can be passed + to Session::subscribe. + The slot will be executed within the context of a coroutine and will be + given a `boost::asio::yield_context` as the last call argument. + @see @ref UnpackedCoroutineEventSlots + @returns An CoroEventUnpacker that wraps the the given slot. + @tparam TArgs List of static types the event slot expects following the + Event parameter, and preceding the + `boost::asio::yield_context` parameter. + @tparam TSlot (deduced) Function type to be converted. Must have the signature + `void function(Event, TArgs..., boost::asio::yield_context)`. */ +//------------------------------------------------------------------------------ +template +CoroEventUnpacker, TArgs...> unpackedCoroEvent(TSlot&& slot); + + +//------------------------------------------------------------------------------ +/** Wrapper around an event slot which automatically unpacks positional + payload arguments. + The [wamp::basicCoroEvent](@ref BasicCoroEventUnpacker::basicCoroEvent) + convenience function should be used to construct instances of + BasicCoroEventUnpacker. + This class differs from CoroEventUnpacker in that the slot type is not + expected to take an Event as the first parameter. + @see [wamp::basicCoroEvent](@ref BasicCoroEventUnpacker::basicCoroEvent) + @see @ref BasicCoroutineEventSlots + @tparam TSlot Function type to be wrapped. Must have the signature + `void function(TArgs..., boost::asio::yield_context)`. + @tparam TArgs List of static types the event slot expects as arguments + preceding the `boost::asio::yield_context` parameter. */ +//------------------------------------------------------------------------------ +template +class BasicCoroEventUnpacker +{ +public: + /// The function type to be wrapped. + using Slot = TSlot; + + /** Constructor taking a callable target. */ + explicit BasicCoroEventUnpacker(Slot slot); + + /** Spawns a new coroutine and executes the stored event slot. + The coroutine will be spawned using `event.iosvc()`. + The `event.args()` positional arguments will be unpacked and passed + to the stored event slot as parameters. */ + void operator()(Event&& event); + +private: + using Yield = boost::asio::yield_context; + + template + void invoke(Event&& event, internal::IntegerSequence); + + std::shared_ptr slot_; +}; + +//------------------------------------------------------------------------------ +/** @relates BasicCoroEventUnpacker + Converts an unpacked event slot into a regular slot than can be passed + to Session::subscribe. + This function differs from `unpackedCoroEvent` in that the slot type is not + expected to take an Event as the first parameter. + @see @ref BasicCoroutineEventSlots + @returns An BasicCoroEventUnpacker that wraps the the given slot. + @tparam TArgs List of static types the event slot expects as arguments, + preceding the `boost::asio::yield_context` parameter. + @tparam TSlot (deduced) Function type to be converted. Must have the + signature `void function(TArgs..., boost::asio::yield_context)`.*/ +//------------------------------------------------------------------------------ +template +BasicCoroEventUnpacker, TArgs...> +basicCoroEvent(TSlot&& slot); + + +//------------------------------------------------------------------------------ +/** Wrapper around a call coroutine slot which automatically unpacks positional + payload arguments. + The [wamp::unpackedCoroRpc](@ref CoroInvocationUnpacker::unpackedCoroRpc) + convenience function should be used to construct instances of + CoroInvocationUnpacker. + @see [wamp::unpackedCoroRpc](@ref CoroInvocationUnpacker::unpackedCoroRpc) + @see @ref UnpackedCoroutineCallSlots + @tparam TSlot Function type to be wrapped. Must have the signature + `void function(Invocation, TArgs..., boost::asio::yield_context)`. + @tparam TArgs List of static types the call slot expects following the + Invocation parameter, and preceding the boost::asio::yield_context + parameter. */ +//------------------------------------------------------------------------------ +template +class CoroInvocationUnpacker +{ +public: + /// The function type to be wrapped. + using Slot = TSlot; + + /** Constructor taking a callable target. */ + explicit CoroInvocationUnpacker(Slot slot); + + /** Spawns a new coroutine and executes the stored call slot. + The coroutine will be spawned using `inv.iosvc()`. + The `inv.args()` positional arguments will be unpacked and passed + to the stored call slot as additional parameters. */ + Outcome operator()(Invocation&& inv); + +private: + using Yield = boost::asio::yield_context; + + template + void invoke(Invocation&& inv, internal::IntegerSequence); + + std::shared_ptr slot_; +}; + +//------------------------------------------------------------------------------ +/** @relates CoroInvocationUnpacker + Converts an unpacked call slot into a regular slot than can be passed + to Session::enroll. + @see @ref UnpackedCoroutineCallSlots + @returns A CoroInvocationUnpacker that wraps the the given slot. + @tparam TArgs List of static types the call slot expects following the + Invocation parameter, and preceding the + `boost::asio::yield_context` parameter. + @tparam TSlot (deduced) Function type to be converted. Must have the signature + `Outcome function(Invocation, TArgs..., boost::asio::yield_context)`.*/ +//------------------------------------------------------------------------------ +template +CoroInvocationUnpacker, TArgs...> +unpackedCoroRpc(TSlot&& slot); + + +//------------------------------------------------------------------------------ +/** Wrapper around a call slot which automatically unpacks positional payload + arguments. + The [wamp::basicCoroRpc](@ref BasicCoroInvocationUnpacker::basicCoroRpc) + convenience function should be used to construct instances of + BasicCoroInvocationUnpacker. + This class differs from CoroInvocationUnpacker in that the slot type returns + `TResult` and is not expected to take an Invocation as the first parameter. + @see [wamp::basicCoroRpc](@ref BasicCoroInvocationUnpacker::basicCoroRpc) + @see @ref BasicCoroutineCallSlots + @tparam TSlot Function type to be wrapped. Must have the signature + `TResult function(TArgs..., boost::asio::yield_context)`. + @tparam TResult The static result type returned by the slot (may be `void`). + @tparam TArgs List of static types the call slot expects as arguments, + preceding the `boost::asio::yield_context` argument. */ +//------------------------------------------------------------------------------ +template +class BasicCoroInvocationUnpacker +{ +public: + /// The function type to be wrapped. + using Slot = TSlot; + + /// The static result type returned by the slot. + using ResultType = TResult; + + /** Constructor taking a callable target. */ + explicit BasicCoroInvocationUnpacker(Slot slot); + + /** Spawns a new coroutine and executes the stored call slot. + The coroutine will be spawned using `inv.iosvc()`. + The `inv.args()` positional arguments will be unpacked and passed + to the stored call slot as additional parameters. */ + Outcome operator()(Invocation&& inv); + +private: + using Yield = boost::asio::yield_context; + + template + void invoke(TrueType, Invocation&& inv, internal::IntegerSequence); + + template + void invoke(FalseType, Invocation&& inv, internal::IntegerSequence); + + std::shared_ptr slot_; +}; + +//------------------------------------------------------------------------------ +/** @relates BasicCoroInvocationUnpacker + Converts an unpacked call slot into a regular slot than can be passed + to Session::enroll. + This function differs from `unpackedCoroRpc` in that the slot type returns + TResult and is not expected to take an Invocation as the first parameter. + @see @ref BasicCoroutineCallSlots + @returns A BasicCoroInvocationUnpacker that wraps the the given slot. + @tparam TArgs List of static types the call slot expects as arguments, + preceding the `boost::asio::yield_context` argument. + @tparam TResult The static result type returned by the slot (may be `void`). + @tparam TSlot (deduced) Function type to be converted. Must have the signature + `TResult function(TArgs..., boost::asio::yield_context)`.*/ +//------------------------------------------------------------------------------ +template +BasicCoroInvocationUnpacker, TResult, TArgs...> +basicCoroRpc(TSlot&& slot); + + +} // namespace wamp + +#include "./internal/corounpacker.ipp" + +#endif // CPPWAMP_COROUNPACKER_HPP diff --git a/cppwamp/include/cppwamp/internal/asynctask.hpp b/cppwamp/include/cppwamp/internal/asynctask.hpp new file mode 100644 index 00000000..d89cc8ff --- /dev/null +++ b/cppwamp/include/cppwamp/internal/asynctask.hpp @@ -0,0 +1,86 @@ +/*------------------------------------------------------------------------------ + Copyright Butterfly Energy Systems 2014-2015. + Distributed under the Boost Software License, Version 1.0. + (See accompanying file LICENSE_1_0.txt or copy at + http://www.boost.org/LICENSE_1_0.txt) +------------------------------------------------------------------------------*/ + +#ifndef CPPWAMP_ASYNCTASK_HPP +#define CPPWAMP_ASYNCTASK_HPP + +#include +#include +#include "../asiodefs.hpp" +#include "../asyncresult.hpp" + +namespace wamp +{ + +//------------------------------------------------------------------------------ +// Bundles an AsyncHandler along with the AsioService in which the handler +// is to be posted. +//------------------------------------------------------------------------------ +template +class AsyncTask +{ +public: + using ValueType = TResult; + + AsyncTask() : iosvc_(nullptr) {} + + AsyncTask(AsioService& iosvc, AsyncHandler handler) + : iosvc_(&iosvc), + handler_(std::move(handler)) + {} + + AsyncTask(const AsyncTask& other) = default; + + AsyncTask(AsyncTask&& other) noexcept + : iosvc_(other.iosvc_), + handler_(std::move(other.handler_)) + { + other.iosvc_ = nullptr; + } + + AsyncTask& operator=(const AsyncTask& other) = default; + + AsyncTask& operator=(AsyncTask&& other) noexcept + { + iosvc_ = other.iosvc_; + handler_ = std::move(other.handler_); + other.iosvc_ = nullptr; + return *this; + } + + explicit operator bool() const {return iosvc_ != nullptr;} + + AsioService& iosvc() const {return *iosvc_;} + + const AsyncHandler& handler() const {return handler_;} + + void operator()(AsyncResult result) const & + { + assert(iosvc_ && "Invoking uninitialized AsyncTask"); + iosvc_->post(std::bind(handler_, std::move(result))); + } + + void operator()(AsyncResult result) && + { + assert(iosvc_ && "Invoking uninitialized AsyncTask"); + iosvc_->post(std::bind(std::move(handler_), std::move(result))); + } + +private: + AsioService* iosvc_; + AsyncHandler handler_; +}; + +//------------------------------------------------------------------------------ +/** ResultTypeOfHandler specialization for AsyncTask */ +//------------------------------------------------------------------------------ +template +struct ResultTypeOfHandler> {using Type = AsyncResult;}; + +} // namespace wamp + +#endif // CPPWAMP_ASYNCTASK_HPP diff --git a/cppwamp/include/cppwamp/internal/callee.hpp b/cppwamp/include/cppwamp/internal/callee.hpp index 1dbee0f3..8cc1896a 100644 --- a/cppwamp/include/cppwamp/internal/callee.hpp +++ b/cppwamp/include/cppwamp/internal/callee.hpp @@ -11,10 +11,10 @@ #include #include #include -#include "../asyncresult.hpp" -#include "../dialoguedata.hpp" +#include "../peerdata.hpp" #include "../sessiondata.hpp" #include "../wampdefs.hpp" +#include "asynctask.hpp" namespace wamp { @@ -32,10 +32,10 @@ class Callee virtual ~Callee() {} - virtual void unregister(const Registration& handle) = 0; + virtual void unregister(const Registration& reg) = 0; - virtual void unregister(const Registration& handle, - AsyncHandler handler) = 0; + virtual void unregister(const Registration& reg, + AsyncTask&& handler) = 0; virtual void yield(RequestId reqId, wamp::Result&& result) = 0; diff --git a/cppwamp/include/cppwamp/internal/client.hpp b/cppwamp/include/cppwamp/internal/client.hpp index 66f7881d..cf9ff449 100644 --- a/cppwamp/include/cppwamp/internal/client.hpp +++ b/cppwamp/include/cppwamp/internal/client.hpp @@ -9,7 +9,6 @@ #define CPPWAMP_INTERNAL_CLIENT_HPP #include -#include #include #include #include @@ -17,6 +16,7 @@ #include #include #include +#include #include #include #include "../registration.hpp" @@ -24,7 +24,7 @@ #include "../unpacker.hpp" #include "../version.hpp" #include "clientinterface.hpp" -#include "dialogue.hpp" +#include "peer.hpp" namespace wamp { @@ -36,7 +36,7 @@ namespace internal // Provides the implementation of the wamp::Session class. //------------------------------------------------------------------------------ template -class Client : public ClientInterface, public Dialogue +class Client : public ClientInterface, public Peer { public: using Ptr = std::shared_ptr; @@ -53,7 +53,7 @@ class Client : public ClientInterface, public Dialogue virtual State state() const override {return Base::state();} - virtual void join(Realm&& realm, AsyncHandler handler) override + virtual void join(Realm&& realm, AsyncTask&& handler) override { using std::move; @@ -73,11 +73,10 @@ class Client : public ClientInterface, public Dialogue { if (reply.type == WampMsgType::welcome) { - this->post(handler, - SessionInfo({}, - move(realmUri), - move(reply.as(1)), - move(reply.as(2)))); + move(handler)(SessionInfo({}, + move(realmUri), + move(reply.as(1)), + move(reply.as(2)))); } else { @@ -93,14 +92,13 @@ class Client : public ClientInterface, public Dialogue AsyncResult result(make_error_code(errc), oss.str()); - this->post(handler, move(result)); + move(handler)(move(result)); } } }); } - virtual void leave(Reason&& reason, - AsyncHandler&& handler) override + virtual void leave(Reason&& reason, AsyncTask&& handler) override { using std::move; if (reason.uri().empty()) @@ -113,7 +111,7 @@ class Client : public ClientInterface, public Dialogue { auto reason = Reason(move(reply.as(2))) .withOptions(move(reply.as(1))); - this->post(handler, move(reason)); + move(handler)(move(reason)); } readership_.clear(); registry_.clear(); @@ -127,34 +125,35 @@ class Client : public ClientInterface, public Dialogue virtual void terminate() override { - setLogHandlers(nullptr, nullptr); + using Handler = AsyncTask; + setLogHandlers(Handler(), Handler()); this->close(true); } virtual void subscribe(Topic&& topic, EventSlot&& slot, - AsyncHandler handler) override + AsyncTask&& handler) override { using std::move; - SubscriptionRecord sub = {move(topic), move(slot)}; + SubscriptionRecord rec = {move(topic), move(slot), handler.iosvc()}; - auto kv = topics_.find(sub.topic.uri()); + auto kv = topics_.find(rec.topic.uri()); if (kv == topics_.end()) { - subscribeMsg_.at(2) = sub.topic.options(); - subscribeMsg_.at(3) = sub.topic.uri(); + subscribeMsg_.at(2) = rec.topic.options(); + subscribeMsg_.at(3) = rec.topic.uri(); auto self = this->shared_from_this(); this->request(subscribeMsg_, - [this, self, sub, handler](std::error_code ec, Message reply) + [this, self, rec, handler](std::error_code ec, Message reply) { if (checkReply(WampMsgType::subscribed, ec, reply, SessionErrc::subscribeError, handler)) { auto subId = reply.to(2); auto slotId = nextSlotId(); - Subscription handle(self, subId, slotId, {}); - topics_.emplace(sub.topic.uri(), subId); - readership_[subId][slotId] = move(sub); - this->post(handler, move(handle)); + Subscription sub(self, subId, slotId, {}); + topics_.emplace(rec.topic.uri(), subId); + readership_[subId][slotId] = move(rec); + std::move(handler)(std::move(sub)); } }); } @@ -162,21 +161,21 @@ class Client : public ClientInterface, public Dialogue { auto subId = kv->second; auto slotId = nextSlotId(); - Subscription handle{this->shared_from_this(), subId, slotId, {}}; - readership_[subId][slotId] = move(sub); - this->post(handler, move(handle)); + Subscription sub{this->shared_from_this(), subId, slotId, {}}; + readership_[subId][slotId] = move(rec); + std::move(handler)(move(sub)); } } - virtual void unsubscribe(const Subscription& handle) override + virtual void unsubscribe(const Subscription& sub) override { - auto kv = readership_.find(handle.id()); + auto kv = readership_.find(sub.id()); if (kv != readership_.end()) { auto& subMap = kv->second; if (!subMap.empty()) { - auto subKv = subMap.find(handle.slotId({})); + auto subKv = subMap.find(sub.slotId({})); if (subKv != subMap.end()) { if (subMap.size() == 1u) @@ -184,23 +183,23 @@ class Client : public ClientInterface, public Dialogue subMap.erase(subKv); if (subMap.empty()) - sendUnsubscribe(handle.id()); + sendUnsubscribe(sub.id()); } } } } - virtual void unsubscribe(const Subscription& handle, - AsyncHandler handler) override + virtual void unsubscribe(const Subscription& sub, + AsyncTask&& handler) override { bool unsubscribed = false; - auto kv = readership_.find(handle.id()); + auto kv = readership_.find(sub.id()); if (kv != readership_.end()) { auto& subMap = kv->second; if (!subMap.empty()) { - auto subKv = subMap.find(handle.slotId({})); + auto subKv = subMap.find(sub.slotId({})); if (subKv != subMap.end()) { unsubscribed = true; @@ -210,15 +209,15 @@ class Client : public ClientInterface, public Dialogue subMap.erase(subKv); if (subMap.empty()) { - sendUnsubscribe(handle.id(), std::move(handler)); - handler = nullptr; + sendUnsubscribe(sub.id(), std::move(handler)); + handler = AsyncTask(); } } } } if (handler) - this->post(handler, unsubscribed); + std::move(handler)(unsubscribed); } virtual void publish(Pub&& pub) override @@ -226,53 +225,52 @@ class Client : public ClientInterface, public Dialogue this->send(marshallPublish(std::move(pub))); } - virtual void publish(Pub&& pub, - AsyncHandler&& handler) override + virtual void publish(Pub&& pub, AsyncTask&& handler) override { pub.options({}).emplace("acknowledge", true); auto self = this->shared_from_this(); this->request(marshallPublish(std::move(pub)), [this, self, handler](std::error_code ec, Message reply) { - if (checkReply(WampMsgType::published, ec, reply, - SessionErrc::publishError, handler)) + if (checkReply(WampMsgType::published, ec, reply, + SessionErrc::publishError, handler)) { - this->post(handler, reply.to(2)); + std::move(handler)(reply.to(2)); } }); } virtual void enroll(Procedure&& procedure, CallSlot&& slot, - AsyncHandler&& handler) override + AsyncTask&& handler) override { using std::move; - RegistrationRecord reg{move(procedure), move(slot)}; - enrollMsg_.at(2) = reg.procedure.options(); - enrollMsg_.at(3) = reg.procedure.uri(); + RegistrationRecord rec{move(procedure), move(slot), handler.iosvc()}; + enrollMsg_.at(2) = rec.procedure.options(); + enrollMsg_.at(3) = rec.procedure.uri(); auto self = this->shared_from_this(); this->request(enrollMsg_, - [this, self, reg, handler](std::error_code ec, Message reply) + [this, self, rec, handler](std::error_code ec, Message reply) { - if (checkReply(WampMsgType::registered, ec, - reply, SessionErrc::registerError, handler)) + if (checkReply(WampMsgType::registered, ec, reply, + SessionErrc::registerError, handler)) { auto regId = reply.to(2); - Registration handle(self, regId, {}); - registry_[regId] = move(reg); - this->post(handler, move(handle)); + Registration reg(self, regId, {}); + registry_[regId] = move(rec); + move(handler)(move(reg)); } }); } - virtual void unregister(const Registration& handle) override + virtual void unregister(const Registration& reg) override { - auto kv = registry_.find(handle.id()); + auto kv = registry_.find(reg.id()); if (kv != registry_.end()) { registry_.erase(kv); if (state() == State::established) { - unregisterMsg_.at(2) = handle.id(); + unregisterMsg_.at(2) = reg.id(); auto self = this->shared_from_this(); this->request( unregisterMsg_, [this, self](std::error_code ec, Message reply) @@ -286,30 +284,30 @@ class Client : public ClientInterface, public Dialogue } } - virtual void unregister(const Registration& handle, - AsyncHandler handler) override + virtual void unregister(const Registration& reg, + AsyncTask&& handler) override { CPPWAMP_LOGIC_CHECK(state() == State::established, "Session is not established"); - auto kv = registry_.find(handle.id()); + auto kv = registry_.find(reg.id()); if (kv != registry_.end()) { registry_.erase(kv); - unregisterMsg_.at(2) = handle.id(); + unregisterMsg_.at(2) = reg.id(); auto self = this->shared_from_this(); this->request( unregisterMsg_, [this, self, handler](std::error_code ec, Message reply) { if (checkReply(WampMsgType::unregistered, ec, reply, SessionErrc::unregisterError, handler)) - this->post(handler, true); + std::move(handler)(true); }); } else - this->post(handler, false); + std::move(handler)(false); } - virtual void call(Rpc&& rpc, AsyncHandler&& handler) override + virtual void call(Rpc&& rpc, AsyncTask&& handler) override { using std::move; Error* errorPtr = rpc.error({}); @@ -369,48 +367,48 @@ class Client : public ClientInterface, public Dialogue this->sendError(WampMsgType::invocation, reqId, move(failure)); } - virtual void setLogHandlers(LogHandler warningHandler, - LogHandler traceHandler) override + virtual void setLogHandlers(AsyncTask warningHandler, + AsyncTask traceHandler) override { warningHandler_ = std::move(warningHandler); this->setTraceHandler(std::move(traceHandler)); } - virtual void postpone(std::function functor) override - { - this->post(functor); - } - private: struct SubscriptionRecord { using Slot = std::function; - SubscriptionRecord() : topic("") {} + SubscriptionRecord() : topic(""), iosvc(nullptr) {} - SubscriptionRecord(Topic&& topic, Slot&& slot) - : topic(std::move(topic)), slot(std::move(slot)) + SubscriptionRecord(Topic&& topic, Slot&& slot, AsioService& iosvc) + : topic(std::move(topic)), slot(std::move(slot)), iosvc(&iosvc) {} Topic topic; Slot slot; + AsioService* iosvc; }; struct RegistrationRecord { using Slot = std::function; - RegistrationRecord() : procedure("") {} + RegistrationRecord() : procedure(""), iosvc(nullptr) {} - RegistrationRecord(Procedure&& procedure, Slot&& slot) - : procedure(std::move(procedure)), slot(std::move(slot)) + RegistrationRecord(Procedure&& procedure, Slot&& slot, + AsioService& iosvc) + : procedure(std::move(procedure)), + slot(std::move(slot)), + iosvc(&iosvc) {} Procedure procedure; Slot slot; + AsioService* iosvc = nullptr; }; - using Base = Dialogue; + using Base = Peer; using WampMsgType = internal::WampMsgType; using Message = internal::WampMessage; using SlotId = uint64_t; @@ -422,11 +420,6 @@ class Client : public ClientInterface, public Dialogue Client(TransportPtr transport) : Base(std::move(transport)) { - warningHandler_ = [](const std::string& log) - { - std::cerr << "[CppWAMP] Warning: " << log << "\n"; - }; - initMessages(); } @@ -452,7 +445,7 @@ class Client : public ClientInterface, public Dialogue } } - void sendUnsubscribe(SubscriptionId subId, AsyncHandler&& handler) + void sendUnsubscribe(SubscriptionId subId, AsyncTask&& handler) { CPPWAMP_LOGIC_CHECK((this->state() == State::established), "Session is not established"); @@ -463,7 +456,7 @@ class Client : public ClientInterface, public Dialogue { if (checkReply(WampMsgType::unsubscribed, ec, reply, SessionErrc::unsubscribeError, handler)) - this->post(handler, true); + std::move(handler)(true); }); } @@ -494,7 +487,7 @@ class Client : public ClientInterface, public Dialogue } void callProcedure(Message& msg, Error* errorPtr, - AsyncHandler&& handler) + AsyncTask&& handler) { auto self = this->shared_from_this(); this->request(msg, @@ -509,8 +502,8 @@ class Client : public ClientInterface, public Dialogue errorPtr->withKwargs(reply.as(6)); } - if (checkReply(WampMsgType::result, ec, reply, - SessionErrc::callError, handler)) + if (checkReply(WampMsgType::result, ec, reply, + SessionErrc::callError, handler)) { using std::move; Result result({}, reply.to(1), @@ -520,7 +513,7 @@ class Client : public ClientInterface, public Dialogue result.withArgList(move(reply.as(3))); if (reply.size() >= 5) result.withKwargs(move(reply.as(4))); - this->post(handler, std::move(result)); + move(handler)(move(result)); } }); } @@ -551,21 +544,26 @@ class Client : public ClientInterface, public Dialogue { using std::move; - Event event({}, - msg.to(1), - msg.to(2), - move(msg.as(3))); + auto subId = msg.to(1); + auto pubId = msg.to(2); - auto kv = readership_.find(event.subId()); + auto kv = readership_.find(subId); if (kv != readership_.end()) { + const auto& localSubs = kv->second; + assert(!localSubs.empty()); + Event event({}, + msg.to(1), + msg.to(2), + localSubs.begin()->second.iosvc, + move(msg.as(3))); + if (msg.fields.size() >= 5) event.args({}) = move(msg.as(4)); if (msg.fields.size() >= 6) event.kwargs({}) = move(msg.as(5)); auto self = this->shared_from_this(); - const auto& localSubs = kv->second; for (const auto& subKv: localSubs) dispatchEvent(subKv.second, event); } @@ -573,8 +571,7 @@ class Client : public ClientInterface, public Dialogue { std::ostringstream oss; oss << "Received an EVENT that is not subscribed to " - "(with subId=" << event.subId() - << " pubId=" << event.pubId() << ")"; + "(with subId=" << subId << " pubId=" << pubId << ")"; warn(oss.str()); } } @@ -583,7 +580,7 @@ class Client : public ClientInterface, public Dialogue { auto self = this->shared_from_this(); const auto& slot = sub.slot; - this->post([this, self, slot, event]() + sub.iosvc->post([this, self, slot, event]() { // Copy the subscription and publication IDs before the Event // object gets moved away. @@ -619,7 +616,8 @@ class Client : public ClientInterface, public Dialogue if (kv != registry_.end()) { auto self = this->shared_from_this(); - Invocation inv({}, self, requestId, move(msg.as(3))); + Invocation inv({}, self, requestId, kv->second.iosvc, + move(msg.as(3))); if (msg.fields.size() >= 5) inv.args({}) = move(msg.as(4)); if (msg.fields.size() >= 6) @@ -640,7 +638,7 @@ class Client : public ClientInterface, public Dialogue { auto self = this->shared_from_this(); const auto& slot = reg.slot; - this->post([this, self, slot, invocation]() + reg.iosvc->post([this, self, slot, invocation]() { // Copy the request ID before the Invocation object gets moved away. auto reqId = invocation.requestId(); @@ -655,11 +653,11 @@ class Client : public ClientInterface, public Dialogue break; case Outcome::Type::result: - yield(reqId, std::move(outcome.result({}))); + yield(reqId, std::move(outcome).asResult()); break; case Outcome::Type::error: - yield(reqId, std::move(outcome.error({}))); + yield(reqId, std::move(outcome).asError()); break; default: @@ -680,16 +678,16 @@ class Client : public ClientInterface, public Dialogue } template - bool checkError(std::error_code ec, THandler& handler) + bool checkError(std::error_code ec, THandler&& handler) { if (ec) - this->post(handler, ec); + std::forward(handler)(ec); return !ec; } - template + template bool checkReply(WampMsgType type, std::error_code ec, const Message& reply, - SessionErrc defaultErrc, const AsyncHandler& handler) + SessionErrc defaultErrc, THandler&& handler) { bool success = checkError(ec, handler); if (success) @@ -707,8 +705,10 @@ class Client : public ClientInterface, public Dialogue if (reply.size() >= 7 && !reply.as(6).empty()) oss << ", ArgsKv=" << reply.at(6); - AsyncResult result(make_error_code(errc), oss.str()); - this->post(handler, result); + using ResultType = typename ResultTypeOfHandler< + typename std::decay::type >::Type; + ResultType result(make_error_code(errc), oss.str()); + std::forward(handler)(std::move(result)); } else assert((reply.type == type) && "Unexpected WAMP message type"); @@ -720,13 +720,13 @@ class Client : public ClientInterface, public Dialogue SessionErrc defaultErrc) { auto self = this->shared_from_this(); - checkReply(type, ec, reply, defaultErrc, + checkReply(type, ec, reply, defaultErrc, AsyncHandler( [this, self](AsyncResult result) { if (!result) warn(error::Failure::makeMessage(result.errorCode(), result.errorInfo())); - }); + })); } void warn(const std::string& log) @@ -766,7 +766,7 @@ class Client : public ClientInterface, public Dialogue TopicMap topics_; Readership readership_; Registry registry_; - LogHandler warningHandler_; + AsyncTask warningHandler_; Message publishMsg_; Message publishArgsMsg_; diff --git a/cppwamp/include/cppwamp/internal/clientinterface.hpp b/cppwamp/include/cppwamp/internal/clientinterface.hpp index 349ab6ef..43858040 100644 --- a/cppwamp/include/cppwamp/internal/clientinterface.hpp +++ b/cppwamp/include/cppwamp/internal/clientinterface.hpp @@ -11,14 +11,14 @@ #include #include #include -#include "../asyncresult.hpp" -#include "../dialoguedata.hpp" +#include "../peerdata.hpp" #include "../error.hpp" #include "../registration.hpp" #include "../sessiondata.hpp" #include "../subscription.hpp" #include "../variant.hpp" #include "../wampdefs.hpp" +#include "asynctask.hpp" #include "callee.hpp" #include "subscriber.hpp" @@ -41,7 +41,6 @@ class ClientInterface : public Callee, public Subscriber using WeakPtr = std::weak_ptr; using EventSlot = std::function; using CallSlot = std::function; - using LogHandler = std::function; static const Object& roles(); @@ -49,30 +48,28 @@ class ClientInterface : public Callee, public Subscriber virtual SessionState state() const = 0; - virtual void join(Realm&& realm, AsyncHandler handler) = 0; + virtual void join(Realm&& realm, AsyncTask&& hander) = 0; - virtual void leave(Reason&& reason, AsyncHandler&& handler) = 0; + virtual void leave(Reason&& reason, AsyncTask&& handler) = 0; virtual void disconnect() = 0; virtual void terminate() = 0; virtual void subscribe(Topic&& topic, EventSlot&& slot, - AsyncHandler handler) = 0; + AsyncTask&& handler) = 0; virtual void publish(Pub&& pub) = 0; - virtual void publish(Pub&& pub, AsyncHandler&& handler) = 0; + virtual void publish(Pub&& pub, AsyncTask&& handler) = 0; virtual void enroll(Procedure&& procedure, CallSlot&& slot, - AsyncHandler&& handler) = 0; + AsyncTask&& handler) = 0; - virtual void call(Rpc&& rpc, AsyncHandler&& handler) = 0; + virtual void call(Rpc&& rpc, AsyncTask&& handler) = 0; - virtual void setLogHandlers(LogHandler warningHandler, - LogHandler traceHandler) = 0; - - virtual void postpone(std::function functor) = 0; + virtual void setLogHandlers(AsyncTask warningHandler, + AsyncTask traceHandler) = 0; }; inline const Object& ClientInterface::roles() diff --git a/cppwamp/include/cppwamp/internal/corosession.ipp b/cppwamp/include/cppwamp/internal/corosession.ipp index 3b1e68dc..e1403560 100644 --- a/cppwamp/include/cppwamp/internal/corosession.ipp +++ b/cppwamp/include/cppwamp/internal/corosession.ipp @@ -13,11 +13,13 @@ namespace wamp //------------------------------------------------------------------------------ template typename CoroSession::Ptr CoroSession::create( + AsioService& userIosvc, /**< IO service used for executing + user handlers. */ const Connector::Ptr& connector /**< Connection details for the transport to use. */ ) { - return Ptr(new CoroSession(connector)); + return Ptr(new CoroSession(userIosvc, {connector})); } //------------------------------------------------------------------------------ @@ -25,11 +27,13 @@ typename CoroSession::Ptr CoroSession::create( //------------------------------------------------------------------------------ template typename CoroSession::Ptr CoroSession::create( + AsioService& userIosvc, /**< IO service used for executing + user handlers. */ const ConnectorList& connectors /**< A list of connection details for the transports to use. */ ) { - return Ptr(new CoroSession(connectors)); + return Ptr(new CoroSession(userIosvc, connectors)); } //------------------------------------------------------------------------------ @@ -245,32 +249,22 @@ Result CoroSession::call( //------------------------------------------------------------------------------ /** @details Has the same effect as - ~~~~~~~~~~~~~~~~~~ - iosvc.post(yield); - ~~~~~~~~~~~~~~~~~~ - where `iosvc` is the asynchronous I/O service used by the client's - underlying transport. - - @pre The client must have already established a transport connection. */ + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + this->userIosvc().post(yield); + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ //------------------------------------------------------------------------------ template template void CoroSession::suspend(YieldContext yield) { - CPPWAMP_LOGIC_CHECK(!!this->impl(), "Session is not connected"); - using boost::asio::handler_type; - using Handler = typename handler_type, void()>::type; - Handler handler(yield); - boost::asio::async_result result(handler); - this->postpone(handler); - return result.get(); + this->userIosvc().post(yield); } //------------------------------------------------------------------------------ template template TResult CoroSession::run(TYieldContext&& yield, std::error_code* ec, - TDelegate&& delegate) + TDelegate&& delegate) { using boost::asio::handler_type; using Handler = typename handler_type +#include "varianttraits.hpp" + +namespace wamp +{ + +namespace internal +{ + +//------------------------------------------------------------------------------ +template +struct UnpackedCoroArgGetter +{ + template + static NthTypeOf get(const Array& args) + { + using TargetType = NthTypeOf; + try + { + return args.at(N).to(); + } + catch(const error::Conversion&) + { + std::ostringstream oss; + oss << "Expected type " << ArgTraits::typeName() + << " for arg index " << N + << ", but got type " << typeNameOf(args.at(N)); + throw UnpackError(oss.str()); + } + } +}; + +} // namespace internal + + +//------------------------------------------------------------------------------ +template +CoroEventUnpacker::CoroEventUnpacker(Slot slot) + : slot_(std::make_shared(std::move(slot))) +{} + +template +void CoroEventUnpacker::operator()(Event&& event) +{ + if (event.args().size() < sizeof...(A)) + { + std::ostringstream oss; + oss << "Expected " << sizeof...(A) + << " args, but only got " << event.args().size(); + throw internal::UnpackError(oss.str()); + } + + // Use the integer parameter pack technique shown in + // http://stackoverflow.com/a/7858971/245265 + using Seq = typename internal::GenIntegerSequence::type; + invoke(std::move(event), Seq()); +} + +template +template +void CoroEventUnpacker::invoke(Event&& event, + internal::IntegerSequence) +{ + auto slot = slot_; + boost::asio::spawn(event.iosvc(), [slot, event](Yield yield) + { + Array args = event.args(); + using Getter = internal::UnpackedCoroArgGetter; + (*slot)(std::move(event), Getter::template get(args)..., yield); + }); +} + +template +CoroEventUnpacker, TArgs...> unpackedCoroEvent(TSlot&& slot) +{ + return CoroEventUnpacker, TArgs...>( + std::forward(slot)); +} + + +//------------------------------------------------------------------------------ +template +BasicCoroEventUnpacker::BasicCoroEventUnpacker(Slot slot) + : slot_(std::make_shared(std::move(slot))) +{} + +template +void BasicCoroEventUnpacker::operator()(Event&& event) +{ + if (event.args().size() < sizeof...(A)) + { + std::ostringstream oss; + oss << "Expected " << sizeof...(A) + << " args, but only got " << event.args().size(); + throw internal::UnpackError(oss.str()); + } + + // Use the integer parameter pack technique shown in + // http://stackoverflow.com/a/7858971/245265 + using Seq = typename internal::GenIntegerSequence::type; + invoke(std::move(event), Seq()); +} + +template +template +void BasicCoroEventUnpacker::invoke(Event&& event, + internal::IntegerSequence) +{ + auto slot = slot_; + Array args = std::move(event).args(); + + boost::asio::spawn(event.iosvc(), [slot, args](Yield yield) + { + using Getter = internal::UnpackedCoroArgGetter; + (*slot)(Getter::template get(args)..., yield); + }); +} + +template +BasicCoroEventUnpacker, TArgs...> +basicCoroEvent(TSlot&& slot) +{ + return BasicCoroEventUnpacker, TArgs...>( + std::forward(slot)); +} + + +//------------------------------------------------------------------------------ +template +CoroInvocationUnpacker::CoroInvocationUnpacker(Slot slot) + : slot_(std::make_shared(std::move(slot))) +{} + +template +Outcome CoroInvocationUnpacker::operator()(Invocation&& inv) +{ + if (inv.args().size() < sizeof...(A)) + { + std::ostringstream oss; + oss << "Expected " << sizeof...(A) + << " args, but only got " << inv.args().size(); + throw internal::UnpackError(oss.str()); + } + + // Use the integer parameter pack technique shown in + // http://stackoverflow.com/a/7858971/245265 + using Seq = typename internal::GenIntegerSequence::type; + invoke(std::move(inv), Seq()); + + return Outcome::deferred(); +} + +template +template +void CoroInvocationUnpacker::invoke(Invocation&& inv, + internal::IntegerSequence) +{ + auto slot = slot_; + boost::asio::spawn(inv.iosvc(), [slot, inv](Yield yield) + { + try + { + Array args = inv.args(); + using Getter = internal::UnpackedCoroArgGetter; + Outcome outcome = (*slot)(std::move(inv), + Getter::template get(args)..., + yield); + + switch (outcome.type()) + { + case Outcome::Type::deferred: + // Do nothing + break; + + case Outcome::Type::result: + inv.yield(std::move(outcome).asResult()); + break; + + case Outcome::Type::error: + inv.yield(std::move(outcome).asError()); + break; + + default: + assert(false && "Unexpected wamp::Outcome::Type enumerator"); + } + + } + catch (const Error& e) + { + inv.yield(e); + } + }); +} + +template +CoroInvocationUnpacker, TArgs...> +unpackedCoroRpc(TSlot&& slot) +{ + return CoroInvocationUnpacker, TArgs...>( + std::forward(slot) ); +} + +//------------------------------------------------------------------------------ +template +BasicCoroInvocationUnpacker::BasicCoroInvocationUnpacker(Slot slot) + : slot_(std::make_shared(std::move(slot))) +{} + +template +Outcome BasicCoroInvocationUnpacker::operator()(Invocation&& inv) +{ + if (inv.args().size() < sizeof...(A)) + { + std::ostringstream oss; + oss << "Expected " << sizeof...(A) + << " args, but only got " << inv.args().size(); + throw internal::UnpackError(oss.str()); + } + + // Use the integer parameter pack technique shown in + // http://stackoverflow.com/a/7858971/245265 + using Seq = typename internal::GenIntegerSequence::type; + using IsVoidResult = BoolConstant::value>; + invoke(IsVoidResult{}, std::move(inv), Seq()); + + return Outcome::deferred(); +} + +template +template +void BasicCoroInvocationUnpacker::invoke(TrueType, Invocation&& inv, + internal::IntegerSequence) +{ + auto slot = slot_; + boost::asio::spawn(inv.iosvc(), [slot, inv](Yield yield) + { + try + { + Array args = std::move(inv).args(); + using Getter = internal::UnpackedCoroArgGetter; + (*slot)(Getter::template get(args)..., yield); + inv.yield(); + } + catch (const Error& e) + { + inv.yield(e); + } + }); +} + +template +template +void BasicCoroInvocationUnpacker::invoke(FalseType, Invocation&& inv, + internal::IntegerSequence) +{ + auto slot = slot_; + boost::asio::spawn(inv.iosvc(), [slot, inv](Yield yield) + { + try + { + Array args = std::move(inv).args(); + using Getter = internal::UnpackedCoroArgGetter; + ResultType result = (*slot)(Getter::template get(args)..., + yield); + inv.yield(Result().withArgs(std::move(result))); + } + catch (const Error& e) + { + inv.yield(e); + } + }); +} + +template +BasicCoroInvocationUnpacker, TResult, TArgs...> +basicCoroRpc(TSlot&& slot) +{ + return BasicCoroInvocationUnpacker, TResult, TArgs...>( + std::forward(slot) ); +} + +} // namespace wamp diff --git a/cppwamp/include/cppwamp/internal/legacyasioendpoint.hpp b/cppwamp/include/cppwamp/internal/legacyasioendpoint.hpp deleted file mode 100644 index 9351f311..00000000 --- a/cppwamp/include/cppwamp/internal/legacyasioendpoint.hpp +++ /dev/null @@ -1,64 +0,0 @@ -/*------------------------------------------------------------------------------ - Copyright Butterfly Energy Systems 2014-2015. - Distributed under the Boost Software License, Version 1.0. - (See accompanying file LICENSE_1_0.txt or copy at - http://www.boost.org/LICENSE_1_0.txt) -------------------------------------------------------------------------------*/ - -#ifndef CPPWAMP_INTERNAL_LEGACYASIOENDPOINT_HPP -#define CPPWAMP_INTERNAL_LEGACYASIOENDPOINT_HPP - -#include -#include "../asiodefs.hpp" -#include "../rawsockoptions.hpp" -#include "asioendpoint.hpp" -#include "legacyasiotransport.hpp" -#include "rawsockhandshake.hpp" - -namespace wamp -{ - -namespace internal -{ - -//------------------------------------------------------------------------------ -template -class LegacyAsioEndpoint : - public AsioEndpoint -{ -public: - using Establisher = TEstablisher; - - LegacyAsioEndpoint(Establisher&& est, int codecId, - RawsockMaxLength maxLength) - : Base(std::move(est)), - codecId_(codecId), - maxLength_(maxLength) - {} - -protected: - using Handshake = internal::RawsockHandshake; - - virtual void onEstablished() override - { - Base::complete(codecId_, - RawsockHandshake::byteLengthOf(maxLength_), - RawsockHandshake::byteLengthOf(maxLength_)); - } - - virtual void onHandshakeReceived(Handshake) override {} - - virtual void onHandshakeSent(Handshake) override {} - -private: - using Base = AsioEndpoint; - - int codecId_; - RawsockMaxLength maxLength_; -}; - -} // namespace internal - -} // namespace wamp - -#endif // CPPWAMP_INTERNAL_LEGACYASIOENDPOINT_HPP diff --git a/cppwamp/include/cppwamp/internal/legacyasiotransport.hpp b/cppwamp/include/cppwamp/internal/legacyasiotransport.hpp deleted file mode 100644 index 6d80f818..00000000 --- a/cppwamp/include/cppwamp/internal/legacyasiotransport.hpp +++ /dev/null @@ -1,84 +0,0 @@ -/*------------------------------------------------------------------------------ - Copyright Butterfly Energy Systems 2014-2015. - Distributed under the Boost Software License, Version 1.0. - (See accompanying file LICENSE_1_0.txt or copy at - http://www.boost.org/LICENSE_1_0.txt) -------------------------------------------------------------------------------*/ - -#ifndef CPPWAMP_LEGACYASIOTRANSPORT_HPP -#define CPPWAMP_LEGACYASIOTRANSPORT_HPP - -#include "../error.hpp" -#include "asiotransport.hpp" -#include "endian.hpp" - -namespace wamp -{ - -namespace internal -{ - -//------------------------------------------------------------------------------ -template -class LegacyAsioTransport : public AsioTransport -{ -private: - using Base = AsioTransport; - -public: - using Ptr = std::shared_ptr; - using Socket = TSocket; - using SocketPtr = typename Base::SocketPtr; - using Buffer = typename Base::Buffer; - using PingHandler = typename Base::PingHandler; - - static Ptr create(SocketPtr&& socket, size_t maxTxLength, - size_t maxRxLength) - { - return Ptr(new LegacyAsioTransport(std::move(socket), maxTxLength, - maxRxLength)); - } - - void ping(Buffer, PingHandler) - { - CPPWAMP_LOGIC_ERROR("Ping messages are not supported " - "on LegacyAsioTransport"); - } - - using Base::post; - -protected: - LegacyAsioTransport(SocketPtr&& socket, size_t maxTxLength, - size_t maxRxLength) - : Base(std::move(socket), maxTxLength, maxRxLength) - {} - - void sendMessage(RawsockMsgType type, Buffer&& message) - { - assert(this->isOpen() && "Attempting to send on bad transport"); - assert((message->length() <= this->maxSendLength()) && - "Outgoing message is longer than allowed by transport"); - - message->header_ = endian::nativeToBig32(message->length()); - if (this->txQueue_.empty()) - transmit(std::move(message)); - else - this->txQueue_.push(std::move(message)); - } - - void processHeader() - { - size_t length = endian::bigToNative32(this->rxBuffer_->header_); - if ( this->check(length <= this->maxReceiveLength(), - TransportErrc::badRxLength) ) - { - this->receivePayload(RawsockMsgType::wamp, length); - } - } -}; - -} // namespace internal - -} // namespace wamp - -#endif // CPPWAMP_LEGACYASIOTRANSPORT_HPP diff --git a/cppwamp/include/cppwamp/internal/passkey.hpp b/cppwamp/include/cppwamp/internal/passkey.hpp index dbc769cc..266e6fce 100644 --- a/cppwamp/include/cppwamp/internal/passkey.hpp +++ b/cppwamp/include/cppwamp/internal/passkey.hpp @@ -19,7 +19,7 @@ namespace internal { PassKey() {} - template friend class Dialogue; + template friend class Peer; template friend class Client; friend class wamp::Session; }; diff --git a/cppwamp/include/cppwamp/internal/dialogue.hpp b/cppwamp/include/cppwamp/internal/peer.hpp similarity index 95% rename from cppwamp/include/cppwamp/internal/dialogue.hpp rename to cppwamp/include/cppwamp/internal/peer.hpp index 13a2ff32..8c2e070f 100644 --- a/cppwamp/include/cppwamp/internal/dialogue.hpp +++ b/cppwamp/include/cppwamp/internal/peer.hpp @@ -16,10 +16,11 @@ #include #include #include "../codec.hpp" -#include "../dialoguedata.hpp" +#include "../peerdata.hpp" #include "../error.hpp" #include "../variant.hpp" #include "../wampdefs.hpp" +#include "asynctask.hpp" #include "wampmessage.hpp" namespace wamp @@ -30,11 +31,10 @@ namespace internal //------------------------------------------------------------------------------ // Base class providing session functionality common to both clients and -// routers. This class is extended by Client to implement a client session. +// router peers. This class is extended by Client to implement a client session. //------------------------------------------------------------------------------ template -class Dialogue : - public std::enable_shared_from_this> +class Peer : public std::enable_shared_from_this> { public: using Codec = TCodec; @@ -50,7 +50,7 @@ class Dialogue : using RxHandler = std::function; using LogHandler = std::function; - Dialogue(TransportPtr&& transport) + Peer(TransportPtr&& transport) : transport_(std::move(transport)) { initMessages(); @@ -67,7 +67,7 @@ class Dialogue : if (!transport_->isStarted()) { - std::weak_ptr self(this->shared_from_this()); + std::weak_ptr self(this->shared_from_this()); transport_->start( [self](Buffer buf) @@ -169,7 +169,8 @@ class Dialogue : fail(make_error_code(errc)); } - void setTraceHandler(LogHandler handler) {traceHandler_ = handler;} + void setTraceHandler(AsyncTask handler) + {traceHandler_ = std::move(handler);} template void post(TFunctor&& fn) {transport_->post(std::forward(fn));} @@ -292,7 +293,7 @@ class Dialogue : if (state_ != State::shuttingDown) { auto self = this->shared_from_this(); - post(std::bind(&Dialogue::onInbound, self, std::move(msg))); + post(std::bind(&Peer::onInbound, self, std::move(msg))); } break; } @@ -315,7 +316,7 @@ class Dialogue : assert(state_ == State::establishing); state_ = State::established; auto self = this->shared_from_this(); - post(std::bind(&Dialogue::onInbound, self, std::move(msg))); + post(std::bind(&Peer::onInbound, self, std::move(msg))); } void processWelcome(Message&& msg) @@ -441,7 +442,7 @@ class Dialogue : } TransportPtr transport_; - LogHandler traceHandler_; + AsyncTask traceHandler_; State state_ = State::closed; RequestMap requestMap_; RequestId nextRequestId_ = 0; diff --git a/cppwamp/include/cppwamp/internal/dialoguedata.ipp b/cppwamp/include/cppwamp/internal/peerdata.ipp similarity index 100% rename from cppwamp/include/cppwamp/internal/dialoguedata.ipp rename to cppwamp/include/cppwamp/internal/peerdata.ipp diff --git a/cppwamp/include/cppwamp/internal/session.ipp b/cppwamp/include/cppwamp/internal/session.ipp index 4d959afd..9c4e2e24 100644 --- a/cppwamp/include/cppwamp/internal/session.ipp +++ b/cppwamp/include/cppwamp/internal/session.ipp @@ -6,6 +6,7 @@ ------------------------------------------------------------------------------*/ #include +#include #include "config.hpp" namespace wamp @@ -16,11 +17,13 @@ namespace wamp @return A shared pointer to the created session object. */ //------------------------------------------------------------------------------ CPPWAMP_INLINE Session::Ptr Session::create( + AsioService& userIosvc, /**< IO service in which to post all + user-provided handlers. */ const Connector::Ptr& connector /**< Connection details for the transport to use. */ ) { - return Ptr(new Session(connector)); + return Ptr(new Session(userIosvc, {connector})); } //------------------------------------------------------------------------------ @@ -30,11 +33,13 @@ CPPWAMP_INLINE Session::Ptr Session::create( @throws error::Logic if `connectors.empty() == true` */ //------------------------------------------------------------------------------ CPPWAMP_INLINE Session::Ptr Session::create( + AsioService& userIosvc, /**< IO service in which to post all + user-provided handlers. */ const ConnectorList& connectors /**< A list of connection details for the transports to use. */ ) { - return Ptr(new Session(connectors)); + return Ptr(new Session(userIosvc, connectors)); } //------------------------------------------------------------------------------ @@ -60,6 +65,12 @@ CPPWAMP_INLINE Session::~Session() reset(); } +//------------------------------------------------------------------------------ +CPPWAMP_INLINE AsioService& Session::userIosvc() const +{ + return userIosvc_; +} + //------------------------------------------------------------------------------ CPPWAMP_INLINE SessionState Session::state() const { @@ -82,7 +93,11 @@ CPPWAMP_INLINE void Session::setWarningHandler( LogHandler handler /**< Function called to handle warnings. */ ) { - warningHandler_ = std::move(handler); + warningHandler_ = AsyncTask + { + userIosvc_, + [handler](AsyncResult warning) {handler(warning.get());} + }; } //------------------------------------------------------------------------------ @@ -96,7 +111,11 @@ CPPWAMP_INLINE void Session::setTraceHandler( LogHandler handler /**< Function called to handle log traces. */ ) { - traceHandler_ = std::move(handler); + traceHandler_ = AsyncTask + { + userIosvc_, + [handler](AsyncResult trace) {handler(trace.get());} + }; } //------------------------------------------------------------------------------ @@ -127,7 +146,7 @@ CPPWAMP_INLINE void Session::connect( state_ = State::connecting; isTerminating_ = false; currentConnector_ = nullptr; - doConnect(0, handler); + doConnect(0, {userIosvc_, handler}); } //------------------------------------------------------------------------------ @@ -151,7 +170,7 @@ CPPWAMP_INLINE void Session::join( ) { CPPWAMP_LOGIC_CHECK(state() == State::closed, "Session is not closed"); - impl_->join(std::move(realm), std::move(handler)); + impl_->join(std::move(realm), {userIosvc_, std::move(handler)}); } //------------------------------------------------------------------------------ @@ -174,7 +193,7 @@ CPPWAMP_INLINE void Session::leave( { CPPWAMP_LOGIC_CHECK(state() == State::established, "Session is not established"); - impl_->leave(std::move(reason), std::move(handler)); + impl_->leave(std::move(reason), {userIosvc_, std::move(handler)}); } //------------------------------------------------------------------------------ @@ -246,7 +265,7 @@ CPPWAMP_INLINE void Session::subscribe( CPPWAMP_LOGIC_CHECK(state() == State::established, "Session is not established"); using std::move; - impl_->subscribe(move(topic), move(slot), move(handler)); + impl_->subscribe(move(topic), move(slot), {userIosvc_, move(handler)}); } //------------------------------------------------------------------------------ @@ -300,7 +319,7 @@ CPPWAMP_INLINE void Session::unsubscribe( CPPWAMP_LOGIC_CHECK(!!sub, "The subscription is empty"); CPPWAMP_LOGIC_CHECK(state() == State::established, "Session is not established"); - impl_->unsubscribe(sub, std::move(handler)); + impl_->unsubscribe(sub, {userIosvc_, std::move(handler)}); } //------------------------------------------------------------------------------ @@ -335,7 +354,7 @@ CPPWAMP_INLINE void Session::publish( { CPPWAMP_LOGIC_CHECK(state() == State::established, "Session is not established"); - impl_->publish(std::move(pub), std::move(handler)); + impl_->publish(std::move(pub), {userIosvc_, std::move(handler)}); } //------------------------------------------------------------------------------ @@ -353,13 +372,17 @@ CPPWAMP_INLINE void Session::publish( - Some other `std::error_code` for protocol and transport errors. @throws error::Logic if `this->state() != SessionState::established` */ //------------------------------------------------------------------------------ -CPPWAMP_INLINE void Session::enroll(Procedure procedure, CallSlot slot, - AsyncHandler handler) +CPPWAMP_INLINE void Session::enroll( + Procedure procedure, /**< The procedure to register. */ + CallSlot slot, /**< The handler to execute when the RPC is invoked. */ + AsyncHandler handler /**< Handler to invoke when + the enroll operation completes. */ +) { CPPWAMP_LOGIC_CHECK(state() == State::established, "Session is not established"); using std::move; - impl_->enroll(move(procedure), move(slot), move(handler)); + impl_->enroll(move(procedure), move(slot), {userIosvc_, move(handler)}); } //------------------------------------------------------------------------------ @@ -410,7 +433,7 @@ CPPWAMP_INLINE void Session::unregister( CPPWAMP_LOGIC_CHECK(state() == State::established, "Session is not established"); if (impl_) - impl_->unregister(reg, std::move(handler)); + impl_->unregister(reg, {userIosvc_, std::move(handler)}); } //------------------------------------------------------------------------------ @@ -435,24 +458,26 @@ CPPWAMP_INLINE void Session::call( { CPPWAMP_LOGIC_CHECK(state() == State::established, "Session is not established"); - impl_->call(std::move(rpc), std::move(handler)); + impl_->call(std::move(rpc), {userIosvc_, std::move(handler)}); } //------------------------------------------------------------------------------ -CPPWAMP_INLINE Session::Session(const Connector::Ptr& connector) - : connectors_({connector->clone()}) {} - -//------------------------------------------------------------------------------ -CPPWAMP_INLINE Session::Session(const ConnectorList& connectors) +CPPWAMP_INLINE Session::Session(AsioService& userIosvc, + const ConnectorList& connectors) + : userIosvc_(userIosvc) { CPPWAMP_LOGIC_CHECK(!connectors.empty(), "Connector list is empty"); for (const auto& cnct: connectors) connectors_.push_back(cnct->clone()); + + setWarningHandler( [](std::string warning) + { + std::cerr << "[CppWAMP] Warning: " << warning << "\n"; + }); } //------------------------------------------------------------------------------ -CPPWAMP_INLINE void Session::doConnect(size_t index, - AsyncHandler handler) +CPPWAMP_INLINE void Session::doConnect(size_t index, AsyncTask handler) { currentConnector_ = connectors_.at(index); std::weak_ptr self(shared_from_this()); @@ -479,7 +504,7 @@ CPPWAMP_INLINE void Session::doConnect(size_t index, ec = make_error_code( SessionErrc::allTransportsFailed); } - handler(ec); + std::move(handler)(ec); } } } @@ -489,7 +514,7 @@ CPPWAMP_INLINE void Session::doConnect(size_t index, state_ = State::closed; impl_ = impl; impl_->setLogHandlers(warningHandler_, traceHandler_); - handler(index); + std::move(handler)(index); } } }); @@ -499,9 +524,5 @@ CPPWAMP_INLINE void Session::doConnect(size_t index, CPPWAMP_INLINE std::shared_ptr Session::impl() {return impl_;} -//------------------------------------------------------------------------------ -CPPWAMP_INLINE void Session::postpone(std::function functor) - {impl_->postpone(functor);} - } // namespace wamp diff --git a/cppwamp/include/cppwamp/internal/sessiondata.ipp b/cppwamp/include/cppwamp/internal/sessiondata.ipp index 609f0f44..e6df7757 100644 --- a/cppwamp/include/cppwamp/internal/sessiondata.ipp +++ b/cppwamp/include/cppwamp/internal/sessiondata.ipp @@ -293,12 +293,23 @@ CPPWAMP_INLINE String&Pub::topic(internal::PassKey) {return topic_;} // Event //****************************************************************************** +/** @post `this->empty() == true` */ CPPWAMP_INLINE Event::Event() {} +CPPWAMP_INLINE bool Event::empty() const {return iosvc_ == nullptr;} + CPPWAMP_INLINE SubscriptionId Event::subId() const {return subId_;} CPPWAMP_INLINE PublicationId Event::pubId() const {return pubId_;} +/** @returns the same object as Session::userIosvc(). + @pre `this->empty() == false` */ +CPPWAMP_INLINE AsioService& Event::iosvc() const +{ + CPPWAMP_LOGIC_CHECK(!empty(), "Event is empty"); + return *iosvc_; +} + /** @details This function checks the value of the `EVENT.Details.publisher|integer` detail. See [Publisher Identification][pub_ident] in the advanced WAMP spec. @@ -336,10 +347,11 @@ CPPWAMP_INLINE Variant Event::topic() const } CPPWAMP_INLINE Event::Event(internal::PassKey, SubscriptionId subId, - PublicationId pubId, Object&& details) + PublicationId pubId, AsioService* iosvc, Object&& details) : Options(std::move(details)), subId_(subId), - pubId_(pubId) + pubId_(pubId), + iosvc_(iosvc) {} CPPWAMP_INLINE std::ostream& operator<<(std::ostream& out, const Event& event) @@ -539,6 +551,34 @@ CPPWAMP_INLINE Outcome::~Outcome() CPPWAMP_INLINE Outcome::Type Outcome::type() const {return type_;} +/** @pre this->type() == Type::result */ +CPPWAMP_INLINE const Result& Outcome::asResult() const & +{ + assert(type_ == Type::result); + return value_.result; +} + +/** @pre this->type() == Type::result */ +CPPWAMP_INLINE Result&& Outcome::asResult() && +{ + assert(type_ == Type::result); + return std::move(value_.result); +} + +/** @pre this->type() == Type::error */ +CPPWAMP_INLINE const Error& Outcome::asError() const & +{ + assert(type_ == Type::error); + return value_.error; +} + +/** @pre this->type() == Type::error */ +CPPWAMP_INLINE Error&& Outcome::asError() && +{ + assert(type_ == Type::error); + return std::move(value_.error); +} + /** @post `this->type() == other.type()` */ CPPWAMP_INLINE Outcome& Outcome::operator=(const Outcome& other) { @@ -653,25 +693,16 @@ CPPWAMP_INLINE void Outcome::destruct() } } -CPPWAMP_INLINE Result& Outcome::result(internal::PassKey) -{ - assert(type_ == Type::result); - return value_.result; -} - -CPPWAMP_INLINE Error& Outcome::error(internal::PassKey) -{ - assert(type_ == Type::error); - return value_.error; -} - //****************************************************************************** // Invocation //****************************************************************************** +/** @post `this->empty() == true` */ CPPWAMP_INLINE Invocation::Invocation() {} +CPPWAMP_INLINE bool Invocation::empty() const {return iosvc_ == nullptr;} + CPPWAMP_INLINE bool Invocation::calleeHasExpired() const { return callee_.expired(); @@ -679,6 +710,14 @@ CPPWAMP_INLINE bool Invocation::calleeHasExpired() const CPPWAMP_INLINE RequestId Invocation::requestId() const {return id_;} +/** @returns the same object as Session::userIosvc(). + @pre `this->empty() == false` */ +CPPWAMP_INLINE AsioService& Invocation::iosvc() const +{ + CPPWAMP_LOGIC_CHECK(!empty(), "Invocation is empty"); + return *iosvc_; +} + /** @details This function checks if the `INVOCATION.Details.receive_progress|bool` detail is `true`. See [Progressive Call Results][prog_calls] in the advanced @@ -744,10 +783,11 @@ CPPWAMP_INLINE void Invocation::yield(Error error) const } CPPWAMP_INLINE Invocation::Invocation(internal::PassKey, CalleePtr callee, - RequestId id, Object&& details) + RequestId id, AsioService* iosvc, Object&& details) : Options(std::move(details)), callee_(callee), - id_(id) + id_(id), + iosvc_(iosvc) {} CPPWAMP_INLINE std::ostream& operator<<(std::ostream& out, diff --git a/cppwamp/include/cppwamp/internal/subscriber.hpp b/cppwamp/include/cppwamp/internal/subscriber.hpp index 9c9de02f..5a9a5dec 100644 --- a/cppwamp/include/cppwamp/internal/subscriber.hpp +++ b/cppwamp/include/cppwamp/internal/subscriber.hpp @@ -10,8 +10,8 @@ #include #include -#include "../asyncresult.hpp" #include "../wampdefs.hpp" +#include "asynctask.hpp" namespace wamp { @@ -29,10 +29,10 @@ class Subscriber virtual ~Subscriber() {} - virtual void unsubscribe(const Subscription& handle) = 0; + virtual void unsubscribe(const Subscription& sub) = 0; - virtual void unsubscribe(const Subscription& handle, - AsyncHandler handler) = 0; + virtual void unsubscribe(const Subscription& sub, + AsyncTask&& handler) = 0; }; } // namespace internal diff --git a/cppwamp/include/cppwamp/internal/tcp.ipp b/cppwamp/include/cppwamp/internal/tcp.ipp index 7f21312a..0b7b8716 100644 --- a/cppwamp/include/cppwamp/internal/tcp.ipp +++ b/cppwamp/include/cppwamp/internal/tcp.ipp @@ -7,7 +7,6 @@ #include #include "asioconnector.hpp" -#include "legacyasioendpoint.hpp" #include "rawsockconnector.hpp" #include "tcpopener.hpp" @@ -23,14 +22,4 @@ Connector::Ptr connector(AsioService& iosvc, TcpHost host) return ConcreteConnector::create(iosvc, std::move(host)); } - -//------------------------------------------------------------------------------ -template -Connector::Ptr legacyConnector(AsioService& iosvc, TcpHost host) -{ - using Endpoint = internal::LegacyAsioEndpoint; - using ConcreteConnector = internal::RawsockConnector; - return ConcreteConnector::create(iosvc, std::move(host)); -} - } // namespace wamp diff --git a/cppwamp/include/cppwamp/internal/uds.ipp b/cppwamp/include/cppwamp/internal/uds.ipp index 16ea1da4..d426b251 100644 --- a/cppwamp/include/cppwamp/internal/uds.ipp +++ b/cppwamp/include/cppwamp/internal/uds.ipp @@ -7,7 +7,6 @@ #include #include "asioconnector.hpp" -#include "legacyasioendpoint.hpp" #include "rawsockconnector.hpp" #include "udsopener.hpp" @@ -23,14 +22,4 @@ Connector::Ptr connector(AsioService& iosvc, UdsPath path) return ConcreteConnector::create(iosvc, std::move(path)); } - -//------------------------------------------------------------------------------ -template -Connector::Ptr legacyConnector(AsioService& iosvc, UdsPath path) -{ - using Endpoint = internal::LegacyAsioEndpoint; - using ConcreteConnector = internal::RawsockConnector; - return ConcreteConnector::create(iosvc, std::move(path)); -} - } // namespace wamp diff --git a/cppwamp/include/cppwamp/internal/unpacker.ipp b/cppwamp/include/cppwamp/internal/unpacker.ipp index 8375b8af..36122fc0 100644 --- a/cppwamp/include/cppwamp/internal/unpacker.ipp +++ b/cppwamp/include/cppwamp/internal/unpacker.ipp @@ -107,9 +107,9 @@ void BasicEventUnpacker::operator()(Event&& event) template template void BasicEventUnpacker::invoke(Event&& event, - internal::IntegerSequence) + internal::IntegerSequence) { - Array args = event.args(); + Array args = std::move(event).args(); using Getter = internal::UnpackedArgGetter; slot_(Getter::template get(args)...); } @@ -162,6 +162,7 @@ InvocationUnpacker, TArgs...> unpackedRpc(TSlot&& slot) std::forward(slot) ); } + //------------------------------------------------------------------------------ template BasicInvocationUnpacker::BasicInvocationUnpacker(Slot slot) @@ -191,7 +192,7 @@ template Outcome BasicInvocationUnpacker::invoke(TrueType, Invocation&& inv, internal::IntegerSequence) { - Array args = inv.args(); + Array args = std::move(inv).args(); using Getter = internal::UnpackedArgGetter; slot_(Getter::template get(args)...); return {}; @@ -202,10 +203,10 @@ template Outcome BasicInvocationUnpacker::invoke(FalseType, Invocation&& inv, internal::IntegerSequence) { - Array args = inv.args(); + Array args = std::move(inv).args(); using Getter = internal::UnpackedArgGetter; ResultType result = slot_(Getter::template get(args)...); - return wamp::Result().withArgs(std::move(result)); + return Result().withArgs(std::move(result)); } template diff --git a/cppwamp/include/cppwamp/dialoguedata.hpp b/cppwamp/include/cppwamp/peerdata.hpp similarity index 94% rename from cppwamp/include/cppwamp/dialoguedata.hpp rename to cppwamp/include/cppwamp/peerdata.hpp index 0748dbe0..41167973 100644 --- a/cppwamp/include/cppwamp/dialoguedata.hpp +++ b/cppwamp/include/cppwamp/peerdata.hpp @@ -5,8 +5,8 @@ http://www.boost.org/LICENSE_1_0.txt) ------------------------------------------------------------------------------*/ -#ifndef CPPWAMP_DIALOGUEDATA_HPP -#define CPPWAMP_DIALOGUEDATA_HPP +#ifndef CPPWAMP_PEERDATA_HPP +#define CPPWAMP_PEERDATA_HPP #include "options.hpp" #include "payload.hpp" @@ -74,7 +74,7 @@ class Error : public Options, public Payload } // namespace wamp #ifndef CPPWAMP_COMPILED_LIB -#include "./internal/dialoguedata.ipp" +#include "./internal/peerdata.ipp" #endif -#endif // CPPWAMP_DIALOGUEDATA_HPP +#endif // CPPWAMP_PEERDATA_HPP diff --git a/cppwamp/include/cppwamp/session.hpp b/cppwamp/include/cppwamp/session.hpp index 3a21ac12..0e09897e 100644 --- a/cppwamp/include/cppwamp/session.hpp +++ b/cppwamp/include/cppwamp/session.hpp @@ -19,14 +19,16 @@ #include #include #include +#include "asiodefs.hpp" #include "asyncresult.hpp" -#include "dialoguedata.hpp" +#include "peerdata.hpp" #include "connector.hpp" #include "error.hpp" #include "registration.hpp" #include "sessiondata.hpp" #include "subscription.hpp" #include "wampdefs.hpp" +#include "internal/asynctask.hpp" #include "internal/clientinterface.hpp" namespace wamp @@ -54,6 +56,10 @@ namespace wamp listed under **Returns** refer to results that are returned via AsyncResult. + The `boost::asio::io_service` passed via `create()` is used when executing + handler functions passed-in by the user. This can be the same, or different + than the `io_service` passed to the `Connector` creation functions. + @par Aborting Asynchronous Operations All pending asynchronous operations can be _aborted_ by dropping the client connection via Session::disconnect. Pending post-join operations can be also @@ -93,10 +99,10 @@ class Session : public std::enable_shared_from_this using CallSlot = std::function; /** Creates a new Session instance. */ - static Ptr create(const Connector::Ptr& connector); + static Ptr create(AsioService& userIosvc, const Connector::Ptr& connector); /** Creates a new Session instance. */ - static Ptr create(const ConnectorList& connectors); + static Ptr create(AsioService& userIosvc, const ConnectorList& connectors); /** Obtains a dictionary of roles and features supported on the client side. */ @@ -113,6 +119,10 @@ class Session : public std::enable_shared_from_this /// @name Observers /// @{ + + /** Obtains the IO service used to execute user-provided handlers. */ + AsioService& userIosvc() const; + /** Returns the current state of the session. */ SessionState state() const; /// @} @@ -182,27 +192,23 @@ class Session : public std::enable_shared_from_this /// @} protected: - explicit Session(const Connector::Ptr& connector); + explicit Session(AsioService& userIosvc, const ConnectorList& connectors); - explicit Session(const ConnectorList& connectors); - - void doConnect(size_t index, AsyncHandler handler); + void doConnect(size_t index, AsyncTask handler); std::shared_ptr impl(); - void postpone(std::function functor); - private: + AsioService& userIosvc_; ConnectorList connectors_; Connector::Ptr currentConnector_; - LogHandler warningHandler_; - LogHandler traceHandler_; + AsyncTask warningHandler_; + AsyncTask traceHandler_; SessionState state_ = State::disconnected; bool isTerminating_ = false; std::shared_ptr impl_; }; - } // namespace wamp #ifndef CPPWAMP_COMPILED_LIB diff --git a/cppwamp/include/cppwamp/sessiondata.hpp b/cppwamp/include/cppwamp/sessiondata.hpp index 3449b258..57fa0e55 100644 --- a/cppwamp/include/cppwamp/sessiondata.hpp +++ b/cppwamp/include/cppwamp/sessiondata.hpp @@ -13,7 +13,8 @@ #include #include #include -#include "dialoguedata.hpp" +#include "asiodefs.hpp" +#include "peerdata.hpp" #include "options.hpp" #include "payload.hpp" #include "variant.hpp" @@ -185,12 +186,19 @@ class Event : public Options, public Payload /** Default constructor. */ Event(); + /** Returns `false` if the Event has been initialized and is ready + for use. */ + bool empty() const; + /** Obtains the subscription ID associated with this event. */ PublicationId subId() const; /** Obtains the publication ID associated with this event. */ PublicationId pubId() const; + /** Obtains the IO service used to execute user-provided handlers. */ + AsioService& iosvc() const; + /** Obtains an optional publisher ID integer. */ Variant publisher() const; @@ -203,12 +211,13 @@ class Event : public Options, public Payload private: SubscriptionId subId_ = -1; - PublicationId pubId_ = -1; + PublicationId pubId_ = -1; + AsioService* iosvc_ = nullptr; public: // Internal use only Event(internal::PassKey, SubscriptionId subId, PublicationId pubId, - Object&& details); + AsioService* iosvc, Object&& details); }; std::ostream& operator<<(std::ostream& out, const Event& event); @@ -370,6 +379,18 @@ class Outcome /** Obtains the object type being contained. */ Type type() const; + /** Accesses the stored Result object. */ + const Result& asResult() const &; + + /** Steals the stored Result object. */ + Result&& asResult() &&; + + /** Accesses the stored Error object. */ + const Error& asError() const &; + + /** Steals the stored Error object. */ + Error&& asError() &&; + /** Copy-assignment operator. */ Outcome& operator=(const Outcome& other); @@ -392,11 +413,6 @@ class Outcome Result result; Error error; } value_; - -public: - // Internal use only - Result& result(internal::PassKey); - Error& error(internal::PassKey); }; @@ -412,6 +428,10 @@ class Invocation : public Options, public Payload /** Default constructor */ Invocation(); + /** Returns `false` if the Invocation has been initialized and is ready + for use. */ + bool empty() const; + /** Determines if the Session object that dispatched this invocation still exists or has expired. */ bool calleeHasExpired() const; @@ -419,6 +439,9 @@ class Invocation : public Options, public Payload /** Returns the request ID associated with this RPC invocation. */ RequestId requestId() const; + /** Obtains the IO service used to execute user-provided handlers. */ + AsioService& iosvc() const; + /** Checks if the caller requested progressive results. */ bool isProgressive() const; @@ -442,11 +465,12 @@ class Invocation : public Options, public Payload // Internal use only using CalleePtr = std::weak_ptr; Invocation(internal::PassKey, CalleePtr callee, RequestId id, - Object&& details); + AsioService* iosvc, Object&& details); private: CalleePtr callee_; RequestId id_ = -1; + AsioService* iosvc_ = nullptr; }; std::ostream& operator<<(std::ostream& out, const Invocation& inv); diff --git a/cppwamp/include/cppwamp/tcp.hpp b/cppwamp/include/cppwamp/tcp.hpp index 40ff0bbb..2e9fbe0c 100644 --- a/cppwamp/include/cppwamp/tcp.hpp +++ b/cppwamp/include/cppwamp/tcp.hpp @@ -35,25 +35,6 @@ Connector::Ptr connector( TcpHost host ///< TCP host address and other socket options. ); - -//------------------------------------------------------------------------------ -/** Creates a Connector that can establish a TCP raw socket transport on - non-conformant routers. - This is an interim Connector for connecting to routers that do not yet - support handshaking on their raw socket transports. Handshaking was - introduced in [version e2c4e57][e2c4e57] of the advanced WAMP specification. - [e2c4e57]: https://github.com/tavendo/WAMP/commit/e2c4e5775d89fa6d991eb2e138e2f42ca2469fa8 - @relates TcpHost - @returns a `std::shared_ptr` to a Connector - @tparam TCodec The serialization to use over this transport. - @see Connector, Json, Msgpack, makeUds, makeLegacyTcp */ -//------------------------------------------------------------------------------ -template -Connector::Ptr legacyConnector( - AsioService& iosvc, ///< The I/O service to be used by the transport. - TcpHost host ///< TCP host address and other socket options. -); - } // namespace wamp #ifndef CPPWAMP_COMPILED_LIB diff --git a/cppwamp/include/cppwamp/tcphost.hpp b/cppwamp/include/cppwamp/tcphost.hpp index 309fd7a5..cfef6ccb 100644 --- a/cppwamp/include/cppwamp/tcphost.hpp +++ b/cppwamp/include/cppwamp/tcphost.hpp @@ -27,7 +27,7 @@ namespace wamp //------------------------------------------------------------------------------ /** Contains TCP host address information, as well as other socket options. - @see IpOptions, RawsockOptions, connector, legacyConnector */ + @see IpOptions, RawsockOptions, connector */ //------------------------------------------------------------------------------ class TcpHost : public IpOptions { diff --git a/cppwamp/include/cppwamp/types/set.hpp b/cppwamp/include/cppwamp/types/set.hpp new file mode 100644 index 00000000..6be0300c --- /dev/null +++ b/cppwamp/include/cppwamp/types/set.hpp @@ -0,0 +1,72 @@ +/*------------------------------------------------------------------------------ + Copyright Butterfly Energy Systems 2014-2015. + Distributed under the Boost Software License, Version 1.0. + (See accompanying file LICENSE_1_0.txt or copy at + http://www.boost.org/LICENSE_1_0.txt) +------------------------------------------------------------------------------*/ + +#ifndef CPPWAMP_TYPES_SET_HPP +#define CPPWAMP_TYPES_SET_HPP + +//------------------------------------------------------------------------------ +/** @file + Provides facilities allowing Variant to interoperate with std::set. */ +//------------------------------------------------------------------------------ + +#include +#include "../conversion.hpp" +#include "../error.hpp" + +namespace wamp +{ + +//------------------------------------------------------------------------------ +/** Performs the conversion from an array variant to a `std::set`. + Users should not use this function directly. Use Variant::to instead. */ +//------------------------------------------------------------------------------ +template +void convert(FromVariantConverter& conv, std::set& set) +{ + const auto& variant = conv.variant(); + if (!variant.is()) + { + throw error::Conversion("Attempting to convert non-array variant " + "to std::set"); + } + + std::set newSet; + const auto& array = variant.as(); + for (Array::size_type i=0; i()); + } + catch (const error::Conversion& e) + { + std::string msg = e.what(); + msg += " (for element #" + std::to_string(i) + ")"; + throw error::Conversion(msg); + } + } + set = std::move(newSet); +} + +//------------------------------------------------------------------------------ +/** Performs the conversion from a `std::set` to an array variant. + Users should not use this function directly. Use Variant::from instead. */ +//------------------------------------------------------------------------------ +template +void convert(ToVariantConverter& conv, std::set& set) +{ + Array array; + for (const auto& elem: set) + { + array.emplace_back(Variant::from(elem)); + } + conv.variant() = std::move(array); +} + +} // namespace wamp + +#endif // CPPWAMP_TYPES_SET_HPP diff --git a/cppwamp/include/cppwamp/types/unorderedmap.hpp b/cppwamp/include/cppwamp/types/unorderedmap.hpp index 52797f70..b2decd37 100644 --- a/cppwamp/include/cppwamp/types/unorderedmap.hpp +++ b/cppwamp/include/cppwamp/types/unorderedmap.hpp @@ -14,9 +14,10 @@ std::unordered_map. */ //------------------------------------------------------------------------------ -#include #include +#include #include "../conversion.hpp" +#include "../error.hpp" namespace wamp { diff --git a/cppwamp/include/cppwamp/types/unorderedset.hpp b/cppwamp/include/cppwamp/types/unorderedset.hpp new file mode 100644 index 00000000..fd4bcaeb --- /dev/null +++ b/cppwamp/include/cppwamp/types/unorderedset.hpp @@ -0,0 +1,73 @@ +/*------------------------------------------------------------------------------ + Copyright Butterfly Energy Systems 2014-2015. + Distributed under the Boost Software License, Version 1.0. + (See accompanying file LICENSE_1_0.txt or copy at + http://www.boost.org/LICENSE_1_0.txt) +------------------------------------------------------------------------------*/ + +#ifndef CPPWAMP_TYPES_UNORDEREDSET_HPP +#define CPPWAMP_TYPES_UNORDEREDSET_HPP + +//------------------------------------------------------------------------------ +/** @file + Provides facilities allowing Variant to interoperate + with std::unordered_set. */ +//------------------------------------------------------------------------------ + +#include +#include "../conversion.hpp" +#include "../error.hpp" + +namespace wamp +{ + +//------------------------------------------------------------------------------ +/** Performs the conversion from an array variant to a `std::unordered_set`. + Users should not use this function directly. Use Variant::to instead. */ +//------------------------------------------------------------------------------ +template +void convert(FromVariantConverter& conv, std::unordered_set& set) +{ + const auto& variant = conv.variant(); + if (!variant.is()) + { + throw error::Conversion("Attempting to convert non-array variant " + "to std::unordered_set"); + } + + std::unordered_set newSet; + const auto& array = variant.as(); + for (Array::size_type i=0; i()); + } + catch (const error::Conversion& e) + { + std::string msg = e.what(); + msg += " (for element #" + std::to_string(i) + ")"; + throw error::Conversion(msg); + } + } + set = std::move(newSet); +} + +//------------------------------------------------------------------------------ +/** Performs the conversion from a `std::unordered_set` to an array variant. + Users should not use this function directly. Use Variant::from instead. */ +//------------------------------------------------------------------------------ +template +void convert(ToVariantConverter& conv, std::unordered_set& set) +{ + Array array; + for (const auto& elem: set) + { + array.emplace_back(Variant::from(elem)); + } + conv.variant() = std::move(array); +} + +} // namespace wamp + +#endif // CPPWAMP_TYPES_UNORDEREDSET_HPP diff --git a/cppwamp/include/cppwamp/uds.hpp b/cppwamp/include/cppwamp/uds.hpp index 2f6f46d5..ec7b4721 100644 --- a/cppwamp/include/cppwamp/uds.hpp +++ b/cppwamp/include/cppwamp/uds.hpp @@ -35,25 +35,6 @@ Connector::Ptr connector( UdsPath host ///< Unix domain socket path and other socket options. ); - -//------------------------------------------------------------------------------ -/** Creates a Connector that can establish a Unix domain socket transport on - non-conformant routers. - This is an interim Connector for connecting to routers that do not yet - support handshaking on their raw socket transports. Handshaking was - introduced in [version e2c4e57][e2c4e57] of the advanced WAMP specification. - [e2c4e57]: https://github.com/tavendo/WAMP/commit/e2c4e5775d89fa6d991eb2e138e2f42ca2469fa8 - @relates UdsPath - @returns a `std::shared_ptr` to a Connector - @tparam TCodec The serialization to use over this transport. - @see Connector, Json, Msgpack */ -//------------------------------------------------------------------------------ -template -Connector::Ptr legacyConnector( - AsioService& iosvc, ///< The I/O service to be used by the transport. - UdsPath host ///< Unix domain socket path and other socket options. -); - } // namespace wamp #ifndef CPPWAMP_COMPILED_LIB diff --git a/cppwamp/include/cppwamp/udspath.hpp b/cppwamp/include/cppwamp/udspath.hpp index 064e2e33..1840248c 100644 --- a/cppwamp/include/cppwamp/udspath.hpp +++ b/cppwamp/include/cppwamp/udspath.hpp @@ -27,7 +27,7 @@ namespace wamp //------------------------------------------------------------------------------ /** Contains a Unix domain socket path, as well as other socket options. - @see RawsockOptions, connector, legacyConnector */ + @see RawsockOptions, connector */ //------------------------------------------------------------------------------ class UdsPath : public RawsockOptions diff --git a/cppwamp/include/cppwamp/unpacker.hpp b/cppwamp/include/cppwamp/unpacker.hpp index b9670d29..ce7c0d3f 100644 --- a/cppwamp/include/cppwamp/unpacker.hpp +++ b/cppwamp/include/cppwamp/unpacker.hpp @@ -49,7 +49,7 @@ using DecayedSlot = typename std::decay::type; The [wamp::unpackedEvent](@ref EventUnpacker::unpackedEvent) convenience function should be used to construct instances of EventUnpacker. @see [wamp::unpackedEvent](@ref EventUnpacker::unpackedEvent) - @see @ref UnpackedCallSlots + @see @ref UnpackedEventSlots @tparam TSlot Function type to be wrapped. @tparam TArgs List of static types the event slot expects following the Event parameter. */ @@ -99,7 +99,7 @@ EventUnpacker, TArgs...> unpackedEvent(TSlot&& slot); This class differs from EventUnpacker in that the slot type is not expected to take an Event as the first parameter. @see [wamp::basicEvent](@ref BasicEventUnpacker::basicEvent) - @see @ref UnpackedCallSlots + @see @ref UnpackedEventSlots @tparam TSlot Function type to be wrapped. @tparam TArgs List of static types the event slot expects as arguments. */ //------------------------------------------------------------------------------ @@ -193,7 +193,7 @@ InvocationUnpacker, TArgs...> unpackedRpc(TSlot&& slot); The [wamp::basicRpc](@ref BasicInvocationUnpacker::basicRpc) convenience function should be used to construct instances of InvocationUnpacker. This class differs from InvocationUnpacker in that the slot type returns - void and is not expected to take an Invocation as the first parameter. + `TResult` and is not expected to take an Invocation as the first parameter. @see [wamp::basicRpc](@ref BasicInvocationUnpacker::basicRpc) @see @ref UnpackedCallSlots @tparam TSlot Function type to be wrapped. diff --git a/cppwamp/include/cppwamp/version.hpp b/cppwamp/include/cppwamp/version.hpp index c26bb5a9..6fc17b56 100644 --- a/cppwamp/include/cppwamp/version.hpp +++ b/cppwamp/include/cppwamp/version.hpp @@ -17,13 +17,13 @@ #define CPPWAMP_MAJOR_VERSION 0 /// Minor version with functionality added in a backwards-compatible manner. -#define CPPWAMP_MINOR_VERSION 5 +#define CPPWAMP_MINOR_VERSION 6 /// Patch version for backwards-compatible bug fixes. -#define CPPWAMP_PATCH_VERSION 3 +#define CPPWAMP_PATCH_VERSION 6 /// Integer version number, computed as `(major*10000) + (minor*100) + patch` -#define CPPWAMP_VERSION 503 +#define CPPWAMP_VERSION 600 namespace wamp { diff --git a/cppwamp/src/cppwamp.cpp b/cppwamp/src/cppwamp.cpp index 97a192f4..4d6927c1 100644 --- a/cppwamp/src/cppwamp.cpp +++ b/cppwamp/src/cppwamp.cpp @@ -10,7 +10,7 @@ #ifdef CPPWAMP_COMPILED_LIB #include -#include +#include #include #include #include @@ -30,7 +30,7 @@ #endif #include -#include +#include #include #include #include @@ -53,8 +53,6 @@ namespace wamp template Connector::Ptr connector(AsioService& iosvc, TcpHost host); template Connector::Ptr connector(AsioService& iosvc, TcpHost host); -template Connector::Ptr legacyConnector(AsioService& iosvc, TcpHost host); -template Connector::Ptr legacyConnector(AsioService& iosvc, TcpHost host); namespace internal { @@ -66,8 +64,6 @@ namespace internal #if CPPWAMP_HAS_UNIX_DOMAIN_SOCKETS template Connector::Ptr connector(AsioService& iosvc, UdsPath path); template Connector::Ptr connector(AsioService& iosvc, UdsPath path); -template Connector::Ptr legacyConnector(AsioService& iosvc, UdsPath path); -template Connector::Ptr legacyConnector(AsioService& iosvc, UdsPath path); namespace internal { diff --git a/doc/architecture.vpp b/doc/architecture.vpp index ca644b50..30dc152b 100644 Binary files a/doc/architecture.vpp and b/doc/architecture.vpp differ diff --git a/doc/images/client_api.svg b/doc/images/client_api.svg index 34c752f2..40318e04 100644 --- a/doc/images/client_api.svg +++ b/doc/images/client_api.svg @@ -69,7 +69,7 @@ >DialogueDataPeerDataDialogueDataPeerDataT0..11..*0..1state_10..1DialoguePeerDialogueDataPeerDataDialogueDataPeerDataUnpackerJsonMsgpackTSocketJsonMsgpackDialoguePeerVariant<<Concept>>CodecJsonMsgpackTCodecTTransportTSocket*1txQueue_0..*+type1txQueue_0..*+fields1*rxBuffer_1<<Concept>>CodecJsonMsgpack<<use>>TTransportslot_callee_10..1registry_0..*callee_0..1impl_0..1slot_1TTransportreadership_slot_0..*10..1slot_1subscriber_1readership_0..*DialoguePeerEstablisherTCodecTTransportTCodecTTransportTSocketTEstablisherTEstablisher1..*1impl_0..1RawsockConnectorTCodecTEndpoint0..1<<Concept>>CodecUdsPathTCodecTTransportTCodecTTransportTSocketTEstablisherTEstablisherTCodecTEndpointimpl_0..111..*0..111` | `TResult function(TArgs...)` | +| [Basic Coroutine Call Slot](@ref BasicCoroutineCallSlots) | `wamp::basicCoroRpc` | `TResult function(TArgs..., Yield)` | +| [Unpacked Call Slot](@ref UnpackedCallSlots) | `wamp::unpackedRpc` | `Outcome function(Invocation, TArgs...)` | +| [Unpacked Coroutine Call Slot](@ref UnpackedCoroutineCallSlots) | `wamp::unpackedCoroRpc` | `Outcome function(Invocation, TArgs..., Yield)` | + +where `Yield` represents the type `boost::asio::yield_context`. + + @section BasicCallSlots Basic Call Slots + A _basic call slot_ represents an RPC handler that expects one or more payload arguments having specific, static types. The [wamp::basicRpc] (@ref wamp::BasicInvocationUnpacker::basicRpc) function can be used when @@ -93,7 +109,69 @@ passes them to the slot's argument list. If `Session` cannot convert the invocation payload arguments to their target types, it automatically sends an `ERROR` reply back to the router. + +@section BasicCoroutineCallSlots Basic Coroutine Call Slots + +A _basic coroutine call slot_ is like a regular _basic call slot_, except +that it is executed within the context of a coroutine. This is useful for +RPC handlers that need to perform asynchronous operations themselves. The +[wamp::basicCoroRpc](@ref wamp::BasicCoroInvocationUnpacker::basicCoroRpc) +function can be used when registering such call slots. It takes a basic +coroutine call slot, and converts it into a regular call slot that can be +passed to wamp::Session::enroll. + +`wamp::basicCoroRpc` expects a call slot with the following signature: + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +TResult function(TArgs..., boost::asio::yield_context) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +where +- `TResult` is a static return type that will be automatically converted + to a single wamp::Result payload argument and then returned as a + wamp::Outcome. This return type may be `void` if the RPC is not expected + to return a value. +- `TArgs...` matches the template parameter pack that was passed to + `wamp::basicCoroRpc`. +- `boost::asio::yield_context` represents the RPC's coroutine context. + +Examples of basic coroutine call slots are: +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +using Yield = boost::asio::yield_context; + +void setSpeed(float speed, Yield yield) { ... } +// ^ ^ +// TResult TArgs + +int purchase(std::string item, int cost, int qty, Yield yield) { ... } +//^ ^ ^ ^ +// \ \----------|---------/ +// TResult TArgs +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The above slots can be registered as follows: +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +session->enroll(Procedure("setSpeed"), + basicCoroRpc(&setSpeed), + handler); + +session->enroll(Procedure("purchase"), + basicCoroRpc(&purchase), + handler); +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +where `handler` is the asynchronous handler (or coroutine yield context) +for the **enroll operation itself**. + +Whenever a wamp::Session dispatches an RPC invocation to the above slots, it +spawns a new coroutine to be executed on wamp::Session::userIosvc(). It then +automatically unpacks the invocation payload positional arguments, and passes +them to the slot's argument list. If `Session` cannot convert the invocation +payload arguments to their target types, it automatically sends an `ERROR` +reply back to the router. + + @section UnpackedCallSlots Unpacked Call Slots + An _unpacked call slot_ represents an RPC handler that expects one or more payload arguments having specific, static types. The [wamp::unpackedRpc] (@ref wamp::InvocationUnpacker::unpackedRpc) function can be used when @@ -155,6 +233,30 @@ passes them to the slot's argument list. If `Session` cannot convert the invocation payload arguments to their target types, it automatically sends an `ERROR` reply back to the router. + +@section UnpackedCoroutineCallSlots Unpacked Coroutine Call Slots + +An _unpacked coroutine call slot_ is like an +[unpacked call slot](@ref UnpackedCallSlots), except that the slot is +executed within the context of a coroutine. The [wamp::unpackedCoroRpc] +(@ref wamp::CoroInvocationUnpacker::unpackedCoroRpc) function can be used when +registering such call slots. + +`wamp::unpackedCoroRpc` expects a call slot with the following signature: + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +wamp::Outcome function(wamp::Invocation, TArgs..., boost::asio::yield_context) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +where +- wamp::Outcome contains a wamp::Result or a wamp::Error object to be sent back + to the caller, +- wamp::Invocation is an object containing information and payload arguments + related to the RPC invocation, and, +- `TArgs...` matches the template parameter pack that was passed to + `wamp::unpackedRpc`. +- `boost::asio::yield_context` represents the RPC's coroutine context. + + @section RpcOutcomes RPC Outcomes RPC call slots are required to return a wamp::Outcome object. `Outcome` is a @@ -243,14 +345,15 @@ Outcome Client::addPerson(Invocation inv, Object person) { // We need to issue another RPC to the database service before we can // fulfill this RPC. - boost::asio::spawn(iosvc_, [this, inv](boost::asio::yield_context yield) - { - auto dbResult = session_->call( Rpc("db.add").withArgs(person), - yield ); - // Manually send the result back to the caller - auto personId = dbResult[0].to(); - inv.yield({personId}); - }); + boost::asio::spawn(inv.iosvc(), + [this, inv](boost::asio::yield_context yield) + { + auto dbResult = session_->call( Rpc("db.add").withArgs(person), + yield ); + // Manually send the result back to the caller + auto personId = dbResult[0].to(); + inv.yield({personId}); + }); // We don't know the result yet as this point. Return a deferred outcome // to indicate that we'll send the result manually in a different @@ -259,6 +362,7 @@ Outcome Client::addPerson(Invocation inv, Object person) } ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + @section ScopedRegistrations Scoped Registrations A wamp::ScopedRegistration object can be used to limit a registration's diff --git a/doc/subscriptions.dox b/doc/subscriptions.dox index d3bf8095..f62e6c0c 100644 --- a/doc/subscriptions.dox +++ b/doc/subscriptions.dox @@ -11,7 +11,9 @@ @tableofcontents + @section EventSlots Event Slots + A _slot_ is a _callable target_ that is called in response to a _signal_ (the signal being the event topic in this case). The term _slot_, borrowed from [Qt's signals and slots][qt_sig], is used to distinguish an event handler @@ -38,8 +40,21 @@ void function(wamp::Event) where wamp::Event is an object containing information and payload arguments related to the publication. +The following table summarizes the different types of event slots that can +be used with the library: + +| Slot Type | Wrapper Function | Slot Signature | +|-------------------------------------------------------------------|-------------------------------------|-----------------------------------------| +| [Basic Event Slot](@ref BasicEventSlots) | `wamp::basicEvent` | `void function(TArgs...)` | +| [Basic Coroutine Event Slot](@ref BasicCoroutineEventSlots) | `wamp::basicCoroEvent` | `void function(TArgs..., Yield)` | +| [Unpacked Event Slot](@ref UnpackedEventSlots) | `wamp::unpackedEvent` | `void function(Event, TArgs...)` | +| [Unpacked Coroutine Event Slot](@ref UnpackedCoroutineEventSlots) | `wamp::unpackedCoroEvent` | `void function(Event, TArgs..., Yield)` | + +where `Yield` represents the type `boost::asio::yield_context`. + @section BasicEventSlots Basic Event Slots -An _basic event slot_ represents an event handler that expects one or more + +A _basic event slot_ represents an event handler that expects one or more payload arguments having specific, static types. The [wamp::basicEvent] (@ref wamp::BasicEventUnpacker::basicEvent) function can be used when registering such event slots. It takes a basic event slot, and converts it @@ -85,7 +100,65 @@ them to the slot's argument list. If `Session` cannot convert the event payload arguments to their target types, it issues a warning that can be captured via wamp::Session::setWarningHandler. + +@section BasicCoroutineEventSlots Basic Coroutine Event Slots + +A _basic coroutine event slot_ is like a regular _basic event slot_, except +that it is executed within the context of a coroutine. This is useful for +event handlers that need to perform asynchronous operations themselves. The +[wamp::basicCoroEvent](@ref wamp::BasicCoroEventUnpacker::basicCoroEvent) +function can be used when registering such event slots. It takes a basic +coroutine event slot, and converts it into a regular event slot that can be +passed to wamp::Session::subscribe. + +`wamp::basicCoroEvent` expects an event slot with the following signature: + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +void function(TArgs..., boost::asio::yield_context) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +where +- `TArgs...` matches the template parameter pack that was passed to + `wamp::basicEvent`. +- `boost::asio::yield_context` represents the event handler's coroutine context. + +Examples of basic coroutine event slots are: +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +using Yield = boost::asio::yield_context; + +void onSensorSampled(float value, Yield yield) { ... } +// ^ +// TArgs + +void onPurchase(std::string item, int cost, int qty, Yield yield) { ... } +// ^ ^ ^ +// \----------|----------/ +// TArgs +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The above slots can be registered as follows: +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +session->subscribe(Topic("sensorSampled"), + basicCoroEvent(&onSensorSampled), + handler); + +session->subscribe(Topic("itemPurchased"), + basicCoroEvent(&onPurchase), + handler); +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +where `handler` is the asynchronous handler (or coroutine yield context) +for the **subscribe operation itself**. + +Whenever a wamp::Session dispatches an event to the above slots, it +spawns a new coroutine to be executed on wamp::Session::userIosvc(). It then +automatically unpacks the event payload positional arguments, and passes +them to the slot's argument list. If `Session` cannot convert the event payload +arguments to their target types, it issues a warning that can be captured via +wamp::Session::setWarningHandler. + + @section UnpackedEventSlots Unpacked Event Slots + An _unpacked event slot_ represents an event handler that expects one or more payload arguments having specific, static types. The [wamp::unpackedEvent] (@ref wamp::EventUnpacker::unpackedEvent) function can be used when @@ -139,6 +212,28 @@ them to the slot's argument list. If `Session` cannot convert the event payload arguments to their target types, it issues a warning that can be captured via wamp::Session::setWarningHandler. + +@section UnpackedCoroutineEventSlots Unpacked Coroutine Event Slots + +An _unpacked coroutine event slot_ is like an [unpacked event slot] +(@ref UnpackedEventSlots), except that the slot is +executed within the context of a coroutine. The [wamp::unpackedCoroEvent] +(@ref wamp::CoroEventUnpacker::unpackedCoroEvent) function can be used when +registering such event slots. + +`wamp::unpackedCoroEvent` expects an event slot with the following signature: + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +void function(wamp::Event, TArgs..., boost::asio::yield_context) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +where +- wamp::Event is an object containing information and payload arguments related + to the publication, and, +- `TArgs...` matches the template parameter pack that was passed to + `wamp::unpackedEvent`. +- `boost::asio::yield_context` represents the event handler's coroutine context. + + @section ScopedSubscriptions Scoped Subscriptions A wamp::ScopedSubscription object can be used to limit a subscription's diff --git a/examples/chat/main.cpp b/examples/chat/main.cpp index 7dbe8fdc..50219837 100644 --- a/examples/chat/main.cpp +++ b/examples/chat/main.cpp @@ -21,10 +21,14 @@ class ChatService public: using Yield = boost::asio::yield_context; + explicit ChatService(wamp::AsioService& iosvc) + : iosvc_(iosvc) + {} + void start(wamp::ConnectorList connectors, Yield yield) { using namespace wamp; - session_ = CoroSession<>::create(connectors); + session_ = CoroSession<>::create(iosvc_, connectors); auto index = session_->connect(yield); std::cout << "Chat service connected on transport #" @@ -57,6 +61,7 @@ class ChatService session_->publish( wamp::Pub("said").withArgs(user, message) ); } + wamp::AsioService& iosvc_; wamp::CoroSession<>::Ptr session_; wamp::ScopedRegistration registration_; }; @@ -68,12 +73,15 @@ class ChatClient public: using Yield = boost::asio::yield_context; - explicit ChatClient(std::string user) : user_(std::move(user)) {} + ChatClient(wamp::AsioService& iosvc, std::string user) + : iosvc_(iosvc), + user_(std::move(user)) + {} void join(wamp::ConnectorList connectors, Yield yield) { using namespace wamp; - session_ = CoroSession<>::create(connectors); + session_ = CoroSession<>::create(iosvc_, connectors); auto index = session_->connect(yield); std::cout << user_ << " connected on transport #" << index << "\n"; @@ -109,6 +117,7 @@ class ChatClient << message << "\"\n"; } + wamp::AsioService& iosvc_; std::string user_; wamp::CoroSession<>::Ptr session_; wamp::ScopedSubscription subscription_; @@ -120,19 +129,14 @@ int main() { wamp::AsioService iosvc; -#ifdef CPPWAMP_USE_LEGACY_CONNECTORS - auto tcp = wamp::legacyConnector( iosvc, - wamp::TcpHost("localhost", 12345) ); -#else auto tcp = wamp::connector( iosvc, wamp::TcpHost("localhost", 12345) ); -#endif // Normally, the service and client instances would be in separate programs. // We run them all here in the same coroutine for demonstration purposes. - ChatService chat; - ChatClient alice("Alice"); - ChatClient bob("Bob"); + ChatService chat(iosvc); + ChatClient alice(iosvc, "Alice"); + ChatClient bob(iosvc, "Bob"); boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { diff --git a/examples/timeclient/main.cpp b/examples/timeclient/main.cpp index a8b44d2c..9da31644 100644 --- a/examples/timeclient/main.cpp +++ b/examples/timeclient/main.cpp @@ -45,17 +45,11 @@ void onTimeTick(std::tm time) //------------------------------------------------------------------------------ int main() { - wamp::AsioService iosvc; - -#ifdef CPPWAMP_USE_LEGACY_CONNECTORS - auto tcp = wamp::legacyConnector(iosvc, - wamp::TcpHost(address, port)); -#else - auto tcp = wamp::connector(iosvc, wamp::TcpHost(address, port)); -#endif - using namespace wamp; - auto session = CoroSession<>::create(tcp); + + AsioService iosvc; + auto tcp = connector(iosvc, TcpHost(address, port)); + auto session = CoroSession<>::create(iosvc, tcp); boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { diff --git a/examples/timeservice/main.cpp b/examples/timeservice/main.cpp index 7a659e26..e4ba13e9 100644 --- a/examples/timeservice/main.cpp +++ b/examples/timeservice/main.cpp @@ -47,19 +47,11 @@ std::tm getTime() //------------------------------------------------------------------------------ int main() { - wamp::AsioService iosvc; - -#ifdef CPPWAMP_USE_LEGACY_CONNECTORS - auto uds = wamp::legacyConnector(iosvc, - wamp::UdsPath(udsPath)); -#else - auto uds1 = wamp::connector(iosvc, wamp::UdsPath(udsPath1)); - auto uds2 = wamp::connector(iosvc, wamp::UdsPath(udsPath2)); -#endif - using namespace wamp; - auto session = CoroSession<>::create({uds1, uds2}); - + AsioService iosvc; + auto uds1 = connector(iosvc, UdsPath(udsPath1)); + auto uds2 = connector(iosvc, UdsPath(udsPath2)); + auto session = CoroSession<>::create(iosvc, {uds1, uds2}); boost::asio::steady_timer timer(iosvc); boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 0e60d2d8..2db1a64d 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -8,7 +8,6 @@ set(SOURCES codectestjson.cpp codectestmsgpack.cpp - legacytransporttest.cpp payloadtest.cpp transporttest.cpp varianttestassign.cpp diff --git a/test/legacytransporttest.cpp b/test/legacytransporttest.cpp deleted file mode 100644 index 0fc4609c..00000000 --- a/test/legacytransporttest.cpp +++ /dev/null @@ -1,325 +0,0 @@ -/*------------------------------------------------------------------------------ - Copyright Butterfly Energy Systems 2014-2015. - Distributed under the Boost Software License, Version 1.0. - (See accompanying file LICENSE_1_0.txt or copy at - http://www.boost.org/LICENSE_1_0.txt) -------------------------------------------------------------------------------*/ - -#if CPPWAMP_TESTING_TRANSPORT - -#include -#include -#include -#include -#include -#include -#include -#include "transporttest.hpp" - -using namespace wamp; - -using TcpAsioConnector = internal::LegacyAsioEndpoint; -using TcpAsioListener = internal::LegacyAsioEndpoint; -using UdsAsioConnector = internal::LegacyAsioEndpoint; -using UdsAsioListener = internal::LegacyAsioEndpoint; -using TcpTransport = TcpAsioConnector::Transport; -using UdsTransport = UdsAsioConnector::Transport; - -namespace -{ - -//------------------------------------------------------------------------------ -struct TcpLoopbackFixture : - protected LoopbackFixtureBase, - public LoopbackFixture -{ - TcpLoopbackFixture( - bool connected = true, - int codec = Json::id(), - RawsockMaxLength clientMaxRxLength = RawsockMaxLength::kB_64, - RawsockMaxLength serverMaxRxLength = RawsockMaxLength::kB_64 ) - : LoopbackFixture( - clientService, - serverService, - internal::TcpOpener(clientService, {tcpLoopbackAddr, tcpTestPort}), - codec, - clientMaxRxLength, - internal::TcpAcceptor(serverService, tcpTestPort), - codec, - serverMaxRxLength, - connected ) - {} -}; - -//------------------------------------------------------------------------------ -struct UdsLoopbackFixture : - protected LoopbackFixtureBase, - public LoopbackFixture -{ - UdsLoopbackFixture( - bool connected = true, - int codec = Json::id(), - RawsockMaxLength clientMaxRxLength = RawsockMaxLength::kB_64, - RawsockMaxLength serverMaxRxLength = RawsockMaxLength::kB_64 ) - : LoopbackFixture( - clientService, - serverService, - internal::UdsOpener(clientService, {udsTestPath}), - codec, - clientMaxRxLength, - internal::UdsAcceptor(serverService, udsTestPath, true), - codec, - serverMaxRxLength, - connected ) - {} -}; - -//------------------------------------------------------------------------------ -template -void checkReceiveTooLong(TFixture& f, - typename TFixture::TransportPtr sender, - typename TFixture::TransportPtr receiver) -{ - using Transport = typename TFixture::Transport; - using Buffer = typename Transport::Buffer; - - bool receiveFailed = false; - receiver->start( - [&](Buffer) - { - FAIL( "unexpected receive" ); - }, - [&](std::error_code ec) - { - CHECK( ec == TransportErrc::badRxLength ); - receiveFailed = true; - }); - - sender->start( - [&](Buffer) - { - FAIL( "unexpected receive" ); - }, - [&](std::error_code) {} ); - - auto sendBuf = sender->getBuffer(); - std::string tooLong(receiver->maxReceiveLength() + 1, 'a'); - sendBuf->write(tooLong.data(), tooLong.size()); - sender->send(std::move(sendBuf)); - - REQUIRE_NOTHROW( f.run() ); - CHECK( receiveFailed ); -} - -} // anonymous namespace - - -//------------------------------------------------------------------------------ -SCENARIO( "Normal legacy connection", "[Transport]" ) -{ -GIVEN( "an unconnected TCP connector/listener pair" ) -{ - WHEN( "the client and server use JSON" ) - { - TcpLoopbackFixture f(false, Json::id()); - checkConnection(f, Json::id()); - } - WHEN( "the client and server use Msgpack" ) - { - TcpLoopbackFixture f(false, Msgpack::id()); - checkConnection(f, Msgpack::id()); - } -} -GIVEN( "an unconnected UDS connector/listener pair" ) -{ - WHEN( "the client and server use JSON" ) - { - UdsLoopbackFixture f(false, Json::id()); - checkConnection(f, Json::id()); - } - WHEN( "the client and server use Msgpack" ) - { - UdsLoopbackFixture f(false, Msgpack::id()); - checkConnection(f, Msgpack::id()); - } -} -} - -//------------------------------------------------------------------------------ -SCENARIO( "Normal legacy communications", "[Transport,Legacy]" ) -{ -GIVEN( "a connected client/server TCP transport pair" ) -{ - TcpLoopbackFixture f; - checkCommunications(f); -} -GIVEN( "a connected client/server UDS transport pair" ) -{ - UdsLoopbackFixture f; - checkCommunications(f); -} -} - -//------------------------------------------------------------------------------ -SCENARIO( "Consecutive legacy send/receive", "[Transport,Legacy]" ) -{ -GIVEN( "a connected client/server TCP transport pair" ) -{ - { - TcpLoopbackFixture f; - checkConsecutiveSendReceive(f, f.client, f.server); - } - { - TcpLoopbackFixture f; - checkConsecutiveSendReceive(f, f.server, f.client); - } -} -GIVEN( "a connected client/server UDS transport pair" ) -{ - { - UdsLoopbackFixture f; - checkConsecutiveSendReceive(f, f.client, f.server); - } - { - UdsLoopbackFixture f; - checkConsecutiveSendReceive(f, f.server, f.client); - } -} -} - -//------------------------------------------------------------------------------ -SCENARIO( "Maximum length legacy messages", "[Transport]" ) -{ -GIVEN( "a connected client/server TCP transport pair" ) -{ - TcpLoopbackFixture f; - const std::string message(f.client->maxReceiveLength(), 'm'); - const std::string reply(f.server->maxReceiveLength(), 'r');; - checkSendReply(f, message, reply); -} -GIVEN( "a connected client/server UDS transport pair" ) -{ - UdsLoopbackFixture f; - const std::string message(f.client->maxReceiveLength(), 'm'); - const std::string reply(f.server->maxReceiveLength(), 'r');; - checkSendReply(f, message, reply); -} -} - -//------------------------------------------------------------------------------ -SCENARIO( "Zero length legacy messages", "[Transport,Legacy]" ) -{ -const std::string message(""); -const std::string reply(""); - -GIVEN( "a connected client/server TCP transport pair" ) -{ - TcpLoopbackFixture f; - checkSendReply(f, message, reply); -} -GIVEN( "a connected client/server UDS transport pair" ) -{ - UdsLoopbackFixture f; - checkSendReply(f, message, reply); -} -} - -//------------------------------------------------------------------------------ -SCENARIO( "Cancel legacy listen", "[Transport,Legacy]" ) -{ - GIVEN( "an unconnected TCP listener/connector pair" ) - { - TcpLoopbackFixture f(false); - checkCancelListen(f); - checkConnection(f, Json::id()); - checkSendReply(f, "Hello", "World"); - } - GIVEN( "an unconnected UDS listener/connector pair" ) - { - UdsLoopbackFixture f(false); - checkCancelListen(f); - checkConnection(f, Json::id()); - checkSendReply(f, "Hello", "World"); - } -} - -//------------------------------------------------------------------------------ -SCENARIO( "Cancel legacy connect", "[Transport,Legacy]" ) -{ - GIVEN( "an unconnected TCP listener/connector pair" ) - { - TcpLoopbackFixture f(false); - checkCancelConnect(f); - } - GIVEN( "an unconnected UDS listener/connector pair" ) - { - UdsLoopbackFixture f(false); - checkCancelConnect(f); - } -} - -//------------------------------------------------------------------------------ -SCENARIO( "Cancel legacy receive", "[Transport,Legacy]" ) -{ - GIVEN( "a connected TCP listener/connector pair" ) - { - TcpLoopbackFixture f; - checkCancelReceive(f); - } - GIVEN( "a connected UDS listener/connector pair" ) - { - UdsLoopbackFixture f; - checkCancelReceive(f); - } -} - -//------------------------------------------------------------------------------ -SCENARIO( "Cancel legacy send", "[Transport]" ) -{ - // The size of transmission is set to maximum to increase the likelyhood - // of the operation being aborted, rather than completed. - - GIVEN( "a connected TCP listener/connector pair" ) - { - TcpLoopbackFixture f(false, Json::id(), RawsockMaxLength::MB_16, - RawsockMaxLength::MB_16); - checkCancelSend(f); - } - GIVEN( "a connected UDS listener/connector pair" ) - { - UdsLoopbackFixture f(false, Json::id(), RawsockMaxLength::MB_16, - RawsockMaxLength::MB_16); - checkCancelSend(f); - } -} - -//------------------------------------------------------------------------------ -SCENARIO( "Receiving legacy messages longer than maximum", "[Transport]" ) -{ -GIVEN ( "A TCP client that sends an overly long message" ) -{ - TcpLoopbackFixture f(true, Json::id(), RawsockMaxLength::kB_64, - RawsockMaxLength::kB_32); - checkReceiveTooLong(f, f.client, f.server); -} -GIVEN ( "A TCP server that sends an overly long message" ) -{ - TcpLoopbackFixture f(true, Json::id(), RawsockMaxLength::kB_32, - RawsockMaxLength::kB_64); - checkReceiveTooLong(f, f.server, f.client); -} -GIVEN ( "A UDS client that sends an overly long message" ) -{ - UdsLoopbackFixture f(true, Json::id(), RawsockMaxLength::kB_64, - RawsockMaxLength::kB_32); - checkReceiveTooLong(f, f.client, f.server); -} -GIVEN ( "A UDS server that sends an overly long message" ) -{ - UdsLoopbackFixture f(true, Json::id(), RawsockMaxLength::kB_32, - RawsockMaxLength::kB_64); - checkReceiveTooLong(f, f.server, f.client); -} -} - -#endif // #if CPPWAMP_TESTING_TRANSPORT diff --git a/test/test.pro b/test/test.pro index 81f9c64c..b55451ee 100644 --- a/test/test.pro +++ b/test/test.pro @@ -13,7 +13,6 @@ CONFIG -= qt SOURCES += \ codectestjson.cpp \ codectestmsgpack.cpp \ - legacytransporttest.cpp \ payloadtest.cpp \ transporttest.cpp \ varianttestassign.cpp \ diff --git a/test/varianttestconvertcontainers.cpp b/test/varianttestconvertcontainers.cpp index ef371d15..ea51f5c0 100644 --- a/test/varianttestconvertcontainers.cpp +++ b/test/varianttestconvertcontainers.cpp @@ -7,8 +7,11 @@ #if CPPWAMP_TESTING_VARIANT +#include #include +#include #include +#include using namespace wamp; @@ -68,12 +71,12 @@ GIVEN( "an empty std::unordered_map" ) auto v = Variant::from(map); THEN( "the variant is as expected" ) { - CHECK( v.is() ); + REQUIRE( v.is() ); CHECK( v.as().empty() ); } } } -GIVEN( "a invalid variant object type" ) +GIVEN( "an invalid variant object type" ) { Variant v(Object{{"a", 1},{"b", null}}); WHEN( "converting to a std::unordered_map" ) @@ -84,4 +87,143 @@ GIVEN( "a invalid variant object type" ) } } + +//------------------------------------------------------------------------------ +SCENARIO( "Converting to/from std::set", "[Variant]" ) +{ +GIVEN( "a valid variant array type" ) +{ + Variant v(Array{1, 3, 2}); + WHEN( "converting to a std::set" ) + { + auto set = v.to>(); + THEN( "the set is as expected" ) + { + REQUIRE( set.size() == 3 ); + auto iter = set.begin(); + CHECK( *iter++ == 1 ); + CHECK( *iter++ == 2 ); + CHECK( *iter++ == 3 ); + } + } +} +GIVEN( "an empty variant array type" ) +{ + Variant v(Array{}); + WHEN( "converting to a std::set" ) + { + auto set = v.to>(); + THEN( "the set is as expected" ) + { + CHECK( set.empty() ); + } + } +} +GIVEN( "a valid std::set type" ) +{ + std::set set{"a", "b", "c"}; + WHEN( "converting to a variant" ) + { + auto v = Variant::from(set); + THEN( "the variant is as expected" ) + { + CHECK( v == (Array{"a", "b", "c"}) ); + } + } +} +GIVEN( "an empty std::set" ) +{ + std::set set{}; + WHEN( "converting to a variant" ) + { + auto v = Variant::from(set); + THEN( "the variant is as expected" ) + { + REQUIRE( v.is() ); + CHECK( v.as().empty() ); + } + } +} +GIVEN( "an invalid variant array type" ) +{ + Variant v(Array{"a", null}); + WHEN( "converting to a std::set" ) + { + using SetType = std::set; + CHECK_THROWS_AS( v.to(), error::Conversion ); + } +} +} + + +//------------------------------------------------------------------------------ +SCENARIO( "Converting to/from std::unordered_set", "[Variant]" ) +{ +GIVEN( "a valid variant array type" ) +{ + Variant v(Array{1, 3, 2}); + WHEN( "converting to a std::unordered_set" ) + { + auto set = v.to>(); + THEN( "the set is as expected" ) + { + REQUIRE( set.size() == 3 ); + std::set sorted(set.begin(), set.end()); + auto iter = sorted.begin(); + CHECK( *iter++ == 1 ); + CHECK( *iter++ == 2 ); + CHECK( *iter++ == 3 ); + } + } +} +GIVEN( "an empty variant array type" ) +{ + Variant v(Array{}); + WHEN( "converting to a std::unordered_set" ) + { + auto set = v.to>(); + THEN( "the set is as expected" ) + { + CHECK( set.empty() ); + } + } +} +GIVEN( "a valid std::unordered_set type" ) +{ + std::unordered_set set{"a", "b", "c"}; + WHEN( "converting to a variant" ) + { + auto v = Variant::from(set); + THEN( "the variant is as expected" ) + { + auto array = v.as(); + std::sort(array.begin(), array.end()); + CHECK( array == (Array{"a", "b", "c"}) ); + } + } +} +GIVEN( "an empty std::unordered_set" ) +{ + std::unordered_set set{}; + WHEN( "converting to a variant" ) + { + auto v = Variant::from(set); + THEN( "the variant is as expected" ) + { + REQUIRE( v.is() ); + CHECK( v.as().empty() ); + } + } +} +GIVEN( "an invalid variant array type" ) +{ + Variant v(Array{"a", null}); + WHEN( "converting to a std::unordered_set" ) + { + using SetType = std::unordered_set; + CHECK_THROWS_AS( v.to(), error::Conversion ); + } +} +} + #endif // #if CPPWAMP_TESTING_VARIANT diff --git a/test/wamptest.cpp b/test/wamptest.cpp index 1ad19fe0..f28e1d4f 100644 --- a/test/wamptest.cpp +++ b/test/wamptest.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -34,32 +35,19 @@ const std::string testUdsPath = "./.crossbar/udstest"; Connector::Ptr tcp(AsioService& iosvc) { -#ifdef CPPWAMP_USE_LEGACY_CONNECTORS - return legacyConnector(iosvc, TcpHost("localhost", validPort)); -#else return connector(iosvc, TcpHost("localhost", validPort)); -#endif } Connector::Ptr invalidTcp(AsioService& iosvc) { -#ifdef CPPWAMP_USE_LEGACY_CONNECTORS - return legacyConnector(iosvc, TcpHost("localhost", invalidPort)); -#else return connector(iosvc, TcpHost("localhost", invalidPort)); -#endif } - #if CPPWAMP_HAS_UNIX_DOMAIN_SOCKETS - Connector::Ptr udsMsgpack(AsioService& iosvc) - { - #ifdef CPPWAMP_USE_LEGACY_CONNECTORS - return legacyConnector(iosvc, UdsPath(testUdsPath)); - #else - return connector(iosvc, UdsPath(testUdsPath)); - #endif - } +Connector::Ptr udsMsgpack(AsioService& iosvc) +{ + return connector(iosvc, UdsPath(testUdsPath)); +} #endif @@ -69,10 +57,11 @@ struct PubSubFixture using PubVec = std::vector; template - PubSubFixture(TConnector cnct) - : publisher(CoroSession<>::create(cnct)), - subscriber(CoroSession<>::create(cnct)), - otherSubscriber(CoroSession<>::create(cnct)) + PubSubFixture(AsioService& iosvc, TConnector cnct) + : iosvc(iosvc), + publisher(CoroSession<>::create(iosvc, cnct)), + subscriber(CoroSession<>::create(iosvc, cnct)), + otherSubscriber(CoroSession<>::create(iosvc, cnct)) {} void join(boost::asio::yield_context yield) @@ -110,6 +99,7 @@ struct PubSubFixture { INFO( "in onDynamicEvent" ); CHECK( event.pubId() <= 9007199254740992ull ); + CHECK( &event.iosvc() == &iosvc ); dynamicArgs = event.args(); dynamicPubs.push_back(event.pubId()); } @@ -118,6 +108,7 @@ struct PubSubFixture { INFO( "in onStaticEvent" ); CHECK( event.pubId() <= 9007199254740992ull ); + CHECK( &event.iosvc() == &iosvc ); staticArgs = Array{{str, num}}; staticPubs.push_back(event.pubId()); } @@ -126,9 +117,12 @@ struct PubSubFixture { INFO( "in onOtherEvent" ); CHECK( event.pubId() <= 9007199254740992ull ); + CHECK( &event.iosvc() == &iosvc ); otherPubs.push_back(event.pubId()); } + AsioService& iosvc; + CoroSession<>::Ptr publisher; CoroSession<>::Ptr subscriber; CoroSession<>::Ptr otherSubscriber; @@ -149,9 +143,10 @@ struct PubSubFixture struct RpcFixture { template - RpcFixture(TConnector cnct) - : caller(CoroSession<>::create(cnct)), - callee(CoroSession<>::create(cnct)) + RpcFixture(AsioService& iosvc, TConnector cnct) + : iosvc(iosvc), + caller(CoroSession<>::create(iosvc, cnct)), + callee(CoroSession<>::create(iosvc, cnct)) {} void join(boost::asio::yield_context yield) @@ -181,6 +176,7 @@ struct RpcFixture { INFO( "in RPC handler" ); CHECK( inv.requestId() <= 9007199254740992ull ); + CHECK( &inv.iosvc() == &iosvc ); ++dynamicCount; // Echo back the call arguments as the result. return Result().withArgList(inv.args()); @@ -190,11 +186,14 @@ struct RpcFixture { INFO( "in RPC handler" ); CHECK( inv.requestId() <= 9007199254740992ull ); + CHECK( &inv.iosvc() == &iosvc ); ++staticCount; // Echo back the call arguments as the yield result. return {str, num}; } + AsioService& iosvc; + CoroSession<>::Ptr caller; CoroSession<>::Ptr callee; @@ -213,7 +212,7 @@ void checkInvalidUri(TThrowDelegate&& throwDelegate, AsioService iosvc; boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { - auto session = CoroSession<>::create(tcp(iosvc)); + auto session = CoroSession<>::create(iosvc, tcp(iosvc)); session->connect(yield); if (joined) session->join(Realm(testRealm), yield); @@ -242,7 +241,7 @@ void checkDisconnect(TDelegate&& delegate) AsyncResult result; boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { - auto session = CoroSession<>::create(tcp(iosvc)); + auto session = CoroSession<>::create(iosvc, tcp(iosvc)); session->connect(yield); delegate(*session, yield, completed, result); session->disconnect(); @@ -311,7 +310,7 @@ void checkInvalidOps(CoroSession<>::Ptr session, CHECK_THROWS_AS( session->call(Rpc("rpc"), [](AsyncResult) {}), error::Logic ); CHECK_THROWS_AS( session->call(Rpc("rpc").withArgs(42), - [](AsyncResult) {}), + [](AsyncResult) {}), error::Logic ); CHECK_THROWS_AS( session->leave(Reason(), yield), error::Logic ); @@ -361,7 +360,7 @@ GIVEN( "an IO service and a TCP connector" ) { { // Connect and disconnect a session-> - auto s = CoroSession<>::create(cnct); + auto s = CoroSession<>::create(iosvc, cnct); CHECK( s->state() == SessionState::disconnected ); CHECK( s->connect(yield) == 0 ); CHECK( s->state() == SessionState::closed ); @@ -380,7 +379,7 @@ GIVEN( "an IO service and a TCP connector" ) } // Check that another client can connect and disconnect. - auto s2 = CoroSession<>::create(cnct); + auto s2 = CoroSession<>::create(iosvc, cnct); CHECK( s2->state() == SessionState::disconnected ); CHECK( s2->connect(yield) == 0 ); CHECK( s2->state() == SessionState::closed ); @@ -393,7 +392,7 @@ GIVEN( "an IO service and a TCP connector" ) WHEN( "joining and leaving" ) { - auto s = CoroSession<>::create(cnct); + auto s = CoroSession<>::create(iosvc, cnct); boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { s->connect(yield); @@ -449,7 +448,7 @@ GIVEN( "an IO service and a TCP connector" ) WHEN( "connecting, joining, leaving, and disconnecting" ) { - auto s = CoroSession<>::create(cnct); + auto s = CoroSession<>::create(iosvc, cnct); boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { { @@ -507,7 +506,8 @@ GIVEN( "an IO service and a TCP connector" ) WHEN( "disconnecting during connect" ) { std::error_code ec; - auto s = Session::create(ConnectorList({invalidTcp(iosvc), cnct})); + auto s = Session::create(iosvc, + ConnectorList({invalidTcp(iosvc), cnct})); bool connectHandlerInvoked = false; s->connect([&](AsyncResult result) { @@ -547,7 +547,7 @@ GIVEN( "an IO service and a TCP connector" ) { std::error_code ec; bool connected = false; - auto s = CoroSession<>::create(cnct); + auto s = CoroSession<>::create(iosvc, cnct); bool disconnectTriggered = false; boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { @@ -580,7 +580,7 @@ GIVEN( "an IO service and a TCP connector" ) WHEN( "resetting during connect" ) { bool handlerWasInvoked = false; - auto s = Session::create(cnct); + auto s = Session::create(iosvc, cnct); s->connect([&handlerWasInvoked](AsyncResult) { handlerWasInvoked = true; @@ -594,7 +594,7 @@ GIVEN( "an IO service and a TCP connector" ) WHEN( "resetting during join" ) { bool handlerWasInvoked = false; - auto s = Session::create(cnct); + auto s = Session::create(iosvc, cnct); s->connect([&](AsyncResult) { s->join(Realm(testRealm), [&](AsyncResult) @@ -612,7 +612,7 @@ GIVEN( "an IO service and a TCP connector" ) { bool handlerWasInvoked = false; - auto session = Session::create(cnct); + auto session = Session::create(iosvc, cnct); std::weak_ptr weakClient(session); session->connect([&handlerWasInvoked](AsyncResult) @@ -646,7 +646,7 @@ GIVEN( "an IO service and a TCP connector" ) WHEN( "joining and leaving" ) { - auto s = CoroSession<>::create(cnct); + auto s = CoroSession<>::create(iosvc, cnct); boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { s->connect(yield); @@ -715,7 +715,7 @@ GIVEN( "an IO service and a TCP connector" ) boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { PublicationId pid = 0; - PubSubFixture f(cnct); + PubSubFixture f(iosvc, cnct); f.join(yield); f.subscribe(yield); @@ -804,7 +804,7 @@ GIVEN( "an IO service and a TCP connector" ) boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { PublicationId pid = 0; - PubSubFixture f(cnct); + PubSubFixture f(iosvc, cnct); f.join(yield); f.staticSub = f.subscriber->subscribe( Topic("str.num"), @@ -838,7 +838,7 @@ GIVEN( "an IO service and a TCP connector" ) boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { PublicationId pid = 0; - PubSubFixture f(cnct); + PubSubFixture f(iosvc, cnct); f.join(yield); f.subscribe(yield); @@ -885,7 +885,7 @@ GIVEN( "an IO service and a TCP connector" ) boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { PublicationId pid = 0; - PubSubFixture f(cnct); + PubSubFixture f(iosvc, cnct); f.join(yield); f.subscribe(yield); @@ -919,7 +919,7 @@ GIVEN( "an IO service and a TCP connector" ) boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { PublicationId pid = 0; - PubSubFixture f(cnct); + PubSubFixture f(iosvc, cnct); f.join(yield); f.subscribe(yield); @@ -955,7 +955,7 @@ GIVEN( "an IO service and a TCP connector" ) boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { PublicationId pid = 0; - PubSubFixture f(cnct); + PubSubFixture f(iosvc, cnct); f.join(yield); f.subscribe(yield); @@ -991,7 +991,7 @@ GIVEN( "an IO service and a TCP connector" ) boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { PublicationId pid = 0; - PubSubFixture f(cnct); + PubSubFixture f(iosvc, cnct); f.join(yield); f.subscribe(yield); @@ -1021,7 +1021,7 @@ GIVEN( "an IO service and a TCP connector" ) { boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { - PubSubFixture f(cnct); + PubSubFixture f(iosvc, cnct); f.join(yield); f.subscribe(yield); @@ -1084,7 +1084,7 @@ GIVEN( "an IO service and a TCP connector" ) { Result result; std::error_code ec; - RpcFixture f(cnct); + RpcFixture f(iosvc, cnct); f.join(yield); f.enroll(yield); @@ -1134,7 +1134,7 @@ GIVEN( "an IO service and a TCP connector" ) { Result result; std::error_code ec; - RpcFixture f(cnct); + RpcFixture f(iosvc, cnct); f.join(yield); f.enroll(yield); @@ -1183,12 +1183,12 @@ GIVEN( "an IO service and a TCP connector" ) { Result result; std::error_code ec; - RpcFixture f(cnct); + RpcFixture f(iosvc, cnct); f.join(yield); f.staticReg = f.callee->enroll( Procedure("static"), - basicRpc([&](std::string s, int n) + basicRpc([&](std::string, int n) { ++f.staticCount; return n; // Echo back the integer argument @@ -1224,7 +1224,7 @@ GIVEN( "an IO service and a TCP connector" ) using namespace std::placeholders; f.staticReg = f.callee->enroll( Procedure("static"), - basicRpc([&](std::string s, int n) + basicRpc([&](std::string, int n) { ++f.staticCount; return n; // Echo back the integer argument @@ -1252,7 +1252,7 @@ GIVEN( "an IO service and a TCP connector" ) boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { std::error_code ec; - RpcFixture f(cnct); + RpcFixture f(iosvc, cnct); f.join(yield); f.enroll(yield); @@ -1287,7 +1287,7 @@ GIVEN( "an IO service and a TCP connector" ) boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { std::error_code ec; - RpcFixture f(cnct); + RpcFixture f(iosvc, cnct); f.join(yield); f.enroll(yield); @@ -1324,7 +1324,7 @@ GIVEN( "an IO service and a TCP connector" ) boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { std::error_code ec; - RpcFixture f(cnct); + RpcFixture f(iosvc, cnct); f.join(yield); f.enroll(yield); @@ -1361,7 +1361,7 @@ GIVEN( "an IO service and a TCP connector" ) boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { std::error_code ec; - RpcFixture f(cnct); + RpcFixture f(iosvc, cnct); f.join(yield); f.enroll(yield); @@ -1391,7 +1391,7 @@ GIVEN( "an IO service and a TCP connector" ) { boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { - RpcFixture f(cnct); + RpcFixture f(iosvc, cnct); f.join(yield); f.enroll(yield); @@ -1435,10 +1435,12 @@ SCENARIO( "Nested WAMP RPCs and Events", "[WAMP]" ) { GIVEN( "these test fixture objects" ) { + using Yield = boost::asio::yield_context; + AsioService iosvc; auto cnct = tcp(iosvc); - auto session1 = CoroSession<>::create(cnct); - auto session2 = CoroSession<>::create(cnct); + auto session1 = CoroSession<>::create(iosvc, cnct); + auto session2 = CoroSession<>::create(iosvc, cnct); // Regular RPC handler auto upperify = [](Invocation, std::string str) -> Outcome @@ -1450,22 +1452,14 @@ GIVEN( "these test fixture objects" ) WHEN( "calling remote procedures within an invocation" ) { - auto uppercat = [&iosvc, session2](Invocation inv, std::string str1, - std::string str2) -> Outcome + auto uppercat = [session2](std::string str1, std::string str2, + boost::asio::yield_context yield) -> String { - // We need a separate yield context here - boost::asio::spawn(iosvc, [session2, inv, str1, str2] - (boost::asio::yield_context yield) - { - auto upper1 = session2->call( - Rpc("upperify").withArgs(str1), yield); - auto upper2 = session2->call( - Rpc("upperify").withArgs(str2), yield); - auto concatted = upper1[0].to() + - upper2[0].to(); - inv.yield(Result({concatted})); - }); - return Outcome::deferred(); + auto upper1 = session2->call( + Rpc("upperify").withArgs(str1), yield); + auto upper2 = session2->call( + Rpc("upperify").withArgs(str2), yield); + return upper1[0].to() + upper2[0].to(); }; boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) @@ -1478,9 +1472,10 @@ GIVEN( "these test fixture objects" ) session2->connect(yield); session2->join(Realm(testRealm), yield); - session2->enroll(Procedure("uppercat"), - unpackedRpc(uppercat), - yield); + session2->enroll( + Procedure("uppercat"), + basicCoroRpc(uppercat), + yield); std::string s1 = "hello "; std::string s2 = "world"; @@ -1500,17 +1495,13 @@ GIVEN( "these test fixture objects" ) auto subscriber = session2; std::string upperized; - auto onEvent = [&iosvc, &upperized, subscriber] - (Event event, std::string str) + auto onEvent = + [&upperized, subscriber](std::string str, + boost::asio::yield_context yield) { - // We need a separate yield context here - boost::asio::spawn(iosvc, [&upperized, subscriber, event, str] - (boost::asio::yield_context yield) - { - auto result = subscriber->call(Rpc("upperify") - .withArgs(str), yield); - upperized = result[0].to(); - }); + auto result = subscriber->call(Rpc("upperify").withArgs(str), + yield); + upperized = result[0].to(); }; boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) @@ -1523,7 +1514,7 @@ GIVEN( "these test fixture objects" ) subscriber->connect(yield); subscriber->join(Realm(testRealm), yield); subscriber->subscribe(Topic("onEvent"), - unpackedEvent(onEvent), + basicCoroEvent(onEvent), yield); callee->publish(Pub("onEvent").withArgs("Hello"), yield); @@ -1549,21 +1540,13 @@ GIVEN( "these test fixture objects" ) }; auto shout = - [&iosvc, callee](Invocation inv, std::string str) -> Outcome + [callee](Invocation, std::string str, Yield yield) -> Outcome { - // We need a separate yield context here for a blocking - // publish. - boost::asio::spawn(iosvc, [callee, inv, str] - (boost::asio::yield_context yield) - { - std::string upper = str; - std::transform(upper.begin(), upper.end(), - upper.begin(), ::toupper); - callee->publish(Pub("grapevine").withArgs(upper), - yield); - inv.yield(Result({upper})); - }); - return Outcome::deferred(); + std::string upper = str; + std::transform(upper.begin(), upper.end(), + upper.begin(), ::toupper); + callee->publish(Pub("grapevine").withArgs(upper), yield); + return Result({upper}); }; boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) @@ -1571,7 +1554,7 @@ GIVEN( "these test fixture objects" ) callee->connect(yield); callee->join(Realm(testRealm), yield); callee->enroll(Procedure("shout"), - unpackedRpc(shout), yield); + unpackedCoroRpc(shout), yield); subscriber->connect(yield); subscriber->join(Realm(testRealm), yield); @@ -1598,34 +1581,27 @@ GIVEN( "these test fixture objects" ) int callCount = 0; Registration reg; - auto oneShot = - [&iosvc, &callCount, ®, callee](Invocation inv) -> Outcome - { - // We need a separate yield context here for a blocking - // unregister. - boost::asio::spawn(iosvc, [&callCount, ®, callee, inv] - (boost::asio::yield_context yield) - { - ++callCount; - callee->unregister(reg, yield); - inv.yield(Result({callCount})); - }); - return Outcome::deferred(); - }; + auto oneShot = [&callCount, ®, callee](Yield yield) + { + // We need a yield context here for a blocking unregister. + ++callCount; + callee->unregister(reg, yield); + }; boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { callee->connect(yield); callee->join(Realm(testRealm), yield); - reg = callee->enroll(Procedure("oneShot"), oneShot, yield); + reg = callee->enroll(Procedure("oneShot"), + basicCoroRpc(oneShot), yield); caller->connect(yield); caller->join(Realm(testRealm), yield); - auto result = caller->call(Rpc("oneShot"), yield); + caller->call(Rpc("oneShot"), yield); while (callCount == 0) caller->suspend(yield); - CHECK( result[0] == 1 ); + CHECK( callCount == 1 ); std::error_code ec; caller->call(Rpc("oneShot"), yield, &ec); @@ -1642,18 +1618,14 @@ GIVEN( "these test fixture objects" ) { std::string upperized; - auto onTalk = [&iosvc, session1](Event, std::string str) + auto onTalk = [session1](std::string str, Yield yield) { // We need a separate yield context here for a blocking // publish. - boost::asio::spawn(iosvc, [session1, str] - (boost::asio::yield_context yield) - { - std::string upper = str; - std::transform(upper.begin(), upper.end(), - upper.begin(), ::toupper); - session1->publish(Pub("onShout").withArgs(upper), yield); - }); + std::string upper = str; + std::transform(upper.begin(), upper.end(), + upper.begin(), ::toupper); + session1->publish(Pub("onShout").withArgs(upper), yield); }; auto onShout = [&upperized](Event, std::string str) @@ -1666,7 +1638,7 @@ GIVEN( "these test fixture objects" ) session1->connect(yield); session1->join(Realm(testRealm), yield); session1->subscribe(Topic("onTalk"), - unpackedEvent(onTalk), yield); + basicCoroEvent(onTalk), yield); session2->connect(yield); session2->join(Realm(testRealm), yield); @@ -1692,16 +1664,11 @@ GIVEN( "these test fixture objects" ) int eventCount = 0; Subscription sub; - auto onEvent = [&iosvc, &eventCount, &sub, subscriber](Event) + auto onEvent = [&eventCount, &sub, subscriber](Event, Yield yield) { - // We need a separate yield context here for a blocking - // unsubscribe. - boost::asio::spawn(iosvc, [&eventCount, &sub, subscriber] - (boost::asio::yield_context yield) - { - ++eventCount; - subscriber->unsubscribe(sub, yield); - }); + // We need a yield context here for a blocking unsubscribe. + ++eventCount; + subscriber->unsubscribe(sub, yield); }; boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) @@ -1711,7 +1678,9 @@ GIVEN( "these test fixture objects" ) subscriber->connect(yield); subscriber->join(Realm(testRealm), yield); - sub = subscriber->subscribe(Topic("onEvent"), onEvent, yield); + sub = subscriber->subscribe(Topic("onEvent"), + unpackedCoroEvent(onEvent), + yield); // Dummy RPC used to end polling int rpcCount = 0; @@ -1758,7 +1727,7 @@ GIVEN( "an IO service, a valid TCP connector, and an invalid connector" ) { boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { - auto session = CoroSession<>::create(badCnct); + auto session = CoroSession<>::create(iosvc, badCnct); bool throws = false; try { @@ -1786,7 +1755,7 @@ GIVEN( "an IO service, a valid TCP connector, and an invalid connector" ) boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { - auto s = CoroSession<>::create(connectors); + auto s = CoroSession<>::create(iosvc, connectors); { // Connect @@ -1849,7 +1818,7 @@ GIVEN( "an IO service and a TCP connector" ) { boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { - RpcFixture f(cnct); + RpcFixture f(iosvc, cnct); f.join(yield); f.enroll(yield); @@ -1874,7 +1843,7 @@ GIVEN( "an IO service and a TCP connector" ) { std::error_code ec; int callCount = 0; - RpcFixture f(cnct); + RpcFixture f(iosvc, cnct); f.join(yield); f.enroll(yield); @@ -1923,7 +1892,7 @@ GIVEN( "an IO service and a TCP connector" ) { std::error_code ec; int callCount = 0; - RpcFixture f(cnct); + RpcFixture f(iosvc, cnct); f.join(yield); f.enroll(yield); @@ -1972,7 +1941,7 @@ GIVEN( "an IO service and a TCP connector" ) boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { std::error_code ec; - RpcFixture f(cnct); + RpcFixture f(iosvc, cnct); f.join(yield); f.enroll(yield); @@ -2001,7 +1970,8 @@ GIVEN( "an IO service and a TCP connector" ) boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { PublicationId pid = 0; - PubSubFixture f(cnct); + PubSubFixture f(iosvc, cnct); + f.subscriber->setWarningHandler( [](std::string){} ); f.join(yield); f.subscribe(yield); @@ -2132,7 +2102,7 @@ GIVEN( "an IO service and a TCP connector" ) { boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { - auto session = CoroSession<>::create(cnct); + auto session = CoroSession<>::create(iosvc, cnct); session->connect(yield); bool throws = false; @@ -2169,7 +2139,8 @@ GIVEN( "an IO service and a TCP connector" ) WHEN( "constructing a session with an empty connector list" ) { - CHECK_THROWS_AS( Session::create(ConnectorList{}), error::Logic ); + CHECK_THROWS_AS( Session::create(iosvc, ConnectorList{}), + error::Logic ); } WHEN( "using invalid operations while disconnected" ) @@ -2177,7 +2148,7 @@ GIVEN( "an IO service and a TCP connector" ) boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { std::error_code ec; - auto session = CoroSession<>::create(cnct); + auto session = CoroSession<>::create(iosvc, cnct); REQUIRE( session->state() == SessionState::disconnected ); checkInvalidJoin(session, yield); checkInvalidLeave(session, yield); @@ -2189,7 +2160,7 @@ GIVEN( "an IO service and a TCP connector" ) WHEN( "using invalid operations while connecting" ) { - auto session = CoroSession<>::create(cnct); + auto session = CoroSession<>::create(iosvc, cnct); session->connect( [](AsyncResult){} ); boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) @@ -2210,7 +2181,7 @@ GIVEN( "an IO service and a TCP connector" ) { boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { - auto session = CoroSession<>::create(invalidTcp(iosvc)); + auto session = CoroSession<>::create(iosvc, invalidTcp(iosvc)); CHECK_THROWS( session->connect(yield) ); REQUIRE( session->state() == SessionState::failed ); checkInvalidJoin(session, yield); @@ -2225,7 +2196,7 @@ GIVEN( "an IO service and a TCP connector" ) { boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { - auto session = CoroSession<>::create(cnct); + auto session = CoroSession<>::create(iosvc, cnct); session->connect(yield); REQUIRE( session->state() == SessionState::closed ); checkInvalidConnect(session, yield); @@ -2238,7 +2209,7 @@ GIVEN( "an IO service and a TCP connector" ) WHEN( "using invalid operations while establishing" ) { - auto session = CoroSession<>::create(cnct); + auto session = CoroSession<>::create(iosvc, cnct); boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { session->connect(yield); @@ -2265,7 +2236,7 @@ GIVEN( "an IO service and a TCP connector" ) { boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { - auto session = CoroSession<>::create(cnct); + auto session = CoroSession<>::create(iosvc, cnct); session->connect(yield); session->join(Realm(testRealm), yield); REQUIRE( session->state() == SessionState::established ); @@ -2278,7 +2249,7 @@ GIVEN( "an IO service and a TCP connector" ) WHEN( "using invalid operations while shutting down" ) { - auto session = CoroSession<>::create(cnct); + auto session = CoroSession<>::create(iosvc, cnct); boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { session->connect(yield); @@ -2507,7 +2478,7 @@ GIVEN( "an IO service and a TCP connector" ) bool published = false; boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { - auto s = CoroSession<>::create(cnct); + auto s = CoroSession<>::create(iosvc, cnct); s->connect(yield); s->join(Realm(testRealm), yield); s->publish(Pub("topic"), diff --git a/test/wamptestadvanced.cpp b/test/wamptestadvanced.cpp index 830df1fc..8b1672d9 100644 --- a/test/wamptestadvanced.cpp +++ b/test/wamptestadvanced.cpp @@ -23,20 +23,16 @@ const short testPort = 12345; Connector::Ptr tcp(AsioService& iosvc) { -#ifdef CPPWAMP_USE_LEGACY_CONNECTORS - return legacyConnector(iosvc, TcpHost("localhost", testPort)); -#else return connector(iosvc, TcpHost("localhost", testPort)); -#endif } //------------------------------------------------------------------------------ struct RpcFixture { template - RpcFixture(TConnector cnct) - : caller(CoroSession<>::create(cnct)), - callee(CoroSession<>::create(cnct)) + RpcFixture(AsioService& iosvc, TConnector cnct) + : caller(CoroSession<>::create(iosvc, cnct)), + callee(CoroSession<>::create(iosvc, cnct)) {} void join(boost::asio::yield_context yield) @@ -63,9 +59,9 @@ struct RpcFixture struct PubSubFixture { template - PubSubFixture(TConnector cnct) - : publisher(CoroSession<>::create(cnct)), - subscriber(CoroSession<>::create(cnct)) + PubSubFixture(AsioService& iosvc, TConnector cnct) + : publisher(CoroSession<>::create(iosvc, cnct)), + subscriber(CoroSession<>::create(iosvc, cnct)) {} void join(boost::asio::yield_context yield) @@ -98,7 +94,7 @@ SCENARIO( "WAMP RPC advanced features", "[WAMP]" ) GIVEN( "a caller and a callee" ) { AsioService iosvc; - RpcFixture f(tcp(iosvc)); + RpcFixture f(iosvc, tcp(iosvc)); WHEN( "using caller identification" ) { @@ -175,7 +171,7 @@ SCENARIO( "WAMP pub/sub advanced features", "[WAMP]" ) GIVEN( "a publisher and a subscriber" ) { AsioService iosvc; - PubSubFixture f(tcp(iosvc)); + PubSubFixture f(iosvc, tcp(iosvc)); WHEN( "using publisher identification" ) { @@ -291,7 +287,7 @@ GIVEN( "a publisher and a subscriber" ) WHEN( "using subscriber black/white listing" ) { - auto subscriber2 = CoroSession<>::create(tcp(iosvc)); + auto subscriber2 = CoroSession<>::create(iosvc, tcp(iosvc)); boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield) { diff --git a/toolchain-arm-linux-gnueabihf.cmake b/toolchain-arm-linux-gnueabihf.cmake new file mode 100644 index 00000000..0c5a0987 --- /dev/null +++ b/toolchain-arm-linux-gnueabihf.cmake @@ -0,0 +1,18 @@ +# this one is important +SET(CMAKE_SYSTEM_NAME Linux) +#this one not so much +SET(CMAKE_SYSTEM_VERSION 1) + +# specify the cross compiler +SET(CMAKE_C_COMPILER /usr/bin/arm-linux-gnueabihf-gcc) +SET(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabihf-g++) + +# where is the target environment +SET(CMAKE_FIND_ROOT_PATH /usr/arm-linux-gnueabihf) + +# search for programs in the build host directories +SET(CMAKE_FIND_ROOT_PATH_MODE_PROGRAM NEVER) +# for libraries and headers in the target directories +SET(CMAKE_FIND_ROOT_PATH_MODE_LIBRARY ONLY) +SET(CMAKE_FIND_ROOT_PATH_MODE_INCLUDE ONLY) +