diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index 489c474261..acdd67480b 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -36,6 +36,8 @@ class ForwardInput < Input config_param :port, :integer, default: LISTEN_PORT desc 'The bind address to listen to.' config_param :bind, :string, default: '0.0.0.0' + desc 'Whether it accept only IPv6 connection with IPv6 bind address.' + config_param :bind_ipv6_only, :bool, default: true config_param :backlog, :integer, default: nil # SO_LINGER 0 to send RST rather than FIN to avoid lots of connections sitting in TIME_WAIT at src @@ -176,6 +178,7 @@ def start resolve_name: @resolve_hostname, linger_timeout: @linger_timeout, send_keepalive_packet: @send_keepalive_packet, + bind_ipv6_only: @bind_ipv6_only, backlog: @backlog, &method(:handle_connection) ) diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index bd22e5437b..c783580280 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -67,7 +67,7 @@ def server_wait_until_stop # conn.close # end # end - def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: true, backlog: nil, tls_options: nil, **socket_options, &block) + def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: true, bind_ipv6_only: true, backlog: nil, tls_options: nil, **socket_options, &block) proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol) @@ -91,7 +91,7 @@ def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: t case proto when :tcp - server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block) + server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, bind_ipv6_only: bind_ipv6_only, &block) when :tls transport_config = if tls_options server_create_transport_section_object(tls_options) @@ -100,7 +100,7 @@ def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: t else raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified" end - server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter, &block) + server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter, bind_ipv6_only: bind_ipv6_only, &block) when :unix raise "not implemented yet" else @@ -121,7 +121,7 @@ def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: t # sock.remote_port # # ... # end - def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, tls_options: nil, max_bytes: nil, flags: 0, **socket_options, &callback) + def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket: nil, bind_ipv6_only: true, backlog: nil, tls_options: nil, max_bytes: nil, flags: 0, **socket_options, &callback) proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol) @@ -155,7 +155,7 @@ def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket case proto when :tcp - server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter) do |conn| + server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, bind_ipv6_only: bind_ipv6_only) do |conn| conn.data(&callback) end when :tls @@ -166,7 +166,7 @@ def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket else raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified" end - server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter) do |conn| + server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter, bind_ipv6_only: bind_ipv6_only) do |conn| conn.data(&callback) end when :udp @@ -212,8 +212,8 @@ def server_attach(title, proto, port, bind, shared, server) event_loop_attach(server) end - def server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block) - sock = server_create_tcp_socket(shared, bind, port) + def server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, bind_ipv6_only: true, &block) + sock = server_create_tcp_socket(shared, bind, port, bind_ipv6_only: bind_ipv6_only) socket_option_setter.call(sock) close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } } server = Coolio::TCPServer.new(sock, nil, EventHandler::TCPServer, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn| @@ -227,9 +227,9 @@ def server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_ server end - def server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_option_setter, &block) + def server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_option_setter, bind_ipv6_only: true, &block) context = cert_option_create_context(conf.version, conf.insecure, conf.ciphers, conf) - sock = server_create_tcp_socket(shared, bind, port) + sock = server_create_tcp_socket(shared, bind, port, bind_ipv6_only: bind_ipv6_only) socket_option_setter.call(sock) close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } } server = Coolio::TCPServer.new(sock, nil, EventHandler::TLSServer, context, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn| @@ -379,13 +379,16 @@ def server_socket_manager_client ServerEngine::SocketManager::Client.new(socket_manager_path) end - def server_create_tcp_socket(shared, bind, port) + def server_create_tcp_socket(shared, bind, port, bind_ipv6_only: true) sock = if shared server_socket_manager_client.listen_tcp(bind, port) else - # TCPServer.new doesn't set IPV6_V6ONLY flag, so use Addrinfo class instead. - # backlog will be set by the caller, we don't need to set backlog here - tsock = Addrinfo.tcp(bind, port).listen + addrinfo = Addrinfo.tcp(bind, port) + tsock = ::Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol) + tsock.ipv6only! if addrinfo.ipv6? && bind_ipv6_only + tsock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_REUSEADDR, 1) + tsock.bind(addrinfo) + tsock.listen(::Socket::SOMAXCONN) tsock.autoclose = false TCPServer.for_fd(tsock.fileno) end