Skip to content

Commit

Permalink
Add a ability to negotiate a protocol version before commencing opera…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
eliran-stratoscale committed Jun 2, 2016
1 parent 71bf46e commit 664e96c
Show file tree
Hide file tree
Showing 13 changed files with 272 additions and 12 deletions.
8 changes: 6 additions & 2 deletions cpp/Osmosis/Chain/Remote/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ namespace Remote

Connection::Connection( const std::string & hostname, unsigned short port ) :
_connection( hostname, port ),
_labelOps( _connection.socket() )
{}
_labelOps( _connection.socket() ),
_protocolVersion( Tongue::MIN_SUPPORTED_PROTOCOL_VERSION )
{
ProtocolVersionNegotiator negotiator( _connection, Tongue::MIN_SUPPORTED_PROTOCOL_VERSION );
_protocolVersion = negotiator.negotiate();
}

void Connection::putString( const std::string & blob, const Hash & hash )
{
Expand Down
2 changes: 2 additions & 0 deletions cpp/Osmosis/Chain/Remote/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "Osmosis/Chain/Remote/LabelOps.h"
#include "Osmosis/TCPConnection.h"
#include "Osmosis/Hash.h"
#include "Osmosis/Chain/Remote/ProtocolVersionNegotiator.h"

namespace Osmosis {
namespace Chain {
Expand Down Expand Up @@ -42,6 +43,7 @@ class Connection : public ObjectStoreConnectionInterface
private:
TCPConnection _connection;
LabelOps _labelOps;
uint32_t _protocolVersion;

Connection( const Connection & rhs ) = delete;

Expand Down
67 changes: 67 additions & 0 deletions cpp/Osmosis/Chain/Remote/ProtocolVersionNegotiator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#include <algorithm>
#include <boost/asio/error.hpp>
#include <Osmosis/Chain/Remote/ProtocolVersionNegotiator.h>
#include <Osmosis/TCPConnection.h>
#include <Osmosis/Tongue.h>
#include <Osmosis/Stream/AckOps.h>

namespace Osmosis {
namespace Chain {
namespace Remote
{

ProtocolVersionNegotiator::ProtocolVersionNegotiator( TCPConnection & connection,
uint32_t currentProtocol ) :
_connection( connection ),
_currentProtocol( currentProtocol )
{}

Tongue::SupportedProtocolVersions ProtocolVersionNegotiator::getSupportedVersionsInServer() {
const struct Tongue::Header header = { static_cast< unsigned char >(
Tongue::Opcode::GET_SUPPORTED_PROTOCOL_VERSIONS ) };
_connection.socket().sendAll( header );
struct Tongue::SupportedProtocolVersions response = { Tongue::FIRST_PROTOCOL_VERSION,
Tongue::FIRST_PROTOCOL_VERSION };
try {
response = _connection.socket().receiveAll< struct Tongue::SupportedProtocolVersions >();
} catch( boost::system::system_error &ex ) {
if ( ex.code().value() == boost::asio::error::misc_errors::eof ) {
// Nothing to do. The server does not support this op
_connection.connect();
} else {
TRACE_ERROR("An unknown error (code: " << ex.code() << ") has occurred while trying to get the "
<< "list of supported protocols from the server.");
throw ex;
}
}
return response;
}

uint32_t ProtocolVersionNegotiator::getMaxCommonSupportedProtocolVersion() {
Tongue::SupportedProtocolVersions serverVersions;
serverVersions = getSupportedVersionsInServer();
const uint32_t maxCommonSupportedVersion = std::min( serverVersions.max,
static_cast<uint32_t>(Tongue::MAX_SUPPORTED_PROTOCOL_VERSION) );
if ( maxCommonSupportedVersion < Tongue::MIN_SUPPORTED_PROTOCOL_VERSION or
maxCommonSupportedVersion < serverVersions.min ) {
return Tongue::FIRST_PROTOCOL_VERSION;
}
return maxCommonSupportedVersion;
}

uint32_t ProtocolVersionNegotiator::negotiate() {
const uint32_t maxCommonSupportedVersion = getMaxCommonSupportedProtocolVersion();
if ( maxCommonSupportedVersion > _currentProtocol ) {
ASSERT( maxCommonSupportedVersion > Tongue::FIRST_PROTOCOL_VERSION );
const struct Tongue::Header header =
{ static_cast< unsigned char >( Tongue::Opcode::UPGRADE_PROTOCOL_VERSION ) };
_connection.socket().sendAllConcated( header, maxCommonSupportedVersion );
Stream::AckOps( _connection.socket() ).wait( "Protocol upgrade confirmation" );
return maxCommonSupportedVersion;
}
return _currentProtocol;
}

} // namespace Remote
} // namespace Chain
} // namespace Osmosis
37 changes: 37 additions & 0 deletions cpp/Osmosis/Chain/Remote/ProtocolVersionNegotiator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#ifndef __OSMOSIS_PROTOCOL_NEGOTIATOR_H__
#define __OSMOSIS_PROTOCOL_NEGOTIATOR_H__

#include <cstdint>
#include <boost/filesystem/path.hpp>
#include "Osmosis/TCPConnection.h"
#include "Osmosis/Hash.h"

namespace Osmosis {
namespace Chain {
namespace Remote
{

class ProtocolVersionNegotiator
{
public:
ProtocolVersionNegotiator( TCPConnection & connection, uint32_t currentProtocol );

uint32_t negotiate();

private:
TCPConnection & _connection;
uint32_t _currentProtocol;

Tongue::SupportedProtocolVersions getSupportedVersionsInServer();

uint32_t getMaxCommonSupportedProtocolVersion();

ProtocolVersionNegotiator( const ProtocolVersionNegotiator & rhs ) = delete;
ProtocolVersionNegotiator & operator= ( const ProtocolVersionNegotiator & rhs ) = delete;
};

} // namespace Remote
} // namespace Chain
} // namespace Osmosis

#endif // __OSMOSIS_PROTOCOL_NEGOTIATOR_H__
24 changes: 24 additions & 0 deletions cpp/Osmosis/Server/GetSupportedProtocolVersionsOp.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#include <Osmosis/Server/GetSupportedProtocolVersionsOp.h>
#include <Osmosis/TCPSocket.h>
#include <Osmosis/Tongue.h>
#include "Common/Error.h"

namespace Osmosis {
namespace Server
{

GetSupportedProtocolVersionsOp::GetSupportedProtocolVersionsOp( TCPSocket & socket ):
_socket( socket )
{}

void GetSupportedProtocolVersionsOp::go()
{
BACKTRACE_BEGIN
Tongue::SupportedProtocolVersions message = { Tongue::MIN_SUPPORTED_PROTOCOL_VERSION,
Tongue::MAX_SUPPORTED_PROTOCOL_VERSION };
_socket.sendAll( message );
BACKTRACE_END
}

} // namespace Server
} // namespace Osmosis
28 changes: 28 additions & 0 deletions cpp/Osmosis/Server/GetSupportedProtocolVersionsOp.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#ifndef OSMOSIS_SERVER_GET_SUPPORTED_PROTOCOL_VERSIONS_OP_H_
#define OSMOSIS_SERVER_GET_SUPPORTED_PROTOCOL_VERSIONS_OP_H_

#include <Osmosis/TCPSocket.h>
#include <Osmosis/Tongue.h>

namespace Osmosis {
namespace Server
{

class GetSupportedProtocolVersionsOp
{
public:
GetSupportedProtocolVersionsOp( TCPSocket & socket );

void go();

private:
TCPSocket & _socket;

GetSupportedProtocolVersionsOp( const GetSupportedProtocolVersionsOp & rhs ) = delete;
GetSupportedProtocolVersionsOp & operator= ( const GetSupportedProtocolVersionsOp & rhs ) = delete;
};

} // namespace Server
} // namespace Osmosis

#endif
18 changes: 15 additions & 3 deletions cpp/Osmosis/Server/Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include "Osmosis/Server/ListLabelsOp.h"
#include "Osmosis/Server/EraseLabelOp.h"
#include "Osmosis/Server/RenameLabelOp.h"
#include "Osmosis/Server/GetSupportedProtocolVersionsOp.h"
#include "Osmosis/Server/UpgradeProtocolOp.h"


namespace Osmosis {
Expand All @@ -25,6 +27,7 @@ Thread::Thread( boost::filesystem::path rootPath,
_labels( labels ),
_boostSocket( _ioService ),
_socket( _boostSocket ),
_protocolVersion( Tongue::MIN_SUPPORTED_PROTOCOL_VERSION ),
_tcpNoDelay( false )
{}

Expand Down Expand Up @@ -101,6 +104,12 @@ bool Thread::work()
case Tongue::Opcode::ACK:
THROW( Error, "Unexpected ack" );
break;
case Tongue::Opcode::GET_SUPPORTED_PROTOCOL_VERSIONS:
GetSupportedProtocolVersionsOp( _socket ).go();
break;
case Tongue::Opcode::UPGRADE_PROTOCOL_VERSION:
UpgradeProtocolOp( _socket, _protocolVersion ).go();
break;
default:
THROW( Error, "Protocol Error: unknown opcode: " << header.opcode );
break;
Expand All @@ -112,10 +121,13 @@ void Thread::handshake()
{
BACKTRACE_BEGIN
auto handshake = _socket.receiveAll< struct Tongue::Handshake >();
if ( handshake.protocolVersion != static_cast< unsigned >( Tongue::PROTOCOL_VERSION ) )
const auto minSupportedVersion = static_cast< unsigned >( Tongue::MIN_SUPPORTED_PROTOCOL_VERSION );
const auto maxSupportedVersion = static_cast< unsigned >( Tongue::MAX_SUPPORTED_PROTOCOL_VERSION );
if ( handshake.protocolVersion < minSupportedVersion or handshake.protocolVersion > maxSupportedVersion )
THROW( Error, "Client with protocol version " << handshake.protocolVersion <<
" attempted to connect to us, but our protocol version is " <<
Tongue::PROTOCOL_VERSION );
" attempted to connect to us, but the server only supports versions from " <<
Tongue::MIN_SUPPORTED_PROTOCOL_VERSION << " to " << Tongue::MAX_SUPPORTED_PROTOCOL_VERSION );
_protocolVersion = handshake.protocolVersion;
if ( handshake.compression != static_cast< unsigned >( Tongue::Compression::UNCOMPRESSED ) )
THROW( Error, "Compression not yet implemented" );
Stream::AckOps( _socket ).sendAck();
Expand Down
1 change: 1 addition & 0 deletions cpp/Osmosis/Server/Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Thread : public std::enable_shared_from_this< Thread >
boost::asio::io_service _ioService;
boost::asio::ip::tcp::socket _boostSocket;
TCPSocket _socket;
uint32_t _protocolVersion;
bool _tcpNoDelay;

void thread();
Expand Down
31 changes: 31 additions & 0 deletions cpp/Osmosis/Server/UpgradeProtocolOp.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#include <Osmosis/Server/UpgradeProtocolOp.h>
#include <Osmosis/TCPSocket.h>
#include <Osmosis/Tongue.h>
#include <Osmosis/Debug.h>
#include <Osmosis/Stream/AckOps.h>
#include "Common/Error.h"

namespace Osmosis {
namespace Server
{

UpgradeProtocolOp::UpgradeProtocolOp( TCPSocket & socket, uint32_t &protocolVersion ):
_socket( socket ),
_protocolVersion( protocolVersion )
{}

void UpgradeProtocolOp::go()
{
auto requestedProtocol = _socket.receiveAll< uint32_t >();
const auto minSupportedVersion = static_cast< unsigned >( Tongue::MIN_SUPPORTED_PROTOCOL_VERSION );
const auto maxSupportedVersion = static_cast< unsigned >( Tongue::MAX_SUPPORTED_PROTOCOL_VERSION );
if ( requestedProtocol < minSupportedVersion or requestedProtocol > maxSupportedVersion )
THROW( Error, "Cannot upgrade protocol to version " << requestedProtocol << "."
" The server only supports versions from " <<
Tongue::MIN_SUPPORTED_PROTOCOL_VERSION << "to " << Tongue::MAX_SUPPORTED_PROTOCOL_VERSION );
_protocolVersion = requestedProtocol;
Stream::AckOps( _socket ).sendAck();
}

} // namespace Server
} // namespace Osmosis
29 changes: 29 additions & 0 deletions cpp/Osmosis/Server/UpgradeProtocolOp.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#ifndef OSMOSIS_SERVER_UPGRADE_PROTOCOL_OP_H_
#define OSMOSIS_SERVER_UPGRADE_PROTOCOL_OP_H_

#include <Osmosis/TCPSocket.h>
#include <Osmosis/Tongue.h>

namespace Osmosis {
namespace Server
{

class UpgradeProtocolOp
{
public:
UpgradeProtocolOp( TCPSocket & socket, uint32_t &protocolVersion );

void go();

private:
TCPSocket & _socket;
uint32_t & _protocolVersion;

UpgradeProtocolOp( const UpgradeProtocolOp & rhs ) = delete;
UpgradeProtocolOp & operator= ( const UpgradeProtocolOp & rhs ) = delete;
};

} // namespace Server
} // namespace Osmosis

#endif
21 changes: 15 additions & 6 deletions cpp/Osmosis/TCPConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace Osmosis

TCPConnection::TCPConnection( const std::string & hostname, unsigned short port ):
_socket( _ioService ),
_endpoint(),
_tcpSocket( _socket )
{
boost::asio::ip::tcp::resolver resolver( _ioService );
Expand All @@ -18,22 +19,30 @@ TCPConnection::TCPConnection( const std::string & hostname, unsigned short port
if ( first == boost::asio::ip::tcp::resolver::iterator() )
THROW( Error, "Unable to resolve the hostname '" << hostname << "'" );
boost::asio::ip::tcp::endpoint endpoint( first->endpoint().address(), port );
_socket.connect( endpoint );
ASSERT( _socket.is_open() );

setTCPNoDelay();
sendHandshake();
_endpoint = endpoint;
connect();
}

TCPSocket & TCPConnection::socket()
{
return _tcpSocket;
}

void TCPConnection::connect()
{
if ( _socket.is_open() ) {
_socket.close();
}
_socket.connect( _endpoint );
ASSERT( _socket.is_open() );
setTCPNoDelay();
sendHandshake();
}

void TCPConnection::sendHandshake()
{
struct Tongue::Handshake handshake = {
static_cast< unsigned >( Tongue::PROTOCOL_VERSION ),
static_cast< unsigned >( Tongue::MIN_SUPPORTED_PROTOCOL_VERSION ),
static_cast< unsigned >( Tongue::Compression::UNCOMPRESSED ) };
_tcpSocket.sendAll( handshake );
Stream::AckOps( _tcpSocket ).wait( "Handshake" );
Expand Down
3 changes: 3 additions & 0 deletions cpp/Osmosis/TCPConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ class TCPConnection

TCPSocket & socket();

void connect();

private:
boost::asio::io_service _ioService;
boost::asio::ip::tcp::socket _socket;
boost::asio::ip::tcp::endpoint _endpoint;
TCPSocket _tcpSocket;

void sendHandshake();
Expand Down
Loading

0 comments on commit 664e96c

Please sign in to comment.