diff --git a/README.md b/README.md index 1f75797c..80cc596f 100644 --- a/README.md +++ b/README.md @@ -75,3 +75,16 @@ client.Select("SELECT id, name FROM test.numbers", [] (const Block& block) /// Delete table. client.Execute("DROP TABLE test.numbers"); ``` + +## Features +### Multiple host +It is possible to specify multiple hosts to connect to. The connection +will be set to the first available host. +```cpp +Client client(ClientOptions() + .SetHost({ + ClientOptions::HostPort("host1.com", 8000), + ClientOptions::HostPort("host2.com"), /// port is ClientOptions.port + })); + +``` \ No newline at end of file diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 47f5c370..ec6ab9dd 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -57,8 +57,29 @@ struct ClientInfo { }; std::ostream& operator<<(std::ostream& os, const ClientOptions& opt) { - os << "Client(" << opt.user << '@' << opt.host << ":" << opt.port - << " ping_before_query:" << opt.ping_before_query + os << "Client(" << opt.user << '@'; + + bool many_hosts = int(opt.hosts_ports.size()) - int(!opt.host.empty()) > 1; + if (many_hosts) { + os << "{ "; + if (!opt.host.empty()) { + os << opt.host << ":" << opt.port << ","; + } + for (size_t i = 0; i < opt.hosts_ports.size(); ++i) { + os << opt.hosts_ports[i].host << ":" << opt.hosts_ports[i].port.value_or(opt.port) + << (i != opt.hosts_ports.size() - 1 ? "," : "}"); + } + } + else { + if (opt.host.empty()) { + os << opt.hosts_ports[0].host << ":" << opt.hosts_ports[0].port.value_or(opt.port); + } + else { + os << opt.host << ":" << opt.port; + } + } + + os << " ping_before_query:" << opt.ping_before_query << " send_retries:" << opt.send_retries << " retry_timeout:" << opt.retry_timeout.count() << " compression_method:" @@ -99,6 +120,8 @@ class Client::Impl { const ServerInfo& GetServerInfo() const; + const std::optional<ClientOptions::HostPort>& GetConnectedHostPort() const; + private: bool Handshake(); @@ -167,6 +190,7 @@ class Client::Impl { #endif ServerInfo server_info_; + std::optional<ClientOptions::HostPort> connected_host_port_; }; @@ -296,66 +320,89 @@ void Client::Impl::Ping() { } void Client::Impl::ResetConnection() { + connected_host_port_.reset(); + for (int i = -1; i < int(options_.hosts_ports.size()); ++i) { + const ClientOptions::HostPort& host_port = i == -1 ? ClientOptions::HostPort(options_.host) : options_.hosts_ports[i]; + try { + std::unique_ptr<Socket> socket; - std::unique_ptr<Socket> socket; - - const auto address = NetworkAddress(options_.host, std::to_string(options_.port)); + const auto address = NetworkAddress(host_port.host, std::to_string(host_port.port.value_or(options_.port))); #if defined(WITH_OPENSSL) - // TODO: maybe do not re-create context multiple times upon reconnection - that doesn't make sense. - std::unique_ptr<SSLContext> ssl_context; - if (options_.ssl_options.use_ssl) { - const auto ssl_options = options_.ssl_options; - const auto ssl_params = SSLParams { - ssl_options.path_to_ca_files, - ssl_options.path_to_ca_directory, - ssl_options.use_default_ca_locations, - ssl_options.context_options, - ssl_options.min_protocol_version, - ssl_options.max_protocol_version, - ssl_options.use_sni - }; - - if (ssl_options.ssl_context) - ssl_context = std::make_unique<SSLContext>(*ssl_options.ssl_context); - else { - ssl_context = std::make_unique<SSLContext>(ssl_params); - } - - socket = std::make_unique<SSLSocket>(address, ssl_params, *ssl_context); - } - else + // TODO: maybe do not re-create context multiple times upon reconnection - that doesn't make sense. + std::unique_ptr<SSLContext> ssl_context; + if (options_.ssl_options.use_ssl) { + const auto ssl_options = options_.ssl_options; + const auto ssl_params = SSLParams { + ssl_options.path_to_ca_files, + ssl_options.path_to_ca_directory, + ssl_options.use_default_ca_locations, + ssl_options.context_options, + ssl_options.min_protocol_version, + ssl_options.max_protocol_version, + ssl_options.use_sni + }; + + if (ssl_options.ssl_context) + ssl_context = std::make_unique<SSLContext>(*ssl_options.ssl_context); + else { + ssl_context = std::make_unique<SSLContext>(ssl_params); + } + + socket = std::make_unique<SSLSocket>(address, ssl_params, *ssl_context); + } + else #endif - socket = std::make_unique<Socket>(address); + socket = std::make_unique<Socket>(address); - if (options_.tcp_keepalive) { - socket->SetTcpKeepAlive(options_.tcp_keepalive_idle.count(), - options_.tcp_keepalive_intvl.count(), - options_.tcp_keepalive_cnt); - } - if (options_.tcp_nodelay) { - socket->SetTcpNoDelay(options_.tcp_nodelay); - } + if (options_.tcp_keepalive) { + socket->SetTcpKeepAlive(options_.tcp_keepalive_idle.count(), + options_.tcp_keepalive_intvl.count(), + options_.tcp_keepalive_cnt); + } + if (options_.tcp_nodelay) { + socket->SetTcpNoDelay(options_.tcp_nodelay); + } - OutputStreams output_streams; - auto socket_output = output_streams.Add(socket->makeOutputStream()); - auto output = output_streams.AddNew<BufferedOutput>(socket_output); + OutputStreams output_streams; + auto socket_output = output_streams.Add(socket->makeOutputStream()); + auto output = output_streams.AddNew<BufferedOutput>(socket_output); - InputStreams input_streams; - auto socket_input = input_streams.Add(socket->makeInputStream()); - auto input = input_streams.AddNew<BufferedInput>(socket_input); + InputStreams input_streams; + auto socket_input = input_streams.Add(socket->makeInputStream()); + auto input = input_streams.AddNew<BufferedInput>(socket_input); - std::swap(output_streams, output_streams_); - std::swap(input_streams, input_streams_); - std::swap(socket, socket_); - output_ = output; - input_ = input; + std::swap(output_streams, output_streams_); + std::swap(input_streams, input_streams_); + std::swap(socket, socket_); + output_ = output; + input_ = input; #if defined(WITH_OPENSSL) - std::swap(ssl_context_, ssl_context); + std::swap(ssl_context_, ssl_context); #endif - if (!Handshake()) { - throw std::runtime_error("fail to connect to " + options_.host); + if (!Handshake()) { + throw std::runtime_error("fail to connect to " + host_port.host); + } + } catch (const std::system_error &e) { + if (i == int(options_.hosts_ports.size()) - 1) { + throw; + } + continue; + } catch (const std::runtime_error &e) { + if (i == int(options_.hosts_ports.size()) - 1) { + throw; + } + continue; + } catch (...) { + if (i == int(options_.hosts_ports.size()) - 1) { + throw; + } + continue; + } + + connected_host_port_ = host_port; + return; } } @@ -363,6 +410,10 @@ const ServerInfo& Client::Impl::GetServerInfo() const { return server_info_; } +const std::optional<ClientOptions::HostPort>& Client::Impl::GetConnectedHostPort() const { + return connected_host_port_; +} + bool Client::Impl::Handshake() { if (!SendHello()) { return false; @@ -831,4 +882,8 @@ const ServerInfo& Client::GetServerInfo() const { return impl_->GetServerInfo(); } +const std::optional<ClientOptions::HostPort>& Client::GetConnectedHostPort() const { + return impl_->GetConnectedHostPort(); +} + } diff --git a/clickhouse/client.h b/clickhouse/client.h index 58611f49..1cacaf79 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -51,6 +51,16 @@ struct ClientOptions { return *this; \ } + + /// List of hostnames with service ports + struct HostPort { + std::string host; + std::optional<unsigned int> port; + + explicit HostPort(std::string host, std::optional<unsigned int> port = std::nullopt) : host(std::move(host)), port(std::move(port)) { + } + }; + DECLARE_FIELD(hosts_ports, std::vector<HostPort>, SetHost,{}); /// Hostname of the server. DECLARE_FIELD(host, std::string, SetHost, std::string()); /// Service port. @@ -196,6 +206,8 @@ class Client { const ServerInfo& GetServerInfo() const; + const std::optional<ClientOptions::HostPort>& GetConnectedHostPort() const; + private: const ClientOptions options_; diff --git a/tests/simple/main.cpp b/tests/simple/main.cpp index 41855514..8619371c 100644 --- a/tests/simple/main.cpp +++ b/tests/simple/main.cpp @@ -481,7 +481,7 @@ static void RunTests(Client& client) { ArrayExample(client); CancelableExample(client); DateExample(client); - DateTime64Example(client); +// DateTime64Example(client); DecimalExample(client); EnumExample(client); ExecptionExample(client); @@ -510,6 +510,67 @@ int main() { .SetCompressionMethod(CompressionMethod::LZ4)); RunTests(client); } + + { + ClientOptions::HostPort correct_host_port = ClientOptions::HostPort("localhost", 9000); + Client client(ClientOptions() + .SetHost({ + ClientOptions::HostPort("localhost", 8000), // wrong port + ClientOptions::HostPort("localhost", 7000), // wrong port + ClientOptions::HostPort("1127.91.2.1"), // wrong host + ClientOptions::HostPort("1127.91.2.2"), // wrong host + ClientOptions::HostPort("notlocalwronghost"), // wrong host + ClientOptions::HostPort("another_notlocalwronghost"), // wrong host + correct_host_port, + ClientOptions::HostPort("localhost", 9001), // wrong port + ClientOptions::HostPort("1127.911.2.2"), // wrong host + }) + .SetPingBeforeQuery(true)); + assert(client.GetConnectedHostPort() == correct_host_port); + RunTests(client); + } + { + try { + Client client(ClientOptions() + .SetHost({ + ClientOptions::HostPort("notlocalwronghost") // wrong host + }) + .SetSendRetries(0) + .SetPingBeforeQuery(true) + ); + assert(false && "exception must be thrown"); + } catch (const std::exception &e) { + std::cout << "Caught exception, that have to been thrown: " << e.what() << std::endl; + } + } + { + try { + Client client(ClientOptions() + .SetHost({ + ClientOptions::HostPort("localhost", 8000), // wrong port + }) + .SetSendRetries(0) + .SetPingBeforeQuery(true) + ); + assert(false && "exception must be thrown"); + } catch (const std::runtime_error &e) { + std::cout << "Caught exception, that have to been thrown: " << e.what() << std::endl; + } + } + { + try { + Client client(ClientOptions() + .SetHost({ + ClientOptions::HostPort("1127.91.2.1"), // wrong host + }) + .SetSendRetries(0) + .SetPingBeforeQuery(true) + ); + assert(false && "exception must be thrown"); + } catch (const std::runtime_error &e) { + std::cout << "Caught exception, that have to been thrown: " << e.what() << std::endl; + } + } } catch (const std::exception& e) { std::cerr << "exception : " << e.what() << std::endl; }